Skip to content

Commit

Permalink
remove state transfer cre loop
Browse files Browse the repository at this point in the history
  • Loading branch information
yontyon committed May 1, 2023
1 parent 3bcec13 commit dc458f4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 26 deletions.
9 changes: 8 additions & 1 deletion bftengine/src/bcstatetransfer/AsyncStateTransferCRE.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ class InternalSigner : public concord::crypto::ISigner {
std::string getPrivKey() const override { return ""; }
};

// Scaling command may break state transfer itself (if, for example, we scale from n1 to n2 < n1 replicas). For that we
// need a client like mechanism which work asynchronously to the state itself. However, a committer replica, may end
// state transfer and get the scale command in the state, before it was caught by CRE. In this case, the command is
// handled by the reconfiguration state transfer callback (see concordbft/kvbc/include/st_reconfiguraion_sm.hpp). The
// two mechanisms are synchronized via the configurations list. Note that we are unable to synchronize them based on
// epoch number, because CRE is (at the moment) unaware to epochs. Epochs are shared via reserved pages which are
// getting updated at the end of state transfer.
class ScalingReplicaHandler : public IStateHandler {
public:
ScalingReplicaHandler() {}
Expand All @@ -100,7 +107,7 @@ class ScalingReplicaHandler : public IStateHandler {
std::stringstream stream;
stream << configurations_file.rdbuf();
std::string configs = stream.str();
return (configs.empty()) || (configs.find(command.config_descriptor) != std::string::npos);
return (configs.empty()) || (configs.find(command.config_descriptor) == std::string::npos);
}
}
return false;
Expand Down
28 changes: 5 additions & 23 deletions bftengine/src/bftengine/ReplicaForStateTransfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,13 @@ void ReplicaForStateTransfer::start() {
stateTransfer->setReconfigurationEngine(cre_);
stateTransfer->addOnTransferringCompleteCallback(
[this](std::uint64_t) {
// TODO - The next lines up to comment 'YYY' do not belong here (CRE) - consider refactor or move outside
if (!config_.isReadOnly) {
// At this point, we, if are not going to have another blocks in state transfer. So, we can safely stop CRE.
// if there is a reconfiguration state change that prevents us from starting another state transfer (i.e.
// scaling) then CRE probably won't work as well.
// 1. First, make sure we handled the most recent available updates.
concord::client::reconfiguration::PollBasedStateClient *pbc =
(concord::client::reconfiguration::PollBasedStateClient *)(cre_->getStateClient());
bool succ = false;
while (!succ) {
auto latestHandledUpdate = cre_->getLatestKnownUpdateBlock();
auto latestReconfUpdates = pbc->getStateUpdate(succ);
if (!succ) {
LOG_WARN(GL, "unable to get the latest reconfiguration updates");
}
for (const auto &update : latestReconfUpdates) {
if (update.blockid > latestHandledUpdate) {
succ = false;
break;
} // else if (!isGettingBlocks)
}
} // while (!succ) {
// CRE may not read all the relevant updates. This may be an issue only in a committer node, which do not run
// state transfer continually. If this is the case, we need to execute the the relevant reconfiguration
// commands, based on the state we gained during state transfer. In any such case, it is the developer
// responsibility to synchronize between the two mechanisms. For example, see the AddRemoveWithWedgeCommand
// handler in concordbft/kvbc/include/st_reconfiguraion_sm.hpp
LOG_INFO(GL, "halting cre");
// 2. Now we can safely halt cre. We know for sure that there are no update in the state transffered
// blocks that haven't been handled yet
cre_->halt();
}
},
Expand Down
29 changes: 27 additions & 2 deletions kvbc/src/st_reconfiguration_sm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,33 @@ bool StReconfigurationHandler::handle(const concord::messages::AddRemoveWithWedg
uint64_t bft_seq_num,
uint64_t current_cp_num,
uint64_t bid) {
return handleWedgeCommands(
command, bid, current_cp_num, bft_seq_num, command.bft_support, true, command.restart, command.restart);
// This callback should work together with the asyncCRE scaling handler. If the scale command has broken state
// transfer itself, we won't even get to that point, and the CRE is expected to handle this case. However, if we did
// manage to complete state transfer, CRE is halted and we need to execute the command based on the state we gained
// during state transfer. In order not to execute the command twice, we do check that this configuration was not
// already executed by reading the local configuration list.
std::ofstream configurations_file;
configurations_file.open(bftEngine::ReplicaConfig::instance().configurationViewFilePath + "/" +
concord::reconfiguration::configurationsFileName + "." +
std::to_string(bftEngine::ReplicaConfig::instance().replicaId),
std::ios_base::app);
if (configurations_file.good()) {
std::stringstream stream;
stream << configurations_file.rdbuf();
std::string configs = stream.str();
if (configs.find(command.config_descriptor) != std::string::npos) {
LOG_INFO(GL, "the scale command was already executed by async CRE, we won't execute it again");
return false;
}
}
bool succ = true;
concord::messages::ReconfigurationResponse response;
for (auto &h : orig_reconf_handlers_) {
// If it was written to the blockchain, it means that this is a valid request.
// We do need to execute every relevant reconfiguration handler to complete the scale command.
succ &= h->handle(command, bft_seq_num, UINT32_MAX, std::nullopt, response);
}
return succ;
}

bool StReconfigurationHandler::handle(const concord::messages::RestartCommand &command,
Expand Down

0 comments on commit dc458f4

Please sign in to comment.