diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 891f3430c..f31888cd6 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -164,28 +164,6 @@ vector SQLite::initializeJournal(sqlite3* db, int minJournalTables) { return journalNames; } -uint64_t SQLite::initializeJournalSize(sqlite3* db, const vector& journalNames) { - // We keep track of the number of rows in the journal, so that we can delete old entries when we're over our size - // limit. - // We want the min of all journal tables. - string minQuery = _getJournalQuery(journalNames, {"SELECT MIN(id) AS id FROM"}, true); - minQuery = "SELECT MIN(id) AS id FROM (" + minQuery + ")"; - - // And the max. - string maxQuery = _getJournalQuery(journalNames, {"SELECT MAX(id) AS id FROM"}, true); - maxQuery = "SELECT MAX(id) AS id FROM (" + maxQuery + ")"; - - // Look up the min and max values in the database. - SQResult result; - SASSERT(!SQuery(db, "getting commit min", minQuery, result)); - uint64_t min = SToUInt64(result[0][0]); - SASSERT(!SQuery(db, "getting commit max", maxQuery, result)); - uint64_t max = SToUInt64(result[0][0]); - - // And save the difference as the size of the journal. - return max - min; -} - void SQLite::commonConstructorInitialization(bool hctree) { // Perform sanity checks. SASSERT(!_filename.empty()); @@ -229,7 +207,6 @@ SQLite::SQLite(const string& filename, int cacheSize, int maxJournalSize, _db(initializeDB(_filename, mmapSizeGB, hctree)), _journalNames(initializeJournal(_db, minJournalTables)), _sharedData(initializeSharedData(_db, _filename, _journalNames, hctree)), - _journalSize(initializeJournalSize(_db, _journalNames)), _cacheSize(cacheSize), _mmapSizeGB(mmapSizeGB) { @@ -242,7 +219,6 @@ SQLite::SQLite(const SQLite& from) : _db(initializeDB(_filename, from._mmapSizeGB, false)), // Create a *new* DB handle from the same filename, don't copy the existing handle. _journalNames(from._journalNames), _sharedData(from._sharedData), - _journalSize(from._journalSize), _cacheSize(from._cacheSize), _mmapSizeGB(from._mmapSizeGB) { @@ -666,6 +642,36 @@ bool SQLite::_writeIdempotent(const string& query, SQResult& result, bool always bool SQLite::prepare(uint64_t* transactionID, string* transactionhash) { SASSERT(_insideTransaction); + // Pick a journal for this transaction. + const int64_t journalID = _sharedData.nextJournalCount++; + _journalName = _journalNames[journalID % _journalNames.size()]; + + // Look up the oldest commit in our chosen journal, and compute the oldest commit we intend to keep. + SQResult journalLookupResult; + SASSERT(!SQuery(_db, "getting commit min", "SELECT MIN(id) FROM " + _journalName, journalLookupResult)); + uint64_t minJournalEntry = journalLookupResult.size() ? SToUInt64(journalLookupResult[0][0]) : 0; + + // Note that this can change before we hold the lock on _sharedData.commitLock, but it doesn't matter yet, as we're only + // using it to truncate the journal. We'll reset this value once we acquire that lock. + uint64_t commitCount = _sharedData.commitCount; + + // If the commitCount is less than the max journal size, keep everything. Otherwise, keep everything from + // commitCount - _maxJournalSize forward. We can't just do the last subtraction part because it overflows our unsigned + // int. + uint64_t oldestCommitToKeep = commitCount < _maxJournalSize ? 0 : commitCount - _maxJournalSize; + + // We limit deletions to a relatively small number to avoid making this extremely slow for some transactions in the case + // where this journal in particular has accumulated a large backlog. + static const size_t deleteLimit = 10; + if (minJournalEntry < oldestCommitToKeep) { + auto startUS = STimeNow(); + string query = "DELETE FROM " + _journalName + " WHERE id < " + SQ(oldestCommitToKeep) + " LIMIT " + SQ(deleteLimit); + SASSERT(!SQuery(_db, "Deleting oldest journal rows", query)); + size_t deletedCount = sqlite3_changes(_db); + SINFO("Removed " << deletedCount << " rows from journal " << _journalName << ", oldestToKeep: " << oldestCommitToKeep << ", count:" + << commitCount << ", limit: " << _maxJournalSize << ", in " << (STimeNow() - startUS) << "us."); + } + // We lock this here, so that we can guarantee the order in which commits show up in the database. if (!_mutexLocked) { auto start = STimeNow(); @@ -681,15 +687,13 @@ bool SQLite::prepare(uint64_t* transactionID, string* transactionhash) { // We pass the journal number selected to the handler so that a caller can utilize the // same method bedrock does for accessing 1 table per thread, in order to attempt to // reduce conflicts on tables that are written to on every command - const int64_t journalID = _sharedData.nextJournalCount++; - _journalName = _journalNames[journalID % _journalNames.size()]; if (_shouldNotifyPluginsOnPrepare) { (*_onPrepareHandler)(*this, journalID); } // Now that we've locked anybody else from committing, look up the state of the database. We don't need to lock the // SharedData object to get these values as we know it can't currently change. - uint64_t commitCount = _sharedData.commitCount; + commitCount = _sharedData.commitCount; // Queue up the journal entry string lastCommittedHash = getCommittedHash(); // This is why we need the lock. @@ -739,28 +743,6 @@ int SQLite::commit(const string& description, function* preCheckpointCal SASSERT(!_uncommittedHash.empty()); // Must prepare first int result = 0; - // Do we need to truncate as we go? - uint64_t newJournalSize = _journalSize + 1; - if (newJournalSize > _maxJournalSize) { - // Delete the oldest entry - uint64_t before = STimeNow(); - string query = "DELETE FROM " + _journalName + " " - "WHERE id < (SELECT MAX(id) FROM " + _journalName + ") - " + SQ(_maxJournalSize) + " " - "LIMIT 10"; - SASSERT(!SQuery(_db, "Deleting oldest journal rows", query)); - - // Figure out the new journal size. - SQResult result; - SASSERT(!SQuery(_db, "getting commit min", "SELECT MIN(id) AS id FROM " + _journalName, result)); - uint64_t min = SToUInt64(result[0][0]); - SASSERT(!SQuery(_db, "getting commit max", "SELECT MAX(id) AS id FROM " + _journalName, result)); - uint64_t max = SToUInt64(result[0][0]); - newJournalSize = max - min; - - // Log timing info. - _writeElapsed += STimeNow() - before; - } - // Make sure one is ready to commit SDEBUG("Committing transaction"); @@ -799,7 +781,6 @@ int SQLite::commit(const string& description, function* preCheckpointCal } _commitElapsed += STimeNow() - before; - _journalSize = newJournalSize; _sharedData.incrementCommit(_uncommittedHash); _insideTransaction = false; _uncommittedHash.clear(); diff --git a/sqlitecluster/SQLite.h b/sqlitecluster/SQLite.h index d19f858be..99c649d67 100644 --- a/sqlitecluster/SQLite.h +++ b/sqlitecluster/SQLite.h @@ -354,7 +354,6 @@ class SQLite { static SharedData& initializeSharedData(sqlite3* db, const string& filename, const vector& journalNames, bool hctree); static sqlite3* initializeDB(const string& filename, int64_t mmapSizeGB, bool hctree); static vector initializeJournal(sqlite3* db, int minJournalTables); - static uint64_t initializeJournalSize(sqlite3* db, const vector& journalNames); void commonConstructorInitialization(bool hctree = false); // The filename of this DB, canonicalized to its full path on disk. @@ -375,9 +374,6 @@ class SQLite { // The name of the journal table that this particular DB handle with write to. string _journalName; - // The current size of the journal, in rows. TODO: Why isn't this in SharedData? - uint64_t _journalSize; - // True when we have a transaction in progress. bool _insideTransaction = false; diff --git a/test/clustertest/tests/DoubleDetachTest.cpp b/test/clustertest/tests/DoubleDetachTest.cpp index f7d5b8b8a..db1edf34f 100644 --- a/test/clustertest/tests/DoubleDetachTest.cpp +++ b/test/clustertest/tests/DoubleDetachTest.cpp @@ -26,13 +26,17 @@ struct DoubleDetachTest : tpunit::TestFixture { BedrockTester& follower = tester->getTester(1); // Detach - SData detachCommand("detach"); + SData detachCommand("Detach"); follower.executeWaitVerifyContent(detachCommand, "203 DETACHING", true); // Wait for it to detach sleep(3); follower.executeWaitVerifyContent(detachCommand, "400 Already detached", true); + + // Re-attach to make shutdown clean. + SData attachCommand("Attach"); + follower.executeWaitVerifyContent(attachCommand, "204 ATTACHING", true); } } __DoubleDetachTest;