From befe08d0415cbbc62eeab169b735cad2f9e0ef80 Mon Sep 17 00:00:00 2001 From: David Edey Date: Wed, 6 Nov 2024 22:45:06 +0000 Subject: [PATCH] tweak: Avoid start-up false-positives with liveness checker --- .../monitors/consensus/ConsensusMonitors.java | 12 +++ .../monitors/consensus/LivenessInvariant.java | 85 ++++++++++++++----- 2 files changed, 74 insertions(+), 23 deletions(-) diff --git a/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/ConsensusMonitors.java b/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/ConsensusMonitors.java index 106f2ae031..375384c832 100644 --- a/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/ConsensusMonitors.java +++ b/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/ConsensusMonitors.java @@ -127,6 +127,18 @@ TestInvariant livenessInvariant(NodeEvents nodeEvents) { }; } + public static Module liveness( + long allowedStartUpDuration, TimeUnit startUpTimeUnit, long duration, TimeUnit timeUnit) { + return new AbstractModule() { + @ProvidesIntoMap + @MonitorKey(Monitor.CONSENSUS_LIVENESS) + TestInvariant livenessInvariant(NodeEvents nodeEvents) { + return new LivenessInvariant( + nodeEvents, allowedStartUpDuration, startUpTimeUnit, duration, timeUnit); + } + }; + } + public static Module safety() { return new AbstractModule() { @ProvidesIntoMap diff --git a/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/LivenessInvariant.java b/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/LivenessInvariant.java index 0324e8ae20..7c423e9dc4 100644 --- a/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/LivenessInvariant.java +++ b/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/LivenessInvariant.java @@ -75,6 +75,7 @@ import com.radixdlt.harness.simulation.network.SimulationNodes.RunningNetwork; import io.reactivex.rxjava3.core.Observable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Check that the network is making progress by ensuring that new QCs and epochs are progressively @@ -84,41 +85,79 @@ */ public class LivenessInvariant implements TestInvariant { private final NodeEvents nodeEvents; + private final long allowedStartUpDuration; + private final TimeUnit startUpTimeUnit; private final long duration; private final TimeUnit timeUnit; public LivenessInvariant(NodeEvents nodeEvents, long duration, TimeUnit timeUnit) { + this(nodeEvents, 3, TimeUnit.SECONDS, duration, timeUnit); + } + + public LivenessInvariant( + NodeEvents nodeEvents, + long allowedStartUpDuration, + TimeUnit startUpTimeUnit, + long duration, + TimeUnit timeUnit) { this.nodeEvents = nodeEvents; + this.allowedStartUpDuration = allowedStartUpDuration; + this.startUpTimeUnit = startUpTimeUnit; this.duration = duration; this.timeUnit = timeUnit; } @Override public Observable check(RunningNetwork network) { - return Observable.create( - emitter -> { - nodeEvents.addListener( - (node, highQCUpdate) -> { - emitter.onNext(highQCUpdate.getHighQC().highestQC()); - }, - BFTHighQCUpdate.class); - nodeEvents.addListener( - (node, committed) -> { - emitter.onNext(committed.vertexStoreState().getHighQC().highestQC()); - }, - BFTCommittedUpdate.class); - }) - .serialize() - .map(QuorumCertificate::getProposedHeader) - .map(header -> EpochRound.of(header.getLedgerHeader().getEpoch(), header.getRound())) - .scan(EpochRound.of(0, Round.epochInitial()), Ordering.natural()::max) - .distinctUntilChanged() - .debounce(duration, timeUnit) - .map( - epochRound -> + var isStartedUp = new AtomicBoolean(false); + + var detectLivenessBreakObservable = + Observable.create( + emitter -> { + nodeEvents.addListener( + (node, highQCUpdate) -> { + emitter.onNext(highQCUpdate.getHighQC().highestQC()); + }, + BFTHighQCUpdate.class); + nodeEvents.addListener( + (node, committed) -> { + emitter.onNext(committed.vertexStoreState().getHighQC().highestQC()); + }, + BFTCommittedUpdate.class); + }) + .serialize() + .map(QuorumCertificate::getProposedHeader) + .map(header -> EpochRound.of(header.getLedgerHeader().getEpoch(), header.getRound())) + .scan(EpochRound.of(0, Round.epochInitial()), Ordering.natural()::max) + // With a large number of nodes, they all boot up their pacemakers at currentRound = 1 + // This takes quite a while. Only when they're all booted up (and we see some event with + // round >= 2) + // do we mark ourselves as "started up" and start to watch for gaps in the stream + .skipWhile(epochRound -> epochRound.getRound().lte(Round.of(1))) + .doOnEach( + epochRound -> { + isStartedUp.set(true); + }) + .distinctUntilChanged() + .debounce(duration, timeUnit) + .map( + epochRound -> + new TestInvariantError( + String.format( + "Highest QC hasn't increased from %s after %s %s", + epochRound, duration, timeUnit))); + + var startUpBrokenObservable = + Observable.just( new TestInvariantError( String.format( - "Highest QC hasn't increased from %s after %s %s", - epochRound, duration, timeUnit))); + "Highest QC hasn't increased beyond Round 1 after %s %s, indicating a stall" + + " at start-up", + allowedStartUpDuration, startUpTimeUnit))) + .delay(allowedStartUpDuration, startUpTimeUnit) + // We skip the start-up error if we're already started up after the delay + .skipWhile(ignored -> isStartedUp.get()); + + return Observable.merge(detectLivenessBreakObservable, startUpBrokenObservable); } }