Skip to content

Commit

Permalink
Add block arrival time in metrics at the earliest possible point (Con…
Browse files Browse the repository at this point in the history
  • Loading branch information
zilm13 authored Nov 2, 2023
1 parent 8d7bfec commit 451573b
Show file tree
Hide file tree
Showing 58 changed files with 444 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public SafeFuture<List<SubmitDataError>> sendSignedAttestations(

private SafeFuture<InternalValidationResult> processAttestation(final Attestation attestation) {
return attestationManager
.addAttestation(ValidatableAttestation.fromValidator(spec, attestation))
.addAttestation(ValidatableAttestation.fromValidator(spec, attestation), Optional.empty())
.thenPeek(
result -> {
if (!result.isReject()) {
Expand Down Expand Up @@ -591,7 +591,9 @@ public SafeFuture<List<SubmitDataError>> sendAggregateAndProofs(
private SafeFuture<InternalValidationResult> processAggregateAndProof(
final SignedAggregateAndProof aggregateAndProof) {
return attestationManager
.addAggregate(ValidatableAttestation.aggregateFromValidator(spec, aggregateAndProof))
.addAggregate(
ValidatableAttestation.aggregateFromValidator(spec, aggregateAndProof),
Optional.empty())
.thenPeek(
result -> {
if (result.isReject()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void shouldIncludeValidOperations() {
}

private <T extends SszData> void addToPool(final OperationPool<T> pool, final T operation) {
assertThat(pool.addRemote(operation)).isCompletedWithValue(ACCEPT);
assertThat(pool.addRemote(operation, Optional.empty())).isCompletedWithValue(ACCEPT);
}

@Test
Expand Down Expand Up @@ -294,7 +294,8 @@ void shouldNotIncludeInvalidOperations() {
addToPool(attesterSlashingPool, attesterSlashing1);
addToPool(attesterSlashingPool, attesterSlashing2);
addToPool(attesterSlashingPool, attesterSlashing3);
assertThat(contributionPool.addRemote(contribution)).isCompletedWithValue(ACCEPT);
assertThat(contributionPool.addRemote(contribution, Optional.empty()))
.isCompletedWithValue(ACCEPT);
addToPool(blsToExecutionChangePool, blsToExecutionChange1);
addToPool(blsToExecutionChangePool, blsToExecutionChange2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,19 +716,20 @@ void subscribeToSyncCommitteeSubnets_shouldConvertCommitteeIndexToSubnetId() {
@Test
public void sendSignedAttestations_shouldAddAttestationToAttestationManager() {
final Attestation attestation = dataStructureUtil.randomAttestation();
when(attestationManager.addAttestation(any(ValidatableAttestation.class)))
when(attestationManager.addAttestation(any(ValidatableAttestation.class), any()))
.thenReturn(completedFuture(InternalValidationResult.ACCEPT));
final SafeFuture<List<SubmitDataError>> result =
validatorApiHandler.sendSignedAttestations(List.of(attestation));
assertThat(result).isCompletedWithValue(emptyList());

verify(attestationManager).addAttestation(ValidatableAttestation.from(spec, attestation));
verify(attestationManager)
.addAttestation(ValidatableAttestation.from(spec, attestation), Optional.empty());
}

@Test
void sendSignedAttestations_shouldAddToDutyMetricsAndPerformanceTrackerWhenNotInvalid() {
final Attestation attestation = dataStructureUtil.randomAttestation();
when(attestationManager.addAttestation(any(ValidatableAttestation.class)))
when(attestationManager.addAttestation(any(ValidatableAttestation.class), any()))
.thenReturn(completedFuture(InternalValidationResult.SAVE_FOR_FUTURE));

final SafeFuture<List<SubmitDataError>> result =
Expand All @@ -742,7 +743,7 @@ void sendSignedAttestations_shouldAddToDutyMetricsAndPerformanceTrackerWhenNotIn
@Test
void sendSignedAttestations_shouldNotAddToDutyMetricsAndPerformanceTrackerWhenInvalid() {
final Attestation attestation = dataStructureUtil.randomAttestation();
when(attestationManager.addAttestation(any(ValidatableAttestation.class)))
when(attestationManager.addAttestation(any(ValidatableAttestation.class), any()))
.thenReturn(completedFuture(InternalValidationResult.reject("Bad juju")));

final SafeFuture<List<SubmitDataError>> result =
Expand All @@ -757,9 +758,9 @@ void sendSignedAttestations_shouldNotAddToDutyMetricsAndPerformanceTrackerWhenIn
void sendSignedAttestations_shouldProcessMixOfValidAndInvalidAttestations() {
final Attestation invalidAttestation = dataStructureUtil.randomAttestation();
final Attestation validAttestation = dataStructureUtil.randomAttestation();
when(attestationManager.addAttestation(validatableAttestationOf(invalidAttestation)))
when(attestationManager.addAttestation(validatableAttestationOf(invalidAttestation), any()))
.thenReturn(completedFuture(InternalValidationResult.reject("Bad juju")));
when(attestationManager.addAttestation(validatableAttestationOf(validAttestation)))
when(attestationManager.addAttestation(validatableAttestationOf(validAttestation), any()))
.thenReturn(completedFuture(InternalValidationResult.ACCEPT));

final SafeFuture<List<SubmitDataError>> result =
Expand Down Expand Up @@ -901,14 +902,16 @@ public void sendSignedBlock_shoulNotGossipAndImportBlobsWhenBlobsDoNotExist() {
public void sendAggregateAndProofs_shouldPostAggregateAndProof() {
final SignedAggregateAndProof aggregateAndProof =
dataStructureUtil.randomSignedAggregateAndProof();
when(attestationManager.addAggregate(any(ValidatableAttestation.class)))
when(attestationManager.addAggregate(any(ValidatableAttestation.class), any()))
.thenReturn(completedFuture(InternalValidationResult.ACCEPT));
final SafeFuture<List<SubmitDataError>> result =
validatorApiHandler.sendAggregateAndProofs(List.of(aggregateAndProof));
assertThat(result).isCompletedWithValue(emptyList());

verify(attestationManager)
.addAggregate(ValidatableAttestation.aggregateFromValidator(spec, aggregateAndProof));
.addAggregate(
ValidatableAttestation.aggregateFromValidator(spec, aggregateAndProof),
Optional.empty());
}

@Test
Expand All @@ -918,10 +921,11 @@ void sendAggregateAndProofs_shouldProcessMixOfValidAndInvalidAggregates() {
final SignedAggregateAndProof validAggregate =
dataStructureUtil.randomSignedAggregateAndProof();
when(attestationManager.addAggregate(
ValidatableAttestation.aggregateFromValidator(spec, invalidAggregate)))
ValidatableAttestation.aggregateFromValidator(spec, invalidAggregate),
Optional.empty()))
.thenReturn(completedFuture(InternalValidationResult.reject("Bad juju")));
when(attestationManager.addAggregate(
ValidatableAttestation.aggregateFromValidator(spec, validAggregate)))
ValidatableAttestation.aggregateFromValidator(spec, validAggregate), Optional.empty()))
.thenReturn(completedFuture(InternalValidationResult.ACCEPT));

final SafeFuture<List<SubmitDataError>> result =
Expand All @@ -933,12 +937,14 @@ void sendAggregateAndProofs_shouldProcessMixOfValidAndInvalidAggregates() {
.addAggregate(
argThat(
validatableAttestation ->
validatableAttestation.getSignedAggregateAndProof().equals(validAggregate)));
validatableAttestation.getSignedAggregateAndProof().equals(validAggregate)),
any());
verify(attestationManager)
.addAggregate(
argThat(
validatableAttestation ->
validatableAttestation.getSignedAggregateAndProof().equals(invalidAggregate)));
validatableAttestation.getSignedAggregateAndProof().equals(invalidAggregate)),
any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import okhttp3.Response;
Expand Down Expand Up @@ -59,7 +60,7 @@ void getBlsToExecutionChangesFromPoolReturnsOk() throws IOException {
when(validator.validateForGossip(any()))
.thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT));
for (SignedBlsToExecutionChange operation : expectedOperations) {
assertThat(blsToExecutionChangePool.addRemote(operation)).isCompleted();
assertThat(blsToExecutionChangePool.addRemote(operation, Optional.empty())).isCompleted();
}

Response response = getResponse(GetBlsToExecutionChanges.ROUTE);
Expand All @@ -80,7 +81,7 @@ void getLocalBlsToExecutionChangesFromPool() throws IOException {
when(validator.validateForGossip(any()))
.thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT));
assertThat(blsToExecutionChangePool.addLocal(localChange)).isCompleted();
assertThat(blsToExecutionChangePool.addRemote(remoteChange)).isCompleted();
assertThat(blsToExecutionChangePool.addRemote(remoteChange, Optional.empty())).isCompleted();

Response response =
getResponse(GetBlsToExecutionChanges.ROUTE, Map.of("locally_submitted", "true"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void prepareBlobsAndProofsForBlock(

@Override
public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
final SignedBlobSidecar signedBlobSidecar) {
final SignedBlobSidecar signedBlobSidecar, final Optional<UInt64> arrivalTimestamp) {
return SafeFuture.failedFuture(
new UnsupportedOperationException("Not available in fork choice reference tests"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool.DEFAULT_MAXIMUM_ATTESTATION_COUNT;

import it.unimi.dsi.fastutil.ints.IntList;
import java.util.Optional;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -146,7 +147,7 @@ void shouldAcceptAttestationsBeforeForkWithOriginalForkId() {
createAttestation(attestationSlot, targetBlockAndState, phase0Fork);

final SafeFuture<InternalValidationResult> result =
attestationManager.addAttestation(attestation);
attestationManager.addAttestation(attestation, Optional.empty());
assertThat(result).isCompletedWithValue(InternalValidationResult.ACCEPT);
}

Expand All @@ -163,7 +164,7 @@ void shouldRejectAttestationsBeforeForkWithNewForkId() {
createAttestation(attestationSlot, targetBlockAndState, altairFork);

final SafeFuture<InternalValidationResult> result =
attestationManager.addAttestation(attestation);
attestationManager.addAttestation(attestation, Optional.empty());
assertThat(result)
.isCompletedWithValueMatching(InternalValidationResult::isReject, "is rejected");
}
Expand All @@ -181,7 +182,7 @@ void shouldRejectAttestationsAfterForkWithOldForkId() {
createAttestation(attestationSlot, targetBlockAndState, phase0Fork);

final SafeFuture<InternalValidationResult> result =
attestationManager.addAttestation(attestation);
attestationManager.addAttestation(attestation, Optional.empty());
assertThat(result)
.isCompletedWithValueMatching(InternalValidationResult::isReject, "is rejected");
}
Expand All @@ -199,7 +200,7 @@ void shouldAcceptAttestationsAfterForkWithNewForkId_emptySlots() {
createAttestation(attestationSlot, targetBlockAndState, altairFork);

final SafeFuture<InternalValidationResult> result =
attestationManager.addAttestation(attestation);
attestationManager.addAttestation(attestation, Optional.empty());
assertThat(result).isCompletedWithValue(InternalValidationResult.ACCEPT);
}

Expand All @@ -216,7 +217,7 @@ void shouldAcceptAttestationAggregatesAfterForkWithNewForkId_emptySlots() {
ValidatableAttestation.aggregateFromValidator(spec, aggregate);

final SafeFuture<InternalValidationResult> result =
attestationManager.addAggregate(attestation);
attestationManager.addAggregate(attestation, Optional.empty());

assertThat(result).isCompletedWithValue(InternalValidationResult.ACCEPT);
}
Expand All @@ -237,7 +238,7 @@ void shouldAcceptAttestationsAfterForkWithNewForkId_filledSlots() {
createAttestation(attestationSlot, targetBlockAndState, altairFork);

final SafeFuture<InternalValidationResult> result =
attestationManager.addAttestation(attestation);
attestationManager.addAttestation(attestation, Optional.empty());
assertThat(result).isCompletedWithValue(InternalValidationResult.ACCEPT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -213,7 +214,8 @@ public SafeFuture<InternalValidationResult> addLocal(final T item) {
}

@Override
public SafeFuture<InternalValidationResult> addRemote(final T item) {
public SafeFuture<InternalValidationResult> addRemote(
final T item, final Optional<UInt64> arrivalTimestamp) {
final int validatorIndex = item.getValidatorId();
if (operations.containsKey(validatorIndex)) {
return SafeFuture.completedFuture(rejectForDuplicatedMessage(metricType, validatorIndex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package tech.pegasys.teku.statetransition;

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszCollection;
import tech.pegasys.teku.infrastructure.ssz.SszData;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;

Expand All @@ -38,7 +40,7 @@ SszList<T> getItemsForBlock(

SafeFuture<InternalValidationResult> addLocal(T item);

SafeFuture<InternalValidationResult> addRemote(T item);
SafeFuture<InternalValidationResult> addRemote(T item, Optional<UInt64> arrivalTimestamp);

void addAll(SszCollection<T> items);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public SafeFuture<InternalValidationResult> addLocal(final T item) {
}

@Override
public SafeFuture<InternalValidationResult> addRemote(final T item) {
public SafeFuture<InternalValidationResult> addRemote(
final T item, final Optional<UInt64> arrivalTimestamp) {
return add(item, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.statetransition.attestation;

import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
Expand Down Expand Up @@ -137,14 +138,16 @@ private void validateForGossipAndNotifySendSubscribers(ValidatableAttestation at
}
}

public SafeFuture<InternalValidationResult> addAttestation(ValidatableAttestation attestation) {
public SafeFuture<InternalValidationResult> addAttestation(
final ValidatableAttestation attestation, final Optional<UInt64> arrivalTime) {
SafeFuture<InternalValidationResult> validationResult =
attestationValidator.validate(attestation);
processInternallyValidatedAttestation(validationResult, attestation);
return validationResult;
}

public SafeFuture<InternalValidationResult> addAggregate(ValidatableAttestation attestation) {
public SafeFuture<InternalValidationResult> addAggregate(
final ValidatableAttestation attestation, final Optional<UInt64> arrivalTime) {
SafeFuture<InternalValidationResult> validationResult =
aggregateValidator.validate(attestation);
processInternallyValidatedAttestation(validationResult, attestation);
Expand All @@ -153,7 +156,8 @@ public SafeFuture<InternalValidationResult> addAggregate(ValidatableAttestation

@SuppressWarnings("FutureReturnValueIgnored")
private void processInternallyValidatedAttestation(
SafeFuture<InternalValidationResult> validationResult, ValidatableAttestation attestation) {
final SafeFuture<InternalValidationResult> validationResult,
final ValidatableAttestation attestation) {
validationResult.thenAccept(
internalValidationResult -> {
if (internalValidationResult.code().equals(ValidationResultCode.ACCEPT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.statetransition.blobs;

import java.util.List;
import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
Expand All @@ -29,7 +30,7 @@ public interface BlobSidecarManager {

@Override
public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
final SignedBlobSidecar signedBlobSidecar) {
final SignedBlobSidecar signedBlobSidecar, final Optional<UInt64> arrivalTimestamp) {
return SafeFuture.completedFuture(InternalValidationResult.ACCEPT);
}

Expand Down Expand Up @@ -59,7 +60,7 @@ public BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmed
};

SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
SignedBlobSidecar signedBlobSidecar);
SignedBlobSidecar signedBlobSidecar, Optional<UInt64> arrivalTimestamp);

void prepareForBlockImport(BlobSidecar blobSidecar);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public BlobSidecarManagerImpl(
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public SafeFuture<InternalValidationResult> validateAndPrepareForBlockImport(
final SignedBlobSidecar signedBlobSidecar) {
final SignedBlobSidecar signedBlobSidecar, final Optional<UInt64> arrivalTimestamp) {

final Optional<InternalValidationResult> maybeInvalid =
Optional.ofNullable(
Expand Down Expand Up @@ -165,6 +165,7 @@ public void onSlot(final UInt64 slot) {
.prune(slot)
.forEach(
blobSidecar ->
validateAndPrepareForBlockImport(blobSidecar).ifExceptionGetsHereRaiseABug());
validateAndPrepareForBlockImport(blobSidecar, Optional.empty())
.ifExceptionGetsHereRaiseABug());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.ARRIVAL_EVENT_LABEL;
import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.COMPLETED_EVENT_LABEL;
import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.EXECUTION_PAYLOAD_RESULT_RECEIVED_LABEL;
import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.GOSSIP_VALIDATION_EVENT_LABEL;
import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.PRESTATE_RETRIEVED_EVENT_LABEL;
import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.PROCESSED_EVENT_LABEL;
import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.SUCCESS_RESULT_METRIC_LABEL_VALUE;
Expand Down Expand Up @@ -79,6 +80,7 @@ public static BlockImportMetrics create(final MetricsSystem metricsSystem) {
result ->
List.of(
ARRIVAL_EVENT_LABEL,
GOSSIP_VALIDATION_EVENT_LABEL,
PRESTATE_RETRIEVED_EVENT_LABEL,
PROCESSED_EVENT_LABEL,
TRANSACTION_PREPARED_EVENT_LABEL,
Expand Down
Loading

0 comments on commit 451573b

Please sign in to comment.