Skip to content

Commit

Permalink
tweak: Avoid start-up false-positives with liveness checker
Browse files Browse the repository at this point in the history
  • Loading branch information
dhedey committed Nov 6, 2024
1 parent 2328195 commit befe08d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ TestInvariant livenessInvariant(NodeEvents nodeEvents) {
};
}

public static Module liveness(
long allowedStartUpDuration, TimeUnit startUpTimeUnit, long duration, TimeUnit timeUnit) {
return new AbstractModule() {
@ProvidesIntoMap
@MonitorKey(Monitor.CONSENSUS_LIVENESS)
TestInvariant livenessInvariant(NodeEvents nodeEvents) {
return new LivenessInvariant(
nodeEvents, allowedStartUpDuration, startUpTimeUnit, duration, timeUnit);
}
};
}

public static Module safety() {
return new AbstractModule() {
@ProvidesIntoMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import com.radixdlt.harness.simulation.network.SimulationNodes.RunningNetwork;
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Check that the network is making progress by ensuring that new QCs and epochs are progressively
Expand All @@ -84,41 +85,79 @@
*/
public class LivenessInvariant implements TestInvariant {
private final NodeEvents nodeEvents;
private final long allowedStartUpDuration;
private final TimeUnit startUpTimeUnit;
private final long duration;
private final TimeUnit timeUnit;

public LivenessInvariant(NodeEvents nodeEvents, long duration, TimeUnit timeUnit) {
this(nodeEvents, 3, TimeUnit.SECONDS, duration, timeUnit);
}

public LivenessInvariant(
NodeEvents nodeEvents,
long allowedStartUpDuration,
TimeUnit startUpTimeUnit,
long duration,
TimeUnit timeUnit) {
this.nodeEvents = nodeEvents;
this.allowedStartUpDuration = allowedStartUpDuration;
this.startUpTimeUnit = startUpTimeUnit;
this.duration = duration;
this.timeUnit = timeUnit;
}

@Override
public Observable<TestInvariantError> check(RunningNetwork network) {
return Observable.<QuorumCertificate>create(
emitter -> {
nodeEvents.addListener(
(node, highQCUpdate) -> {
emitter.onNext(highQCUpdate.getHighQC().highestQC());
},
BFTHighQCUpdate.class);
nodeEvents.addListener(
(node, committed) -> {
emitter.onNext(committed.vertexStoreState().getHighQC().highestQC());
},
BFTCommittedUpdate.class);
})
.serialize()
.map(QuorumCertificate::getProposedHeader)
.map(header -> EpochRound.of(header.getLedgerHeader().getEpoch(), header.getRound()))
.scan(EpochRound.of(0, Round.epochInitial()), Ordering.natural()::max)
.distinctUntilChanged()
.debounce(duration, timeUnit)
.map(
epochRound ->
var isStartedUp = new AtomicBoolean(false);

var detectLivenessBreakObservable =
Observable.<QuorumCertificate>create(
emitter -> {
nodeEvents.addListener(
(node, highQCUpdate) -> {
emitter.onNext(highQCUpdate.getHighQC().highestQC());
},
BFTHighQCUpdate.class);
nodeEvents.addListener(
(node, committed) -> {
emitter.onNext(committed.vertexStoreState().getHighQC().highestQC());
},
BFTCommittedUpdate.class);
})
.serialize()
.map(QuorumCertificate::getProposedHeader)
.map(header -> EpochRound.of(header.getLedgerHeader().getEpoch(), header.getRound()))
.scan(EpochRound.of(0, Round.epochInitial()), Ordering.natural()::max)
// With a large number of nodes, they all boot up their pacemakers at currentRound = 1
// This takes quite a while. Only when they're all booted up (and we see some event with
// round >= 2)
// do we mark ourselves as "started up" and start to watch for gaps in the stream
.skipWhile(epochRound -> epochRound.getRound().lte(Round.of(1)))
.doOnEach(
epochRound -> {
isStartedUp.set(true);
})
.distinctUntilChanged()
.debounce(duration, timeUnit)
.map(
epochRound ->
new TestInvariantError(
String.format(
"Highest QC hasn't increased from %s after %s %s",
epochRound, duration, timeUnit)));

var startUpBrokenObservable =
Observable.just(
new TestInvariantError(
String.format(
"Highest QC hasn't increased from %s after %s %s",
epochRound, duration, timeUnit)));
"Highest QC hasn't increased beyond Round 1 after %s %s, indicating a stall"
+ " at start-up",
allowedStartUpDuration, startUpTimeUnit)))
.delay(allowedStartUpDuration, startUpTimeUnit)
// We skip the start-up error if we're already started up after the delay
.skipWhile(ignored -> isStartedUp.get());

return Observable.merge(detectLivenessBreakObservable, startUpBrokenObservable);
}
}

0 comments on commit befe08d

Please sign in to comment.