Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerkaraszewski committed Jun 24, 2024
1 parent ed58715 commit e3a1370
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 12 deletions.
4 changes: 2 additions & 2 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,10 +882,10 @@ int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, u
SDEBUG("Getting commits #" << fromIndex << "-" << toIndex);
query = "SELECT hash, query FROM (" + query + ") ORDER BY id";
if (timeoutLimitUS) {
_timeoutLimit = STimeNow() + timeoutLimitUS;
setTimeout(timeoutLimitUS);
}
int queryResult = SQuery(_db, "getting commits", query, result);
_timeoutLimit = 0;
clearTimeout();
return queryResult;
}

Expand Down
15 changes: 6 additions & 9 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
}
PINFO("Received SUBSCRIBE, accepting new follower");
SData response("SUBSCRIPTION_APPROVED");
_queueSynchronize(this, peer, _db, response, true); // Send everything it's missing

// We send every remaining commit that the node doesn't have, but we set a timeout on the query that gathers these to half the
// maximum time limit that will cause this node to be disconnected from the cluster.
_queueSynchronize(this, peer, _db, response, true, RECV_TIMEOUT / 2);
_sendToPeer(peer, response);
SASSERTWARN(!peer->subscribed);
peer->subscribed = true;
Expand Down Expand Up @@ -2083,7 +2086,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance
}
}

void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) {
void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS) {
// We need this to check the state of the node, and we also need `name` to make the logging macros work in a static
// function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that,
// so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that.
Expand Down Expand Up @@ -2132,18 +2135,12 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
// Figure out how much to send it
uint64_t fromIndex = peerCommitCount + 1;
uint64_t toIndex = targetCommit;
uint64_t timeoutLimitUS = 0;
if (sendAll) {
SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex);

// We set this for all commits because this only gets all commits in response to SUBSCRIBE, which is done synchronously, and blocks the commit thread.
// For asynchronous queries, there's nothing being blocked, so it doesn't much matter how long these take.
// This is really not the correct encapsulation for this, but we can improve that later.
timeoutLimitUS = 10'000'000;
} else {
toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time
}
int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS);
int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutAfterUS);
if (resultCode) {
if (resultCode == SQLITE_INTERRUPT) {
STHROW("synchronization query timeout");
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class SQLiteNode : public STCPManager {
// Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the
// *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker
// thread with a different DB object) - which is why this is static.
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll);
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS = 0);

bool _isNothingBlockingShutdown() const;
bool _majoritySubscribed() const;
Expand Down

0 comments on commit e3a1370

Please sign in to comment.