diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 72191b5d8..a4650c7cc 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -882,10 +882,10 @@ int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, u SDEBUG("Getting commits #" << fromIndex << "-" << toIndex); query = "SELECT hash, query FROM (" + query + ") ORDER BY id"; if (timeoutLimitUS) { - _timeoutLimit = STimeNow() + timeoutLimitUS; + setTimeout(timeoutLimitUS); } int queryResult = SQuery(_db, "getting commits", query, result); - _timeoutLimit = 0; + clearTimeout(); return queryResult; } diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 33c8a9e49..c49499d61 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1660,7 +1660,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } PINFO("Received SUBSCRIBE, accepting new follower"); SData response("SUBSCRIPTION_APPROVED"); - _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing + + // We send every remaining commit that the node doesn't have, but we set a timeout on the query that gathers these to half the + // maximum time limit that will cause this node to be disconnected from the cluster. + _queueSynchronize(this, peer, _db, response, true, RECV_TIMEOUT / 2); _sendToPeer(peer, response); SASSERTWARN(!peer->subscribed); peer->subscribed = true; @@ -2083,7 +2086,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance } } -void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { +void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, // so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that. @@ -2132,18 +2135,12 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Figure out how much to send it uint64_t fromIndex = peerCommitCount + 1; uint64_t toIndex = targetCommit; - uint64_t timeoutLimitUS = 0; if (sendAll) { SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex); - - // We set this for all commits because this only gets all commits in response to SUBSCRIBE, which is done synchronously, and blocks the commit thread. - // For asynchronous queries, there's nothing being blocked, so it doesn't much matter how long these take. - // This is really not the correct encapsulation for this, but we can improve that later. - timeoutLimitUS = 10'000'000; } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time } - int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS); + int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutAfterUS); if (resultCode) { if (resultCode == SQLITE_INTERRUPT) { STHROW("synchronization query timeout"); diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 8212ae18a..83e049884 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -216,7 +216,7 @@ class SQLiteNode : public STCPManager { // Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the // *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker // thread with a different DB object) - which is why this is static. - static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); + static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS = 0); bool _isNothingBlockingShutdown() const; bool _majoritySubscribed() const;