Skip to content

Commit

Permalink
Merge pull request #2029 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
rafecolton authored Dec 17, 2024
2 parents 903da12 + 5af420e commit 0d43b6c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 55 deletions.
81 changes: 31 additions & 50 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,28 +164,6 @@ vector<string> SQLite::initializeJournal(sqlite3* db, int minJournalTables) {
return journalNames;
}

uint64_t SQLite::initializeJournalSize(sqlite3* db, const vector<string>& journalNames) {
// We keep track of the number of rows in the journal, so that we can delete old entries when we're over our size
// limit.
// We want the min of all journal tables.
string minQuery = _getJournalQuery(journalNames, {"SELECT MIN(id) AS id FROM"}, true);
minQuery = "SELECT MIN(id) AS id FROM (" + minQuery + ")";

// And the max.
string maxQuery = _getJournalQuery(journalNames, {"SELECT MAX(id) AS id FROM"}, true);
maxQuery = "SELECT MAX(id) AS id FROM (" + maxQuery + ")";

// Look up the min and max values in the database.
SQResult result;
SASSERT(!SQuery(db, "getting commit min", minQuery, result));
uint64_t min = SToUInt64(result[0][0]);
SASSERT(!SQuery(db, "getting commit max", maxQuery, result));
uint64_t max = SToUInt64(result[0][0]);

// And save the difference as the size of the journal.
return max - min;
}

void SQLite::commonConstructorInitialization(bool hctree) {
// Perform sanity checks.
SASSERT(!_filename.empty());
Expand Down Expand Up @@ -229,7 +207,6 @@ SQLite::SQLite(const string& filename, int cacheSize, int maxJournalSize,
_db(initializeDB(_filename, mmapSizeGB, hctree)),
_journalNames(initializeJournal(_db, minJournalTables)),
_sharedData(initializeSharedData(_db, _filename, _journalNames, hctree)),
_journalSize(initializeJournalSize(_db, _journalNames)),
_cacheSize(cacheSize),
_mmapSizeGB(mmapSizeGB)
{
Expand All @@ -242,7 +219,6 @@ SQLite::SQLite(const SQLite& from) :
_db(initializeDB(_filename, from._mmapSizeGB, false)), // Create a *new* DB handle from the same filename, don't copy the existing handle.
_journalNames(from._journalNames),
_sharedData(from._sharedData),
_journalSize(from._journalSize),
_cacheSize(from._cacheSize),
_mmapSizeGB(from._mmapSizeGB)
{
Expand Down Expand Up @@ -666,6 +642,36 @@ bool SQLite::_writeIdempotent(const string& query, SQResult& result, bool always
bool SQLite::prepare(uint64_t* transactionID, string* transactionhash) {
SASSERT(_insideTransaction);

// Pick a journal for this transaction.
const int64_t journalID = _sharedData.nextJournalCount++;
_journalName = _journalNames[journalID % _journalNames.size()];

// Look up the oldest commit in our chosen journal, and compute the oldest commit we intend to keep.
SQResult journalLookupResult;
SASSERT(!SQuery(_db, "getting commit min", "SELECT MIN(id) FROM " + _journalName, journalLookupResult));
uint64_t minJournalEntry = journalLookupResult.size() ? SToUInt64(journalLookupResult[0][0]) : 0;

// Note that this can change before we hold the lock on _sharedData.commitLock, but it doesn't matter yet, as we're only
// using it to truncate the journal. We'll reset this value once we acquire that lock.
uint64_t commitCount = _sharedData.commitCount;

// If the commitCount is less than the max journal size, keep everything. Otherwise, keep everything from
// commitCount - _maxJournalSize forward. We can't just do the last subtraction part because it overflows our unsigned
// int.
uint64_t oldestCommitToKeep = commitCount < _maxJournalSize ? 0 : commitCount - _maxJournalSize;

// We limit deletions to a relatively small number to avoid making this extremely slow for some transactions in the case
// where this journal in particular has accumulated a large backlog.
static const size_t deleteLimit = 10;
if (minJournalEntry < oldestCommitToKeep) {
auto startUS = STimeNow();
string query = "DELETE FROM " + _journalName + " WHERE id < " + SQ(oldestCommitToKeep) + " LIMIT " + SQ(deleteLimit);
SASSERT(!SQuery(_db, "Deleting oldest journal rows", query));
size_t deletedCount = sqlite3_changes(_db);
SINFO("Removed " << deletedCount << " rows from journal " << _journalName << ", oldestToKeep: " << oldestCommitToKeep << ", count:"
<< commitCount << ", limit: " << _maxJournalSize << ", in " << (STimeNow() - startUS) << "us.");
}

// We lock this here, so that we can guarantee the order in which commits show up in the database.
if (!_mutexLocked) {
auto start = STimeNow();
Expand All @@ -681,15 +687,13 @@ bool SQLite::prepare(uint64_t* transactionID, string* transactionhash) {
// We pass the journal number selected to the handler so that a caller can utilize the
// same method bedrock does for accessing 1 table per thread, in order to attempt to
// reduce conflicts on tables that are written to on every command
const int64_t journalID = _sharedData.nextJournalCount++;
_journalName = _journalNames[journalID % _journalNames.size()];
if (_shouldNotifyPluginsOnPrepare) {
(*_onPrepareHandler)(*this, journalID);
}

// Now that we've locked anybody else from committing, look up the state of the database. We don't need to lock the
// SharedData object to get these values as we know it can't currently change.
uint64_t commitCount = _sharedData.commitCount;
commitCount = _sharedData.commitCount;

// Queue up the journal entry
string lastCommittedHash = getCommittedHash(); // This is why we need the lock.
Expand Down Expand Up @@ -739,28 +743,6 @@ int SQLite::commit(const string& description, function<void()>* preCheckpointCal
SASSERT(!_uncommittedHash.empty()); // Must prepare first
int result = 0;

// Do we need to truncate as we go?
uint64_t newJournalSize = _journalSize + 1;
if (newJournalSize > _maxJournalSize) {
// Delete the oldest entry
uint64_t before = STimeNow();
string query = "DELETE FROM " + _journalName + " "
"WHERE id < (SELECT MAX(id) FROM " + _journalName + ") - " + SQ(_maxJournalSize) + " "
"LIMIT 10";
SASSERT(!SQuery(_db, "Deleting oldest journal rows", query));

// Figure out the new journal size.
SQResult result;
SASSERT(!SQuery(_db, "getting commit min", "SELECT MIN(id) AS id FROM " + _journalName, result));
uint64_t min = SToUInt64(result[0][0]);
SASSERT(!SQuery(_db, "getting commit max", "SELECT MAX(id) AS id FROM " + _journalName, result));
uint64_t max = SToUInt64(result[0][0]);
newJournalSize = max - min;

// Log timing info.
_writeElapsed += STimeNow() - before;
}

// Make sure one is ready to commit
SDEBUG("Committing transaction");

Expand Down Expand Up @@ -799,7 +781,6 @@ int SQLite::commit(const string& description, function<void()>* preCheckpointCal
}

_commitElapsed += STimeNow() - before;
_journalSize = newJournalSize;
_sharedData.incrementCommit(_uncommittedHash);
_insideTransaction = false;
_uncommittedHash.clear();
Expand Down
4 changes: 0 additions & 4 deletions sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ class SQLite {
static SharedData& initializeSharedData(sqlite3* db, const string& filename, const vector<string>& journalNames, bool hctree);
static sqlite3* initializeDB(const string& filename, int64_t mmapSizeGB, bool hctree);
static vector<string> initializeJournal(sqlite3* db, int minJournalTables);
static uint64_t initializeJournalSize(sqlite3* db, const vector<string>& journalNames);
void commonConstructorInitialization(bool hctree = false);

// The filename of this DB, canonicalized to its full path on disk.
Expand All @@ -375,9 +374,6 @@ class SQLite {
// The name of the journal table that this particular DB handle with write to.
string _journalName;

// The current size of the journal, in rows. TODO: Why isn't this in SharedData?
uint64_t _journalSize;

// True when we have a transaction in progress.
bool _insideTransaction = false;

Expand Down
6 changes: 5 additions & 1 deletion test/clustertest/tests/DoubleDetachTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ struct DoubleDetachTest : tpunit::TestFixture {
BedrockTester& follower = tester->getTester(1);

// Detach
SData detachCommand("detach");
SData detachCommand("Detach");
follower.executeWaitVerifyContent(detachCommand, "203 DETACHING", true);

// Wait for it to detach
sleep(3);

follower.executeWaitVerifyContent(detachCommand, "400 Already detached", true);

// Re-attach to make shutdown clean.
SData attachCommand("Attach");
follower.executeWaitVerifyContent(attachCommand, "204 ATTACHING", true);
}

} __DoubleDetachTest;

0 comments on commit 0d43b6c

Please sign in to comment.