Skip to content

Commit

Permalink
Merge pull request #1783 from Expensify/tyler-timeout-subscribe-query
Browse files Browse the repository at this point in the history
Time out SUBSCRIBE query instead of forking.
  • Loading branch information
tylerkaraszewski authored Jun 24, 2024
2 parents 92aaa14 + e3a1370 commit 2c1102a
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 10 deletions.
9 changes: 7 additions & 2 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -874,14 +874,19 @@ string SQLite::getCommittedHash() {
return _sharedData.lastCommittedHash.load();
}

bool SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result) {
int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS) {
// Look up all the queries within that range
SASSERTWARN(SWITHIN(1, fromIndex, toIndex));
string query = _getJournalQuery({"SELECT id, hash, query FROM", "WHERE id >= " + SQ(fromIndex) +
(toIndex ? " AND id <= " + SQ(toIndex) : "")});
SDEBUG("Getting commits #" << fromIndex << "-" << toIndex);
query = "SELECT hash, query FROM (" + query + ") ORDER BY id";
return !SQuery(_db, "getting commits", query, result);
if (timeoutLimitUS) {
setTimeout(timeoutLimitUS);
}
int queryResult = SQuery(_db, "getting commits", query, result);
clearTimeout();
return queryResult;
}

int64_t SQLite::getLastInsertRowID() {
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class SQLite {
static bool getCommit(sqlite3* db, const vector<string> journalNames, uint64_t index, string& query, string& hash);

// Looks up a range of commits.
bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result);
int getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS = 0);

// Set a time limit for this transaction, in US from the current time.
void setTimeout(uint64_t timeLimitUS);
Expand Down
26 changes: 20 additions & 6 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
}
PINFO("Received SUBSCRIBE, accepting new follower");
SData response("SUBSCRIPTION_APPROVED");
_queueSynchronize(this, peer, _db, response, true); // Send everything it's missing

// We send every remaining commit that the node doesn't have, but we set a timeout on the query that gathers these to half the
// maximum time limit that will cause this node to be disconnected from the cluster.
_queueSynchronize(this, peer, _db, response, true, RECV_TIMEOUT / 2);
_sendToPeer(peer, response);
SASSERTWARN(!peer->subscribed);
peer->subscribed = true;
Expand Down Expand Up @@ -2083,7 +2086,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance
}
}

void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) {
void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS) {
// We need this to check the state of the node, and we also need `name` to make the logging macros work in a static
// function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that,
// so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that.
Expand Down Expand Up @@ -2132,12 +2135,23 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
// Figure out how much to send it
uint64_t fromIndex = peerCommitCount + 1;
uint64_t toIndex = targetCommit;
if (!sendAll)
if (sendAll) {
SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex);
} else {
toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time
if (!db.getCommits(fromIndex, toIndex, result))
STHROW("error getting commits");
if ((uint64_t)result.size() != toIndex - fromIndex + 1)
}
int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutAfterUS);
if (resultCode) {
if (resultCode == SQLITE_INTERRUPT) {
STHROW("synchronization query timeout");
} else {
STHROW("error getting commits");
}
}

if ((uint64_t)result.size() != toIndex - fromIndex + 1) {
STHROW("mismatched commit count");
}

// Wrap everything into one huge message
PINFO("Synchronizing commits from " << peerCommitCount + 1 << "-" << targetCommit);
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class SQLiteNode : public STCPManager {
// Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the
// *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker
// thread with a different DB object) - which is why this is static.
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll);
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS = 0);

bool _isNothingBlockingShutdown() const;
bool _majoritySubscribed() const;
Expand Down

0 comments on commit 2c1102a

Please sign in to comment.