Skip to content

Commit

Permalink
Merge branch 'develop' into 806_improve_consensus_throughput_latest
Browse files Browse the repository at this point in the history
  • Loading branch information
kladkogex authored Apr 29, 2024
2 parents b5922c0 + 2f7c743 commit 7d3b820
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 45 deletions.
2 changes: 1 addition & 1 deletion libBLS
Submodule libBLS updated 1 files
+1 −1 deps/build.sh
80 changes: 38 additions & 42 deletions monitoring/StuckDetectionAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,56 +69,51 @@ void StuckDetectionAgent::StuckDetectionLoop( StuckDetectionAgent* _agent ) {

LOG( info, "StuckDetection agent: started monitoring." );

// determine if this is the first restart, or there we restarts
// before
auto numberOfPreviousRestarts = _agent->getNumberOfPreviousRestarts();

uint64_t restartIteration = 1;

while ( true ) {
if ( _agent->getSchain()->getNode()->isExitRequested() )
return;
auto restartFileName = _agent->createStuckFileName( restartIteration );

if ( !boost::filesystem::exists( restartFileName ) ) {
break;
}
restartIteration++;
if ( numberOfPreviousRestarts > 0 ) {
LOG( info, "Stuck detection engine: previous restarts detected:" << numberOfPreviousRestarts );
}

if ( restartIteration > 1 ) {
LOG( info, "Stuck detection engine: previous restarts detected:" << to_string(
restartIteration - 1 ) );
}


if ( _agent->getSchain()->getNode()->isExitRequested() )
return;

uint64_t restartTime = 0;
uint64_t sleepTime = _agent->getSchain()->getNode()->getStuckMonitoringIntervalMs() * 1000;
uint64_t restartIteration = numberOfPreviousRestarts + 1;
uint64_t whenToRestart = 0;

while ( restartTime == 0 ) {
if ( _agent->getSchain()->getNode()->isExitRequested() )
return;
// loop until stuck is detected
do {
try {
usleep( sleepTime );
_agent->getSchain()->getNode()->exitCheck();
restartTime = _agent->checkForRestart( restartIteration );
usleep(_agent->getSchain()->getNode()->getStuckMonitoringIntervalMs() * 1000);
// this will return non-zero if skaled needs to be restarted
whenToRestart = _agent->doStuckCheckAndReturnTimeWhenToRestart(restartIteration);
} catch ( ExitRequestedException& ) {
return;
} catch ( exception& e ) {
SkaleException::logNested( e );
}
}

} while (whenToRestart == 0 );

CHECK_STATE( restartTime > 0 );
// Stuck detection loop detected stuck. Restart.
try {
LOG( info, "Stuck detection engine: restarting skaled because of stuck detected." );
_agent->restart( restartTime, restartIteration );
_agent->restart(whenToRestart, restartIteration );
} catch ( ExitRequestedException& ) {
return;
}
}

uint64_t StuckDetectionAgent::getNumberOfPreviousRestarts() {
// each time a restart happens, a file with a corresponding name
// is created. To find out how many restarts already happened we
// count these files
uint64_t restartCounter = 0;
while (boost::filesystem::exists(restartFileName(restartCounter + 1))) {
restartCounter++;
}
return restartCounter;
}

void StuckDetectionAgent::join() {
CHECK_STATE( stuckDetectionThreadPool );
stuckDetectionThreadPool->joinAll();
Expand Down Expand Up @@ -174,12 +169,12 @@ bool StuckDetectionAgent::stuckCheck( uint64_t _restartIntervalMs, uint64_t _tim
return result;
}

uint64_t StuckDetectionAgent::checkForRestart( uint64_t _restartIteration ) {
// this function returns 0 if now stuck is detected
// othewise it returns Linux time in ms when to restart
uint64_t StuckDetectionAgent::doStuckCheckAndReturnTimeWhenToRestart(uint64_t _restartIteration ) {
CHECK_STATE( _restartIteration >= 1 );

auto baseRestartIntervalMs = getSchain()->getNode()->getStuckRestartIntervalMs();

uint64_t restartIntervalMs = baseRestartIntervalMs;
auto restartIntervalMs = getSchain()->getNode()->getStuckRestartIntervalMs();

auto blockID = getSchain()->getLastCommittedBlockID();

Expand All @@ -191,12 +186,13 @@ uint64_t StuckDetectionAgent::checkForRestart( uint64_t _restartIteration ) {
if ( sChain->getCryptoManager()->isSGXServerDown() )
return 0;

auto timeStampMs = getSchain()->getBlock( blockID )->getTimeStampS() * 1000;
auto lastBlockTimeStampMs = getSchain()->getBlock(blockID )->getTimeStampS() * 1000;

// check that the chain has not been doing much for a long time
auto startTimeMs = Time::getCurrentTimeMs();
while ( Time::getCurrentTimeMs() - startTimeMs < 60000 ) {
if ( !stuckCheck( restartIntervalMs, timeStampMs ) )
getNode()->exitCheck();
if ( !stuckCheck(restartIntervalMs, lastBlockTimeStampMs ) )
return 0;
usleep( 5 * 1000 * 1000 );
}
Expand All @@ -206,21 +202,21 @@ uint64_t StuckDetectionAgent::checkForRestart( uint64_t _restartIteration ) {

LOG( info, "Cleaned up state" );

return timeStampMs + restartIntervalMs + 120000;
return lastBlockTimeStampMs + restartIntervalMs + 120000;
}
void StuckDetectionAgent::restart( uint64_t _restartTimeMs, uint64_t _iteration ) {
CHECK_STATE( _restartTimeMs > 0 );

// wait until restart time is reached
while ( Time::getCurrentTimeMs() < _restartTimeMs ) {
try {
usleep( 100 );
} catch ( ... ) {
}

getNode()->exitCheck();
}

createStuckRestartFile( _iteration + 1 );
createStuckRestartFile( _iteration );

LOG( err,
"Consensus engine stuck detected, because no blocks were mined for a long time and "
Expand All @@ -229,7 +225,7 @@ void StuckDetectionAgent::restart( uint64_t _restartTimeMs, uint64_t _iteration
exit( 13 );
}

string StuckDetectionAgent::createStuckFileName( uint64_t _iteration ) {
string StuckDetectionAgent::restartFileName(uint64_t _iteration ) {
CHECK_STATE( _iteration >= 1 );
auto engine = getNode()->getConsensusEngine();
CHECK_STATE( engine );
Expand All @@ -242,7 +238,7 @@ string StuckDetectionAgent::createStuckFileName( uint64_t _iteration ) {

void StuckDetectionAgent::createStuckRestartFile( uint64_t _iteration ) {
CHECK_STATE( _iteration >= 1 );
auto fileName = createStuckFileName( _iteration );
auto fileName = restartFileName(_iteration);

ofstream f;
f.open( fileName, ios::trunc );
Expand Down
6 changes: 4 additions & 2 deletions monitoring/StuckDetectionAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ class StuckDetectionAgent : public Agent {

void join();

uint64_t checkForRestart( uint64_t _restartIteration );
uint64_t doStuckCheckAndReturnTimeWhenToRestart(uint64_t _restartIteration );

void restart( uint64_t _baseRestartTimeMs, uint64_t _iteration );

void createStuckRestartFile( uint64_t _iteration );

void cleanupState();

string createStuckFileName( uint64_t _iteration );
string restartFileName(uint64_t _iteration );

bool checkNodesAreOnline();

uint64_t getNumberOfPreviousRestarts();

bool stuckCheck( uint64_t _restartIntervalMs, uint64_t _timeStamp );
};
136 changes: 136 additions & 0 deletions thirdparty/lru_ordered_memory_constrained_cache.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Author: Alexander Ponomarev
* Author: Stan Kladko
*
* This is cache that cleans the last recently used items
* until both the size in total number of elements and the size
* in bytes are satisfied
*/

#ifndef LRUORDEREDCACHE_MEMORY_CONSTRAINED_HPP_INCLUDED
#define LRUORDEREDCACHE_MEMORY_CONSTRAINED_HPP_INCLUDED

#include <map>
#include <list>
#include <string>
#include <cstddef>
#include <stdexcept>
#include <any>
#include <memory>
#include <shared_mutex>

#define CACHE_READ_LOCK( _M_ ) std::shared_lock< std::shared_mutex > _read_lock_( _M_ );
#define CACHE_WRITE_LOCK( _M_ ) std::unique_lock< std::shared_mutex > _write_lock_( _M_ );

namespace cache {

constexpr size_t MAX_VALUE_SIZE = 1024 * 1024 * 1024; // 1 GiGABYTE

template<typename key_t, typename value_t>
class lru_ordered_memory_constrained_cache {

std::shared_mutex m;

public:
typedef typename std::pair<key_t, value_t> key_value_pair_t;
typedef typename std::list<key_value_pair_t>::iterator list_iterator_t;

lru_ordered_memory_constrained_cache(size_t max_size, size_t max_bytes) :
_max_size(max_size), _max_bytes(max_bytes) {
}

void put(const key_t& key, const value_t& value, size_t value_size) {
if (value_size > MAX_VALUE_SIZE) {
throw std::length_error( "Item size too large:" + std::to_string(value_size));
}

CACHE_WRITE_LOCK(m)

auto it = _cache_items_map.find(key);
_cache_items_list.push_front(key_value_pair_t(key, value));
if (it != _cache_items_map.end()) {
size_t item_size = _cache_items_sizes_map.at(key);
if (item_size > _total_bytes) {
throw std::underflow_error("Item size more than total bytes" +
std::to_string(item_size) + ":" + to_string(_total_bytes));
}
_total_bytes -= item_size;
_cache_items_list.erase(it->second);
_cache_items_sizes_map.erase(key);
_cache_items_map.erase(it);
}
_cache_items_map[key] = _cache_items_list.begin();
_cache_items_sizes_map[key] = value_size;
_total_bytes += value_size;

while (_cache_items_map.size() > _max_size || _total_bytes > _max_bytes) {
auto last = _cache_items_list.end();
last--;
size_t item_size = _cache_items_sizes_map.at(last->first);
if ( item_size > _total_bytes ) {
throw std::underflow_error( "Item size more than total bytes" +
std::to_string( item_size ) + ":" +
to_string( _total_bytes ) );
}
_total_bytes -= item_size;
_cache_items_sizes_map.erase(last->first);
_cache_items_map.erase(last->first);
_cache_items_list.pop_back();
}
}

const value_t& get(const key_t& key) {

CACHE_WRITE_LOCK(m);

auto it = _cache_items_map.find(key);
if (it == _cache_items_map.end()) {
throw std::range_error("There is no such key in cache");
} else {
_cache_items_list.splice(_cache_items_list.begin(), _cache_items_list, it->second);
return it->second->second;
}
}

std::any getIfExists(const key_t& key) {

CACHE_WRITE_LOCK(m);

auto it = _cache_items_map.find(key);
if (it == _cache_items_map.end()) {
return std::any();
} else {
_cache_items_list.splice(_cache_items_list.begin(), _cache_items_list, it->second);
return it->second->second;
}
}


bool exists(const key_t& key) {

CACHE_READ_LOCK(m);

return _cache_items_map.find(key) != _cache_items_map.end();
}

size_t size() {
CACHE_READ_LOCK(m);
return _cache_items_map.size();
}

size_t total_bytes() {
return _total_bytes;
}

private:
std::list<key_value_pair_t> _cache_items_list;
std::map<key_t, list_iterator_t> _cache_items_map;
std::map<key_t, size_t> _cache_items_sizes_map;
size_t _max_size;
size_t _max_bytes;
std::atomic<size_t> _total_bytes = 0;
};

} // namespace cache

#endif

0 comments on commit 7d3b820

Please sign in to comment.