Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vertex store size limit; cleanup vertex store events #854

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion common/src/main/java/com/radixdlt/monitoring/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ public record Sync(
Counter invalidEpochInitialQcSyncStates) {}

public record VertexStore(
Gauge size, Counter forks, Counter rebuilds, Counter indirectParents) {}
Gauge vertexCount,
Gauge byteSize,
Counter forks,
Counter rebuilds,
Counter indirectParents,
Counter errorsDueToSizeLimit) {}

public record DivergentVertexExecution(int numDistinctExecutionResults) {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public String toHexString() {
return Hex.toHexString(value);
}

public int size() {
return value.length;
}

@Override
public byte[] hashableBytes() {
return value;
Expand Down
4 changes: 2 additions & 2 deletions core-rust/state-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ pub use crate::types::*;
pub mod engine_prelude {
pub use radix_common::prelude::*;

pub use radix_engine::*;
pub use radix_engine::errors::*;
pub use radix_engine::system::bootstrap::*;
#[cfg(feature = "db_checker")]
Expand All @@ -109,9 +108,10 @@ pub mod engine_prelude {
pub use radix_engine::transaction::*;
pub use radix_engine::updates::*;
pub use radix_engine::vm::*;
pub use radix_engine::*;

pub use radix_engine_interface::blueprints::transaction_processor::*;
pub use radix_engine_interface::blueprints::account::*;
pub use radix_engine_interface::blueprints::transaction_processor::*;
pub use radix_engine_interface::prelude::*;

pub use radix_substate_store_impls::state_tree::tree_store::*;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void no_byzantine_event_occurs_on_epoch_tc_event() {
test.runUntilMessage(
proposalAtRound(ROUNDS_PER_EPOCH + 2),
true,
10 * NUM_NODES * NUM_NODES * ((int) ROUNDS_PER_EPOCH));
10 * NUM_NODES * NUM_NODES * ROUNDS_PER_EPOCH);
// Run for a while more and verify that no byzantine issues occur
test.runForCount(40000);
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/com/radixdlt/RadixNodeModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.radixdlt.consensus.bft.*;
import com.radixdlt.consensus.epoch.EpochsConsensusModule;
import com.radixdlt.consensus.sync.BFTSyncPatienceMillis;
import com.radixdlt.consensus.vertexstore.VertexStoreConfig;
import com.radixdlt.environment.*;
import com.radixdlt.environment.rx.RxEnvironmentModule;
import com.radixdlt.genesis.GenesisProvider;
Expand Down Expand Up @@ -159,6 +160,19 @@ protected void configure() {
bindConstant().annotatedWith(AdditionalRoundTimeIfProposalReceivedMs.class).to(30_000L);
bindConstant().annotatedWith(TimeoutQuorumResolutionDelayMs.class).to(0L);

final var vertexStoreConfig =
new VertexStoreConfig(
properties.get(
"bft.vertex_store.max_serialized_size_bytes",
VertexStoreConfig.DEFAULT_MAX_SERIALIZED_SIZE_BYTES));
bind(VertexStoreConfig.class).toInstance(vertexStoreConfig);

Preconditions.checkArgument(
vertexStoreConfig.maxSerializedSizeBytes()
>= VertexStoreConfig.MIN_MAX_SERIALIZED_SIZE_BYTES,
"Invalid configuration: bft.vertex_store.max_serialized_size_byte must be at least {}",
VertexStoreConfig.MIN_MAX_SERIALIZED_SIZE_BYTES);

// System (e.g. time, random)
install(new SystemModule());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,31 @@

package com.radixdlt.consensus.bft;

import com.google.common.collect.ImmutableList;
import com.radixdlt.consensus.HighQC;
import com.radixdlt.consensus.event.LocalEvent;
import com.radixdlt.consensus.vertexstore.VertexStoreState;
import com.radixdlt.consensus.vertexstore.ExecutedVertex;
import com.radixdlt.lang.Option;
import com.radixdlt.utils.WrappedByteArray;
import java.util.AbstractCollection;

/** An event emitted when the high qc has been updated */
public record BFTHighQCUpdate(VertexStoreState vertexStoreState) implements LocalEvent {
public HighQC getHighQC() {
return vertexStoreState.getHighQC();
/**
* An event emitted when vertex store updates its highQC, which possibly results in some vertices
* being committed.
*/
public record BFTHighQCUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've lost the nice toString here - perhaps we should override it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise we could end up with a massive log of the serialized vertex state in hex.

HighQC newHighQc,
Option<ImmutableList<ExecutedVertex>> committedVertices,
WrappedByteArray serializedVertexStoreState)
implements LocalEvent {

@Override
public String toString() {
return String.format(
"%s[newHighQc=%s numCommittedVertices=%s serializedVertexStoreStateSize=%s]",
getClass().getSimpleName(),
newHighQc,
committedVertices.map(AbstractCollection::size).orElse(0),
serializedVertexStoreState.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,18 @@

package com.radixdlt.consensus.bft;

import com.radixdlt.consensus.BFTHeader;
import com.radixdlt.consensus.event.LocalEvent;
import com.radixdlt.consensus.vertexstore.ExecutedVertex;
import com.radixdlt.consensus.vertexstore.VertexStoreState;
import com.radixdlt.utils.WrappedByteArray;

/** An update emitted when the BFT has inserted a new vertex */
/** An event emitted after a vertex has been inserted into the vertex store. */
public record BFTInsertUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've lost the nice toString here - perhaps we should override it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise we could end up with a massive log of the serialized vertex state in hex.

ExecutedVertex insertedVertex, int siblingsCount, VertexStoreState vertexStoreState)
ExecutedVertex insertedVertex, WrappedByteArray serializedVertexStoreState)
implements LocalEvent {

public int getVertexStoreSize() {
return vertexStoreState.getVertices().size();
}

public BFTHeader getHeader() {
return new BFTHeader(
insertedVertex.getRound(),
insertedVertex.getVertexHash(),
insertedVertex.getLedgerHeader());
@Override
public String toString() {
return String.format(
"%s[insertedVertex=%s serializedVertexStoreStateSize=%s]",
getClass().getSimpleName(), insertedVertex(), serializedVertexStoreState.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@

import com.radixdlt.consensus.event.LocalEvent;
import com.radixdlt.consensus.vertexstore.VertexStoreState;
import com.radixdlt.utils.WrappedByteArray;

/** An update emitted when the BFT has been rebuilt */
public record BFTRebuildUpdate(VertexStoreState vertexStoreState) implements LocalEvent {}
/** An even emitted when the vertex store has been rebuilt. */
public record BFTRebuildUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've lost the nice toString here - perhaps we should override it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise we could end up with a massive log of the serialized vertex state in hex.

VertexStoreState vertexStoreState, WrappedByteArray serializedVertexStoreState)
implements LocalEvent {

@Override
public String toString() {
return String.format(
"%s[serializedVertexStoreStateSize=%s]",
getClass().getSimpleName(), serializedVertexStoreState.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.radixdlt.consensus.liveness.*;
import com.radixdlt.consensus.sync.*;
import com.radixdlt.consensus.vertexstore.VertexStoreAdapter;
import com.radixdlt.consensus.vertexstore.VertexStoreConfig;
import com.radixdlt.consensus.vertexstore.VertexStoreJavaImpl;
import com.radixdlt.crypto.Hasher;
import com.radixdlt.environment.*;
Expand All @@ -84,6 +85,7 @@
import com.radixdlt.messaging.core.GetVerticesRequestRateLimit;
import com.radixdlt.monitoring.Metrics;
import com.radixdlt.p2p.NodeId;
import com.radixdlt.serialization.Serialization;
import com.radixdlt.sync.messages.local.LocalSyncRequest;
import com.radixdlt.utils.TimeSupplier;
import java.util.Comparator;
Expand All @@ -107,7 +109,6 @@ protected void configure() {
eventBinder.addBinding().toInstance(BFTRebuildUpdate.class);
eventBinder.addBinding().toInstance(BFTInsertUpdate.class);
eventBinder.addBinding().toInstance(BFTHighQCUpdate.class);
eventBinder.addBinding().toInstance(BFTCommittedUpdate.class);
eventBinder.addBinding().toInstance(Proposal.class);
eventBinder.addBinding().toInstance(Vote.class);
eventBinder.addBinding().toInstance(LedgerUpdate.class);
Expand Down Expand Up @@ -457,15 +458,17 @@ private VertexStoreFactory vertexStoreFactory(
EventDispatcher<BFTInsertUpdate> updateSender,
EventDispatcher<BFTRebuildUpdate> rebuildUpdateDispatcher,
EventDispatcher<BFTHighQCUpdate> highQCUpdateEventDispatcher,
EventDispatcher<BFTCommittedUpdate> committedDispatcher,
Ledger ledger,
Hasher hasher) {
Hasher hasher,
Serialization serialization,
Metrics metrics,
VertexStoreConfig vertexStoreConfig) {
return vertexStoreState ->
new VertexStoreAdapter(
VertexStoreJavaImpl.create(vertexStoreState, ledger, hasher),
new VertexStoreJavaImpl(
ledger, hasher, serialization, metrics, vertexStoreConfig, vertexStoreState),
highQCUpdateEventDispatcher,
updateSender,
rebuildUpdateDispatcher,
committedDispatcher);
rebuildUpdateDispatcher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void processProposal(Proposal proposal) {
public void processBFTUpdate(BFTInsertUpdate update) {
log.trace("BFTUpdate: Processing {}", update);

final var round = update.getHeader().getRound();
final var round = update.insertedVertex().getRound();
final var vertex = update.insertedVertex();

if (round.equals(currentRound())) {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/com/radixdlt/consensus/sync/BFTSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static java.util.function.Predicate.not;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashCode;
import com.google.common.util.concurrent.RateLimiter;
import com.radixdlt.consensus.*;
Expand Down Expand Up @@ -244,7 +245,7 @@ public SyncResult syncToQC(HighQC highQC, @Nullable NodeId author, HighQcSource
return SyncResult.INVALID;
}

return switch (vertexStore.insertQc(qc)) {
return switch (vertexStore.insertQuorumCertificate(qc)) {
case VertexStore.InsertQcResult.Inserted ignored -> {
// QC was inserted, try TC too (as it can be higher), and then process a new highQC
highQC.highestTC().map(vertexStore::insertTimeoutCertificate);
Expand Down Expand Up @@ -464,8 +465,8 @@ private void rebuildAndSyncQC(SyncState syncState) {
// TODO: check if there are any vertices which haven't been local sync processed yet
if (requiresLedgerSync(syncState)) {
syncState.fetched.sort(Comparator.comparing(v -> v.vertex().getRound()));
ImmutableList<VertexWithHash> nonRootVertices =
syncState.fetched.stream().skip(1).collect(ImmutableList.toImmutableList());
ImmutableSet<VertexWithHash> nonRootVertices =
syncState.fetched.stream().skip(1).collect(ImmutableSet.toImmutableSet());

final var syncStateHighestCommittedQc = syncState.highQC.highestCommittedQC();
final var syncStateHighestTc = syncState.highQC.highestTC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@
* RadixEngine ((RPNV1-718)
*/
public interface PersistentVertexStore {
void save(VertexStoreState vertexStoreState);
void save(byte[] serializedVertexStoreState);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,21 @@

import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashCode;
import com.radixdlt.consensus.HighQC;
import com.radixdlt.consensus.QuorumCertificate;
import com.radixdlt.consensus.TimeoutCertificate;
import com.radixdlt.consensus.VertexWithHash;
import com.radixdlt.consensus.*;
import com.radixdlt.consensus.bft.BFTInsertUpdate;
import com.radixdlt.lang.Option;
import com.radixdlt.lang.Result;
import com.radixdlt.utils.WrappedByteArray;
import java.util.List;

/** Manages the BFT Vertex chain. TODO: Move this logic into ledger package. */
public interface VertexStore {
record CommittedUpdate(ImmutableList<ExecutedVertex> committedVertices) {}
record CommittedUpdate(ImmutableList<ExecutedVertex> committedVertices, HighQC newHighQc) {}

sealed interface InsertQcResult {
record Inserted(
HighQC newHighQc,
// TODO: remove me once vertex store persistence and commit on the java side are gone
VertexStoreState vertexStoreState,
WrappedByteArray serializedVertexStoreState,
Option<CommittedUpdate> committedUpdate)
implements InsertQcResult {}

Expand All @@ -91,19 +89,32 @@ record Ignored() implements InsertQcResult {}
record VertexIsMissing() implements InsertQcResult {}
}

sealed interface InsertTcResult {
record Inserted(HighQC newHighQc, WrappedByteArray serializedVertexStoreState)
implements InsertTcResult {}

record Ignored() implements InsertTcResult {}
}

record InsertVertexChainResult(
List<InsertQcResult.Inserted> insertedQcs, List<BFTInsertUpdate> insertUpdates) {}

record RebuildSummary(
VertexStoreState resultantState, WrappedByteArray serializedVertexStoreState) {}

enum RebuildError {
VERTEX_STORE_SIZE_EXCEEDED,
VERTEX_EXECUTION_ERROR
}

InsertQcResult insertQc(QuorumCertificate qc);

/**
* Inserts a timeout certificate into the store.
*
* @param timeoutCertificate the timeout certificate
* @return true if the timeout certificate was inserted, false if it was ignored because it's not
* the highest
*/
boolean insertTimeoutCertificate(TimeoutCertificate timeoutCertificate);
InsertTcResult insertTimeoutCertificate(TimeoutCertificate timeoutCertificate);

/**
* Inserts a vertex and then attempts to create the next header.
Expand All @@ -114,7 +125,7 @@ record InsertVertexChainResult(

InsertVertexChainResult insertVertexChain(VertexChain vertexChain);

Option<VertexStoreState> tryRebuild(VertexStoreState vertexStoreState);
Result<RebuildSummary, RebuildError> tryRebuild(VertexStoreState vertexStoreState);

boolean containsVertex(HashCode vertexId);

Expand Down Expand Up @@ -143,4 +154,6 @@ record InsertVertexChainResult(
* @return the list of vertices if all found, otherwise an empty list
*/
Option<ImmutableList<VertexWithHash>> getVertices(HashCode vertexHash, int count);

int getCurrentSerializedSizeBytes();
}
Loading
Loading