From 6b90c73cea835cabd9fba6beefab9a437f22cb8c Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 13 May 2024 13:56:57 -0700 Subject: [PATCH 1/3] Undo change to discard last replicated commands --- sqlitecluster/SQLiteNode.cpp | 15 ++++++--------- sqlitecluster/SQLiteSequentialNotifier.cpp | 9 +-------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 0247380cd..393b971ca 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -207,7 +207,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn // the DB was at when the transaction began on leader). bool quorum = !SStartsWith(command["ID"], "ASYNC"); uint64_t waitForCount = SStartsWith(command["ID"], "ASYNC") ? command.calcU64("dbCountAtStart") : currentCount; - SDEBUG("Thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")"); + SINFO("BEGIN_TRANSACTION replicate thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")"); while (true) { SQLiteSequentialNotifier::RESULT result = _localCommitNotifier.waitFor(waitForCount, false); if (result == SQLiteSequentialNotifier::RESULT::UNKNOWN) { @@ -223,6 +223,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn SERROR("Got unhandled SQLiteSequentialNotifier::RESULT value, did someone update the enum without updating this block?"); } } + SINFO("Finished waiting for commit count " << waitForCount << ", beginning replicate write."); try { int result = -1; @@ -231,7 +232,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn if (commitAttemptCount > 1) { SINFO("Commit attempt number " << commitAttemptCount << " for concurrent replication."); } - SDEBUG("BEGIN for commit " << newCount); + SINFO("BEGIN for commit " << newCount); bool uniqueContraintsError = false; try { auto start = chrono::steady_clock::now(); @@ -267,7 +268,9 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn // don't send LEADER the approval for this until inside of `prepare`. This potentially makes us // wait while holding the commit lock for non-concurrent transactions, but I guess nobody else with // a commit after us will be able to commit, either. + SINFO("Waiting on leader to say it has committed transaction " << command.calcU64("NewCount")); SQLiteSequentialNotifier::RESULT waitResult = _leaderCommitNotifier.waitFor(command.calcU64("NewCount"), true); + SINFO("Leader reported committing transaction " << command.calcU64("NewCount") << ", committing."); if (uniqueContraintsError) { SINFO("Got unique constraints error in replication, restarting."); --_concurrentReplicateTransactions; @@ -300,6 +303,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn --_concurrentReplicateTransactions; goSearchingOnExit = true; } else if (SIEquals(command.methodLine, "COMMIT_TRANSACTION")) { + SINFO("Notifying threads that leader has committed transaction " << command.calcU64("CommitCount")); _leaderCommitNotifier.notifyThrough(command.calcU64("CommitCount")); } } @@ -1930,15 +1934,8 @@ void SQLiteNode::_changeState(SQLiteNodeState newState) { _replicationThreadsShouldExit = true; uint64_t cancelAfter = _leaderCommitNotifier.getValue(); SINFO("Replication threads should exit, canceling commits after current leader commit " << cancelAfter); - /* _localCommitNotifier.cancel(cancelAfter); _leaderCommitNotifier.cancel(cancelAfter); - */ - - // Hack. Some bug means that occasionally something doesn't get notified, and we wait forever in a loop. - // This needs to be fixed, but this probably un-breaks it in exchange for potentially losing in-flight transactions. - _localCommitNotifier.cancel(_db.getCommitCount()); - _leaderCommitNotifier.cancel(_db.getCommitCount()); // Polling wait for threads to quit. This could use a notification model such as with a condition_variable, // which would probably be "better" but introduces yet more state variables for a state that we're rarely diff --git a/sqlitecluster/SQLiteSequentialNotifier.cpp b/sqlitecluster/SQLiteSequentialNotifier.cpp index 2e6f533de..9414cd36b 100644 --- a/sqlitecluster/SQLiteSequentialNotifier.cpp +++ b/sqlitecluster/SQLiteSequentialNotifier.cpp @@ -19,7 +19,6 @@ SQLiteSequentialNotifier::RESULT SQLiteSequentialNotifier::waitFor(uint64_t valu } } - size_t cancelAttempts = 0; while (true) { unique_lock lock(state->waitingThreadMutex); if (_globalResult == RESULT::CANCELED) { @@ -30,12 +29,7 @@ SQLiteSequentialNotifier::RESULT SQLiteSequentialNotifier::waitFor(uint64_t valu return state->result; } // If there's no result yet, log that we're waiting for it. - if (cancelAttempts > 10) { - SWARN("Not waiting anymore for " << value << ", just canceling"); - return RESULT::CANCELED; - } else { - SINFO("Canceled after " << _cancelAfter << ", but waiting for " << value << " so not returning yet."); - } + SINFO("Canceled after " << _cancelAfter << ", but waiting for " << value << " so not returning yet."); } else { // Canceled and we're not before the cancellation cutoff. return RESULT::CANCELED; @@ -62,7 +56,6 @@ SQLiteSequentialNotifier::RESULT SQLiteSequentialNotifier::waitFor(uint64_t valu // It's possible that we hit the timeout here after `cancel()` has set the global value, but before we received the notification. // This isn't a problem, and we can jump back to the top of the loop and check again. If there's some problem, we'll see it there. SINFO("Hit 1s timeout while global cancel " << (_globalResult == RESULT::CANCELED) << " or " << " specific cancel " << (state->result == RESULT::CANCELED)); - cancelAttempts++; continue; } } From 27cd78690062731d6daef433408f3f1f2b5dc738 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 13 May 2024 14:08:12 -0700 Subject: [PATCH 2/3] one more log line --- sqlitecluster/SQLiteNode.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 393b971ca..9f9486610 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2432,7 +2432,9 @@ int SQLiteNode::_handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uin // Let the commit handler notify any other waiting threads that our commit is complete before it starts a checkpoint. function notifyIfCommitted = [&]() { - _localCommitNotifier.notifyThrough(db.getCommitCount()); + auto commitCount = db.getCommitCount(); + SINFO("Notifying waiting threads that we've locally committed " << commitCount); + _localCommitNotifier.notifyThrough(commitCount); }; int result = db.commit(stateName(_state), ¬ifyIfCommitted); From db260c8f327518389c8733d53e692c30b5faf3fe Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 13 May 2024 14:28:43 -0700 Subject: [PATCH 3/3] Remove some loglines --- sqlitecluster/SQLiteSequentialNotifier.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/sqlitecluster/SQLiteSequentialNotifier.cpp b/sqlitecluster/SQLiteSequentialNotifier.cpp index 9414cd36b..11944bc79 100644 --- a/sqlitecluster/SQLiteSequentialNotifier.cpp +++ b/sqlitecluster/SQLiteSequentialNotifier.cpp @@ -75,7 +75,6 @@ void SQLiteSequentialNotifier::notifyThrough(uint64_t value) { for (auto valueThreadMapPtr : {&_valueToPendingThreadMap, &_valueToPendingThreadMapNoCurrentTransaction}) { auto& valueThreadMap = *valueThreadMapPtr; auto lastToDelete = valueThreadMap.begin(); - SINFO("Notifying " << valueThreadMap.size() << " waiting threads for value: " << value); for (auto it = valueThreadMap.begin(); it != valueThreadMap.end(); it++) { if (it->first > value) { // If we've passed our value, there's nothing else to erase, so we can stop. @@ -100,7 +99,6 @@ void SQLiteSequentialNotifier::notifyThrough(uint64_t value) { // // I think it's reasonable to assume this is the intention for multimap as well, and in my testing, that was the // case. - SINFO("Deleting from thread map through value: " << lastToDelete->first); valueThreadMap.erase(valueThreadMap.begin(), lastToDelete); } }