Skip to content

Commit

Permalink
LZ-21: Initialize the historical state table on boot-up when feature …
Browse files Browse the repository at this point in the history
…becomes enabled (#863)

## Summary

Addresses the first part of https://radixdlt.atlassian.net/browse/LZ-21
(i.e. only the backfill on boot-up; handling new commits is out of scope
here).

## Details

The backfill associates the current version JMT leaves with current
values from SubstateDatabase, and records at which state version it
happened (this information will finally be used - among other numbers -
when checking whether particular historical state version is available).

## Testing

Hard to test without any other piece of history-supported feature
implemented.
I manually verified that a Node with non-zero version successfully boots
up and reports the backfill progress.
The feature is disabled by default Node flags.
  • Loading branch information
jakrawcz-rdx authored Mar 11, 2024
2 parents 428ae89 + 18aa0fc commit e0d6f4b
Show file tree
Hide file tree
Showing 19 changed files with 184 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@

/** Database configuration options */
public record DatabaseConfig(
boolean enableLocalTransactionExecutionIndex, boolean enableAccountChangeIndex) {
boolean enableLocalTransactionExecutionIndex,
boolean enableAccountChangeIndex,
boolean enableHistoricalSubstateValues) {
public static void registerCodec(CodecMap codecMap) {
codecMap.register(
DatabaseConfig.class,
Expand Down
26 changes: 25 additions & 1 deletion core-rust/state-manager/src/store/historical_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct StateHashTreeBasedSubstateDatabase<'t, T> {
at_state_version: StateVersion,
}

impl<'t, T: ReadableTreeStore + LeafSubstateValueStore> StateHashTreeBasedSubstateDatabase<'t, T> {
impl<'t, T> StateHashTreeBasedSubstateDatabase<'t, T> {
/// Creates an instance backed by the given lower-level stores and scoped at the given version.
pub fn new(tree_store: &'t T, at_state_version: StateVersion) -> Self {
Self {
Expand All @@ -97,6 +97,30 @@ impl<'t, T: ReadableTreeStore + LeafSubstateValueStore> StateHashTreeBasedSubsta
}
}

impl<'t, T: ReadableTreeStore> StateHashTreeBasedSubstateDatabase<'t, T> {
/// Returns an iterator over *all* Substate-Tier's leaf keys accessible from the scoped version
/// (i.e. from all Entities/Partitions).
/// Each Substate leaf key is accompanied by a full key of the Substate it represents.
pub fn iter_substate_leaf_keys(
&self,
) -> impl Iterator<Item = (StoredTreeNodeKey, DbSubstateKey)> + 't {
self.create_entity_tier()
.into_iter_entity_partition_tiers_from(None)
.flat_map(|partition_tier| partition_tier.into_iter_partition_substate_tiers_from(None))
.flat_map(|substate_tier| {
let partition_key = substate_tier.partition_key().clone();
substate_tier
.into_iter_substate_summaries_from(None)
.map(move |summary| {
(
summary.state_tree_leaf_key,
(partition_key.clone(), summary.sort_key),
)
})
})
}
}

impl<'t, T: ReadableTreeStore + LeafSubstateValueStore> SubstateDatabase
for StateHashTreeBasedSubstateDatabase<'t, T>
{
Expand Down
104 changes: 103 additions & 1 deletion core-rust/state-manager/src/store/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ use tracing::{error, info, warn};
use crate::accumulator_tree::storage::{ReadableAccuTreeStore, TreeSlice};
use crate::query::TransactionIdentifierLoader;
use crate::store::codecs::*;
use crate::store::historical_state::StateHashTreeBasedSubstateDatabase;
use crate::store::traits::gc::{
LedgerProofsGcProgress, LedgerProofsGcStore, StateHashTreeGcStore,
VersionedLedgerProofsGcProgress,
Expand Down Expand Up @@ -121,7 +122,7 @@ use super::traits::extensions::*;
/// The `NAME` constants defined by `*Cf` structs (and referenced below) are used as database column
/// family names. Any change would effectively mean a ledger wipe. For this reason, we choose to
/// define them manually (rather than using the `Into<String>`, which is refactor-sensitive).
const ALL_COLUMN_FAMILIES: [&str; 22] = [
const ALL_COLUMN_FAMILIES: [&str; 23] = [
RawLedgerTransactionsCf::DEFAULT_NAME,
CommittedTransactionIdentifiersCf::VERSIONED_NAME,
TransactionReceiptsCf::VERSIONED_NAME,
Expand All @@ -144,6 +145,7 @@ const ALL_COLUMN_FAMILIES: [&str; 22] = [
AccountChangeStateVersionsCf::NAME,
ExecutedGenesisScenariosCf::VERSIONED_NAME,
LedgerProofsGcProgressCf::VERSIONED_NAME,
AssociatedStateHashTreeValuesCf::DEFAULT_NAME,
];

/// Committed transactions.
Expand Down Expand Up @@ -401,6 +403,7 @@ impl TypedCf for ExtensionsDataCf {
ExtensionsDataKey::AccountChangeIndexLastProcessedStateVersion,
ExtensionsDataKey::LocalTransactionExecutionIndexEnabled,
ExtensionsDataKey::December2023LostSubstatesRestored,
ExtensionsDataKey::ValuesAssociatedWithStateHashTreeSince,
])
}

Expand Down Expand Up @@ -458,13 +461,28 @@ impl VersionedCf for LedgerProofsGcProgressCf {
type VersionedValue = VersionedLedgerProofsGcProgress;
}

/// Substate values associated with leaf nodes of the state hash tree's Substate Tier.
/// Needed for [`LeafSubstateValueStore`].
/// Note: This table does not use explicit versioning wrapper, since each serialized substate
/// value is already versioned.
struct AssociatedStateHashTreeValuesCf;
impl DefaultCf for AssociatedStateHashTreeValuesCf {
type Key = StoredTreeNodeKey;
type Value = DbSubstateValue;

const DEFAULT_NAME: &'static str = "associated_state_hash_tree_values";
type KeyCodec = StoredTreeNodeKeyDbCodec;
type ValueCodec = DirectDbCodec;
}

/// An enum key for [`ExtensionsDataCf`].
#[derive(Eq, PartialEq, Hash, PartialOrd, Ord, Clone, Debug)]
enum ExtensionsDataKey {
AccountChangeIndexLastProcessedStateVersion,
AccountChangeIndexEnabled,
LocalTransactionExecutionIndexEnabled,
December2023LostSubstatesRestored,
ValuesAssociatedWithStateHashTreeSince,
}

// IMPORTANT NOTE: the strings defined below are used as database identifiers. Any change would
Expand All @@ -481,6 +499,9 @@ impl fmt::Display for ExtensionsDataKey {
"local_transaction_execution_index_enabled"
}
Self::December2023LostSubstatesRestored => "december_2023_lost_substates_restored",
Self::ValuesAssociatedWithStateHashTreeSince => {
"values_associated_with_state_hash_tree_since"
}
};
write!(f, "{str}")
}
Expand Down Expand Up @@ -718,6 +739,7 @@ impl ActualStateManagerDatabase {

state_manager_database.catchup_account_change_index();
state_manager_database.restore_december_2023_lost_substates(network);
state_manager_database.ensure_historical_substate_values();

Ok(state_manager_database)
}
Expand Down Expand Up @@ -752,6 +774,7 @@ impl<R: ReadableRocks> StateManagerDatabase<R> {
config: DatabaseConfig {
enable_local_transaction_execution_index: false,
enable_account_change_index: false,
enable_historical_substate_values: false,
},
rocks: DirectRocks { db },
}
Expand Down Expand Up @@ -787,6 +810,7 @@ impl<R: SecondaryRocks> StateManagerDatabase<R> {
config: DatabaseConfig {
enable_local_transaction_execution_index: false,
enable_account_change_index: false,
enable_historical_substate_values: false,
},
rocks: DirectRocks { db },
}
Expand Down Expand Up @@ -845,6 +869,80 @@ impl<R: WriteableRocks> StateManagerDatabase<R> {
&ExtensionsDataKey::LocalTransactionExecutionIndexEnabled,
&scrypto_encode(&self.config.enable_local_transaction_execution_index).unwrap(),
);
// Note: the `enable_values_in_state_hash_tree` is written by the "initialize values" logic,
// after actual values - so that it correctly handles unexpected Node's restarts.
}
}

impl<R: WriteableRocks> StateManagerDatabase<R> {
/// Ensures that the database structures related to historical Substate values are initialized
/// properly, according to the database configuration.
///
/// Most notably: if the historical state feature becomes enabled, this method runs the
/// [`Self::populate_hash_tree_associated_substate_values()`] initialization and records its
/// success afterwards. With this approach, the lengthy backfill is tolerant to the Node's
/// restarts (i.e. it will simply be re-run).
fn ensure_historical_substate_values(&self) {
let db_context = self.open_rw_context();
let extension_data_cf = db_context.cf(ExtensionsDataCf);
let values_associated_since = extension_data_cf
.get(&ExtensionsDataKey::ValuesAssociatedWithStateHashTreeSince)
.map(|bytes| scrypto_decode::<StateVersion>(&bytes).unwrap());

if self.config.enable_historical_substate_values {
if let Some(version) = values_associated_since {
info!("Historical Substate values enabled since {:?}", version);
} else {
let current_version = self.max_state_version();
info!(
"Enabling historical Substate values at {:?}",
current_version
);
self.populate_hash_tree_associated_substate_values(current_version);
extension_data_cf.put(
&ExtensionsDataKey::ValuesAssociatedWithStateHashTreeSince,
&scrypto_encode(&current_version).unwrap(),
);
}
} else if let Some(version) = values_associated_since {
info!(
"Disabling historical Substate values (were enabled since {:?})",
version
);
extension_data_cf.delete(&ExtensionsDataKey::ValuesAssociatedWithStateHashTreeSince);
} else {
info!("Historical Substate values remain disabled");
}
}

/// Traverses the entire state hash tree at the given version (which necessarily must be the
/// current version) and populates [`AssociatedStateHashTreeValuesCf`] for all the Substate
/// leaf keys, using values from the [`SubstateDatabase`].
///
/// The writing is implemented in byte-size-driven batches (since Substates' sizes vary a lot).
fn populate_hash_tree_associated_substate_values(&self, current_version: StateVersion) {
const SUBSTATE_BATCH_BYTE_SIZE: usize = 50 * 1024 * 1024; // arbitrary 50 MB work chunks

let db_context = self.open_rw_context();
let tree_values_cf = db_context.cf(AssociatedStateHashTreeValuesCf);
let substate_leaf_keys = StateHashTreeBasedSubstateDatabase::new(self, current_version)
.iter_substate_leaf_keys();
for (tree_node_key, (partition_key, sort_key)) in substate_leaf_keys {
let value = self
.get_substate(&partition_key, &sort_key)
.expect("substate value referenced by hash tree does not exist");
tree_values_cf.put(&tree_node_key, &value);
if db_context.buffered_data_size() >= SUBSTATE_BATCH_BYTE_SIZE {
db_context.flush();
info!(
"Populated historical values up to tree node key {} (Substate key {:?}:{:?})",
tree_node_key.nibble_path(),
SpreadPrefixKeyMapper::from_db_partition_key(&partition_key),
hex::encode(&sort_key.0),
);
}
}
info!("Finished capturing all current Substate values as historical");
}
}

Expand All @@ -856,6 +954,10 @@ impl<R: ReadableRocks> ConfigurableDatabase for StateManagerDatabase<R> {
fn is_local_transaction_execution_index_enabled(&self) -> bool {
self.config.enable_local_transaction_execution_index
}

fn is_historical_substate_value_storage_enabled(&self) -> bool {
self.config.enable_historical_substate_values
}
}

impl MeasurableDatabase for ActualStateManagerDatabase {
Expand Down
6 changes: 5 additions & 1 deletion core-rust/state-manager/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ pub enum DatabaseConfigValidationError {
pub struct DatabaseConfig {
pub enable_local_transaction_execution_index: bool,
pub enable_account_change_index: bool,
pub enable_historical_substate_values: bool,
}

impl Default for DatabaseConfig {
fn default() -> Self {
DatabaseConfig {
enable_local_transaction_execution_index: true,
enable_account_change_index: true,
enable_historical_substate_values: false,
}
}
}
Expand Down Expand Up @@ -133,6 +135,8 @@ pub trait ConfigurableDatabase {
fn is_account_change_index_enabled(&self) -> bool;

fn is_local_transaction_execution_index_enabled(&self) -> bool;

fn is_historical_substate_value_storage_enabled(&self) -> bool;
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -222,7 +226,7 @@ pub mod substate {
/// course share the same parent).
pub type KeyedSubstateNodeAncestryRecord = (Vec<NodeId>, SubstateNodeAncestryRecord);

/// A store of historical substate values associated with state tree's leaves.
/// A store of historical substate values associated with state hash tree's leaves.
/// See [`WriteableTreeStore::associate_substate_value()`].
///
/// Note: this is *not* a "historical values" store, but only its lower-level source of values.
Expand Down
14 changes: 14 additions & 0 deletions core-rust/state-manager/src/store/typed_cf_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ impl<'r, R: WriteableRocks> BufferedWriteSupport<'r, R> {
self.rocks.write(write_batch);
}
}

/// Returns the size of buffered data, in bytes.
fn buffered_data_size(&self) -> usize {
self.buffer.byte_size()
}
}

impl<'r, R: WriteableRocks> Drop for BufferedWriteSupport<'r, R> {
Expand Down Expand Up @@ -145,6 +150,11 @@ impl<'r, R: WriteableRocks> TypedDbContext<'r, R, BufferedWriteSupport<'r, R>> {
pub fn flush(&self) {
self.write_support.flush();
}

/// Returns the size of buffered data, in bytes.
pub fn buffered_data_size(&self) -> usize {
self.write_support.buffered_data_size()
}
}

impl<'r, R: ReadableRocks, W: WriteSupport> TypedDbContext<'r, R, W> {
Expand Down Expand Up @@ -766,6 +776,10 @@ impl WriteBuffer {
pub fn flip(&self) -> WriteBatch {
self.write_batch.replace(WriteBatch::default())
}

pub fn byte_size(&self) -> usize {
self.write_batch.borrow().size_in_bytes()
}
}

/// An internal wrapper for a [`TypedCf`] and dependencies resolved from it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private DeterministicTest createTest(
GenesisConsensusManagerConfig.Builder.testWithRoundsPerEpoch(
this.roundsPerEpoch)
.totalEmissionXrdPerEpoch(Decimal.ofNonNegative(0))),
new DatabaseConfig(true, false),
new DatabaseConfig(true, false, false),
StateComputerConfig.REV2ProposerConfig.transactionGenerator(
new REV2TransactionGenerator(), 1),
false,
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/com/radixdlt/RadixNodeModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,17 @@ protected void configure() {
NodeConstants.DEFAULT_MAX_TRANSACTION_SIZE);
var mempoolConfig =
new RustMempoolConfig(mempoolMaxTotalTransactionsSize, mempoolMaxTransactionCount);

var enableLocalTransactionExecutionIndex =
properties.get("db.local_transaction_execution_index.enable", true);
var enableAccountChangeIndex = properties.get("db.account_change_index.enable", true);
var enableHistoricalSubstateValues =
properties.get("db.historical_substate_values.enable", false);
var databaseConfig =
new DatabaseConfig(enableLocalTransactionExecutionIndex, enableAccountChangeIndex);
new DatabaseConfig(
enableLocalTransactionExecutionIndex,
enableAccountChangeIndex,
enableHistoricalSubstateValues);

install(new REv2LedgerInitializerModule(genesisProvider));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ static StateComputerConfig rev2(
return new REv2StateComputerConfig(
networkId,
genesis,
new DatabaseConfig(true, false),
new DatabaseConfig(true, false, false),
proposerConfig,
false,
StateHashTreeGcConfig.forTesting(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import org.junit.rules.TemporaryFolder;

public abstract class DeterministicCoreApiTestBase {
private static final DatabaseConfig TEST_DATABASE_CONFIG = new DatabaseConfig(true, false, false);

@Rule public TemporaryFolder folder = new TemporaryFolder();
public static NetworkDefinition networkDefinition = NetworkDefinition.INT_TEST_NET;
public static Addressing addressing = Addressing.ofNetwork(NetworkDefinition.INT_TEST_NET);
Expand All @@ -122,21 +124,18 @@ protected DeterministicCoreApiTestBase() {}

protected DeterministicTest buildRunningServerTest() {
return buildRunningServerTest(
1000000,
new DatabaseConfig(true, false),
GenesisData.NO_SCENARIOS,
ProtocolConfig.testingDefault());
1000000, TEST_DATABASE_CONFIG, GenesisData.NO_SCENARIOS, ProtocolConfig.testingDefault());
}

protected DeterministicTest buildRunningServerTestWithProtocolConfig(
int roundsPerEpoch, ProtocolConfig protocolConfig) {
return buildRunningServerTest(
roundsPerEpoch, new DatabaseConfig(true, false), GenesisData.NO_SCENARIOS, protocolConfig);
roundsPerEpoch, TEST_DATABASE_CONFIG, GenesisData.NO_SCENARIOS, protocolConfig);
}

protected DeterministicTest buildRunningServerTestWithScenarios(ImmutableList<String> scenarios) {
return buildRunningServerTest(
1000000, new DatabaseConfig(true, false), scenarios, ProtocolConfig.testingDefault());
1000000, TEST_DATABASE_CONFIG, scenarios, ProtocolConfig.testingDefault());
}

protected DeterministicTest buildRunningServerTest(DatabaseConfig databaseConfig) {
Expand All @@ -147,7 +146,7 @@ protected DeterministicTest buildRunningServerTest(DatabaseConfig databaseConfig
protected DeterministicTest buildRunningServerTest(int roundsPerEpoch) {
return buildRunningServerTest(
roundsPerEpoch,
new DatabaseConfig(true, false),
TEST_DATABASE_CONFIG,
GenesisData.NO_SCENARIOS,
ProtocolConfig.testingDefault());
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/com/radixdlt/api/SystemApiTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void setup() {
TEST_KEY.getPublicKey(),
Decimal.ONE,
GenesisConsensusManagerConfig.Builder.testDefaults()),
new DatabaseConfig(false, false),
new DatabaseConfig(false, false, false),
StateComputerConfig.REV2ProposerConfig.Mempool.defaults()),
new SyncRelayConfig(500, 10, 3000, 10, Long.MAX_VALUE)))),
new TestP2PModule.Builder().build(),
Expand Down
Loading

0 comments on commit e0d6f4b

Please sign in to comment.