Skip to content

Commit

Permalink
Merge pull request #340 from Infomaniak/KDESKTOP-1286-Fix-in-Executor…
Browse files Browse the repository at this point in the history
…Worker-UploadSession-constructor

Kdesktop 1286 fix in executor worker upload session constructor
  • Loading branch information
ClementKunz authored Oct 8, 2024
2 parents fe8493b + d665012 commit 05ba603
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 56 deletions.
8 changes: 4 additions & 4 deletions src/libcommon/utility/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,10 @@ std::string toString(const UploadSessionType e) {
switch (e) {
case UploadSessionType::Unknown:
return "Unknown";
case UploadSessionType::Standard:
return "Standard";
case UploadSessionType::LogUpload:
return "LogUpload";
case UploadSessionType::Drive:
return "Drive";
case UploadSessionType::Log:
return "Log";
default:
return noConversionStr;
}
Expand Down
2 changes: 1 addition & 1 deletion src/libcommon/utility/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ enum class SyncStatus {
};
std::string toString(SyncStatus e);

enum class UploadSessionType { Unknown, Standard, LogUpload };
enum class UploadSessionType { Unknown, Drive, Log };
std::string toString(UploadSessionType e);

enum class SyncNodeType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
namespace KDC {

AbstractUploadSessionJob::AbstractUploadSessionJob(UploadSessionType uploadType, int driveDbId) :
AbstractTokenNetworkJob(uploadType == UploadSessionType::Standard ? ApiType::Drive : ApiType::Desktop, 0, 0, driveDbId, 0) {}
AbstractTokenNetworkJob(uploadType == UploadSessionType::Drive ? ApiType::Drive : ApiType::Desktop, 0, 0, driveDbId, 0) {}

AbstractUploadSessionJob::AbstractUploadSessionJob(UploadSessionType uploadType, int driveDbId, const SyncPath &filepath,
const std::string &sessionToken) :
AbstractTokenNetworkJob(uploadType == UploadSessionType::Standard ? ApiType::Drive : ApiType::Desktop, 0, 0, driveDbId, 0),
AbstractTokenNetworkJob(uploadType == UploadSessionType::Drive ? ApiType::Drive : ApiType::Desktop, 0, 0, driveDbId, 0),
_sessionToken(sessionToken), _filePath(filepath) {}
} // namespace KDC
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
#include "utility/utility.h"

namespace KDC {
DriveUploadSession::DriveUploadSession(int driveDbId, std::shared_ptr<SyncDb> syncDb, const SyncPath &filepath,
const NodeId &fileId, SyncTime modtime, bool liteSyncActivated,
uint64_t nbParalleleThread /*= 1*/) :
DriveUploadSession(driveDbId, syncDb, filepath, SyncName(), fileId, modtime, liteSyncActivated, nbParalleleThread) {
_fileId = fileId;
}

DriveUploadSession::DriveUploadSession(int driveDbId, std::shared_ptr<SyncDb> syncDb, const SyncPath &filepath,
const SyncName &filename, const NodeId &remoteParentDirId, SyncTime modtime,
bool liteSyncActivated, uint64_t nbParalleleThread /*= 1*/) :
AbstractUploadSession(filepath, filename, nbParalleleThread),
_driveDbId(driveDbId), _syncDb(syncDb), _modtimeIn(modtime), _remoteParentDirId(remoteParentDirId) {
_uploadSessionType = UploadSessionType::Standard;
_uploadSessionType = UploadSessionType::Drive;
}

DriveUploadSession::~DriveUploadSession() {
Expand All @@ -40,23 +46,28 @@ bool DriveUploadSession::runJobInit() {
}

std::shared_ptr<UploadSessionStartJob> DriveUploadSession::createStartJob() {
return std::make_shared<UploadSessionStartJob>(UploadSessionType::Standard, _driveDbId, getFileName(), getFileSize(),
_remoteParentDirId, getTotalChunks());
if (_fileId.empty()) {
return std::make_shared<UploadSessionStartJob>(UploadSessionType::Drive, _driveDbId, getFileName(), getFileSize(),
_remoteParentDirId, getTotalChunks());
} else {
return std::make_shared<UploadSessionStartJob>(UploadSessionType::Drive, _driveDbId, _fileId, getFileSize(),
getTotalChunks());
}
}

std::shared_ptr<UploadSessionChunkJob> DriveUploadSession::createChunkJob(const std::string &chunckContent, uint64_t chunkNb,
std::streamsize actualChunkSize) {
return std::make_shared<UploadSessionChunkJob>(UploadSessionType::Standard, _driveDbId, getFilePath(), getSessionToken(),
return std::make_shared<UploadSessionChunkJob>(UploadSessionType::Drive, _driveDbId, getFilePath(), getSessionToken(),
chunckContent, chunkNb, actualChunkSize, jobId());
}

std::shared_ptr<UploadSessionFinishJob> DriveUploadSession::createFinishJob() {
return std::make_shared<UploadSessionFinishJob>(UploadSessionType::Standard, _driveDbId, getFilePath(), getSessionToken(),
return std::make_shared<UploadSessionFinishJob>(UploadSessionType::Drive, _driveDbId, getFilePath(), getSessionToken(),
getTotalChunkHash(), getTotalChunks(), _modtimeIn);
}

std::shared_ptr<UploadSessionCancelJob> DriveUploadSession::createCancelJob() {
return std::make_shared<UploadSessionCancelJob>(UploadSessionType::Standard, _driveDbId, getFilePath(), getSessionToken());
return std::make_shared<UploadSessionCancelJob>(UploadSessionType::Drive, _driveDbId, getFilePath(), getSessionToken());
}

bool DriveUploadSession::handleStartJobResult(const std::shared_ptr<UploadSessionStartJob> &StartJob, std::string uploadToken) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ namespace KDC {

class DriveUploadSession : public AbstractUploadSession {
public:
// Using file ID, for file edition only.
DriveUploadSession(int driveDbId, std::shared_ptr<SyncDb> syncDb, const SyncPath &filepath, const NodeId &fileId,
SyncTime modtime, bool liteSyncActivated,
uint64_t nbParalleleThread = 1);

// Using file name and parent ID, for file creation only.
DriveUploadSession(int driveDbId, std::shared_ptr<SyncDb> syncDb, const SyncPath &filepath, const SyncName &filename,
const NodeId &remoteParentDirId, SyncTime modtime, bool liteSyncActivated,
uint64_t nbParalleleThread = 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,33 @@ namespace KDC {

LogUploadSession::LogUploadSession(const SyncPath &filepath, uint64_t nbParalleleThread /*= 1*/) :
AbstractUploadSession(filepath, filepath.filename(), nbParalleleThread) {
_uploadSessionType = UploadSessionType::LogUpload;
_uploadSessionType = UploadSessionType::Log;
}

bool LogUploadSession::runJobInit() {
return true;
}

std::shared_ptr<UploadSessionStartJob> LogUploadSession::createStartJob() {
return std::make_shared<UploadSessionStartJob>(UploadSessionType::LogUpload, getFileName(), getFileSize(), getTotalChunks());
return std::make_shared<UploadSessionStartJob>(UploadSessionType::Log, getFileName(), getFileSize(), getTotalChunks());
}

std::shared_ptr<UploadSessionChunkJob> LogUploadSession::createChunkJob(const std::string &chunckContent, uint64_t chunkNb,
std::streamsize actualChunkSize) {
return std::make_shared<UploadSessionChunkJob>(UploadSessionType::LogUpload, getFilePath(), getSessionToken(), chunckContent,
return std::make_shared<UploadSessionChunkJob>(UploadSessionType::Log, getFilePath(), getSessionToken(), chunckContent,
chunkNb, actualChunkSize, jobId());
}

std::shared_ptr<UploadSessionFinishJob> LogUploadSession::createFinishJob() {
SyncTime modtimeIn =
std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()).time_since_epoch().count();

return std::make_shared<UploadSessionFinishJob>(UploadSessionType::LogUpload, getFilePath(), getSessionToken(),
return std::make_shared<UploadSessionFinishJob>(UploadSessionType::Log, getFilePath(), getSessionToken(),
getTotalChunkHash(), getTotalChunks(), modtimeIn);
}

std::shared_ptr<UploadSessionCancelJob> LogUploadSession::createCancelJob() {
return std::make_shared<UploadSessionCancelJob>(UploadSessionType::LogUpload, getSessionToken());
return std::make_shared<UploadSessionCancelJob>(UploadSessionType::Log, getSessionToken());
}

bool LogUploadSession::handleStartJobResult(const std::shared_ptr<UploadSessionStartJob> &StartJob, std::string uploadToken) {
Expand All @@ -60,7 +60,7 @@ bool LogUploadSession::handleStartJobResult(const std::shared_ptr<UploadSessionS
}

if (const std::string logUploadToken = std::get<std::string>(appStateValue); !logUploadToken.empty()) {
UploadSessionCancelJob cancelJob(UploadSessionType::LogUpload, logUploadToken);
UploadSessionCancelJob cancelJob(UploadSessionType::Log, logUploadToken);
if (const ExitCode exitCode = cancelJob.runSynchronously(); exitCode != ExitCode::Ok) {
LOG_WARN(getLogger(), "Error in UploadSessionCancelJob::runSynchronously : " << exitCode);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ void UploadSessionStartJob::setData(bool &canceled) {
auto timestamp = duration_cast<seconds>(time_point_cast<seconds>(system_clock::now()).time_since_epoch());

switch (_uploadType) {
case UploadSessionType::Standard:
case UploadSessionType::Drive:
if (_fileId.empty()) {
json.set("file_name", _filename);
json.set("directory_id", _remoteParentDirId);
} else {
json.set("file_id", _fileId);
}
break;
case UploadSessionType::LogUpload:
case UploadSessionType::Log:
json.set("last_modified_at", timestamp.count());
json.set("file_name", _filename);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace KDC {

class UploadSessionStartJob : public AbstractUploadSessionJob {
public:
// Using file name and parent ID, for create or edit
// Using file name and parent ID, for create only
UploadSessionStartJob(UploadSessionType uploadType, int driveDbId, const SyncName &filename, uint64_t size,
const NodeId &remoteParentDirId, uint64_t totalChunks);
// Using file ID, for edit only
Expand Down
29 changes: 14 additions & 15 deletions src/libsyncengine/propagation/executor/executorworker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,13 +1000,24 @@ bool ExecutorWorker::generateEditJob(SyncOpPtr syncOp, std::shared_ptr<AbstractJ
return false;
}

if (!syncOp->correspondingNode()->id()) {
// Should not happen
LOGW_SYNCPAL_WARN(_logger, L"Edit operation with empty corresponding node id for "
<< Utility::formatSyncPath(absoluteLocalFilePath).c_str());
_executorExitCode = ExitCode::DataError;
_executorExitCause = ExitCause::Unknown;
SentryHandler::instance()->captureMessage(SentryLevel::Warning, "ExecutorWorker::generateEditJob",
"Edit operation with empty corresponding node id");

return false;
}

if (filesize > useUploadSessionThreshold) {
try {
int uploadSessionParallelJobs = ParametersCache::instance()->parameters().uploadSessionParallelJobs();
job = std::make_shared<DriveUploadSession>(
_syncPal->driveDbId(), _syncPal->_syncDb, absoluteLocalFilePath, syncOp->affectedNode()->name(),
syncOp->correspondingNode()->parentNode()->id() ? *syncOp->correspondingNode()->parentNode()->id()
: std::string(),
_syncPal->driveDbId(), _syncPal->_syncDb, absoluteLocalFilePath,
syncOp->correspondingNode()->id() ? *syncOp->correspondingNode()->id() : std::string(),
syncOp->affectedNode()->lastmodified() ? *syncOp->affectedNode()->lastmodified() : 0,
isLiteSyncActivated(), uploadSessionParallelJobs);
} catch (std::exception const &e) {
Expand All @@ -1017,18 +1028,6 @@ bool ExecutorWorker::generateEditJob(SyncOpPtr syncOp, std::shared_ptr<AbstractJ
return false;
};
} else {
if (!syncOp->correspondingNode()->id()) {
// Should not happen
LOGW_SYNCPAL_WARN(_logger, L"Edit operation with empty corresponding node id for "
<< Utility::formatSyncPath(absoluteLocalFilePath).c_str());
_executorExitCode = ExitCode::DataError;
_executorExitCause = ExitCause::Unknown;
SentryHandler::instance()->captureMessage(SentryLevel::Warning, "ExecutorWorker::generateEditJob",
"Edit operation with empty corresponding node id");

return false;
}

try {
job = std::make_shared<UploadJob>(
_syncPal->driveDbId(), absoluteLocalFilePath,
Expand Down
2 changes: 1 addition & 1 deletion src/libsyncengine/syncpal/syncpal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ ExitCode SyncPal::cleanOldUploadSessionTokens() {

for (auto &uploadSessionToken: uploadSessionTokenList) {
try {
auto job = std::make_shared<UploadSessionCancelJob>(UploadSessionType::Standard, driveDbId(), "",
auto job = std::make_shared<UploadSessionCancelJob>(UploadSessionType::Drive, driveDbId(), "",
uploadSessionToken.token());
ExitCode exitCode = job->runSynchronously();
if (exitCode != ExitCode::Ok) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/appserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ AppServer::AppServer(int &argc, char **argv) :
}

if (const auto logUploadToken = std::get<std::string>(appStateValue); !logUploadToken.empty()) {
UploadSessionCancelJob cancelJob(UploadSessionType::LogUpload, logUploadToken);
UploadSessionCancelJob cancelJob(UploadSessionType::Log, logUploadToken);
if (const ExitCode exitCode = cancelJob.runSynchronously(); exitCode != ExitCode::Ok) {
LOG_WARN(_logger, "Error in UploadSessionCancelJob::runSynchronously : " << exitCode);
} else {
Expand Down
Loading

0 comments on commit 05ba603

Please sign in to comment.