Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update expensify_prod branch #1723

Merged
merged 4 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
// the DB was at when the transaction began on leader).
bool quorum = !SStartsWith(command["ID"], "ASYNC");
uint64_t waitForCount = SStartsWith(command["ID"], "ASYNC") ? command.calcU64("dbCountAtStart") : currentCount;
SDEBUG("Thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")");
SINFO("BEGIN_TRANSACTION replicate thread for commit " << newCount << " waiting on DB count " << waitForCount << " (" << (quorum ? "QUORUM" : "ASYNC") << ")");
while (true) {
SQLiteSequentialNotifier::RESULT result = _localCommitNotifier.waitFor(waitForCount, false);
if (result == SQLiteSequentialNotifier::RESULT::UNKNOWN) {
Expand All @@ -223,6 +223,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
SERROR("Got unhandled SQLiteSequentialNotifier::RESULT value, did someone update the enum without updating this block?");
}
}
SINFO("Finished waiting for commit count " << waitForCount << ", beginning replicate write.");

try {
int result = -1;
Expand All @@ -231,7 +232,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
if (commitAttemptCount > 1) {
SINFO("Commit attempt number " << commitAttemptCount << " for concurrent replication.");
}
SDEBUG("BEGIN for commit " << newCount);
SINFO("BEGIN for commit " << newCount);
bool uniqueContraintsError = false;
try {
auto start = chrono::steady_clock::now();
Expand Down Expand Up @@ -267,7 +268,9 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
// don't send LEADER the approval for this until inside of `prepare`. This potentially makes us
// wait while holding the commit lock for non-concurrent transactions, but I guess nobody else with
// a commit after us will be able to commit, either.
SINFO("Waiting on leader to say it has committed transaction " << command.calcU64("NewCount"));
SQLiteSequentialNotifier::RESULT waitResult = _leaderCommitNotifier.waitFor(command.calcU64("NewCount"), true);
SINFO("Leader reported committing transaction " << command.calcU64("NewCount") << ", committing.");
if (uniqueContraintsError) {
SINFO("Got unique constraints error in replication, restarting.");
--_concurrentReplicateTransactions;
Expand Down Expand Up @@ -300,6 +303,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
--_concurrentReplicateTransactions;
goSearchingOnExit = true;
} else if (SIEquals(command.methodLine, "COMMIT_TRANSACTION")) {
SINFO("Notifying threads that leader has committed transaction " << command.calcU64("CommitCount"));
_leaderCommitNotifier.notifyThrough(command.calcU64("CommitCount"));
}
}
Expand Down Expand Up @@ -1930,15 +1934,8 @@ void SQLiteNode::_changeState(SQLiteNodeState newState) {
_replicationThreadsShouldExit = true;
uint64_t cancelAfter = _leaderCommitNotifier.getValue();
SINFO("Replication threads should exit, canceling commits after current leader commit " << cancelAfter);
/*
_localCommitNotifier.cancel(cancelAfter);
_leaderCommitNotifier.cancel(cancelAfter);
*/

// Hack. Some bug means that occasionally something doesn't get notified, and we wait forever in a loop.
// This needs to be fixed, but this probably un-breaks it in exchange for potentially losing in-flight transactions.
_localCommitNotifier.cancel(_db.getCommitCount());
_leaderCommitNotifier.cancel(_db.getCommitCount());

// Polling wait for threads to quit. This could use a notification model such as with a condition_variable,
// which would probably be "better" but introduces yet more state variables for a state that we're rarely
Expand Down Expand Up @@ -2435,7 +2432,9 @@ int SQLiteNode::_handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uin

// Let the commit handler notify any other waiting threads that our commit is complete before it starts a checkpoint.
function<void()> notifyIfCommitted = [&]() {
_localCommitNotifier.notifyThrough(db.getCommitCount());
auto commitCount = db.getCommitCount();
SINFO("Notifying waiting threads that we've locally committed " << commitCount);
_localCommitNotifier.notifyThrough(commitCount);
};

int result = db.commit(stateName(_state), &notifyIfCommitted);
Expand Down
11 changes: 1 addition & 10 deletions sqlitecluster/SQLiteSequentialNotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ SQLiteSequentialNotifier::RESULT SQLiteSequentialNotifier::waitFor(uint64_t valu
}
}

size_t cancelAttempts = 0;
while (true) {
unique_lock<mutex> lock(state->waitingThreadMutex);
if (_globalResult == RESULT::CANCELED) {
Expand All @@ -30,12 +29,7 @@ SQLiteSequentialNotifier::RESULT SQLiteSequentialNotifier::waitFor(uint64_t valu
return state->result;
}
// If there's no result yet, log that we're waiting for it.
if (cancelAttempts > 10) {
SWARN("Not waiting anymore for " << value << ", just canceling");
return RESULT::CANCELED;
} else {
SINFO("Canceled after " << _cancelAfter << ", but waiting for " << value << " so not returning yet.");
}
SINFO("Canceled after " << _cancelAfter << ", but waiting for " << value << " so not returning yet.");
} else {
// Canceled and we're not before the cancellation cutoff.
return RESULT::CANCELED;
Expand All @@ -62,7 +56,6 @@ SQLiteSequentialNotifier::RESULT SQLiteSequentialNotifier::waitFor(uint64_t valu
// It's possible that we hit the timeout here after `cancel()` has set the global value, but before we received the notification.
// This isn't a problem, and we can jump back to the top of the loop and check again. If there's some problem, we'll see it there.
SINFO("Hit 1s timeout while global cancel " << (_globalResult == RESULT::CANCELED) << " or " << " specific cancel " << (state->result == RESULT::CANCELED));
cancelAttempts++;
continue;
}
}
Expand All @@ -82,7 +75,6 @@ void SQLiteSequentialNotifier::notifyThrough(uint64_t value) {
for (auto valueThreadMapPtr : {&_valueToPendingThreadMap, &_valueToPendingThreadMapNoCurrentTransaction}) {
auto& valueThreadMap = *valueThreadMapPtr;
auto lastToDelete = valueThreadMap.begin();
SINFO("Notifying " << valueThreadMap.size() << " waiting threads for value: " << value);
for (auto it = valueThreadMap.begin(); it != valueThreadMap.end(); it++) {
if (it->first > value) {
// If we've passed our value, there's nothing else to erase, so we can stop.
Expand All @@ -107,7 +99,6 @@ void SQLiteSequentialNotifier::notifyThrough(uint64_t value) {
//
// I think it's reasonable to assume this is the intention for multimap as well, and in my testing, that was the
// case.
SINFO("Deleting from thread map through value: " << lastToDelete->first);
valueThreadMap.erase(valueThreadMap.begin(), lastToDelete);
}
}
Expand Down