From 451573b8dcc7adbdfd69ea796c4d100f53cb09cf Mon Sep 17 00:00:00 2001 From: Dmitrii Shmatko Date: Thu, 2 Nov 2023 19:02:33 +0300 Subject: [PATCH] Add block arrival time in metrics at the earliest possible point (#7618) --- .../coordinator/ValidatorApiHandler.java | 6 +- .../BlockOperationSelectorFactoryTest.java | 5 +- .../coordinator/ValidatorApiHandlerTest.java | 30 ++++--- ...tBlsToExecutionChangesIntegrationTest.java | 5 +- .../forkchoice/StubBlobSidecarManager.java | 3 +- .../AttestationManagerIntegrationTest.java | 13 +-- .../statetransition/MappedOperationPool.java | 4 +- .../teku/statetransition/OperationPool.java | 4 +- .../statetransition/SimpleOperationPool.java | 3 +- .../attestation/AttestationManager.java | 10 ++- .../blobs/BlobSidecarManager.java | 5 +- .../blobs/BlobSidecarManagerImpl.java | 5 +- .../block/BlockImportMetrics.java | 2 + .../block/BlockImportPerformance.java | 22 ++++- .../statetransition/block/BlockManager.java | 5 +- .../SyncCommitteeContributionPool.java | 4 +- .../SyncCommitteeMessagePool.java | 2 +- .../MappedOperationPoolTest.java | 14 +-- .../blobs/BlobSidecarManagerTest.java | 21 +++-- .../block/BlockManagerTest.java | 15 ++-- .../SyncCommitteeContributionPoolTest.java | 3 +- .../SyncCommitteeMessagePoolTest.java | 4 +- .../time/PerformanceTracker.java | 8 +- ...AttesterSlashingGossipIntegrationTest.java | 2 +- ...oExecutionChangeGossipIntegrationTest.java | 2 +- .../GossipMessageHandlerIntegrationTest.java | 16 ++-- ...ProposerSlashingGossipIntegrationTest.java | 2 +- .../VoluntaryExitGossipIntegrationTest.java | 2 +- .../eth2/Eth2P2PNetworkBuilder.java | 8 ++ .../eth2/gossip/AggregateGossipManager.java | 5 +- .../eth2/gossip/BlobSidecarGossipManager.java | 6 +- .../Eth2PreparedGossipMessageFactory.java | 8 +- .../eth2/gossip/encoding/GossipEncoding.java | 4 +- .../encoding/SnappyPreparedGossipMessage.java | 28 ++++-- .../SnappyPreparedGossipMessageFactory.java | 15 +++- .../SyncCommitteeSubnetSubscriptions.java | 5 +- .../gossip/topics/OperationProcessor.java | 18 +++- .../topichandlers/Eth2TopicHandler.java | 9 +- .../SingleAttestationTopicHandler.java | 4 +- .../gossip/BlobSidecarGossipManagerTest.java | 9 +- .../SnappyPreparedGossipMessageTest.java | 17 +++- .../encoding/SszSnappyGossipEncodingTest.java | 2 +- .../topics/AggregateTopicHandlerTest.java | 19 ++-- .../AttesterSlashingTopicHandlerTest.java | 17 ++-- .../gossip/topics/BlockTopicHandlerTest.java | 21 +++-- .../gossip/topics/Eth2TopicHandlerTest.java | 53 +++++------ .../ProposerSlashingTopicHandlerTest.java | 17 ++-- .../SingleAttestationTopicHandlerTest.java | 21 ++--- .../topics/VoluntaryExitTopicHandlerTest.java | 15 ++-- .../eth2/Eth2P2PNetworkFactory.java | 1 + .../p2p/gossip/PreparedGossipMessage.java | 5 +- .../gossip/PreparedGossipMessageFactory.java | 8 +- .../networking/p2p/gossip/TopicHandler.java | 10 ++- .../p2p/libp2p/LibP2PNetworkBuilder.java | 16 ++++ .../gossip/LibP2PGossipNetworkBuilder.java | 87 ++++++++++++++----- .../network/p2p/DiscoveryNetworkFactory.java | 3 +- .../network/p2p/jvmlibp2p/MockMessageApi.java | 7 ++ .../beaconchain/BeaconChainController.java | 1 + 58 files changed, 444 insertions(+), 212 deletions(-) diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java index 6b0e599090b..091a2650ca5 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java @@ -540,7 +540,7 @@ public SafeFuture> sendSignedAttestations( private SafeFuture processAttestation(final Attestation attestation) { return attestationManager - .addAttestation(ValidatableAttestation.fromValidator(spec, attestation)) + .addAttestation(ValidatableAttestation.fromValidator(spec, attestation), Optional.empty()) .thenPeek( result -> { if (!result.isReject()) { @@ -591,7 +591,9 @@ public SafeFuture> sendAggregateAndProofs( private SafeFuture processAggregateAndProof( final SignedAggregateAndProof aggregateAndProof) { return attestationManager - .addAggregate(ValidatableAttestation.aggregateFromValidator(spec, aggregateAndProof)) + .addAggregate( + ValidatableAttestation.aggregateFromValidator(spec, aggregateAndProof), + Optional.empty()) .thenPeek( result -> { if (result.isReject()) { diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactoryTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactoryTest.java index 77ab8d6de0c..6dadc5cdc61 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactoryTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/BlockOperationSelectorFactoryTest.java @@ -254,7 +254,7 @@ void shouldIncludeValidOperations() { } private void addToPool(final OperationPool pool, final T operation) { - assertThat(pool.addRemote(operation)).isCompletedWithValue(ACCEPT); + assertThat(pool.addRemote(operation, Optional.empty())).isCompletedWithValue(ACCEPT); } @Test @@ -294,7 +294,8 @@ void shouldNotIncludeInvalidOperations() { addToPool(attesterSlashingPool, attesterSlashing1); addToPool(attesterSlashingPool, attesterSlashing2); addToPool(attesterSlashingPool, attesterSlashing3); - assertThat(contributionPool.addRemote(contribution)).isCompletedWithValue(ACCEPT); + assertThat(contributionPool.addRemote(contribution, Optional.empty())) + .isCompletedWithValue(ACCEPT); addToPool(blsToExecutionChangePool, blsToExecutionChange1); addToPool(blsToExecutionChangePool, blsToExecutionChange2); diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java index 78ff9e74216..42e7d5104aa 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java @@ -716,19 +716,20 @@ void subscribeToSyncCommitteeSubnets_shouldConvertCommitteeIndexToSubnetId() { @Test public void sendSignedAttestations_shouldAddAttestationToAttestationManager() { final Attestation attestation = dataStructureUtil.randomAttestation(); - when(attestationManager.addAttestation(any(ValidatableAttestation.class))) + when(attestationManager.addAttestation(any(ValidatableAttestation.class), any())) .thenReturn(completedFuture(InternalValidationResult.ACCEPT)); final SafeFuture> result = validatorApiHandler.sendSignedAttestations(List.of(attestation)); assertThat(result).isCompletedWithValue(emptyList()); - verify(attestationManager).addAttestation(ValidatableAttestation.from(spec, attestation)); + verify(attestationManager) + .addAttestation(ValidatableAttestation.from(spec, attestation), Optional.empty()); } @Test void sendSignedAttestations_shouldAddToDutyMetricsAndPerformanceTrackerWhenNotInvalid() { final Attestation attestation = dataStructureUtil.randomAttestation(); - when(attestationManager.addAttestation(any(ValidatableAttestation.class))) + when(attestationManager.addAttestation(any(ValidatableAttestation.class), any())) .thenReturn(completedFuture(InternalValidationResult.SAVE_FOR_FUTURE)); final SafeFuture> result = @@ -742,7 +743,7 @@ void sendSignedAttestations_shouldAddToDutyMetricsAndPerformanceTrackerWhenNotIn @Test void sendSignedAttestations_shouldNotAddToDutyMetricsAndPerformanceTrackerWhenInvalid() { final Attestation attestation = dataStructureUtil.randomAttestation(); - when(attestationManager.addAttestation(any(ValidatableAttestation.class))) + when(attestationManager.addAttestation(any(ValidatableAttestation.class), any())) .thenReturn(completedFuture(InternalValidationResult.reject("Bad juju"))); final SafeFuture> result = @@ -757,9 +758,9 @@ void sendSignedAttestations_shouldNotAddToDutyMetricsAndPerformanceTrackerWhenIn void sendSignedAttestations_shouldProcessMixOfValidAndInvalidAttestations() { final Attestation invalidAttestation = dataStructureUtil.randomAttestation(); final Attestation validAttestation = dataStructureUtil.randomAttestation(); - when(attestationManager.addAttestation(validatableAttestationOf(invalidAttestation))) + when(attestationManager.addAttestation(validatableAttestationOf(invalidAttestation), any())) .thenReturn(completedFuture(InternalValidationResult.reject("Bad juju"))); - when(attestationManager.addAttestation(validatableAttestationOf(validAttestation))) + when(attestationManager.addAttestation(validatableAttestationOf(validAttestation), any())) .thenReturn(completedFuture(InternalValidationResult.ACCEPT)); final SafeFuture> result = @@ -901,14 +902,16 @@ public void sendSignedBlock_shoulNotGossipAndImportBlobsWhenBlobsDoNotExist() { public void sendAggregateAndProofs_shouldPostAggregateAndProof() { final SignedAggregateAndProof aggregateAndProof = dataStructureUtil.randomSignedAggregateAndProof(); - when(attestationManager.addAggregate(any(ValidatableAttestation.class))) + when(attestationManager.addAggregate(any(ValidatableAttestation.class), any())) .thenReturn(completedFuture(InternalValidationResult.ACCEPT)); final SafeFuture> result = validatorApiHandler.sendAggregateAndProofs(List.of(aggregateAndProof)); assertThat(result).isCompletedWithValue(emptyList()); verify(attestationManager) - .addAggregate(ValidatableAttestation.aggregateFromValidator(spec, aggregateAndProof)); + .addAggregate( + ValidatableAttestation.aggregateFromValidator(spec, aggregateAndProof), + Optional.empty()); } @Test @@ -918,10 +921,11 @@ void sendAggregateAndProofs_shouldProcessMixOfValidAndInvalidAggregates() { final SignedAggregateAndProof validAggregate = dataStructureUtil.randomSignedAggregateAndProof(); when(attestationManager.addAggregate( - ValidatableAttestation.aggregateFromValidator(spec, invalidAggregate))) + ValidatableAttestation.aggregateFromValidator(spec, invalidAggregate), + Optional.empty())) .thenReturn(completedFuture(InternalValidationResult.reject("Bad juju"))); when(attestationManager.addAggregate( - ValidatableAttestation.aggregateFromValidator(spec, validAggregate))) + ValidatableAttestation.aggregateFromValidator(spec, validAggregate), Optional.empty())) .thenReturn(completedFuture(InternalValidationResult.ACCEPT)); final SafeFuture> result = @@ -933,12 +937,14 @@ void sendAggregateAndProofs_shouldProcessMixOfValidAndInvalidAggregates() { .addAggregate( argThat( validatableAttestation -> - validatableAttestation.getSignedAggregateAndProof().equals(validAggregate))); + validatableAttestation.getSignedAggregateAndProof().equals(validAggregate)), + any()); verify(attestationManager) .addAggregate( argThat( validatableAttestation -> - validatableAttestation.getSignedAggregateAndProof().equals(invalidAggregate))); + validatableAttestation.getSignedAggregateAndProof().equals(invalidAggregate)), + any()); } @Test diff --git a/data/beaconrestapi/src/integration-test/java/tech/pegasys/teku/beaconrestapi/v1/beacon/GetBlsToExecutionChangesIntegrationTest.java b/data/beaconrestapi/src/integration-test/java/tech/pegasys/teku/beaconrestapi/v1/beacon/GetBlsToExecutionChangesIntegrationTest.java index 64516dd72fa..9b5f375f418 100644 --- a/data/beaconrestapi/src/integration-test/java/tech/pegasys/teku/beaconrestapi/v1/beacon/GetBlsToExecutionChangesIntegrationTest.java +++ b/data/beaconrestapi/src/integration-test/java/tech/pegasys/teku/beaconrestapi/v1/beacon/GetBlsToExecutionChangesIntegrationTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import okhttp3.Response; @@ -59,7 +60,7 @@ void getBlsToExecutionChangesFromPoolReturnsOk() throws IOException { when(validator.validateForGossip(any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); for (SignedBlsToExecutionChange operation : expectedOperations) { - assertThat(blsToExecutionChangePool.addRemote(operation)).isCompleted(); + assertThat(blsToExecutionChangePool.addRemote(operation, Optional.empty())).isCompleted(); } Response response = getResponse(GetBlsToExecutionChanges.ROUTE); @@ -80,7 +81,7 @@ void getLocalBlsToExecutionChangesFromPool() throws IOException { when(validator.validateForGossip(any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); assertThat(blsToExecutionChangePool.addLocal(localChange)).isCompleted(); - assertThat(blsToExecutionChangePool.addRemote(remoteChange)).isCompleted(); + assertThat(blsToExecutionChangePool.addRemote(remoteChange, Optional.empty())).isCompleted(); Response response = getResponse(GetBlsToExecutionChanges.ROUTE, Map.of("locally_submitted", "true")); diff --git a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/StubBlobSidecarManager.java b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/StubBlobSidecarManager.java index 738ec22e73b..dadab22977e 100644 --- a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/StubBlobSidecarManager.java +++ b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/StubBlobSidecarManager.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -54,7 +55,7 @@ public void prepareBlobsAndProofsForBlock( @Override public SafeFuture validateAndPrepareForBlockImport( - final SignedBlobSidecar signedBlobSidecar) { + final SignedBlobSidecar signedBlobSidecar, final Optional arrivalTimestamp) { return SafeFuture.failedFuture( new UnsupportedOperationException("Not available in fork choice reference tests")); } diff --git a/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java b/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java index 4c4c2d3bfef..fbb604b11cb 100644 --- a/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java +++ b/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java @@ -18,6 +18,7 @@ import static tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool.DEFAULT_MAXIMUM_ATTESTATION_COUNT; import it.unimi.dsi.fastutil.ints.IntList; +import java.util.Optional; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -146,7 +147,7 @@ void shouldAcceptAttestationsBeforeForkWithOriginalForkId() { createAttestation(attestationSlot, targetBlockAndState, phase0Fork); final SafeFuture result = - attestationManager.addAttestation(attestation); + attestationManager.addAttestation(attestation, Optional.empty()); assertThat(result).isCompletedWithValue(InternalValidationResult.ACCEPT); } @@ -163,7 +164,7 @@ void shouldRejectAttestationsBeforeForkWithNewForkId() { createAttestation(attestationSlot, targetBlockAndState, altairFork); final SafeFuture result = - attestationManager.addAttestation(attestation); + attestationManager.addAttestation(attestation, Optional.empty()); assertThat(result) .isCompletedWithValueMatching(InternalValidationResult::isReject, "is rejected"); } @@ -181,7 +182,7 @@ void shouldRejectAttestationsAfterForkWithOldForkId() { createAttestation(attestationSlot, targetBlockAndState, phase0Fork); final SafeFuture result = - attestationManager.addAttestation(attestation); + attestationManager.addAttestation(attestation, Optional.empty()); assertThat(result) .isCompletedWithValueMatching(InternalValidationResult::isReject, "is rejected"); } @@ -199,7 +200,7 @@ void shouldAcceptAttestationsAfterForkWithNewForkId_emptySlots() { createAttestation(attestationSlot, targetBlockAndState, altairFork); final SafeFuture result = - attestationManager.addAttestation(attestation); + attestationManager.addAttestation(attestation, Optional.empty()); assertThat(result).isCompletedWithValue(InternalValidationResult.ACCEPT); } @@ -216,7 +217,7 @@ void shouldAcceptAttestationAggregatesAfterForkWithNewForkId_emptySlots() { ValidatableAttestation.aggregateFromValidator(spec, aggregate); final SafeFuture result = - attestationManager.addAggregate(attestation); + attestationManager.addAggregate(attestation, Optional.empty()); assertThat(result).isCompletedWithValue(InternalValidationResult.ACCEPT); } @@ -237,7 +238,7 @@ void shouldAcceptAttestationsAfterForkWithNewForkId_filledSlots() { createAttestation(attestationSlot, targetBlockAndState, altairFork); final SafeFuture result = - attestationManager.addAttestation(attestation); + attestationManager.addAttestation(attestation, Optional.empty()); assertThat(result).isCompletedWithValue(InternalValidationResult.ACCEPT); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/MappedOperationPool.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/MappedOperationPool.java index a44a19d9f99..111827f1899 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/MappedOperationPool.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/MappedOperationPool.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; @@ -213,7 +214,8 @@ public SafeFuture addLocal(final T item) { } @Override - public SafeFuture addRemote(final T item) { + public SafeFuture addRemote( + final T item, final Optional arrivalTimestamp) { final int validatorIndex = item.getValidatorId(); if (operations.containsKey(validatorIndex)) { return SafeFuture.completedFuture(rejectForDuplicatedMessage(metricType, validatorIndex)) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/OperationPool.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/OperationPool.java index 9adcb6b2059..7b009ff5128 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/OperationPool.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/OperationPool.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.statetransition; import com.google.common.annotations.VisibleForTesting; +import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; @@ -21,6 +22,7 @@ import tech.pegasys.teku.infrastructure.ssz.SszCollection; import tech.pegasys.teku.infrastructure.ssz.SszData; import tech.pegasys.teku.infrastructure.ssz.SszList; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; import tech.pegasys.teku.statetransition.validation.InternalValidationResult; @@ -38,7 +40,7 @@ SszList getItemsForBlock( SafeFuture addLocal(T item); - SafeFuture addRemote(T item); + SafeFuture addRemote(T item, Optional arrivalTimestamp); void addAll(SszCollection items); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/SimpleOperationPool.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/SimpleOperationPool.java index 9d4874ec0e2..4253eda5a3a 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/SimpleOperationPool.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/SimpleOperationPool.java @@ -167,7 +167,8 @@ public SafeFuture addLocal(final T item) { } @Override - public SafeFuture addRemote(final T item) { + public SafeFuture addRemote( + final T item, final Optional arrivalTimestamp) { return add(item, true); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationManager.java index 8f4fe31029e..81b4f67d54e 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationManager.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.statetransition.attestation; import java.util.List; +import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes32; @@ -137,14 +138,16 @@ private void validateForGossipAndNotifySendSubscribers(ValidatableAttestation at } } - public SafeFuture addAttestation(ValidatableAttestation attestation) { + public SafeFuture addAttestation( + final ValidatableAttestation attestation, final Optional arrivalTime) { SafeFuture validationResult = attestationValidator.validate(attestation); processInternallyValidatedAttestation(validationResult, attestation); return validationResult; } - public SafeFuture addAggregate(ValidatableAttestation attestation) { + public SafeFuture addAggregate( + final ValidatableAttestation attestation, final Optional arrivalTime) { SafeFuture validationResult = aggregateValidator.validate(attestation); processInternallyValidatedAttestation(validationResult, attestation); @@ -153,7 +156,8 @@ public SafeFuture addAggregate(ValidatableAttestation @SuppressWarnings("FutureReturnValueIgnored") private void processInternallyValidatedAttestation( - SafeFuture validationResult, ValidatableAttestation attestation) { + final SafeFuture validationResult, + final ValidatableAttestation attestation) { validationResult.thenAccept( internalValidationResult -> { if (internalValidationResult.code().equals(ValidationResultCode.ACCEPT) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java index d60d31316c0..09f36ff1110 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.statetransition.blobs; import java.util.List; +import java.util.Optional; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; @@ -29,7 +30,7 @@ public interface BlobSidecarManager { @Override public SafeFuture validateAndPrepareForBlockImport( - final SignedBlobSidecar signedBlobSidecar) { + final SignedBlobSidecar signedBlobSidecar, final Optional arrivalTimestamp) { return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); } @@ -59,7 +60,7 @@ public BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmed }; SafeFuture validateAndPrepareForBlockImport( - SignedBlobSidecar signedBlobSidecar); + SignedBlobSidecar signedBlobSidecar, Optional arrivalTimestamp); void prepareForBlockImport(BlobSidecar blobSidecar); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java index 6175cd42524..6694c19afcc 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java @@ -71,7 +71,7 @@ public BlobSidecarManagerImpl( @Override @SuppressWarnings("FutureReturnValueIgnored") public SafeFuture validateAndPrepareForBlockImport( - final SignedBlobSidecar signedBlobSidecar) { + final SignedBlobSidecar signedBlobSidecar, final Optional arrivalTimestamp) { final Optional maybeInvalid = Optional.ofNullable( @@ -165,6 +165,7 @@ public void onSlot(final UInt64 slot) { .prune(slot) .forEach( blobSidecar -> - validateAndPrepareForBlockImport(blobSidecar).ifExceptionGetsHereRaiseABug()); + validateAndPrepareForBlockImport(blobSidecar, Optional.empty()) + .ifExceptionGetsHereRaiseABug()); } } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportMetrics.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportMetrics.java index 32bc7909426..040fa9b9baf 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportMetrics.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportMetrics.java @@ -16,6 +16,7 @@ import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.ARRIVAL_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.COMPLETED_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.EXECUTION_PAYLOAD_RESULT_RECEIVED_LABEL; +import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.GOSSIP_VALIDATION_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.PRESTATE_RETRIEVED_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.PROCESSED_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.SUCCESS_RESULT_METRIC_LABEL_VALUE; @@ -79,6 +80,7 @@ public static BlockImportMetrics create(final MetricsSystem metricsSystem) { result -> List.of( ARRIVAL_EVENT_LABEL, + GOSSIP_VALIDATION_EVENT_LABEL, PRESTATE_RETRIEVED_EVENT_LABEL, PROCESSED_EVENT_LABEL, TRANSACTION_PREPARED_EVENT_LABEL, diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportPerformance.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportPerformance.java index 300e9dae00e..b7e647e87b5 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportPerformance.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportPerformance.java @@ -16,6 +16,9 @@ import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis; import java.util.Locale; +import java.util.Optional; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import tech.pegasys.teku.infrastructure.logging.EventLogger; import tech.pegasys.teku.infrastructure.time.PerformanceTracker; import tech.pegasys.teku.infrastructure.time.TimeProvider; @@ -25,7 +28,10 @@ import tech.pegasys.teku.storage.client.RecentChainData; public class BlockImportPerformance { + private static final Logger LOG = LogManager.getLogger(); + public static final String ARRIVAL_EVENT_LABEL = "arrival"; + public static final String GOSSIP_VALIDATION_EVENT_LABEL = "gossip_validation"; public static final String PRESTATE_RETRIEVED_EVENT_LABEL = "pre-state_retrieved"; public static final String PROCESSED_EVENT_LABEL = "processed"; public static final String TRANSACTION_PREPARED_EVENT_LABEL = "transaction_prepared"; @@ -53,12 +59,24 @@ public BlockImportPerformance( this.blockImportMetrics = blockImportMetrics; } - public void arrival(final RecentChainData recentChainData, final UInt64 slot) { + public void arrival( + final RecentChainData recentChainData, + final UInt64 slot, + final Optional arrivalTimestamp) { timeAtSlotStartTimeStamp = secondsToMillis(recentChainData.computeTimeAtSlot(slot)); timeWarningLimitTimeStamp = timeAtSlotStartTimeStamp.plus( secondsToMillis(recentChainData.getSpec().getSecondsPerSlot(slot)).dividedBy(3)); - performanceTracker.addEvent(ARRIVAL_EVENT_LABEL); + if (arrivalTimestamp.isPresent()) { + performanceTracker.addEvent(ARRIVAL_EVENT_LABEL, arrivalTimestamp.get()); + } else { + LOG.trace("Block arrival time missed for slot {}, setting current", slot); + performanceTracker.addEvent(ARRIVAL_EVENT_LABEL); + } + } + + public void gossipValidation() { + performanceTracker.addEvent(GOSSIP_VALIDATION_EVENT_LABEL); } public void preStateRetrieved() { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java index 18ddc4f4ba5..295eb7459a6 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java @@ -148,14 +148,15 @@ public SafeFuture importBlock( @SuppressWarnings("FutureReturnValueIgnored") public SafeFuture validateAndImportBlock( - final SignedBeaconBlock block) { + final SignedBeaconBlock block, final Optional arrivalTimestamp) { final Optional blockImportPerformance; if (blockImportMetrics.isPresent()) { final BlockImportPerformance performance = new BlockImportPerformance(timeProvider, blockImportMetrics.get()); - performance.arrival(recentChainData, block.getSlot()); + performance.arrival(recentChainData, block.getSlot(), arrivalTimestamp); + performance.gossipValidation(); blockImportPerformance = Optional.of(performance); } else { blockImportPerformance = Optional.empty(); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeContributionPool.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeContributionPool.java index 195e1e70e86..3e75d4ce48e 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeContributionPool.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeContributionPool.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; +import java.util.Optional; import java.util.TreeMap; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; @@ -60,7 +61,8 @@ public SafeFuture addLocal( } public SafeFuture addRemote( - final SignedContributionAndProof signedContributionAndProof) { + final SignedContributionAndProof signedContributionAndProof, + final Optional arrivalTimestamp) { return add(signedContributionAndProof, true); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeMessagePool.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeMessagePool.java index e27eef67e2d..eb5970aeb6a 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeMessagePool.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeMessagePool.java @@ -74,7 +74,7 @@ public SafeFuture addLocal( } public SafeFuture addRemote( - final ValidatableSyncCommitteeMessage message) { + final ValidatableSyncCommitteeMessage message, final Optional arrivalTimestamp) { return add(message, true); } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/MappedOperationPoolTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/MappedOperationPoolTest.java index 358251eb175..d993cac30f9 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/MappedOperationPoolTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/MappedOperationPoolTest.java @@ -162,7 +162,7 @@ void shouldSelectLocalOperationBeforeRemoteOperation() { final SignedBlsToExecutionChange secondLocalEntry = dataStructureUtil.randomSignedBlsToExecutionChange(); - assertThat(pool.addRemote(remoteEntry)).isCompleted(); + assertThat(pool.addRemote(remoteEntry, Optional.empty())).isCompleted(); assertThat(pool.addLocal(secondLocalEntry)).isCompleted(); final SszList blockItems = pool.getItemsForBlock(state); @@ -178,7 +178,7 @@ void getLocalEntriesReturnsOnlyLocalEntries() { final SignedBlsToExecutionChange secondLocalEntry = dataStructureUtil.randomSignedBlsToExecutionChange(); - assertThat(pool.addRemote(remoteEntry)).isCompleted(); + assertThat(pool.addRemote(remoteEntry, Optional.empty())).isCompleted(); assertThat(pool.addLocal(secondLocalEntry)).isCompleted(); assertThat(pool.getLocallySubmitted()).containsExactlyInAnyOrder(localEntry, secondLocalEntry); @@ -241,7 +241,7 @@ void shouldNotReprocessRemoteOperations() { final SignedBlsToExecutionChange remoteEntry = dataStructureUtil.randomSignedBlsToExecutionChange(); - assertThat(pool.addRemote(remoteEntry)).isCompleted(); + assertThat(pool.addRemote(remoteEntry, Optional.empty())).isCompleted(); verify(validator, times(1)).validateForGossip(any()); stubTimeProvider.advanceTimeBySeconds(1_000_000); asyncRunner.executeDueActions(); @@ -258,7 +258,7 @@ void subscribeOperationAddedSuccessfully(final boolean isFromNetwork) { when(validator.validateForGossip(item)).thenReturn(completedFuture(ACCEPT)); if (isFromNetwork) { - assertThat(pool.addRemote(item)).isCompleted(); + assertThat(pool.addRemote(item, Optional.empty())).isCompleted(); } else { assertThat(pool.addLocal(item)).isCompleted(); } @@ -275,7 +275,7 @@ void subscribeOperationIgnored(final boolean isFromNetwork) { when(validator.validateForGossip(item)).thenReturn(completedFuture(IGNORE)); if (isFromNetwork) { - assertThat(pool.addRemote(item)).isCompleted(); + assertThat(pool.addRemote(item, Optional.empty())).isCompleted(); } else { assertThat(pool.addLocal(item)).isCompleted(); } @@ -291,14 +291,14 @@ void subscribeOperationIgnoredDuplicate(final boolean isFromNetwork) final SignedBlsToExecutionChange item = dataStructureUtil.randomSignedBlsToExecutionChange(); when(validator.validateForGossip(item)).thenReturn(completedFuture(ACCEPT)); // pre-populate cache - assertThat(pool.addRemote(item)).isCompleted(); + assertThat(pool.addRemote(item, Optional.empty())).isCompleted(); final Subscription subscription = initialiseSubscriptions(); final SafeFuture future; if (isFromNetwork) { // pre-populate cache, then try to add a second time. - future = pool.addRemote(item); + future = pool.addRemote(item, Optional.empty()); } else { future = pool.addLocal(item); } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerTest.java index 19a86234881..5d1971e090b 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -89,7 +90,9 @@ void setUp() { @Test void validateAndPrepareForBlockImport_shouldPrepareBlobSidecar() { - assertThatSafeFuture(blobSidecarManager.validateAndPrepareForBlockImport(signedBlobSidecar)) + assertThatSafeFuture( + blobSidecarManager.validateAndPrepareForBlockImport( + signedBlobSidecar, Optional.empty())) .isCompletedWithValue(InternalValidationResult.ACCEPT); verify(blobSidecarPool).onNewBlobSidecar(blobSidecar); @@ -104,7 +107,9 @@ void validateAndPrepareForBlockImport_shouldSaveForTheFuture() { when(blobSidecarValidator.validate(any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.SAVE_FOR_FUTURE)); - assertThatSafeFuture(blobSidecarManager.validateAndPrepareForBlockImport(signedBlobSidecar)) + assertThatSafeFuture( + blobSidecarManager.validateAndPrepareForBlockImport( + signedBlobSidecar, Optional.empty())) .isCompletedWithValue(InternalValidationResult.SAVE_FOR_FUTURE); verify(blobSidecarPool, never()).onNewBlobSidecar(blobSidecar); @@ -119,7 +124,9 @@ void validateAndPrepareForBlockImport_shouldReject() { when(blobSidecarValidator.validate(any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.reject("no way"))); - assertThatSafeFuture(blobSidecarManager.validateAndPrepareForBlockImport(signedBlobSidecar)) + assertThatSafeFuture( + blobSidecarManager.validateAndPrepareForBlockImport( + signedBlobSidecar, Optional.empty())) .isCompletedWithValueMatching(InternalValidationResult::isReject); verify(blobSidecarPool, never()).onNewBlobSidecar(blobSidecar); @@ -136,7 +143,9 @@ void validateAndPrepareForBlockImport_shouldIgnore() { when(blobSidecarValidator.validate(any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.IGNORE)); - assertThatSafeFuture(blobSidecarManager.validateAndPrepareForBlockImport(signedBlobSidecar)) + assertThatSafeFuture( + blobSidecarManager.validateAndPrepareForBlockImport( + signedBlobSidecar, Optional.empty())) .isCompletedWithValue(InternalValidationResult.IGNORE); verify(blobSidecarPool, never()).onNewBlobSidecar(blobSidecar); @@ -151,7 +160,9 @@ void validateAndPrepareForBlockImport_shouldRejectKnownInvalidBlobs() { invalidBlobSidecarRoots.put( blobSidecar.hashTreeRoot(), InternalValidationResult.reject("no way")); - assertThatSafeFuture(blobSidecarManager.validateAndPrepareForBlockImport(signedBlobSidecar)) + assertThatSafeFuture( + blobSidecarManager.validateAndPrepareForBlockImport( + signedBlobSidecar, Optional.empty())) .isCompletedWithValueMatching(InternalValidationResult::isReject); verify(blobSidecarValidator, never()).validate(any()); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java index d741b926f0e..933bd102f0a 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java @@ -39,6 +39,7 @@ import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.BEGIN_IMPORTING_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.COMPLETED_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.EXECUTION_PAYLOAD_RESULT_RECEIVED_LABEL; +import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.GOSSIP_VALIDATION_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.PRESTATE_RETRIEVED_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.PROCESSED_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.TRANSACTION_COMMITTED_EVENT_LABEL; @@ -719,8 +720,10 @@ void onValidateAndImportBlock_shouldLogSlowImport() { localChain.chainBuilder().generateBlockAtSlot(incrementSlot()).getBlock(); // slot 1 - secondPerSlot 6 - // arrival time - timeProvider.advanceTimeByMillis(7_000); // 1 second late + // 1 second late + final Optional arrivalTime = Optional.of(UInt64.valueOf(7000)); + // gossip validation time + timeProvider.advanceTimeByMillis(7_500); when(blockValidator.validateGossip(any())) .thenAnswer( @@ -731,7 +734,7 @@ void onValidateAndImportBlock_shouldLogSlowImport() { return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }); - assertThat(blockManager.validateAndImportBlock(block)) + assertThat(blockManager.validateAndImportBlock(block, arrivalTime)) .isCompletedWithValueMatching(InternalValidationResult::isAccept); verify(eventLogger) .lateBlockImport( @@ -740,6 +743,8 @@ void onValidateAndImportBlock_shouldLogSlowImport() { block.getProposerIndex(), ARRIVAL_EVENT_LABEL + " 1000ms, " + + GOSSIP_VALIDATION_EVENT_LABEL + + " +500ms, " + PRESTATE_RETRIEVED_EVENT_LABEL + " +3000ms, " + PROCESSED_EVENT_LABEL @@ -774,7 +779,7 @@ void onValidateAndImportBlock_shouldNotLogSlowImport() { return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }); - assertThat(blockManager.validateAndImportBlock(block)) + assertThat(blockManager.validateAndImportBlock(block, Optional.empty())) .isCompletedWithValueMatching(InternalValidationResult::isAccept); verifyNoInteractions(eventLogger); } @@ -1142,7 +1147,7 @@ private void assertImportBlockWithResult( } private void assertValidateAndImportBlockRejectWithoutValidation(final SignedBeaconBlock block) { - assertThat(blockManager.validateAndImportBlock(block)) + assertThat(blockManager.validateAndImportBlock(block, Optional.empty())) .isCompletedWithValueMatching(InternalValidationResult::isReject); verify(blockValidator, never()).validateGossip(eq(block)); verify(blockValidator, never()).validateBroadcast(any(), any(), any()); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeContributionPoolTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeContributionPoolTest.java index c27b316b254..331f1717a0f 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeContributionPoolTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeContributionPoolTest.java @@ -26,6 +26,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; +import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -207,7 +208,7 @@ private void addValidLocal(final SignedContributionAndProof proof) { } private void addValidRemote(final SignedContributionAndProof proof) { - assertThat(pool.addRemote(proof)).isCompletedWithValue(ACCEPT); + assertThat(pool.addRemote(proof, Optional.empty())).isCompletedWithValue(ACCEPT); } private void assertSyncAggregateFromContribution( diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeMessagePoolTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeMessagePoolTest.java index 7ee21306e61..4295dc5ef24 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeMessagePoolTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/synccommittee/SyncCommitteeMessagePoolTest.java @@ -81,7 +81,7 @@ void shouldNotifySubscriberWhenRemoteValidMessageAdded() { pool.subscribeOperationAdded(subscriber); when(validator.validate(message)).thenReturn(SafeFuture.completedFuture(ACCEPT)); - assertThat(pool.addRemote(message)).isCompletedWithValue(ACCEPT); + assertThat(pool.addRemote(message, Optional.empty())).isCompletedWithValue(ACCEPT); verify(subscriber).onOperationAdded(message, ACCEPT, true); } @@ -383,7 +383,7 @@ private void addValidLocal(final ValidatableSyncCommitteeMessage message0) { } private void addValidRemote(final ValidatableSyncCommitteeMessage message0) { - assertThat(pool.addRemote(message0)).isCompletedWithValue(ACCEPT); + assertThat(pool.addRemote(message0, Optional.empty())).isCompletedWithValue(ACCEPT); } private void assertMessagesPresentForSlots( diff --git a/infrastructure/time/src/main/java/tech/pegasys/teku/infrastructure/time/PerformanceTracker.java b/infrastructure/time/src/main/java/tech/pegasys/teku/infrastructure/time/PerformanceTracker.java index 5a3232ec43c..e6d9dee576c 100644 --- a/infrastructure/time/src/main/java/tech/pegasys/teku/infrastructure/time/PerformanceTracker.java +++ b/infrastructure/time/src/main/java/tech/pegasys/teku/infrastructure/time/PerformanceTracker.java @@ -28,12 +28,16 @@ public PerformanceTracker(final TimeProvider timeProvider) { this.timeProvider = timeProvider; } - public UInt64 addEvent(final String label) { - final UInt64 timestamp = timeProvider.getTimeInMillis(); + public UInt64 addEvent(final String label, final UInt64 timestamp) { events.add(Pair.of(label, timestamp)); return timestamp; } + public UInt64 addEvent(final String label) { + final UInt64 timestamp = timeProvider.getTimeInMillis(); + return addEvent(label, timestamp); + } + public void report( final UInt64 startTime, final boolean isLateEvent, diff --git a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/AttesterSlashingGossipIntegrationTest.java b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/AttesterSlashingGossipIntegrationTest.java index 279b353d2eb..b4c1d5bf835 100644 --- a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/AttesterSlashingGossipIntegrationTest.java +++ b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/AttesterSlashingGossipIntegrationTest.java @@ -53,7 +53,7 @@ public void shouldGossipToPeers() throws Exception { // Set up publishers & consumers Set receivedGossip = new HashSet<>(); final OperationProcessor operationProcessor = - (slashing) -> { + (slashing, __) -> { receivedGossip.add(slashing); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }; diff --git a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/BlsToExecutionChangeGossipIntegrationTest.java b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/BlsToExecutionChangeGossipIntegrationTest.java index f3922988300..672164db7e7 100644 --- a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/BlsToExecutionChangeGossipIntegrationTest.java +++ b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/BlsToExecutionChangeGossipIntegrationTest.java @@ -54,7 +54,7 @@ public void shouldGossipBlsToExecutionChangesToPeers() throws Exception { Set receivedSignedBlsToExecutionChange = new HashSet<>(); final OperationProcessor operationProcessor = - (msg) -> { + (msg, __) -> { receivedSignedBlsToExecutionChange.add(msg); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }; diff --git a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/GossipMessageHandlerIntegrationTest.java b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/GossipMessageHandlerIntegrationTest.java index 5d598a2e3d9..d99ba1963e8 100644 --- a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/GossipMessageHandlerIntegrationTest.java +++ b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/GossipMessageHandlerIntegrationTest.java @@ -70,7 +70,7 @@ public void shouldGossipBlocksAcrossToIndirectlyConnectedPeers() throws Exceptio b -> b.gossipEncoding(gossipEncoding) .gossipedBlockProcessor( - (block) -> { + (block, arrivalTimestamp) -> { node2ReceivedBlocks.add(block); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }); @@ -83,7 +83,7 @@ public void shouldGossipBlocksAcrossToIndirectlyConnectedPeers() throws Exceptio b -> b.gossipEncoding(gossipEncoding) .gossipedBlockProcessor( - (block) -> { + (block, arrivalTimestamp) -> { node3ReceivedBlocks.add(block); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }); @@ -132,7 +132,7 @@ public void shouldNotGossipInvalidBlocks() throws Exception { b1 -> b1.gossipEncoding(gossipEncoding) .gossipedBlockProcessor( - block -> { + (block, arrivalTimestamp) -> { // Report block as invalid return SafeFuture.completedFuture(InternalValidationResult.reject("No")); })); @@ -144,7 +144,7 @@ public void shouldNotGossipInvalidBlocks() throws Exception { b -> b.gossipEncoding(gossipEncoding) .gossipedBlockProcessor( - block -> { + (block, arrivalTimestamp) -> { node3ReceivedBlocks.add(block); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); })); @@ -192,7 +192,7 @@ public void shouldNotGossipAttestationsAcrossPeersThatAreNotOnTheSameSubnet() th b -> { b.gossipEncoding(gossipEncoding); b.gossipedAttestationProcessor( - (attestation) -> { + (attestation, __) -> { node2attestations.add(attestation); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }); @@ -242,7 +242,7 @@ public void shouldGossipAttestationsAcrossPeersThatAreOnTheSameSubnet() throws E b -> { b.gossipEncoding(gossipEncoding); b.gossipedAttestationProcessor( - (attestation) -> { + (attestation, __) -> { node2attestations.add(attestation); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }); @@ -299,7 +299,7 @@ public void shouldNotGossipAttestationsWhenPeerDeregistersFromTopic() throws Exc b -> { b.gossipEncoding(gossipEncoding); b.gossipedAttestationProcessor( - (__) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + (__, ___) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); b.processedAttestationSubscriptionProvider(processedAttestationSubscribers::subscribe); }; @@ -307,7 +307,7 @@ public void shouldNotGossipAttestationsWhenPeerDeregistersFromTopic() throws Exc b -> { b.gossipEncoding(gossipEncoding); b.gossipedAttestationProcessor( - (attestation) -> { + (attestation, __) -> { node2attestations.add(attestation); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }); diff --git a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/ProposerSlashingGossipIntegrationTest.java b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/ProposerSlashingGossipIntegrationTest.java index 963d7f70a3e..4fd806e7d45 100644 --- a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/ProposerSlashingGossipIntegrationTest.java +++ b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/ProposerSlashingGossipIntegrationTest.java @@ -54,7 +54,7 @@ public void shouldGossipToPeers() throws Exception { // Set up publishers & consumers Set receivedGossip = new HashSet<>(); final OperationProcessor operationProcessor = - (slashing) -> { + (slashing, __) -> { receivedGossip.add(slashing); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }; diff --git a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/VoluntaryExitGossipIntegrationTest.java b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/VoluntaryExitGossipIntegrationTest.java index 0f8efd25772..6896c859f8f 100644 --- a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/VoluntaryExitGossipIntegrationTest.java +++ b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/VoluntaryExitGossipIntegrationTest.java @@ -62,7 +62,7 @@ public void shouldGossipVoluntaryExitToPeers() throws Exception { // Set up publishers & consumers Set receivedVoluntaryExits = new HashSet<>(); final OperationProcessor operationProcessor = - (voluntaryExit) -> { + (voluntaryExit, __) -> { receivedVoluntaryExits.add(voluntaryExit); return SafeFuture.completedFuture(InternalValidationResult.ACCEPT); }; diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java index c0ae86d93ef..d72383c993e 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java @@ -122,6 +122,7 @@ public class Eth2P2PNetworkBuilder { gossipedSyncCommitteeMessageProcessor; protected StatusMessageFactory statusMessageFactory; protected KZG kzg; + protected boolean recordMessageArrival; protected Eth2P2PNetworkBuilder() {} @@ -318,6 +319,8 @@ protected DiscoveryNetwork buildNetwork( .peerHandlers(peerHandlers) .preparedGossipMessageFactory(defaultMessageFactory) .gossipTopicFilter(gossipTopicsFilter) + .timeProvider(timeProvider) + .recordMessageArrival(recordMessageArrival) .build(); final AttestationSubnetTopicProvider attestationSubnetTopicProvider = @@ -572,4 +575,9 @@ public Eth2P2PNetworkBuilder kzg(final KZG kzg) { this.kzg = kzg; return this; } + + public Eth2P2PNetworkBuilder recordMessageArrival(final boolean recordMessageArrival) { + this.recordMessageArrival = recordMessageArrival; + return this; + } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/AggregateGossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/AggregateGossipManager.java index 5d44624ed7e..cb0737a6db0 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/AggregateGossipManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/AggregateGossipManager.java @@ -41,10 +41,11 @@ public AggregateGossipManager( gossipNetwork, gossipEncoding, forkInfo, - proofMessage -> + (proofMessage, arrivalTimestamp) -> processor.process( ValidatableAttestation.aggregateFromNetwork( - recentChainData.getSpec(), proofMessage)), + recentChainData.getSpec(), proofMessage), + arrivalTimestamp), spec.atEpoch(forkInfo.getFork().getEpoch()) .getSchemaDefinitions() .getSignedAggregateAndProofSchema(), diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java index 50df7396518..840f50cb64c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java @@ -20,6 +20,7 @@ import java.util.stream.IntStream; import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName; import tech.pegasys.teku.networking.eth2.gossip.topics.OperationMilestoneValidator; @@ -165,7 +166,8 @@ private TopicSubnetIdAwareOperationProcessor( } @Override - public SafeFuture process(final SignedBlobSidecar blobSidecar) { + public SafeFuture process( + final SignedBlobSidecar blobSidecar, final Optional arrivalTimestamp) { final int blobSidecarSubnet = spec.computeSubnetForBlobSidecar(blobSidecar).intValue(); if (blobSidecarSubnet != subnetId) { return SafeFuture.completedFuture( @@ -173,7 +175,7 @@ public SafeFuture process(final SignedBlobSidecar blob "blob sidecar with subnet_id %s does not match the topic subnet_id %d", blobSidecarSubnet, subnetId)); } - return delegate.process(blobSidecar); + return delegate.process(blobSidecar, arrivalTimestamp); } } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/Eth2PreparedGossipMessageFactory.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/Eth2PreparedGossipMessageFactory.java index 3416035d65a..35d218e7eff 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/Eth2PreparedGossipMessageFactory.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/Eth2PreparedGossipMessageFactory.java @@ -13,9 +13,11 @@ package tech.pegasys.teku.networking.eth2.gossip.encoding; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.infrastructure.ssz.SszData; import tech.pegasys.teku.infrastructure.ssz.schema.SszSchema; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessage; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessageFactory; import tech.pegasys.teku.spec.config.NetworkingSpecConfig; @@ -28,5 +30,9 @@ public interface Eth2PreparedGossipMessageFactory extends PreparedGossipMessageF * @param valueType The concrete type to deserialize to */ PreparedGossipMessage create( - String topic, Bytes data, SszSchema valueType, NetworkingSpecConfig networkingConfig); + String topic, + Bytes data, + SszSchema valueType, + NetworkingSpecConfig networkingConfig, + Optional arrivalTimestamp); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/GossipEncoding.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/GossipEncoding.java index 64df05c03da..ff07dc84566 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/GossipEncoding.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/GossipEncoding.java @@ -21,6 +21,7 @@ import tech.pegasys.teku.infrastructure.ssz.schema.SszSchema; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessage; import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.config.NetworkingSpecConfig; public interface GossipEncoding { @@ -51,7 +52,8 @@ Eth2PreparedGossipMessageFactory createPreparedGossipMessageFactory( * Decodes preprocessed message * * @param message preprocessed raw bytes message returned earlier by {@link - * Eth2PreparedGossipMessageFactory#create(String, Bytes, SszSchema)} + * Eth2PreparedGossipMessageFactory#create(String, Bytes, SszSchema, NetworkingSpecConfig, + * Optional)} * @param valueType The concrete type to deserialize to * @return The deserialized value * @throws DecodingException If deserialization fails diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessage.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessage.java index 4a09ed48b14..10006b308e7 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessage.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessage.java @@ -20,6 +20,7 @@ import tech.pegasys.teku.infrastructure.bytes.Bytes4; import tech.pegasys.teku.infrastructure.ssz.schema.SszSchema; import tech.pegasys.teku.infrastructure.ssz.sos.SszLengthBounds; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding.ForkDigestToMilestone; import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopics; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessage; @@ -39,6 +40,7 @@ class SnappyPreparedGossipMessage implements PreparedGossipMessage { private final Uncompressor snappyCompressor; private final MessageIdCalculator messageIdCalculator; private final NetworkingSpecConfig networkingConfig; + private final Optional arrivalTimestamp; private final Supplier decodedResult = Suppliers.memoize(this::getDecodedMessage); @@ -47,9 +49,16 @@ static SnappyPreparedGossipMessage createUnknown( final String topic, final Bytes compressedData, final ForkDigestToMilestone forkDigestToMilestone, - final NetworkingSpecConfig networkingConfig) { + final NetworkingSpecConfig networkingConfig, + final Optional arrivalTimestamp) { return new SnappyPreparedGossipMessage( - topic, compressedData, forkDigestToMilestone, null, null, networkingConfig); + topic, + compressedData, + forkDigestToMilestone, + null, + null, + networkingConfig, + arrivalTimestamp); } static SnappyPreparedGossipMessage create( @@ -58,14 +67,16 @@ static SnappyPreparedGossipMessage create( final ForkDigestToMilestone forkDigestToMilestone, final SszSchema valueType, final Uncompressor snappyCompressor, - final NetworkingSpecConfig networkingConfig) { + final NetworkingSpecConfig networkingConfig, + final Optional arrivalTimestamp) { return new SnappyPreparedGossipMessage( topic, compressedData, forkDigestToMilestone, valueType, snappyCompressor, - networkingConfig); + networkingConfig, + arrivalTimestamp); } private SnappyPreparedGossipMessage( @@ -74,13 +85,15 @@ private SnappyPreparedGossipMessage( final ForkDigestToMilestone forkDigestToMilestone, final SszSchema valueType, final Uncompressor snappyCompressor, - final NetworkingSpecConfig networkingConfig) { + final NetworkingSpecConfig networkingConfig, + final Optional arrivalTimestamp) { this.compressedData = compressedData; this.valueType = valueType; this.snappyCompressor = snappyCompressor; this.networkingConfig = networkingConfig; this.messageIdCalculator = createMessageIdCalculator(topic, compressedData, forkDigestToMilestone); + this.arrivalTimestamp = arrivalTimestamp; } private MessageIdCalculator createMessageIdCalculator( @@ -139,6 +152,11 @@ public Bytes getMessageId() { .orElseGet(messageIdCalculator::getInvalidMessageId); } + @Override + public Optional getArrivalTimestamp() { + return arrivalTimestamp; + } + @FunctionalInterface interface Uncompressor { Bytes uncompress(final Bytes compressedData, final SszLengthBounds lengthBounds) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageFactory.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageFactory.java index 8af6ada0d0d..d8a7e630434 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageFactory.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageFactory.java @@ -13,9 +13,11 @@ package tech.pegasys.teku.networking.eth2.gossip.encoding; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.infrastructure.ssz.SszData; import tech.pegasys.teku.infrastructure.ssz.schema.SszSchema; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding.ForkDigestToMilestone; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessage; import tech.pegasys.teku.spec.config.NetworkingSpecConfig; @@ -37,20 +39,25 @@ public PreparedGossipMessage create( final String topic, final Bytes data, final SszSchema valueType, - final NetworkingSpecConfig networkingConfig) { + final NetworkingSpecConfig networkingConfig, + final Optional arrivalTimestamp) { return SnappyPreparedGossipMessage.create( topic, data, forkDigestToMilestone, valueType, snappyCompressor::uncompress, - networkingConfig); + networkingConfig, + arrivalTimestamp); } @Override public PreparedGossipMessage create( - final String topic, final Bytes data, final NetworkingSpecConfig networkingConfig) { + final String topic, + final Bytes data, + final NetworkingSpecConfig networkingConfig, + final Optional arrivalTimestamp) { return SnappyPreparedGossipMessage.createUnknown( - topic, data, forkDigestToMilestone, networkingConfig); + topic, data, forkDigestToMilestone, networkingConfig, arrivalTimestamp); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/SyncCommitteeSubnetSubscriptions.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/SyncCommitteeSubnetSubscriptions.java index 83c74a53e37..e044761b363 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/SyncCommitteeSubnetSubscriptions.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/SyncCommitteeSubnetSubscriptions.java @@ -66,8 +66,9 @@ public SafeFuture gossip(final SyncCommitteeMessage message, final int subnet @Override protected Eth2TopicHandler createTopicHandler(final int subnetId) { final OperationProcessor convertingProcessor = - message -> - processor.process(ValidatableSyncCommitteeMessage.fromNetwork(message, subnetId)); + (message, arrivalTimestamp) -> + processor.process( + ValidatableSyncCommitteeMessage.fromNetwork(message, subnetId), arrivalTimestamp); return new Eth2TopicHandler<>( recentChainData, asyncRunner, diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/OperationProcessor.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/OperationProcessor.java index e8a326fb1ac..9e55e044c1b 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/OperationProcessor.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/OperationProcessor.java @@ -13,13 +13,27 @@ package tech.pegasys.teku.networking.eth2.gossip.topics; +import java.util.Optional; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.statetransition.validation.InternalValidationResult; +@FunctionalInterface public interface OperationProcessor { - SafeFuture process(T operation); + /** + * Process operation + * + * @param operation Operation + * @param arrivalTimestamp arrival timestamp if operation is received from remote (not necessarily + * provided). We may consider adding meta container when we need more than only arrival time + * alongside, but for memory saving and module imports simplicity let's delay it until at + * least 2 fields are needed. + * @return result of operation validation + */ + SafeFuture process(T operation, Optional arrivalTimestamp); - OperationProcessor NOOP = (__) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT); + OperationProcessor NOOP = + (__, ___) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT); @SuppressWarnings("unchecked") static OperationProcessor noop() { diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/Eth2TopicHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/Eth2TopicHandler.java index a18d07b75bf..3aa03596cc8 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/Eth2TopicHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/Eth2TopicHandler.java @@ -16,6 +16,7 @@ import static tech.pegasys.teku.infrastructure.logging.P2PLogger.P2P_LOG; import io.libp2p.core.pubsub.ValidationResult; +import java.util.Optional; import java.util.concurrent.RejectedExecutionException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,6 +27,7 @@ import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; import tech.pegasys.teku.infrastructure.ssz.SszData; import tech.pegasys.teku.infrastructure.ssz.schema.SszSchema; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.gossip.encoding.DecodingException; import tech.pegasys.teku.networking.eth2.gossip.encoding.Eth2PreparedGossipMessageFactory; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; @@ -111,7 +113,7 @@ public SafeFuture handleMessage(PreparedGossipMessage message) return asyncRunner.runAsync( () -> processor - .process(deserialized) + .process(deserialized, message.getArrivalTimestamp()) .thenApply( internalValidation -> { processMessage(internalValidation, message); @@ -178,9 +180,10 @@ protected ValidationResult handleMessageProcessingError( } @Override - public PreparedGossipMessage prepareMessage(Bytes payload) { + public PreparedGossipMessage prepareMessage( + final Bytes payload, final Optional arrivalTimestamp) { return preparedGossipMessageFactory.create( - getTopic(), payload, getMessageType(), networkingConfig); + getTopic(), payload, getMessageType(), networkingConfig, arrivalTimestamp); } @Override diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/SingleAttestationTopicHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/SingleAttestationTopicHandler.java index 5f1d4ec024f..f88a7d6ee9c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/SingleAttestationTopicHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/topics/topichandlers/SingleAttestationTopicHandler.java @@ -38,9 +38,9 @@ public static Eth2TopicHandler createHandler( final Spec spec = recentChainData.getSpec(); OperationProcessor convertingProcessor = - attMessage -> + (attMessage, arrivalTimestamp) -> operationProcessor.process( - ValidatableAttestation.fromNetwork(spec, attMessage, subnetId)); + ValidatableAttestation.fromNetwork(spec, attMessage, subnetId), arrivalTimestamp); return new Eth2TopicHandler<>( recentChainData, diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManagerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManagerTest.java index b3fe7d2c9f8..b0c7ff9c3d4 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManagerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManagerTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.tuweni.bytes.Bytes; @@ -92,7 +93,7 @@ public void setup() { }) .when(gossipNetwork) .subscribe(any(), any()); - when(processor.process(any())) + when(processor.process(any(), any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); blobSidecarGossipManager = BlobSidecarGossipManager.create( @@ -164,7 +165,8 @@ public void testAcceptingSidecarGossipIfOnTheCorrectTopic() { // processing blob sidecar with subnet_id 1 should be accepted final SignedBlobSidecar blobSidecar = dataStructureUtil.randomSignedBlobSidecar(UInt64.ONE); final InternalValidationResult validationResult = - SafeFutureAssert.safeJoin(topicHandler.getProcessor().process(blobSidecar)); + SafeFutureAssert.safeJoin( + topicHandler.getProcessor().process(blobSidecar, Optional.empty())); assertThat(validationResult).isEqualTo(InternalValidationResult.ACCEPT); } @@ -179,7 +181,8 @@ public void testRejectingSidecarGossipIfNotOnTheCorrectTopic() { final SignedBlobSidecar blobSidecar = dataStructureUtil.randomSignedBlobSidecar(UInt64.valueOf(2)); final InternalValidationResult validationResult = - SafeFutureAssert.safeJoin(topicHandler.getProcessor().process(blobSidecar)); + SafeFutureAssert.safeJoin( + topicHandler.getProcessor().process(blobSidecar, Optional.empty())); assertThat(validationResult.isReject()).isTrue(); assertThat(validationResult.getDescription()) diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageTest.java index 4eb6c4ded62..c76f6c8eb16 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SnappyPreparedGossipMessageTest.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.bytes.Bytes4; @@ -158,12 +159,24 @@ public void getMessageId_generateUniqueIdsBasedOnValidityAndMilestone() { private SnappyPreparedGossipMessage getPhase0Message( final Bytes rawMessage, final String topic, final Uncompressor uncompressor) { return SnappyPreparedGossipMessage.create( - topic, rawMessage, forkDigestToMilestone, schema, uncompressor, spec.getNetworkingConfig()); + topic, + rawMessage, + forkDigestToMilestone, + schema, + uncompressor, + spec.getNetworkingConfig(), + Optional.empty()); } private SnappyPreparedGossipMessage getAltairMessage( final Bytes rawMessage, final String topic, final Uncompressor uncompressor) { return SnappyPreparedGossipMessage.create( - topic, rawMessage, forkDigestToMilestone, schema, uncompressor, spec.getNetworkingConfig()); + topic, + rawMessage, + forkDigestToMilestone, + schema, + uncompressor, + spec.getNetworkingConfig(), + Optional.empty()); } } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SszSnappyGossipEncodingTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SszSnappyGossipEncodingTest.java index 02a6b370f39..4a074f896ad 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SszSnappyGossipEncodingTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/encoding/SszSnappyGossipEncodingTest.java @@ -53,7 +53,7 @@ private T decode( return encoding.decodeMessage( encoding .createPreparedGossipMessageFactory(__ -> Optional.of(SpecMilestone.PHASE0)) - .create(topic, data, valueType, spec.getNetworkingConfig()), + .create(topic, data, valueType, spec.getNetworkingConfig(), Optional.empty()), valueType); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/AggregateTopicHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/AggregateTopicHandlerTest.java index a372014a2cc..ca276059c89 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/AggregateTopicHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/AggregateTopicHandlerTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.when; import io.libp2p.core.pubsub.ValidationResult; +import java.util.Optional; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.eth2.gossip.AggregateGossipManager; @@ -39,13 +40,13 @@ public void handleMessage_validAggregate() { final ValidatableAttestation aggregate = ValidatableAttestation.aggregateFromValidator( spec, dataStructureUtil.randomSignedAggregateAndProof(validSlot)); - when(processor.process(aggregate)) + when(processor.process(aggregate, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); final SafeFuture result = topicHandler.handleMessage( topicHandler.prepareMessage( - gossipEncoding.encode(aggregate.getSignedAggregateAndProof()))); + gossipEncoding.encode(aggregate.getSignedAggregateAndProof()), Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Valid); } @@ -59,7 +60,7 @@ public void handleMessage_invalidAggregate_wrongFork() { final SafeFuture result = topicHandler.handleMessage( topicHandler.prepareMessage( - gossipEncoding.encode(aggregate.getSignedAggregateAndProof()))); + gossipEncoding.encode(aggregate.getSignedAggregateAndProof()), Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); verifyNoInteractions(processor); @@ -70,13 +71,13 @@ public void handleMessage_savedForFuture() { final ValidatableAttestation aggregate = ValidatableAttestation.aggregateFromValidator( spec, dataStructureUtil.randomSignedAggregateAndProof(validSlot)); - when(processor.process(aggregate)) + when(processor.process(aggregate, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.SAVE_FOR_FUTURE)); final SafeFuture result = topicHandler.handleMessage( topicHandler.prepareMessage( - gossipEncoding.encode(aggregate.getSignedAggregateAndProof()))); + gossipEncoding.encode(aggregate.getSignedAggregateAndProof()), Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -86,13 +87,13 @@ public void handleMessage_ignoredAggregate() { final ValidatableAttestation aggregate = ValidatableAttestation.aggregateFromValidator( spec, dataStructureUtil.randomSignedAggregateAndProof(validSlot)); - when(processor.process(aggregate)) + when(processor.process(aggregate, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.IGNORE)); final SafeFuture result = topicHandler.handleMessage( topicHandler.prepareMessage( - gossipEncoding.encode(aggregate.getSignedAggregateAndProof()))); + gossipEncoding.encode(aggregate.getSignedAggregateAndProof()), Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -102,13 +103,13 @@ public void handleMessage_invalidAggregate() { final ValidatableAttestation aggregate = ValidatableAttestation.aggregateFromValidator( spec, dataStructureUtil.randomSignedAggregateAndProof(validSlot)); - when(processor.process(aggregate)) + when(processor.process(aggregate, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.reject("Nope"))); final SafeFuture result = topicHandler.handleMessage( topicHandler.prepareMessage( - gossipEncoding.encode(aggregate.getSignedAggregateAndProof()))); + gossipEncoding.encode(aggregate.getSignedAggregateAndProof()), Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/AttesterSlashingTopicHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/AttesterSlashingTopicHandlerTest.java index 84bae0b53f8..b2954d77c7e 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/AttesterSlashingTopicHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/AttesterSlashingTopicHandlerTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.when; import io.libp2p.core.pubsub.ValidationResult; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -39,11 +40,11 @@ protected Eth2TopicHandler createHandler() { @Test public void handleMessage_validSlashing() { final AttesterSlashing slashing = dataStructureUtil.randomAttesterSlashingAtSlot(validSlot); - when(processor.process(slashing)) + when(processor.process(slashing, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); Bytes serialized = gossipEncoding.encode(slashing); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Valid); } @@ -53,7 +54,7 @@ public void handleMessage_invalidSlashing_wrongFork() { final AttesterSlashing slashing = dataStructureUtil.randomAttesterSlashingAtSlot(wrongForkSlot); Bytes serialized = gossipEncoding.encode(slashing); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); verifyNoInteractions(processor); @@ -62,11 +63,11 @@ public void handleMessage_invalidSlashing_wrongFork() { @Test public void handleMessage_ignoredSlashing() { final AttesterSlashing slashing = dataStructureUtil.randomAttesterSlashingAtSlot(validSlot); - when(processor.process(slashing)) + when(processor.process(slashing, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.IGNORE)); Bytes serialized = gossipEncoding.encode(slashing); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -74,11 +75,11 @@ public void handleMessage_ignoredSlashing() { @Test public void handleMessage_rejectedSlashing() { final AttesterSlashing slashing = dataStructureUtil.randomAttesterSlashingAtSlot(validSlot); - when(processor.process(slashing)) + when(processor.process(slashing, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.reject("Nope"))); Bytes serialized = gossipEncoding.encode(slashing); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } @@ -88,7 +89,7 @@ public void handleMessage_invalidSSZ() { Bytes serialized = Bytes.fromHexString("0x1234"); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/BlockTopicHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/BlockTopicHandlerTest.java index 63404b4eaef..9bde5f5cec3 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/BlockTopicHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/BlockTopicHandlerTest.java @@ -14,10 +14,13 @@ package tech.pegasys.teku.networking.eth2.gossip.topics; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import io.libp2p.core.pubsub.ValidationResult; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -43,11 +46,11 @@ public void handleMessage_validBlock() { final SignedBeaconBlock block = chainBuilder.generateBlockAtSlot(nextSlot).getBlock(); Bytes serialized = gossipEncoding.encode(block); - when(processor.process(block)) + when(processor.process(eq(block), any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Valid); } @@ -58,11 +61,11 @@ public void handleMessage_validFutureBlock() { final SignedBeaconBlock block = chainBuilder.generateBlockAtSlot(nextSlot).getBlock(); Bytes serialized = gossipEncoding.encode(block); - when(processor.process(block)) + when(processor.process(eq(block), any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.SAVE_FOR_FUTURE)); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -72,11 +75,11 @@ public void handleMessage_invalidBlock_unknownPreState() { SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(validSlot); Bytes serialized = gossipEncoding.encode(block); - when(processor.process(block)) + when(processor.process(eq(block), any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.SAVE_FOR_FUTURE)); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -87,7 +90,7 @@ public void handleMessage_invalidBlock_wrongFork() { storageSystem.chainBuilder().getBlockAtSlot(wrongForkSlot); Bytes serialized = gossipEncoding.encode(prevForkBlock); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); assertThat(asyncRunner.countDelayedActions()).isEqualTo(0); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); verifyNoInteractions(processor); @@ -98,7 +101,7 @@ public void handleMessage_invalidBlock_invalidSSZ() { Bytes serialized = Bytes.fromHexString("0x1234"); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } @@ -114,7 +117,7 @@ public void handleMessage_invalidBlock_wrongProposer() { storageSystem.chainUpdater().setCurrentSlot(nextSlot); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2TopicHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2TopicHandlerTest.java index 56004956046..e7cd99dcabd 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2TopicHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/Eth2TopicHandlerTest.java @@ -16,6 +16,7 @@ import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture; import io.libp2p.core.pubsub.ValidationResult; +import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.RejectedExecutionException; import org.apache.tuweni.bytes.Bytes; @@ -62,10 +63,10 @@ public void handleMessage_valid() { recentChainData, spec, asyncRunner, - (b) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + (b, __) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Valid); } @@ -77,10 +78,10 @@ public void handleMessage_invalid() { recentChainData, spec, asyncRunner, - (b) -> SafeFuture.completedFuture(InternalValidationResult.reject("Nope"))); + (b, __) -> SafeFuture.completedFuture(InternalValidationResult.reject("Nope"))); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Invalid); } @@ -92,10 +93,10 @@ public void handleMessage_ignore() { recentChainData, spec, asyncRunner, - (b) -> SafeFuture.completedFuture(InternalValidationResult.IGNORE)); + (b, __) -> SafeFuture.completedFuture(InternalValidationResult.IGNORE)); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -107,10 +108,10 @@ public void handleMessage_invalidBytes() { recentChainData, spec, asyncRunner, - (b) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + (b, __) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); final Bytes invalidBytes = Bytes.fromHexString("0x0102"); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(invalidBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(invalidBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Invalid); @@ -123,14 +124,14 @@ public void handleMessage_errorWhileProcessing_decodingException() { recentChainData, spec, asyncRunner, - (b) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + (b, __) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); topicHandler.setDeserializer( (b) -> { throw new DecodingException("oops"); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Invalid); @@ -143,14 +144,14 @@ public void handleMessage_errorWhileProcessing_wrappedDecodingException() { recentChainData, spec, asyncRunner, - (b) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + (b, __) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); topicHandler.setDeserializer( (b) -> { throw new CompletionException(new DecodingException("oops")); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Invalid); @@ -163,14 +164,14 @@ public void handleMessage_errorWhileProcessing_decodingExceptionWithCause() { recentChainData, spec, asyncRunner, - (b) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + (b, __) -> SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); topicHandler.setDeserializer( (b) -> { throw new DecodingException("oops", new RuntimeException("oops")); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Invalid); @@ -183,12 +184,12 @@ public void handleMessage_errorWhileProcessing_rejectedExecution() { recentChainData, spec, asyncRunner, - (b) -> { + (b, __) -> { throw new RejectedExecutionException("No more capacity"); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Ignore); @@ -201,12 +202,12 @@ public void handleMessage_errorWhileProcessing_wrappedRejectedExecution() { recentChainData, spec, asyncRunner, - (b) -> { + (b, __) -> { throw new CompletionException(new RejectedExecutionException("No more capacity")); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Ignore); @@ -219,12 +220,12 @@ public void handleMessage_errorWhileProcessing_rejectedExecutionWithRootCause() recentChainData, spec, asyncRunner, - (b) -> { + (b, __) -> { throw new RejectedExecutionException("No more capacity", new NullPointerException()); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Ignore); @@ -237,12 +238,12 @@ public void handleMessage_errorWhileProcessing_serviceCapacityExceededExecution( recentChainData, spec, asyncRunner, - (b) -> { + (b, __) -> { throw new ServiceCapacityExceededException("No more capacity"); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Ignore); @@ -255,13 +256,13 @@ public void handleMessage_errorWhileProcessing_wrappedServiceCapacityExceededExe recentChainData, spec, asyncRunner, - (b) -> { + (b, __) -> { throw new CompletionException( new ServiceCapacityExceededException("No more capacity")); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Ignore); @@ -274,12 +275,12 @@ public void handleMessage_errorWhileProcessing_unknownError() { recentChainData, spec, asyncRunner, - (b) -> { + (b, __) -> { throw new NullPointerException(); }); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes)); + topicHandler.handleMessage(topicHandler.prepareMessage(blockBytes, Optional.empty())); asyncRunner.executeQueuedActions(); assertThatSafeFuture(result).isCompletedWithValue(ValidationResult.Invalid); diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/ProposerSlashingTopicHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/ProposerSlashingTopicHandlerTest.java index b28a47d87f3..32b80a67a17 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/ProposerSlashingTopicHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/ProposerSlashingTopicHandlerTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.when; import io.libp2p.core.pubsub.ValidationResult; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -46,11 +47,11 @@ protected Eth2TopicHandler createHandler() { public void handleMessage_validSlashing() { final ProposerSlashing slashing = dataStructureUtil.randomProposerSlashing(validSlot, UInt64.ZERO); - when(processor.process(slashing)) + when(processor.process(slashing, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); Bytes serialized = gossipEncoding.encode(slashing); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Valid); } @@ -61,7 +62,7 @@ public void handleMessage_invalidSlashing_wrongFork() { dataStructureUtil.randomProposerSlashing(wrongForkSlot, UInt64.ZERO); Bytes serialized = gossipEncoding.encode(slashing); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); verifyNoInteractions(processor); @@ -71,11 +72,11 @@ public void handleMessage_invalidSlashing_wrongFork() { public void handleMessage_ignoredSlashing() { final ProposerSlashing slashing = dataStructureUtil.randomProposerSlashing(validSlot, UInt64.ZERO); - when(processor.process(slashing)) + when(processor.process(slashing, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.IGNORE)); Bytes serialized = gossipEncoding.encode(slashing); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -84,11 +85,11 @@ public void handleMessage_ignoredSlashing() { public void handleMessage_rejectedSlashing() { final ProposerSlashing slashing = dataStructureUtil.randomProposerSlashing(validSlot, UInt64.ZERO); - when(processor.process(slashing)) + when(processor.process(slashing, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.reject("Nope"))); Bytes serialized = gossipEncoding.encode(slashing); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } @@ -98,7 +99,7 @@ public void handleMessage_invalidSSZ() { Bytes serialized = Bytes.fromHexString("0x1234"); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/SingleAttestationTopicHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/SingleAttestationTopicHandlerTest.java index 2649016ca97..3f9607898d9 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/SingleAttestationTopicHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/SingleAttestationTopicHandlerTest.java @@ -19,6 +19,7 @@ import io.libp2p.core.pubsub.ValidationResult; import java.util.List; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.Test; import tech.pegasys.teku.bls.BLSKeyGenerator; @@ -57,12 +58,12 @@ public void handleMessage_valid() { final ValidatableAttestation attestation = ValidatableAttestation.fromNetwork( spec, attestationGenerator.validAttestation(blockAndState), SUBNET_ID); - when(processor.process(attestation)) + when(processor.process(attestation, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); final Bytes serialized = gossipEncoding.encode(attestation.getAttestation()); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Valid); } @@ -78,7 +79,7 @@ public void handleMessage_invalid_wrongFork() { final Bytes serialized = gossipEncoding.encode(attestation.getAttestation()); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); verifyNoInteractions(processor); @@ -91,12 +92,12 @@ public void handleMessage_ignored() { final ValidatableAttestation attestation = ValidatableAttestation.fromNetwork( spec, attestationGenerator.validAttestation(blockAndState), SUBNET_ID); - when(processor.process(attestation)) + when(processor.process(attestation, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.IGNORE)); final Bytes serialized = gossipEncoding.encode(attestation.getAttestation()); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -108,12 +109,12 @@ public void handleMessage_saveForFuture() { final ValidatableAttestation attestation = ValidatableAttestation.fromNetwork( spec, attestationGenerator.validAttestation(blockAndState), SUBNET_ID); - when(processor.process(attestation)) + when(processor.process(attestation, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.SAVE_FOR_FUTURE)); final Bytes serialized = gossipEncoding.encode(attestation.getAttestation()); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -125,12 +126,12 @@ public void handleMessage_invalid() { final ValidatableAttestation attestation = ValidatableAttestation.fromNetwork( spec, attestationGenerator.validAttestation(blockAndState), SUBNET_ID); - when(processor.process(attestation)) + when(processor.process(attestation, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.reject("Nope"))); final Bytes serialized = gossipEncoding.encode(attestation.getAttestation()); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } @@ -140,7 +141,7 @@ public void handleMessage_invalidAttestation_invalidSSZ() { final Bytes serialized = Bytes.fromHexString("0x3456"); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/VoluntaryExitTopicHandlerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/VoluntaryExitTopicHandlerTest.java index 306c47a004d..b45c6561293 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/VoluntaryExitTopicHandlerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/topics/VoluntaryExitTopicHandlerTest.java @@ -19,6 +19,7 @@ import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin; import io.libp2p.core.pubsub.ValidationResult; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -58,11 +59,11 @@ protected StorageSystem createStorageSystem() { @Test public void handleMessage_validExit() { final SignedVoluntaryExit exit = exitGenerator.withEpoch(getBestState(), validEpoch, 3); - when(processor.process(exit)) + when(processor.process(exit, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); Bytes serialized = gossipEncoding.encode(exit); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Valid); } @@ -74,7 +75,7 @@ public void handleMessage_invalidExit_wrongFork() { getBestState(), spec.computeEpochAtSlot(wrongForkSlot).intValue(), 3); Bytes serialized = gossipEncoding.encode(exit); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Invalid); verifyNoInteractions(processor); @@ -83,11 +84,11 @@ public void handleMessage_invalidExit_wrongFork() { @Test public void handleMessage_ignoredExit() { final SignedVoluntaryExit exit = exitGenerator.withEpoch(getBestState(), validEpoch, 3); - when(processor.process(exit)) + when(processor.process(exit, Optional.empty())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.IGNORE)); Bytes serialized = gossipEncoding.encode(exit); final SafeFuture result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)); + topicHandler.handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())); asyncRunner.executeQueuedActions(); assertThat(result).isCompletedWithValue(ValidationResult.Ignore); } @@ -97,7 +98,9 @@ public void handleMessage_invalidSSZ() { Bytes serialized = Bytes.fromHexString("0x1234"); final ValidationResult result = - topicHandler.handleMessage(topicHandler.prepareMessage(serialized)).join(); + topicHandler + .handleMessage(topicHandler.prepareMessage(serialized, Optional.empty())) + .join(); assertThat(result).isEqualTo(ValidationResult.Invalid); } diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java index 416ddbc248a..c901cd28292 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java @@ -283,6 +283,7 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { gossipEncoding.createPreparedGossipMessageFactory( recentChainData::getMilestoneByForkDigest)) .gossipTopicFilter(gossipTopicsFilter) + .timeProvider(timeProvider) .build()) .peerPools(peerPools) .peerSelectionStrategy( diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/PreparedGossipMessage.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/PreparedGossipMessage.java index ff6775a4389..8e7b0e1010a 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/PreparedGossipMessage.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/PreparedGossipMessage.java @@ -15,11 +15,12 @@ import java.util.Optional; import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; /** * Semi-processed raw gossip message which can supply Gossip 'message-id' * - * @see TopicHandler#prepareMessage(Bytes) + * @see TopicHandler#prepareMessage(Bytes, Optional) */ public interface PreparedGossipMessage { @@ -36,6 +37,8 @@ public interface PreparedGossipMessage { Bytes getOriginalMessage(); + Optional getArrivalTimestamp(); + class DecodedMessageResult { private final Optional decodedMessage; private final Optional decodingException; diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/PreparedGossipMessageFactory.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/PreparedGossipMessageFactory.java index 068cbf492d3..112289203b7 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/PreparedGossipMessageFactory.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/PreparedGossipMessageFactory.java @@ -13,12 +13,18 @@ package tech.pegasys.teku.networking.p2p.gossip; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.config.NetworkingSpecConfig; /** Factory for {@link PreparedGossipMessage} instances */ public interface PreparedGossipMessageFactory { /** Creates a {@link PreparedGossipMessage} instance */ - PreparedGossipMessage create(String topic, Bytes payload, NetworkingSpecConfig networkingConfig); + PreparedGossipMessage create( + String topic, + Bytes payload, + NetworkingSpecConfig networkingConfig, + Optional arrivalTimestamp); } diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/TopicHandler.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/TopicHandler.java index 99db434f555..fe545d570ef 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/TopicHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/TopicHandler.java @@ -14,20 +14,22 @@ package tech.pegasys.teku.networking.p2p.gossip; import io.libp2p.core.pubsub.ValidationResult; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; public interface TopicHandler { - /** * Preprocess 'raw' Gossip message returning the instance which may calculate Gossip 'message-id' * and cache intermediate data for later message handling with {@link - * #handleMessage(PreparedGossipMessage)} + * #handleMessage(PreparedGossipMessage)}. Also packs it with arrivalTimestamp when available */ - PreparedGossipMessage prepareMessage(Bytes payload); + PreparedGossipMessage prepareMessage(Bytes payload, Optional arrivalTimestamp); /** - * Validates and handles gossip message preprocessed earlier by {@link #prepareMessage(Bytes)} + * Validates and handles gossip message preprocessed earlier by {@link #prepareMessage(Bytes, + * Optional)} * * @param message The preprocessed gossip message * @return Message validation promise diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java index 715f4d53b41..374a6fddfd7 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java @@ -37,6 +37,7 @@ import java.util.List; import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.version.VersionProvider; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessageFactory; import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.PrivateKeyProvider; @@ -58,6 +59,7 @@ * might be changed in any version in backward incompatible way */ public class LibP2PNetworkBuilder { + public static final boolean DEFAULT_RECORD_MESSAGE_ARRIVAL = false; public static LibP2PNetworkBuilder create() { return new LibP2PNetworkBuilder(); @@ -73,6 +75,7 @@ public static LibP2PNetworkBuilder create() { protected List peerHandlers; protected PreparedGossipMessageFactory preparedGossipMessageFactory; protected GossipTopicFilter gossipTopicFilter; + protected TimeProvider timeProvider; protected Firewall firewall = new Firewall(Duration.ofSeconds(30)); protected MuxFirewall muxFirewall = @@ -85,6 +88,7 @@ public static LibP2PNetworkBuilder create() { protected List> rpcHandlers; protected PeerManager peerManager; + protected boolean recordMessageArrival = DEFAULT_RECORD_MESSAGE_ARRIVAL; protected LibP2PNetworkBuilder() {} @@ -126,6 +130,8 @@ protected LibP2PGossipNetwork createGossipNetwork() { .defaultMessageFactory(preparedGossipMessageFactory) .gossipTopicFilter(gossipTopicFilter) .logWireGossip(config.getWireLogsConfig().isLogWireGossip()) + .timeProvider(timeProvider) + .recordArrivalTime(recordMessageArrival) .build(); } @@ -276,4 +282,14 @@ public LibP2PNetworkBuilder muxFirewall(MuxFirewall muxFirewall) { this.muxFirewall = muxFirewall; return this; } + + public LibP2PNetworkBuilder timeProvider(final TimeProvider timeProvider) { + this.timeProvider = timeProvider; + return this; + } + + public LibP2PNetworkBuilder recordMessageArrival(final boolean recordMessageArrival) { + this.recordMessageArrival = recordMessageArrival; + return this; + } } diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PGossipNetworkBuilder.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PGossipNetworkBuilder.java index 9491b4dc9e2..9d17e1f3e36 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PGossipNetworkBuilder.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PGossipNetworkBuilder.java @@ -13,6 +13,8 @@ package tech.pegasys.teku.networking.p2p.libp2p.gossip; +import static com.google.common.base.Preconditions.checkState; +import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetworkBuilder.DEFAULT_RECORD_MESSAGE_ARRIVAL; import static tech.pegasys.teku.networking.p2p.libp2p.config.LibP2PParamsFactory.MAX_SUBSCRIPTIONS_PER_MESSAGE; import static tech.pegasys.teku.networking.p2p.libp2p.gossip.LibP2PGossipNetwork.NULL_SEQNO_GENERATOR; import static tech.pegasys.teku.networking.p2p.libp2p.gossip.LibP2PGossipNetwork.STRICT_FIELDS_VALIDATOR; @@ -39,6 +41,8 @@ import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.hyperledger.besu.plugin.services.MetricsSystem; +import tech.pegasys.teku.infrastructure.time.TimeProvider; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessage; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessageFactory; import tech.pegasys.teku.networking.p2p.gossip.config.GossipConfig; @@ -64,23 +68,40 @@ public static LibP2PGossipNetworkBuilder create() { protected PreparedGossipMessageFactory defaultMessageFactory; protected GossipTopicFilter gossipTopicFilter; protected boolean logWireGossip; + protected TimeProvider timeProvider; + protected boolean recordArrivalTime = DEFAULT_RECORD_MESSAGE_ARRIVAL; protected ChannelHandler debugGossipHandler = null; protected LibP2PGossipNetworkBuilder() {} public LibP2PGossipNetwork build() { - GossipTopicHandlers topicHandlers = new GossipTopicHandlers(); - Gossip gossip = createGossip(gossipConfig, logWireGossip, gossipTopicFilter, topicHandlers); - PubsubPublisherApi publisher = gossip.createPublisher(null, NULL_SEQNO_GENERATOR); + validate(); + final GossipTopicHandlers topicHandlers = new GossipTopicHandlers(); + final Gossip gossip = + createGossip(gossipConfig, logWireGossip, gossipTopicFilter, topicHandlers); + final PubsubPublisherApi publisher = gossip.createPublisher(null, NULL_SEQNO_GENERATOR); return new LibP2PGossipNetwork(metricsSystem, gossip, publisher, topicHandlers); } + private void validate() { + assertNotNull("metricsSystem", metricsSystem); + assertNotNull("gossipConfig", gossipConfig); + assertNotNull("networkingSpecConfig", networkingSpecConfig); + assertNotNull("defaultMessageFactory", defaultMessageFactory); + assertNotNull("gossipTopicFilter", gossipTopicFilter); + assertNotNull("timeProvider", timeProvider); + } + + private void assertNotNull(final String fieldName, final Object fieldValue) { + checkState(fieldValue != null, "Field " + fieldName + " must be set."); + } + protected GossipRouter createGossipRouter( - GossipConfig gossipConfig, - GossipTopicFilter gossipTopicFilter, - GossipTopicHandlers topicHandlers) { + final GossipConfig gossipConfig, + final GossipTopicFilter gossipTopicFilter, + final GossipTopicHandlers topicHandlers) { final GossipParams gossipParams = LibP2PParamsFactory.createGossipParams(gossipConfig); final GossipScoreParams scoreParams = @@ -109,14 +130,22 @@ protected GossipRouter createGossipRouter( Preconditions.checkArgument( msg.getTopicIDsCount() == 1, "Unexpected number of topics for a single message: " + msg.getTopicIDsCount()); - String topic = msg.getTopicIDs(0); - Bytes payload = Bytes.wrap(msg.getData().toByteArray()); - - PreparedGossipMessage preparedMessage = + final Optional arrivalTimestamp; + if (recordArrivalTime) { + arrivalTimestamp = Optional.of(timeProvider.getTimeInMillis()); + } else { + arrivalTimestamp = Optional.empty(); + } + final String topic = msg.getTopicIDs(0); + final Bytes payload = Bytes.wrap(msg.getData().toByteArray()); + + final PreparedGossipMessage preparedMessage = topicHandlers .getHandlerForTopic(topic) - .map(handler -> handler.prepareMessage(payload)) - .orElse(defaultMessageFactory.create(topic, payload, networkingSpecConfig)); + .map(handler -> handler.prepareMessage(payload, arrivalTimestamp)) + .orElse( + defaultMessageFactory.create( + topic, payload, networkingSpecConfig, arrivalTimestamp)); return new PreparedPubsubMessage(msg, preparedMessage); }); @@ -125,12 +154,12 @@ protected GossipRouter createGossipRouter( } protected Gossip createGossip( - GossipConfig gossipConfig, - boolean gossipLogsEnabled, - GossipTopicFilter gossipTopicFilter, - GossipTopicHandlers topicHandlers) { + final GossipConfig gossipConfig, + final boolean gossipLogsEnabled, + final GossipTopicFilter gossipTopicFilter, + final GossipTopicHandlers topicHandlers) { - GossipRouter router = createGossipRouter(gossipConfig, gossipTopicFilter, topicHandlers); + final GossipRouter router = createGossipRouter(gossipConfig, gossipTopicFilter, topicHandlers); if (gossipLogsEnabled) { if (debugGossipHandler != null) { @@ -139,17 +168,17 @@ protected Gossip createGossip( } debugGossipHandler = new LoggingHandler("wire.gossip", LogLevel.DEBUG); } - PubsubApi pubsubApi = PubsubApiKt.createPubsubApi(router); + final PubsubApi pubsubApi = PubsubApiKt.createPubsubApi(router); return new Gossip(router, pubsubApi, debugGossipHandler); } - public LibP2PGossipNetworkBuilder metricsSystem(MetricsSystem metricsSystem) { + public LibP2PGossipNetworkBuilder metricsSystem(final MetricsSystem metricsSystem) { this.metricsSystem = metricsSystem; return this; } - public LibP2PGossipNetworkBuilder gossipConfig(GossipConfig gossipConfig) { + public LibP2PGossipNetworkBuilder gossipConfig(final GossipConfig gossipConfig) { this.gossipConfig = gossipConfig; return this; } @@ -161,23 +190,33 @@ public LibP2PGossipNetworkBuilder networkingSpecConfig( } public LibP2PGossipNetworkBuilder defaultMessageFactory( - PreparedGossipMessageFactory defaultMessageFactory) { + final PreparedGossipMessageFactory defaultMessageFactory) { this.defaultMessageFactory = defaultMessageFactory; return this; } - public LibP2PGossipNetworkBuilder gossipTopicFilter(GossipTopicFilter gossipTopicFilter) { + public LibP2PGossipNetworkBuilder gossipTopicFilter(final GossipTopicFilter gossipTopicFilter) { this.gossipTopicFilter = gossipTopicFilter; return this; } - public LibP2PGossipNetworkBuilder logWireGossip(boolean logWireGossip) { + public LibP2PGossipNetworkBuilder logWireGossip(final boolean logWireGossip) { this.logWireGossip = logWireGossip; return this; } - public LibP2PGossipNetworkBuilder debugGossipHandler(ChannelHandler debugGossipHandler) { + public LibP2PGossipNetworkBuilder debugGossipHandler(final ChannelHandler debugGossipHandler) { this.debugGossipHandler = debugGossipHandler; return this; } + + public LibP2PGossipNetworkBuilder timeProvider(final TimeProvider timeProvider) { + this.timeProvider = timeProvider; + return this; + } + + public LibP2PGossipNetworkBuilder recordArrivalTime(final boolean recordArrivalTime) { + this.recordArrivalTime = recordArrivalTime; + return this; + } } diff --git a/networking/p2p/src/testFixtures/java/tech/pegasys/teku/network/p2p/DiscoveryNetworkFactory.java b/networking/p2p/src/testFixtures/java/tech/pegasys/teku/network/p2p/DiscoveryNetworkFactory.java index bb0786cc474..5fef5f9be3a 100644 --- a/networking/p2p/src/testFixtures/java/tech/pegasys/teku/network/p2p/DiscoveryNetworkFactory.java +++ b/networking/p2p/src/testFixtures/java/tech/pegasys/teku/network/p2p/DiscoveryNetworkFactory.java @@ -126,10 +126,11 @@ public DiscoveryNetwork buildAndStart() throws Exception { .rpcMethods(Collections.emptyList()) .peerHandlers(Collections.emptyList()) .preparedGossipMessageFactory( - (topic, payload, networkingSpecConfig) -> { + (topic, payload, networkingSpecConfig, arrivalTimestamp) -> { throw new UnsupportedOperationException(); }) .gossipTopicFilter(topic -> true) + .timeProvider(StubTimeProvider.withTimeInMillis(0)) .build()) .peerPools(peerPools) .peerSelectionStrategy(peerSelectionStrategy) diff --git a/networking/p2p/src/testFixtures/java/tech/pegasys/teku/network/p2p/jvmlibp2p/MockMessageApi.java b/networking/p2p/src/testFixtures/java/tech/pegasys/teku/network/p2p/jvmlibp2p/MockMessageApi.java index 8981f1732a9..edc886142ad 100644 --- a/networking/p2p/src/testFixtures/java/tech/pegasys/teku/network/p2p/jvmlibp2p/MockMessageApi.java +++ b/networking/p2p/src/testFixtures/java/tech/pegasys/teku/network/p2p/jvmlibp2p/MockMessageApi.java @@ -20,11 +20,13 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.tuweni.bytes.Bytes; import org.jetbrains.annotations.NotNull; import pubsub.pb.Rpc.Message; import tech.pegasys.teku.infrastructure.crypto.Hash; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessage; import tech.pegasys.teku.networking.p2p.libp2p.gossip.PreparedPubsubMessage; @@ -88,6 +90,11 @@ public DecodedMessageResult getDecodedMessage() { public Bytes getOriginalMessage() { return Bytes.wrapByteBuf(data); } + + @Override + public Optional getArrivalTimestamp() { + return Optional.empty(); + } }; return new PreparedPubsubMessage(protoMessage, preparedMessage); } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 0aa0c3ef45d..b22168acaa6 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -1011,6 +1011,7 @@ protected void initP2PNetwork() { .requiredCheckpoint(weakSubjectivityValidator.getWSCheckpoint()) .specProvider(spec) .kzg(kzg) + .recordMessageArrival(true) .build(); syncCommitteeMessagePool.subscribeOperationAdded(