Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Entity-listing indices tweaks #926

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static void registerCodec(CodecMap codecMap) {

public static InternalAddress create(byte[] addressBytes) {
if (addressBytes.length != BYTE_LENGTH) {
throw new IllegalArgumentException("Invalid ReNode ID length");
throw new IllegalArgumentException("Invalid Entity ID length");
}
return new InternalAddress(addressBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use convert_case::{Case, Casing};
use itertools::Itertools;

use state_manager::store::traits::indices::{
CreationId, EntityBlueprintId, EntityBlueprintIdV1, ReNodeListingIndex,
CreationId, EntityBlueprintId, EntityBlueprintIdV1, EntityListingIndex,
};

use super::*;
Expand Down Expand Up @@ -1199,7 +1199,7 @@ pub struct EntitySummary {
pub blueprint_id: Option<BlueprintId>, // only present for Object entities
}

impl<'s, S: ReNodeListingIndex> EngineEntityLister<'s, S> {
impl<'s, S: EntityListingIndex> EngineEntityLister<'s, S> {
/// Creates an instance reading from the given database.
pub fn new(database: &'s S) -> Self {
Self { database }
Expand Down
78 changes: 67 additions & 11 deletions core-rust/state-manager/src/store/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* permissions under this License.
*/

use std::mem::size_of;
use std::ops::Range;

use crate::engine_prelude::*;
Expand Down Expand Up @@ -316,6 +317,22 @@ impl DbCodec<NodeId> for NodeIdDbCodec {
#[derive(Default)]
pub struct TypeAndCreationIndexKeyDbCodec {}

impl TypeAndCreationIndexKeyDbCodec {
/// An extracted "how are parts encoded together" knowledge, to be shared with the
/// [`BoundedDbCodec`] implementation.
fn encode_parts(
entity_byte: u8,
state_version_bytes: &[u8; StateVersion::BYTE_LEN],
index_within_txn_bytes: &[u8; size_of::<u32>()],
) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.push(entity_byte);
bytes.extend_from_slice(state_version_bytes);
bytes.extend_from_slice(index_within_txn_bytes);
bytes
}
}

impl DbCodec<(EntityType, CreationId)> for TypeAndCreationIndexKeyDbCodec {
fn encode(&self, value: &(EntityType, CreationId)) -> Vec<u8> {
let (
Expand All @@ -325,11 +342,11 @@ impl DbCodec<(EntityType, CreationId)> for TypeAndCreationIndexKeyDbCodec {
index_within_txn,
},
) = value;
let mut bytes = Vec::new();
bytes.push(*entity_type as u8);
bytes.extend_from_slice(&state_version.to_be_bytes());
bytes.extend_from_slice(&index_within_txn.to_be_bytes());
bytes
Self::encode_parts(
*entity_type as u8,
&state_version.to_be_bytes(),
&index_within_txn.to_be_bytes(),
)
}

fn decode(&self, bytes: &[u8]) -> (EntityType, CreationId) {
Expand All @@ -350,6 +367,16 @@ impl DbCodec<(EntityType, CreationId)> for TypeAndCreationIndexKeyDbCodec {
}
}

impl BoundedDbCodec for TypeAndCreationIndexKeyDbCodec {
fn upper_bound_encoding(&self) -> Vec<u8> {
Self::encode_parts(
u8::MAX,
&[u8::MAX; StateVersion::BYTE_LEN],
&[u8::MAX; size_of::<u32>()],
)
}
}

impl GroupPreservingDbCodec for TypeAndCreationIndexKeyDbCodec {
type Group = EntityType;

Expand All @@ -372,6 +399,24 @@ impl IntraGroupOrderPreservingDbCodec<(EntityType, CreationId)> for TypeAndCreat
#[derive(Default)]
pub struct BlueprintAndCreationIndexKeyDbCodec {}

impl BlueprintAndCreationIndexKeyDbCodec {
/// An extracted "how are parts encoded together" knowledge, to be shared with the
/// [`BoundedDbCodec`] implementation.
fn encode_parts(
package_address_bytes: &[u8; NodeId::LENGTH],
blueprint_name_hash_bytes: &[u8; Hash::LENGTH],
state_version_bytes: &[u8; StateVersion::BYTE_LEN],
index_within_txn_bytes: &[u8; size_of::<u32>()],
) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(package_address_bytes);
bytes.extend_from_slice(blueprint_name_hash_bytes);
bytes.extend_from_slice(state_version_bytes);
bytes.extend_from_slice(index_within_txn_bytes);
bytes
}
}

impl DbCodec<(PackageAddress, Hash, CreationId)> for BlueprintAndCreationIndexKeyDbCodec {
fn encode(&self, value: &(PackageAddress, Hash, CreationId)) -> Vec<u8> {
let (
Expand All @@ -382,12 +427,12 @@ impl DbCodec<(PackageAddress, Hash, CreationId)> for BlueprintAndCreationIndexKe
index_within_txn,
},
) = value;
let mut bytes = Vec::new();
bytes.extend_from_slice(&package_address.as_node_id().0);
bytes.extend_from_slice(&blueprint_name_hash.0);
bytes.extend_from_slice(&state_version.to_be_bytes());
bytes.extend_from_slice(&index_within_txn.to_be_bytes());
bytes
Self::encode_parts(
&package_address.as_node_id().0,
&blueprint_name_hash.0,
&state_version.to_be_bytes(),
&index_within_txn.to_be_bytes(),
)
}

fn decode(&self, bytes: &[u8]) -> (PackageAddress, Hash, CreationId) {
Expand All @@ -413,6 +458,17 @@ impl DbCodec<(PackageAddress, Hash, CreationId)> for BlueprintAndCreationIndexKe
}
}

impl BoundedDbCodec for BlueprintAndCreationIndexKeyDbCodec {
fn upper_bound_encoding(&self) -> Vec<u8> {
Self::encode_parts(
&[u8::MAX; NodeId::LENGTH],
&[u8::MAX; Hash::LENGTH],
&[u8::MAX; StateVersion::BYTE_LEN],
&[u8::MAX; size_of::<u32>()],
)
}
}

impl GroupPreservingDbCodec for BlueprintAndCreationIndexKeyDbCodec {
type Group = (PackageAddress, Hash);

Expand Down
6 changes: 3 additions & 3 deletions core-rust/state-manager/src/store/historical_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use substate_store_impls::state_tree::entity_tier::EntityTier;
use crate::engine_prelude::*;
use crate::query::StateManagerSubstateQueries;
use crate::store::traits::*;
use crate::traits::indices::{CreationId, EntityBlueprintId, ReNodeListingIndex};
use crate::traits::indices::{CreationId, EntityBlueprintId, EntityListingIndex};
use crate::{
CommittedTransactionIdentifiers, LedgerStateSummary, ReadableRocks, StateManagerDatabase,
StateVersion,
Expand All @@ -77,7 +77,7 @@ use crate::{
/// An implementation of a [`SubstateDatabase`] viewed at a specific [`StateVersion`].
///
/// This database is backed by:
/// - a [`ReadableTreeStore`] - a versioned source of ReNodes / Partitions / Substates metadata,
/// - a [`ReadableTreeStore`] - a versioned source of Entities / Partitions / Substates metadata,
/// - and a [`LeafSubstateValueStore`] - a store of Substate values' associated with their leafs.
pub struct StateTreeBasedSubstateDatabase<'s, DS> {
base_store: DS,
Expand Down Expand Up @@ -360,7 +360,7 @@ impl<'s, R: ReadableRocks + 's, DS: Deref<Target = StateManagerDatabase<R>>>
}
}

impl<'s, R: ReadableRocks + 's, DS: Deref<Target = StateManagerDatabase<R>>> ReNodeListingIndex
impl<'s, R: ReadableRocks + 's, DS: Deref<Target = StateManagerDatabase<R>>> EntityListingIndex
for VersionScopedDatabase<'s, DS>
{
fn get_created_entity_iter(
Expand Down
2 changes: 1 addition & 1 deletion core-rust/state-manager/src/store/jmt_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fn iterate_dfs_post_order<'s, S: ReadableTreeStore>(
TreeNode::Null => {
// A special case: this subtree is empty.
// Note: at the moment of writing this, this case is impossible in practice: we do
// not delete ReNode-Tier tree, and we also do not store empty lower-Tier trees
// not delete Entity-Tier tree, and we also do not store empty lower-Tier trees
// (i.e. we delete their higher-Tier leaf counterpart instead). However, we can
// return a correct empty result here (in case the above assumptions ever change).
Box::new(iter::empty())
Expand Down
73 changes: 47 additions & 26 deletions core-rust/state-manager/src/store/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ use crate::store::traits::gc::{
LedgerProofsGcProgress, LedgerProofsGcStore, StateTreeGcStore, VersionedLedgerProofsGcProgress,
};
use crate::store::traits::indices::{
CreationId, EntityBlueprintId, ObjectBlueprintName, ObjectBlueprintNameV1, ReNodeListingIndex,
CreationId, EntityBlueprintId, EntityListingIndex, ObjectBlueprintName, ObjectBlueprintNameV1,
VersionedEntityBlueprintId, VersionedObjectBlueprintName,
};
use crate::store::traits::measurement::{CategoryDbVolumeStatistic, MeasurableDatabase};
Expand Down Expand Up @@ -799,10 +799,7 @@ impl ActualStateManagerDatabase {
state_manager_database.catchup_account_change_index();
state_manager_database.restore_december_2023_lost_substates(network);
state_manager_database.ensure_historical_substate_values();

if state_manager_database.config.enable_entity_listing_indices {
state_manager_database.catchup_entity_listing_indices()
}
state_manager_database.ensure_entity_listing_indices();

Ok(state_manager_database)
}
Expand Down Expand Up @@ -2067,23 +2064,33 @@ impl<R: WriteableRocks> StateManagerDatabase<R> {
substate_changes: &BySubstate<SubstateChangeAction>,
) {
for (index_within_txn, node_id) in substate_changes.iter_node_ids().enumerate() {
let type_info_creation = substate_changes.get(
let type_info_change = substate_changes.get(
node_id,
&TYPE_INFO_FIELD_PARTITION,
&TypeInfoField::TypeInfo.into(),
);
let Some(type_info_creation) = type_info_creation else {
let Some(type_info_change) = type_info_change else {
continue;
};
let SubstateChangeAction::Create { new } = type_info_creation else {
panic!(
"type info substate should be immutable: {:?}",
type_info_creation
);
let created_type_info_value = match type_info_change {
SubstateChangeAction::Create { new } => new,
SubstateChangeAction::Update { .. } => {
// Even if TypeInfo is updated (e.g. its blueprint version bumped), the fields
// that we care about (package address and blueprint name) are effectively
// immutable - we can thus safely ignore all updates to this substate.
continue;
}
SubstateChangeAction::Delete { .. } => {
panic!(
"type info substate should not be deleted: {:?}",
type_info_change
)
}
};
let type_info = scrypto_decode::<TypeInfoSubstate>(new).expect("decode type info");
let type_info = scrypto_decode::<TypeInfoSubstate>(created_type_info_value)
.expect("decode type info");

let entity_type = node_id.entity_type().expect("type of upserted ReNode");
let entity_type = node_id.entity_type().expect("type of upserted Entity");
let creation_id = CreationId::new(state_version, index_within_txn);

match type_info {
Expand Down Expand Up @@ -2119,18 +2126,33 @@ impl<R: WriteableRocks> StateManagerDatabase<R> {
}
}

fn catchup_entity_listing_indices(&self) {
fn ensure_entity_listing_indices(&self) {
const TXN_FLUSH_INTERVAL: u64 = 10_000;
const PROGRESS_LOG_INTERVAL: u64 = 1_000_000;

info!("ReNode listing indices are enabled.");
let db_context = self.open_rw_context();
let catchup_from_version = db_context

if !self.config.enable_entity_listing_indices {
info!("Entity listing indices are disabled.");
// We remove the indices' data and metadata in a single, cheap write batch:
db_context.cf(TypeAndCreationIndexedEntitiesCf).delete_all();
db_context
.cf(BlueprintAndCreationIndexedObjectsCf)
.delete_all();
db_context
.cf(ExtensionsDataCf)
.delete(&ExtensionsDataKey::EntityListingIndicesLastProcessedStateVersion);
info!("Deleted entity listing indices.");
return;
}

info!("Entity listing indices are enabled.");
let last_processed_state_version = db_context
.cf(ExtensionsDataCf)
.get(&ExtensionsDataKey::EntityListingIndicesLastProcessedStateVersion)
.map(StateVersion::from_be_bytes)
.unwrap_or(StateVersion::pre_genesis())
.next()
.expect("next version");
.unwrap_or(StateVersion::pre_genesis());
let catchup_from_version = last_processed_state_version.next().expect("next version");

let mut receipts_iter = db_context
.cf(TransactionReceiptsCf)
Expand All @@ -2144,18 +2166,17 @@ impl<R: WriteableRocks> StateManagerDatabase<R> {
&receipt.state_changes.substate_level_changes,
);
if state_version.number() % TXN_FLUSH_INTERVAL == 0 || receipts_iter.peek().is_none() {
info!(
"ReNode listing indices updated to {}; flushing...",
state_version
);
if state_version.number() % PROGRESS_LOG_INTERVAL == 0 {
info!("Entity listing indices updated to {}", state_version);
}
db_context.cf(ExtensionsDataCf).put(
&ExtensionsDataKey::EntityListingIndicesLastProcessedStateVersion,
&state_version.to_be_bytes().to_vec(),
);
db_context.flush();
}
}
info!("ReNode listing indices are caught up.");
info!("Caught up Entity listing indices.");
}
}

Expand Down Expand Up @@ -2315,7 +2336,7 @@ impl<R: ReadableRocks> IterableAccountChangeIndex for StateManagerDatabase<R> {
}
}

impl<R: ReadableRocks> ReNodeListingIndex for StateManagerDatabase<R> {
impl<R: ReadableRocks> EntityListingIndex for StateManagerDatabase<R> {
fn get_created_entity_iter(
&self,
entity_type: EntityType,
Expand Down
14 changes: 7 additions & 7 deletions core-rust/state-manager/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ pub mod indices {
use super::*;
use std::ops::Range;

pub trait ReNodeListingIndex {
pub trait EntityListingIndex {
fn get_created_entity_iter(
&self,
entity_type: EntityType,
Expand All @@ -672,14 +672,14 @@ pub mod indices {
) -> Box<dyn Iterator<Item = (CreationId, EntityBlueprintId)> + '_>;
}

/// A unique ID of a ReNode, based on creation order.
/// A unique ID of an Entity, based on creation order.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Sbor)]
pub struct CreationId {
/// State version of the transaction which created the ReNode (i.e. which created the first
/// substate under this ReNode).
/// State version of the transaction which created the Entity (i.e. which created the first
/// Substate under this Entity).
pub state_version: StateVersion,

/// An index in a list of ReNodes created by a single transaction.
/// An index in a list of Entities created by a single transaction.
pub index_within_txn: u32,
}

Expand Down Expand Up @@ -717,7 +717,7 @@ pub mod indices {
}

/// An entity's ID and its blueprint reference.
/// This is a "technical" structure stored in one of the ReNode-listing indices.
/// This is a "technical" structure stored in one of the Entity-listing indices.
#[derive(Debug, Clone, ScryptoCategorize, ScryptoEncode, ScryptoDecode)]
pub struct EntityBlueprintIdV1 {
/// Node ID.
Expand Down Expand Up @@ -751,7 +751,7 @@ pub mod indices {
}

/// An Object's ID and its blueprint name.
/// This is a "technical" structure stored in one of the ReNode-listing indices.
/// This is a "technical" structure stored in one of the Entity-listing indices.
#[derive(Debug, Clone, ScryptoCategorize, ScryptoEncode, ScryptoDecode)]
pub struct ObjectBlueprintNameV1 {
/// Node ID - guaranteed to *not* be a Key-Value Store.
Expand Down
Loading