Skip to content

Commit

Permalink
Merge pull request #70 from Infomaniak/KDESKTOP-777-Windows-Lite-Sync…
Browse files Browse the repository at this point in the history
…-fetch-issue

[KDESKTOP-777] Fix fetch issue
  • Loading branch information
ClementKunz authored Apr 23, 2024
2 parents 2158b6a + 2aa2c32 commit 12437c7
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 70 deletions.
142 changes: 79 additions & 63 deletions extensions/windows/cfapi/Vfs/cloudprovider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ bool CloudProvider::start(wchar_t *namespaceCLSID, DWORD *namespaceCLSIDSize) {
}

// The client folder (syncroot) must be indexed in order for states to properly display
TRACE_DEBUG(L"Calling Utilities::addFolderToSearchIndexer : path = %ls", _providerInfo->folderPath());
TRACE_DEBUG(L"Calling Utilities::addFolderToSearchIndexer: path = %ls", _providerInfo->folderPath());
if (!addFolderToSearchIndexer(_providerInfo->folderPath())) {
TRACE_ERROR(L"Error in Utilities::addFolderToSearchIndexer!");
return false;
Expand Down Expand Up @@ -126,47 +126,47 @@ bool CloudProvider::dehydrate(const wchar_t *path) {

winrt::handle fileHandle(CreateFile(path, WRITE_DAC, 0, nullptr, OPEN_EXISTING, FILE_FLAG_OPEN_NO_RECALL, nullptr));
if (fileHandle.get() == INVALID_HANDLE_VALUE) {
TRACE_ERROR(L"Error in CreateFile : %ls", Utilities::getLastErrorMessage().c_str());
TRACE_ERROR(L"Error in CreateFile: %ls", Utilities::getLastErrorMessage().c_str());
return false;
}

LARGE_INTEGER offset = {};
LARGE_INTEGER length;
length.QuadPart = MAXLONGLONG;
try {
TRACE_DEBUG(L"Dehydrating placeholder : path = %ls", path);
TRACE_DEBUG(L"Dehydrating placeholder: path = %ls", path);
winrt::check_hresult(CfDehydratePlaceholder(fileHandle.get(), offset, length, CF_DEHYDRATE_FLAG_NONE, NULL));
} catch (winrt::hresult_error const &ex) {
TRACE_ERROR(L"WinRT error catched : %08x - %s", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
TRACE_ERROR(L"WinRT error catched: %08x - %s", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
if (ex.code() == HRESULT_FROM_WIN32(ERROR_CLOUD_FILE_IN_USE)) {
// Workaround to fix a MS Photos issue that leads to locking pictures
CF_PLACEHOLDER_STANDARD_INFO info;
DWORD retLength = 0;
try {
TRACE_DEBUG(L"Get placeholder info : path = %ls", path);
TRACE_DEBUG(L"Get placeholder info: path = %ls", path);
winrt::check_hresult(
CfGetPlaceholderInfo(fileHandle.get(), CF_PLACEHOLDER_INFO_STANDARD, &info, sizeof(info), &retLength));
} catch (winrt::hresult_error const &ex) {
if (ex.code() != HRESULT_FROM_WIN32(ERROR_MORE_DATA)) {
TRACE_ERROR(L"WinRT error catched : %08x - %s", static_cast<HRESULT>(winrt::to_hresult()),
TRACE_ERROR(L"WinRT error catched: %08x - %s", static_cast<HRESULT>(winrt::to_hresult()),
ex.message().c_str());
res = false;
}
}

if (res) {
try {
TRACE_DEBUG(L"Reverting placeholder : path = %ls", path);
TRACE_DEBUG(L"Reverting placeholder: path = %ls", path);
winrt::check_hresult(CfRevertPlaceholder(fileHandle.get(), CF_REVERT_FLAG_NONE, NULL));

TRACE_DEBUG(L"Converting to placeholder : path = %ls", path);
TRACE_DEBUG(L"Converting to placeholder: path = %ls", path);
winrt::check_hresult(CfConvertToPlaceholder(fileHandle.get(), info.FileIdentity, info.FileIdentityLength,
CF_CONVERT_FLAG_MARK_IN_SYNC, nullptr, NULL));

TRACE_DEBUG(L"Dehydrating placeholder : path = %ls", path);
TRACE_DEBUG(L"Dehydrating placeholder: path = %ls", path);
winrt::check_hresult(CfDehydratePlaceholder(fileHandle.get(), offset, length, CF_DEHYDRATE_FLAG_NONE, NULL));
} catch (winrt::hresult_error const &ex) {
TRACE_ERROR(L"WinRT error catched : %08x - %s", static_cast<HRESULT>(winrt::to_hresult()),
TRACE_ERROR(L"WinRT error catched: %08x - %s", static_cast<HRESULT>(winrt::to_hresult()),
ex.message().c_str());
res = false;
}
Expand All @@ -186,13 +186,13 @@ bool CloudProvider::hydrate(const wchar_t *path) {
}

if (_providerInfo->_fetchMap.find(path) != _providerInfo->_fetchMap.end()) {
TRACE_DEBUG(L"Fetch already in progress : path = %ls", path);
TRACE_DEBUG(L"Fetch already in progress: path = %ls", path);
return true;
}

winrt::handle fileHandle(CreateFile(path, WRITE_DAC, 0, nullptr, OPEN_EXISTING, FILE_FLAG_OPEN_NO_RECALL, nullptr));
if (fileHandle.get() == INVALID_HANDLE_VALUE) {
TRACE_ERROR(L"Error in CreateFile : %ls", Utilities::getLastErrorMessage().c_str());
TRACE_ERROR(L"Error in CreateFile: %ls", Utilities::getLastErrorMessage().c_str());
return false;
}

Expand All @@ -202,17 +202,17 @@ bool CloudProvider::hydrate(const wchar_t *path) {
bool res = true;

try {
TRACE_DEBUG(L"Hydrating placeholder : path = %ls", path);
TRACE_DEBUG(L"Hydrating placeholder: path = %ls", path);
winrt::check_hresult(CfHydratePlaceholder(fileHandle.get(), offset, length, CF_HYDRATE_FLAG_NONE, NULL));
} catch (winrt::hresult_error const &ex) {
std::lock_guard<std::mutex> lk(_providerInfo->_fetchMapMutex);
const auto &fetchInfoIt = _providerInfo->_fetchMap.find(path);
if (fetchInfoIt == _providerInfo->_fetchMap.end() || fetchInfoIt->second.getCancel()) {
// Hydration has been cancelled
TRACE_DEBUG(L"Hydration has been cancelled : path = %ls", path);
TRACE_DEBUG(L"Hydration has been cancelled: path = %ls", path);
res = true;
} else {
TRACE_ERROR(L"WinRT error catched : %08x - %s", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
TRACE_ERROR(L"WinRT error catched: %08x - %s", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
if (ex.code() == 0x800701aa) {
// Timeout
res = false;
Expand All @@ -239,28 +239,28 @@ bool CloudProvider::updateTransfer(const wchar_t *filePath, const wchar_t *fromF

std::unique_lock<std::mutex> lck(_providerInfo->_fetchMapMutex);
if (_providerInfo->_fetchMap.find(filePath) == _providerInfo->_fetchMap.end()) {
TRACE_DEBUG(L"No fetch in progress : path = %ls", filePath);
TRACE_DEBUG(L"No fetch in progress: path = %ls", filePath);
return true;
}

FetchInfo &fetchInfo = _providerInfo->_fetchMap[filePath];
if (fetchInfo.getUpdating()) {
TRACE_DEBUG(L"Update already in progress : path = %ls", filePath);
TRACE_DEBUG(L"Update already in progress: path = %ls", filePath);
return true;
}

if (completed == 0 || completed > fetchInfo._length.QuadPart) {
TRACE_ERROR(L"Invalid completed size : path = %ls", filePath);
TRACE_ERROR(L"Invalid completed size: path = %ls", filePath);
return false;
} else if (completed == fetchInfo._offset.QuadPart) {
TRACE_DEBUG(L"Nothing to do : path = %ls", filePath);
TRACE_DEBUG(L"Nothing to do: path = %ls", filePath);
return true;
} else if (completed > fetchInfo._offset.QuadPart && completed < fetchInfo._offset.QuadPart + CHUNKBASESIZE &&
completed != fetchInfo._length.QuadPart) {
TRACE_DEBUG(L"Not enough to transfer : path = %ls, completed = %lld", filePath, completed);
TRACE_DEBUG(L"Not enough to transfer: path = %ls, completed = %lld", filePath, completed);
return true;
} else if (completed < fetchInfo._offset.QuadPart) {
TRACE_DEBUG(L"Restart transfer : path = %ls", filePath);
TRACE_DEBUG(L"Restart transfer: path = %ls", filePath);
fetchInfo._offset.QuadPart = 0;
}

Expand Down Expand Up @@ -311,11 +311,11 @@ bool CloudProvider::updateTransfer(const wchar_t *filePath, const wchar_t *fromF
Utilities::traceFileDates(filePath);

try {
TRACE_DEBUG(L"Execute transfer : offset = %lld, size = %lld", opParams.TransferData.Offset.QuadPart,
TRACE_DEBUG(L"Execute transfer: offset = %lld, size = %lld", opParams.TransferData.Offset.QuadPart,
opParams.TransferData.Length.QuadPart);
winrt::check_hresult(CfExecute(&opInfo, &opParams));
} catch (winrt::hresult_error const &ex) {
TRACE_ERROR(L"Error catched : hr %08x - %s", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
TRACE_ERROR(L"Error catched: hr %08x - %s", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
res = false;
break;
}
Expand All @@ -338,13 +338,13 @@ bool CloudProvider::updateTransfer(const wchar_t *filePath, const wchar_t *fromF
fromFilePathStream.close();
delete[] transferData;
} else {
TRACE_ERROR(L"Unable to open temporary file : path = %ls", fromFilePath);
TRACE_ERROR(L"Unable to open temporary file: path = %ls", fromFilePath);
res = false;
}

if (fetchInfo.getCancel()) {
// Update canceled, notify
TRACE_DEBUG(L"Update canceled : path = %ls", fromFilePath);
TRACE_DEBUG(L"Update canceled: path = %ls", fromFilePath);
*canceled = true;
_providerInfo->_cancelFetchCV.notify_all();
} else {
Expand All @@ -371,44 +371,27 @@ bool CloudProvider::cancelTransfer(ProviderInfo *providerInfo, const wchar_t *fi
}

if (wcsncmp(filePath, providerInfo->folderPath(), wcslen(providerInfo->folderPath()))) {
TRACE_DEBUG(L"File not synchronized : path = %lls", filePath);
TRACE_DEBUG(L"File not synchronized: path = %lls", filePath);
return true;
}

std::unique_lock<std::mutex> lck(providerInfo->_fetchMapMutex);
if (providerInfo->_fetchMap.find(filePath) == providerInfo->_fetchMap.end()) {
TRACE_DEBUG(L"No fetch in progress : path = %lls", filePath);
TRACE_DEBUG(L"No fetch in progress: path = %lls", filePath);
return true;
}

FetchInfo &fetchInfo = providerInfo->_fetchMap[filePath];
if (fetchInfo.getUpdating()) {
// If updating, set cancel indicator and wait for notification
TRACE_DEBUG(L"Cancel updating : path = %lls", filePath);
TRACE_DEBUG(L"Cancel updating: path = %lls", filePath);
fetchInfo.setCancel();
providerInfo->_cancelFetchCV.wait(lck);
}

if (updateStatus) {
// Update transfer status
CF_OPERATION_INFO opInfo = {0};
CF_OPERATION_PARAMETERS opParams = {0};

opInfo.StructSize = sizeof(opInfo);
opInfo.Type = CF_OPERATION_TYPE_TRANSFER_DATA;
opInfo.ConnectionKey = fetchInfo._connectionKey;
opInfo.TransferKey = fetchInfo._transferKey;
opParams.ParamSize = CF_SIZE_OF_OP_PARAM(TransferData);
opParams.TransferData.CompletionStatus = STATUS_UNSUCCESSFUL;
opParams.TransferData.Buffer = nullptr;
opParams.TransferData.Offset = {0}; // Cancel entire transfer
opParams.TransferData.Length = fetchInfo._length;

try {
TRACE_DEBUG(L"Cancel transfer : path = %lls", filePath);
winrt::check_hresult(CfExecute(&opInfo, &opParams));
} catch (winrt::hresult_error const &ex) {
TRACE_ERROR(L"Error catched : hr %08x - %s", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
if (!cancelFetchData(fetchInfo._connectionKey, fetchInfo._transferKey, fetchInfo._length)) {
TRACE_ERROR(L"Error in cancelFetchData: path = %ls", filePath);
return false;
}
}
Expand All @@ -417,14 +400,14 @@ bool CloudProvider::cancelTransfer(ProviderInfo *providerInfo, const wchar_t *fi
lck.unlock();

// Reset pin state
TRACE_DEBUG(L"Set pin state to UNPINNED : path = %lls", filePath);
TRACE_DEBUG(L"Set pin state to UNPINNED: path = %lls", filePath);
if (!Placeholders::setPinState(filePath, CF_PIN_STATE_UNPINNED)) {
TRACE_ERROR(L"Error in setPinState");
return false;
}

// Dehydrate file
TRACE_DEBUG(L"Dehydrate : path = %lls", filePath);
TRACE_DEBUG(L"Dehydrate: path = %lls", filePath);
if (!dehydrate(filePath)) {
TRACE_ERROR(L"Error in dehydrate");
return false;
Expand All @@ -440,23 +423,26 @@ void CALLBACK CloudProvider::onFetchData(_In_ CONST CF_CALLBACK_INFO *callbackIn
return;
}

if (callbackParameters->FetchData.Flags & CF_CALLBACK_FETCH_DATA_FLAG_NONE) {
return;
}

ProviderInfo *providerInfo = (ProviderInfo *)callbackInfo->CallbackContext;
if (providerInfo) {
std::filesystem::path fullPath =
std::filesystem::path(callbackInfo->VolumeDosName) / std::filesystem::path(callbackInfo->NormalizedPath);

if (wcsncmp(fullPath.wstring().c_str(), providerInfo->folderPath(), wcslen(providerInfo->folderPath()))) {
TRACE_DEBUG(L"File not synchronized : %ls", fullPath.wstring().c_str());
TRACE_DEBUG(L"File not synchronized: %ls", fullPath.wstring().c_str());
return;
}

if (callbackParameters->FetchData.Flags & CF_CALLBACK_FETCH_DATA_FLAG_NONE) {
if (!cancelFetchData(callbackInfo->ConnectionKey, callbackInfo->TransferKey, callbackParameters->FetchData.RequiredFileOffset)) {
TRACE_ERROR(L"Error in cancelFetchData: path = %ls", fullPath.wstring().c_str());
}
return;
}

std::unique_lock<std::mutex> lck(providerInfo->_fetchMapMutex);
if (providerInfo->_fetchMap.find(fullPath.wstring()) != providerInfo->_fetchMap.end()) {
TRACE_DEBUG(L"Fetch already in progress : path = %ls", fullPath.wstring().c_str());
TRACE_DEBUG(L"Fetch already in progress: path = %ls", fullPath.wstring().c_str());
return;
}

Expand All @@ -473,14 +459,17 @@ void CALLBACK CloudProvider::onFetchData(_In_ CONST CF_CALLBACK_INFO *callbackIn
lck.unlock();

if (callbackInfo->ProcessInfo->ProcessId == Utilities::s_processId) {
TRACE_DEBUG(L"Hydration asked by app : path = %lls", fullPath.wstring().c_str());
TRACE_DEBUG(L"Hydration asked by app: path = %lls", fullPath.wstring().c_str());
} else {
TRACE_DEBUG(L"Hydration asked : processId = %ld, path = %lls", callbackInfo->ProcessInfo->ProcessId,
TRACE_DEBUG(L"Hydration asked: processId = %ld, path = %lls", callbackInfo->ProcessInfo->ProcessId,
fullPath.wstring().c_str());
}

if (!PipeClient::getInstance().sendMessageWithoutAnswer(L"MAKE_AVAILABLE_LOCALLY_DIRECT", fullPath.wstring())) {
TRACE_ERROR(L"Error in Utilities::writeMessage!");
if (!cancelFetchData(callbackInfo->ConnectionKey, callbackInfo->TransferKey, callbackParameters->FetchData.RequiredFileOffset)) {
TRACE_ERROR(L"Error in cancelFetchData: path = %ls", fullPath.wstring().c_str());
}
}
} else {
TRACE_ERROR(L"Empty CallbackContext");
Expand All @@ -505,7 +494,7 @@ void CALLBACK CloudProvider::onCancelFetchData(_In_ CONST CF_CALLBACK_INFO *call
std::filesystem::path(callbackInfo->VolumeDosName) / std::filesystem::path(callbackInfo->NormalizedPath);

if (!cancelTransfer(providerInfo, fullPath.wstring().c_str(), false)) {
TRACE_ERROR(L"Error in cancelTransfer : path = %lls", fullPath.wstring().c_str());
TRACE_ERROR(L"Error in cancelTransfer: path = %lls", fullPath.wstring().c_str());
return;
}
}
Expand All @@ -524,9 +513,9 @@ void CloudProvider::onNotifyDehydrate(_In_ CONST CF_CALLBACK_INFO *callbackInfo,
std::filesystem::path(callbackInfo->VolumeDosName) / std::filesystem::path(callbackInfo->NormalizedPath);

if (callbackInfo->ProcessInfo->ProcessId == Utilities::s_processId) {
TRACE_DEBUG(L"Dehydration asked by app : path = %lls", fullPath.wstring().c_str());
TRACE_DEBUG(L"Dehydration asked by app: path = %lls", fullPath.wstring().c_str());
} else {
TRACE_DEBUG(L"Dehydration asked : processId = %ld, path = %lls", callbackInfo->ProcessInfo->ProcessId,
TRACE_DEBUG(L"Dehydration asked: processId = %ld, path = %lls", callbackInfo->ProcessInfo->ProcessId,
fullPath.wstring().c_str());
}

Expand Down Expand Up @@ -596,7 +585,7 @@ bool CloudProvider::connectSyncRootTransferCallbacks() {
CF_CONNECT_FLAG_REQUIRE_PROCESS_INFO | CF_CONNECT_FLAG_REQUIRE_FULL_FILE_PATH,
&_transferCallbackConnectionKey));
} catch (winrt::hresult_error const &ex) {
TRACE_ERROR(L"WinRT error catched : hr %08x - %s!", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
TRACE_ERROR(L"WinRT error catched: hr %08x - %s!", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
return false;
}

Expand All @@ -608,7 +597,7 @@ bool CloudProvider::disconnectSyncRootTransferCallbacks() {
try {
winrt::check_hresult(CfDisconnectSyncRoot(_transferCallbackConnectionKey));
} catch (...) {
TRACE_ERROR(L"WinRT error catched : hr %08x", static_cast<HRESULT>(winrt::to_hresult()));
TRACE_ERROR(L"WinRT error catched: hr %08x", static_cast<HRESULT>(winrt::to_hresult()));
return false;
}

Expand Down Expand Up @@ -638,7 +627,34 @@ bool CloudProvider::addFolderToSearchIndexer(const PCWSTR folder) {
winrt::check_hresult(searchCrawlScopeManager->AddDefaultScopeRule(url.data(), TRUE, FOLLOW_FLAGS::FF_INDEXCOMPLEXURLS));
winrt::check_hresult(searchCrawlScopeManager->SaveAll());
} catch (winrt::hresult_error const &ex) {
TRACE_ERROR(L"WinRT error catched : hr %08x - %s!", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
TRACE_ERROR(L"WinRT error catched: hr %08x - %s!", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
return false;
}

return true;
}

bool CloudProvider::cancelFetchData(CF_CONNECTION_KEY connectionKey, CF_TRANSFER_KEY transferKey, LARGE_INTEGER requiredFileOffset)
{
// Update transfer status
CF_OPERATION_INFO opInfo = { 0 };
CF_OPERATION_PARAMETERS opParams = { 0 };

opInfo.StructSize = sizeof(opInfo);
opInfo.Type = CF_OPERATION_TYPE_TRANSFER_DATA;
opInfo.ConnectionKey = connectionKey;
opInfo.TransferKey = transferKey;
opParams.ParamSize = CF_SIZE_OF_OP_PARAM(TransferData);
opParams.TransferData.CompletionStatus = STATUS_UNSUCCESSFUL;
opParams.TransferData.Buffer = nullptr;
opParams.TransferData.Offset = { 0 }; // Cancel entire transfer
opParams.TransferData.Length = requiredFileOffset;

try {
winrt::check_hresult(CfExecute(&opInfo, &opParams));
}
catch (winrt::hresult_error const& ex) {
TRACE_ERROR(L"Error catched: hr %08x - %s", static_cast<HRESULT>(winrt::to_hresult()), ex.message().c_str());
return false;
}

Expand Down
2 changes: 2 additions & 0 deletions extensions/windows/cfapi/Vfs/cloudprovider.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,6 @@ class CloudProvider {
std::wstring _synRootID;

static bool addFolderToSearchIndexer(const PCWSTR folder);
static bool cancelFetchData(CF_CONNECTION_KEY connectionKey, CF_TRANSFER_KEY transferKey, LARGE_INTEGER requiredFileOffset);

};
Loading

0 comments on commit 12437c7

Please sign in to comment.