diff --git a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp index ddb1cab14e..9b92795206 100644 --- a/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp +++ b/bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp @@ -81,6 +81,13 @@ class InternalSigner : public concord::crypto::ISigner { std::string getPrivKey() const override { return ""; } }; +// Scaling command may break state transfer itself (if, for example, we scale from n1 to n2 < n1 replicas). For that we +// need a client like mechanism which work asynchronously to the state itself. However, a committer replica, may end +// state transfer and get the scale command in the state, before it was caught by CRE. In this case, the command is +// handled by the reconfiguration state transfer callback (see concordbft/kvbc/include/st_reconfiguraion_sm.hpp). The +// two mechanisms are synchronized via the configurations list. Note that we are unable to synchronize them based on +// epoch number, because CRE is (at the moment) unaware to epochs. Epochs are shared via reserved pages which are +// getting updated at the end of state transfer. class ScalingReplicaHandler : public IStateHandler { public: ScalingReplicaHandler() {} @@ -100,7 +107,7 @@ class ScalingReplicaHandler : public IStateHandler { std::stringstream stream; stream << configurations_file.rdbuf(); std::string configs = stream.str(); - return (configs.empty()) || (configs.find(command.config_descriptor) != std::string::npos); + return (configs.empty()) || (configs.find(command.config_descriptor) == std::string::npos); } } return false; diff --git a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp index 956bb75281..9a56af89da 100644 --- a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp +++ b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp @@ -74,31 +74,13 @@ void ReplicaForStateTransfer::start() { stateTransfer->setReconfigurationEngine(cre_); stateTransfer->addOnTransferringCompleteCallback( [this](std::uint64_t) { - // TODO - The next lines up to comment 'YYY' do not belong here (CRE) - consider refactor or move outside if (!config_.isReadOnly) { - // At this point, we, if are not going to have another blocks in state transfer. So, we can safely stop CRE. - // if there is a reconfiguration state change that prevents us from starting another state transfer (i.e. - // scaling) then CRE probably won't work as well. - // 1. First, make sure we handled the most recent available updates. - concord::client::reconfiguration::PollBasedStateClient *pbc = - (concord::client::reconfiguration::PollBasedStateClient *)(cre_->getStateClient()); - bool succ = false; - while (!succ) { - auto latestHandledUpdate = cre_->getLatestKnownUpdateBlock(); - auto latestReconfUpdates = pbc->getStateUpdate(succ); - if (!succ) { - LOG_WARN(GL, "unable to get the latest reconfiguration updates"); - } - for (const auto &update : latestReconfUpdates) { - if (update.blockid > latestHandledUpdate) { - succ = false; - break; - } // else if (!isGettingBlocks) - } - } // while (!succ) { + // CRE may not read all the relevant updates. This may be an issue only in a committer node, which do not run + // state transfer continually. If this is the case, we need to execute the the relevant reconfiguration + // commands, based on the state we gained during state transfer. In any such case, it is the developer + // responsibility to synchronize between the two mechanisms. For example, see the AddRemoveWithWedgeCommand + // handler in concordbft/kvbc/include/st_reconfiguraion_sm.hpp LOG_INFO(GL, "halting cre"); - // 2. Now we can safely halt cre. We know for sure that there are no update in the state transffered - // blocks that haven't been handled yet cre_->halt(); } }, diff --git a/kvbc/src/st_reconfiguration_sm.cpp b/kvbc/src/st_reconfiguration_sm.cpp index 7ed514ad99..e38d98cd86 100644 --- a/kvbc/src/st_reconfiguration_sm.cpp +++ b/kvbc/src/st_reconfiguration_sm.cpp @@ -128,8 +128,33 @@ bool StReconfigurationHandler::handle(const concord::messages::AddRemoveWithWedg uint64_t bft_seq_num, uint64_t current_cp_num, uint64_t bid) { - return handleWedgeCommands( - command, bid, current_cp_num, bft_seq_num, command.bft_support, true, command.restart, command.restart); + // This callback should work together with the asyncCRE scaling handler. If the scale command has broken state + // transfer itself, we won't even get to that point, and the CRE is expected to handle this case. However, if we did + // manage to complete state transfer, CRE is halted and we need to execute the command based on the state we gained + // during state transfer. In order not to execute the command twice, we do check that this configuration was not + // already executed by reading the local configuration list. + std::ofstream configurations_file; + configurations_file.open(bftEngine::ReplicaConfig::instance().configurationViewFilePath + "/" + + concord::reconfiguration::configurationsFileName + "." + + std::to_string(bftEngine::ReplicaConfig::instance().replicaId), + std::ios_base::app); + if (configurations_file.good()) { + std::stringstream stream; + stream << configurations_file.rdbuf(); + std::string configs = stream.str(); + if (configs.find(command.config_descriptor) != std::string::npos) { + LOG_INFO(GL, "the scale command was already executed by async CRE, we won't execute it again"); + return false; + } + } + bool succ = true; + concord::messages::ReconfigurationResponse response; + for (auto &h : orig_reconf_handlers_) { + // If it was written to the blockchain, it means that this is a valid request. + // We do need to execute every relevant reconfiguration handler to complete the scale command. + succ &= h->handle(command, bft_seq_num, UINT32_MAX, std::nullopt, response); + } + return succ; } bool StReconfigurationHandler::handle(const concord::messages::RestartCommand &command, diff --git a/tests/apollo/test_skvbc_restart_recovery.py b/tests/apollo/test_skvbc_restart_recovery.py index f41cd290f2..ff2663cc96 100644 --- a/tests/apollo/test_skvbc_restart_recovery.py +++ b/tests/apollo/test_skvbc_restart_recovery.py @@ -492,7 +492,6 @@ async def test_recovering_of_primary_with_initiated_view_change(self, bft_networ await bft_network.wait_for_replicas_to_reach_at_least_view(replicas_ids=bft_network.all_replicas(), expected_view=view, timeout=20 + timeouts) - @unittest.skip("Unstable") @with_trio @with_bft_network(start_replica_cmd, selected_configs=lambda n, f, c: c == 0, rotate_keys=True) @verify_linearizability()