From 06329ec2d105fc60c4c7218561cff11fb6b398a3 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 17 Jan 2025 01:27:08 +0700 Subject: [PATCH] `SingleAttestation` implementation (#6488) * First pass * Add restrictions to RuntimeVariableList api * Use empty_uninitialized and fix warnings * Fix some todos * Merge branch 'unstable' into max-blobs-preset * Fix take impl on RuntimeFixedList * cleanup * Fix test compilations * Fix some more tests * Fix test from unstable * Merge branch 'unstable' into max-blobs-preset * SingleAttestation * Add post attestation v2 endpoint logic to attestation service * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation * Implement "Bugfix and more withdrawal tests" * Implement "Add missed exit checks to consolidation processing" * Implement "Update initial earliest_exit_epoch calculation" * Implement "Limit consolidating balance by validator.effective_balance" * Implement "Use 16-bit random value in validator filter" * Implement "Do not change creds type on consolidation" * some tests and fixed attestqtion calc * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation * Rename PendingPartialWithdraw index field to validator_index * Skip slots to get test to pass and add TODO * Implement "Synchronously check all transactions to have non-zero length" * Merge remote-tracking branch 'origin/unstable' into max-blobs-preset * Remove footgun function * Minor simplifications * Move from preset to config * Fix typo * Revert "Remove footgun function" This reverts commit de01f923c7452355c87f50c0e8031ca94fa00d36. * Try fixing tests * Implement "bump minimal preset MAX_BLOB_COMMITMENTS_PER_BLOCK and KZG_COMMITMENT_INCLUSION_PROOF_DEPTH" * Thread through ChainSpec * Fix release tests * Move RuntimeFixedVector into module and rename * Add test * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation * Added more test coverage, simplified Attestation conversion, and other minor refactors * Removed unusued codepaths * Fix failing test * Implement "Remove post-altair `initialize_beacon_state_from_eth1` from specs" * Update preset YAML * Remove empty RuntimeVarList awefullness * Make max_blobs_per_block a config parameter (#6329) Squashed commit of the following: commit 04b3743ec1e0b650269dd8e58b540c02430d1c0d Author: Michael Sproul Date: Mon Jan 6 17:36:58 2025 +1100 Add test commit 440e85419940d4daba406d910e7908dd1fe78668 Author: Michael Sproul Date: Mon Jan 6 17:24:50 2025 +1100 Move RuntimeFixedVector into module and rename commit f66e179a40c3917eee39a93534ecf75480172699 Author: Michael Sproul Date: Mon Jan 6 17:17:17 2025 +1100 Fix release tests commit e4bfe71cd1f0a2784d0bd57f85b2f5d8cf503ac1 Author: Michael Sproul Date: Mon Jan 6 17:05:30 2025 +1100 Thread through ChainSpec commit 063b79c16abd3f6df47b85efcf3858177bc933b9 Author: Michael Sproul Date: Mon Jan 6 15:32:16 2025 +1100 Try fixing tests commit 88bedf09bc647de66bd1ff944bbc8fb13e2b7590 Author: Michael Sproul Date: Mon Jan 6 15:04:37 2025 +1100 Revert "Remove footgun function" This reverts commit de01f923c7452355c87f50c0e8031ca94fa00d36. commit 32483d385b66f252d50cee5b524e2924157bdcd4 Author: Michael Sproul Date: Mon Jan 6 15:04:32 2025 +1100 Fix typo commit 2e86585b478c012f6e3483989c87e38161227674 Author: Michael Sproul Date: Mon Jan 6 15:04:15 2025 +1100 Move from preset to config commit 1095d60a40be20dd3c229b759fc3c228b51e51e3 Author: Michael Sproul Date: Mon Jan 6 14:38:40 2025 +1100 Minor simplifications commit de01f923c7452355c87f50c0e8031ca94fa00d36 Author: Michael Sproul Date: Mon Jan 6 14:06:57 2025 +1100 Remove footgun function commit 0c2c8c42245c25b8cf17885faf20acd3b81140ec Merge: 21ecb58ff f51a292f7 Author: Michael Sproul Date: Mon Jan 6 14:02:50 2025 +1100 Merge remote-tracking branch 'origin/unstable' into max-blobs-preset commit f51a292f77575a1786af34271fb44954f141c377 Author: Daniel Knopik <107140945+dknopik@users.noreply.github.com> Date: Fri Jan 3 20:27:21 2025 +0100 fully lint only explicitly to avoid unnecessary rebuilds (#6753) * fully lint only explicitly to avoid unnecessary rebuilds commit 7e0cddef321c2a069582c65b58e5f46590d60c49 Author: Akihito Nakano Date: Tue Dec 24 10:38:56 2024 +0900 Make sure we have fanout peers when publish (#6738) * Ensure that `fanout_peers` is always non-empty if it's `Some` commit 21ecb58ff88b86435ab62d9ac227394c10fdcd22 Merge: 2fcb2935e 9aefb5539 Author: Pawan Dhananjay Date: Mon Oct 21 14:46:00 2024 -0700 Merge branch 'unstable' into max-blobs-preset commit 2fcb2935ec7ef4cd18bbdd8aedb7de61fac69e61 Author: Pawan Dhananjay Date: Fri Sep 6 18:28:31 2024 -0700 Fix test from unstable commit 12c6ef118a1a6d910c48d9d4b23004f3609264c7 Author: Pawan Dhananjay Date: Wed Sep 4 16:16:36 2024 -0700 Fix some more tests commit d37733b846ce58e318e976d6503ca394b4901141 Author: Pawan Dhananjay Date: Wed Sep 4 12:47:36 2024 -0700 Fix test compilations commit 52bb581e071d5f474d519366e860a4b3a0b52f78 Author: Pawan Dhananjay Date: Tue Sep 3 18:38:19 2024 -0700 cleanup commit e71020e3e613910e0315f558ead661b490a0ff20 Author: Pawan Dhananjay Date: Tue Sep 3 17:16:10 2024 -0700 Fix take impl on RuntimeFixedList commit 13f9bba6470b2140e5c34f14aed06dab2b062c1c Merge: 60100fc6b 4e675cf5d Author: Pawan Dhananjay Date: Tue Sep 3 16:08:59 2024 -0700 Merge branch 'unstable' into max-blobs-preset commit 60100fc6be72792ff33913d7e5a53434c792aacf Author: Pawan Dhananjay Date: Fri Aug 30 16:04:11 2024 -0700 Fix some todos commit a9cb329a221a809f7dd818984753826f91c2e26b Author: Pawan Dhananjay Date: Fri Aug 30 15:54:00 2024 -0700 Use empty_uninitialized and fix warnings commit 4dc6e6515ecf75cefa4de840edc7b57e76a8fc9e Author: Pawan Dhananjay Date: Fri Aug 30 15:53:18 2024 -0700 Add restrictions to RuntimeVariableList api commit 25feedfde348b530c4fa2348cc71a06b746898ed Author: Pawan Dhananjay Date: Thu Aug 29 16:11:19 2024 -0700 First pass * Fix tests * Implement max_blobs_per_block_electra * Fix config issues * Simplify BlobSidecarListFromRoot * Disable PeerDAS tests * Cleanup single attestation imports * Fix some single attestation network plumbing * Merge remote-tracking branch 'origin/unstable' into max-blobs-preset * Bump quota to account for new target (6) * Remove clone * Fix issue from review * Try to remove ugliness * Merge branch 'unstable' into max-blobs-preset * Merge remote-tracking branch 'origin/unstable' into electra-alpha10 * Merge commit '04b3743ec1e0b650269dd8e58b540c02430d1c0d' into electra-alpha10 * Merge remote-tracking branch 'pawan/max-blobs-preset' into electra-alpha10 * Update tests to v1.5.0-beta.0 * Merge remote-tracking branch 'origin/electra-alpha10' into single_attestation * Fix some tests * Cargo fmt * lint * fmt * Resolve merge conflicts * Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation * lint * Linting * fmt * Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation * Fmt * Fix test and add TODO * Gracefully handle slashed proposers in fork choice tests * Merge remote-tracking branch 'origin/unstable' into electra-alpha10 * Keep latest changes from max_blobs_per_block PR in codec.rs * Revert a few more regressions and add a comment * Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation * Disable more DAS tests * Improve validator monitor test a little * Make test more robust * Fix sync test that didn't understand blobs * Fill out cropped comment * Merge remote-tracking branch 'origin/electra-alpha10' into single_attestation * Merge remote-tracking branch 'origin/unstable' into single_attestation * Merge remote-tracking branch 'origin/unstable' into single_attestation * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation * publish_attestations should accept Either * log an error when failing to convert to SingleAttestation * Use Cow to avoid clone * Avoid reconverting to SingleAttestation * Tweak VC error message * update comments * update comments * pass in single attestation as ref to subnetid calculation method * Improved API, new error variants and other minor tweaks * Fix single_attestation event topic boilerplate * fix sse event failure * Add single_attestation event topic test coverage --- Cargo.lock | 1 + .../src/attestation_verification.rs | 13 +- beacon_node/beacon_chain/src/beacon_chain.rs | 26 +- beacon_node/beacon_chain/src/events.rs | 15 ++ beacon_node/beacon_chain/src/test_utils.rs | 245 ++++++++++++++++++ beacon_node/http_api/Cargo.toml | 1 + beacon_node/http_api/src/lib.rs | 53 +++- .../http_api/src/publish_attestations.rs | 95 +++++-- beacon_node/http_api/tests/fork_tests.rs | 4 - .../http_api/tests/interactive_tests.rs | 59 +++-- beacon_node/http_api/tests/tests.rs | 99 ++++++- .../lighthouse_network/src/types/pubsub.rs | 83 +++--- .../src/network_beacon_processor/mod.rs | 52 ++++ beacon_node/network/src/router.rs | 11 + beacon_node/network/src/service.rs | 18 +- beacon_node/network/src/subnet_service/mod.rs | 6 +- common/eth2/src/lib.rs | 4 +- common/eth2/src/types.rs | 10 + consensus/types/src/attestation.rs | 94 ++++++- consensus/types/src/lib.rs | 2 +- consensus/types/src/subnet_id.rs | 16 ++ .../src/attestation_service.rs | 24 +- 22 files changed, 829 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa9bdd2afc7..29ffdc49bae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3951,6 +3951,7 @@ dependencies = [ "bs58 0.4.0", "bytes", "directory", + "either", "eth1", "eth2", "ethereum_serde_utils", diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index c3dea3dbb40..ffaf61e41af 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -62,7 +62,7 @@ use tree_hash::TreeHash; use types::{ Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof, - SignedAggregateAndProof, Slot, SubnetId, + SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations}; @@ -317,12 +317,22 @@ pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> { attestation: AttestationRef<'a, T::EthSpec>, indexed_attestation: IndexedAttestation, subnet_id: SubnetId, + validator_index: usize, } impl VerifiedUnaggregatedAttestation<'_, T> { pub fn into_indexed_attestation(self) -> IndexedAttestation { self.indexed_attestation } + + pub fn single_attestation(&self) -> Option { + Some(SingleAttestation { + committee_index: self.attestation.committee_index()? as usize, + attester_index: self.validator_index, + data: self.attestation.data().clone(), + signature: self.attestation.signature().clone(), + }) + } } /// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive @@ -1035,6 +1045,7 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { attestation, indexed_attestation, subnet_id, + validator_index: validator_index as usize, }) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a6da610c0e8..d0c294b44ff 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2035,10 +2035,30 @@ impl BeaconChain { |v| { // This method is called for API and gossip attestations, so this covers all unaggregated attestation events if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_single_attestation_subscribers() { + let current_fork = self + .spec + .fork_name_at_slot::(v.attestation().data().slot); + if current_fork.electra_enabled() { + // I don't see a situation where this could return None. The upstream unaggregated attestation checks + // should have already verified that this is an attestation with a single committee bit set. + if let Some(single_attestation) = v.single_attestation() { + event_handler.register(EventKind::SingleAttestation(Box::new( + single_attestation, + ))); + } + } + } + if event_handler.has_attestation_subscribers() { - event_handler.register(EventKind::Attestation(Box::new( - v.attestation().clone_as_attestation(), - ))); + let current_fork = self + .spec + .fork_name_at_slot::(v.attestation().data().slot); + if !current_fork.electra_enabled() { + event_handler.register(EventKind::Attestation(Box::new( + v.attestation().clone_as_attestation(), + ))); + } } } metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 267d56220c9..8c342893ae3 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -8,6 +8,7 @@ const DEFAULT_CHANNEL_CAPACITY: usize = 16; pub struct ServerSentEventHandler { attestation_tx: Sender>, + single_attestation_tx: Sender>, block_tx: Sender>, blob_sidecar_tx: Sender>, finalized_tx: Sender>, @@ -37,6 +38,7 @@ impl ServerSentEventHandler { pub fn new_with_capacity(log: Logger, capacity: usize) -> Self { let (attestation_tx, _) = broadcast::channel(capacity); + let (single_attestation_tx, _) = broadcast::channel(capacity); let (block_tx, _) = broadcast::channel(capacity); let (blob_sidecar_tx, _) = broadcast::channel(capacity); let (finalized_tx, _) = broadcast::channel(capacity); @@ -56,6 +58,7 @@ impl ServerSentEventHandler { Self { attestation_tx, + single_attestation_tx, block_tx, blob_sidecar_tx, finalized_tx, @@ -90,6 +93,10 @@ impl ServerSentEventHandler { .attestation_tx .send(kind) .map(|count| log_count("attestation", count)), + EventKind::SingleAttestation(_) => self + .single_attestation_tx + .send(kind) + .map(|count| log_count("single_attestation", count)), EventKind::Block(_) => self .block_tx .send(kind) @@ -164,6 +171,10 @@ impl ServerSentEventHandler { self.attestation_tx.subscribe() } + pub fn subscribe_single_attestation(&self) -> Receiver> { + self.single_attestation_tx.subscribe() + } + pub fn subscribe_block(&self) -> Receiver> { self.block_tx.subscribe() } @@ -232,6 +243,10 @@ impl ServerSentEventHandler { self.attestation_tx.receiver_count() > 0 } + pub fn has_single_attestation_subscribers(&self) -> bool { + self.single_attestation_tx.receiver_count() > 0 + } + pub fn has_block_subscribers(&self) -> bool { self.block_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index fd3cc496260..443cc686ebe 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -669,10 +669,16 @@ pub struct BeaconChainHarness { pub rng: Mutex, } +pub type CommitteeSingleAttestations = Vec<(SingleAttestation, SubnetId)>; pub type CommitteeAttestations = Vec<(Attestation, SubnetId)>; pub type HarnessAttestations = Vec<(CommitteeAttestations, Option>)>; +pub type HarnessSingleAttestations = Vec<( + CommitteeSingleAttestations, + Option>, +)>; + pub type HarnessSyncContributions = Vec<( Vec<(SyncCommitteeMessage, usize)>, Option>, @@ -1024,6 +1030,99 @@ where ) } + #[allow(clippy::too_many_arguments)] + pub fn produce_single_attestation_for_block( + &self, + slot: Slot, + index: CommitteeIndex, + beacon_block_root: Hash256, + mut state: Cow>, + state_root: Hash256, + aggregation_bit_index: usize, + validator_index: usize, + ) -> Result { + let epoch = slot.epoch(E::slots_per_epoch()); + + if state.slot() > slot { + return Err(BeaconChainError::CannotAttestToFutureState); + } else if state.current_epoch() < epoch { + let mut_state = state.to_mut(); + complete_state_advance( + mut_state, + Some(state_root), + epoch.start_slot(E::slots_per_epoch()), + &self.spec, + )?; + mut_state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; + } + + let committee_len = state.get_beacon_committee(slot, index)?.committee.len(); + + let target_slot = epoch.start_slot(E::slots_per_epoch()); + let target_root = if state.slot() <= target_slot { + beacon_block_root + } else { + *state.get_block_root(target_slot)? + }; + + let attestation: Attestation = Attestation::empty_for_signing( + index, + committee_len, + slot, + beacon_block_root, + state.current_justified_checkpoint(), + Checkpoint { + epoch, + root: target_root, + }, + &self.spec, + )?; + + let attestation = match attestation { + Attestation::Electra(mut attn) => { + attn.aggregation_bits + .set(aggregation_bit_index, true) + .unwrap(); + attn + } + Attestation::Base(_) => panic!("Must be an Electra attestation"), + }; + + let aggregation_bits = attestation.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state.get_beacon_committee(slot, index).unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + + let single_attestation = + attestation.to_single_attestation_with_attester_index(attester_index)?; + + let attestation: Attestation = single_attestation.to_attestation(committee.committee)?; + + assert_eq!( + single_attestation.committee_index, + attestation.committee_index().unwrap() as usize + ); + assert_eq!(single_attestation.attester_index, validator_index); + Ok(single_attestation) + } + /// Produces an "unaggregated" attestation for the given `slot` and `index` that attests to /// `beacon_block_root`. The provided `state` should match the `block.state_root` for the /// `block` identified by `beacon_block_root`. @@ -1081,6 +1180,33 @@ where )?) } + /// A list of attestations for each committee for the given slot. + /// + /// The first layer of the Vec is organised per committee. For example, if the return value is + /// called `all_attestations`, then all attestations in `all_attestations[0]` will be for + /// committee 0, whilst all in `all_attestations[1]` will be for committee 1. + pub fn make_single_attestations( + &self, + attesting_validators: &[usize], + state: &BeaconState, + state_root: Hash256, + head_block_root: SignedBeaconBlockHash, + attestation_slot: Slot, + ) -> Vec { + let fork = self + .spec + .fork_at_epoch(attestation_slot.epoch(E::slots_per_epoch())); + self.make_single_attestations_with_opts( + attesting_validators, + state, + state_root, + head_block_root, + attestation_slot, + MakeAttestationOptions { limit: None, fork }, + ) + .0 + } + /// A list of attestations for each committee for the given slot. /// /// The first layer of the Vec is organised per committee. For example, if the return value is @@ -1108,6 +1234,99 @@ where .0 } + pub fn make_single_attestations_with_opts( + &self, + attesting_validators: &[usize], + state: &BeaconState, + state_root: Hash256, + head_block_root: SignedBeaconBlockHash, + attestation_slot: Slot, + opts: MakeAttestationOptions, + ) -> (Vec, Vec) { + let MakeAttestationOptions { limit, fork } = opts; + let committee_count = state.get_committee_count_at_slot(state.slot()).unwrap(); + let num_attesters = AtomicUsize::new(0); + + let (attestations, split_attesters) = state + .get_beacon_committees_at_slot(attestation_slot) + .expect("should get committees") + .iter() + .map(|bc| { + bc.committee + .par_iter() + .enumerate() + .filter_map(|(i, validator_index)| { + if !attesting_validators.contains(validator_index) { + return None; + } + + if let Some(limit) = limit { + // This atomics stuff is necessary because we're under a par_iter, + // and Rayon will deadlock if we use a mutex. + if num_attesters.fetch_add(1, Ordering::Relaxed) >= limit { + num_attesters.fetch_sub(1, Ordering::Relaxed); + return None; + } + } + + let mut attestation = self + .produce_single_attestation_for_block( + attestation_slot, + bc.index, + head_block_root.into(), + Cow::Borrowed(state), + state_root, + i, + *validator_index, + ) + .unwrap(); + + attestation.signature = { + let domain = self.spec.get_domain( + attestation.data.target.epoch, + Domain::BeaconAttester, + &fork, + state.genesis_validators_root(), + ); + + let message = attestation.data.signing_root(domain); + + let mut agg_sig = AggregateSignature::infinity(); + + agg_sig.add_assign( + &self.validator_keypairs[*validator_index].sk.sign(message), + ); + + agg_sig + }; + + let subnet_id = SubnetId::compute_subnet_for_single_attestation::( + &attestation, + committee_count, + &self.chain.spec, + ) + .unwrap(); + + Some(((attestation, subnet_id), validator_index)) + }) + .unzip::<_, _, Vec<_>, Vec<_>>() + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + // Flatten attesters. + let attesters = split_attesters.into_iter().flatten().collect::>(); + + if let Some(limit) = limit { + assert_eq!(limit, num_attesters.load(Ordering::Relaxed)); + assert_eq!( + limit, + attesters.len(), + "failed to generate `limit` attestations" + ); + } + (attestations, attesters) + } + pub fn make_unaggregated_attestations_with_opts( &self, attesting_validators: &[usize], @@ -1288,6 +1507,32 @@ where ) } + /// A list of attestations for each committee for the given slot. + /// + /// The first layer of the Vec is organised per committee. For example, if the return value is + /// called `all_attestations`, then all attestations in `all_attestations[0]` will be for + /// committee 0, whilst all in `all_attestations[1]` will be for committee 1. + pub fn get_single_attestations( + &self, + attestation_strategy: &AttestationStrategy, + state: &BeaconState, + state_root: Hash256, + head_block_root: Hash256, + attestation_slot: Slot, + ) -> Vec> { + let validators: Vec = match attestation_strategy { + AttestationStrategy::AllValidators => self.get_all_validators(), + AttestationStrategy::SomeValidators(vals) => vals.clone(), + }; + self.make_single_attestations( + &validators, + state, + state_root, + head_block_root.into(), + attestation_slot, + ) + } + pub fn make_attestations( &self, attesting_validators: &[usize], diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 0ced27e4464..61f3370c702 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -11,6 +11,7 @@ beacon_processor = { workspace = true } bs58 = "0.4.0" bytes = { workspace = true } directory = { workspace = true } +either = { workspace = true } eth1 = { workspace = true } eth2 = { workspace = true } ethereum_serde_utils = { workspace = true } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d5c6c115670..5dc9055c6ca 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -44,6 +44,7 @@ pub use block_id::BlockId; use builder_states::get_next_withdrawals; use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; +use either::Either; use eth2::types::{ self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId, @@ -86,8 +87,8 @@ use types::{ AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, - SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, - SyncCommitteeMessage, SyncContributionData, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, + SingleAttestation, Slot, SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -1832,26 +1833,62 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()); + let beacon_pool_path_v2 = eth_v2 + .and(warp::path("beacon")) + .and(warp::path("pool")) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()); + // POST beacon/pool/attestations - let post_beacon_pool_attestations = beacon_pool_path_any + let post_beacon_pool_attestations = beacon_pool_path .clone() .and(warp::path("attestations")) .and(warp::path::end()) .and(warp_utils::json::json()) .and(network_tx_filter.clone()) - .and(reprocess_send_filter) + .and(reprocess_send_filter.clone()) .and(log_filter.clone()) .then( // V1 and V2 are identical except V2 has a consensus version header in the request. // We only require this header for SSZ deserialization, which isn't supported for // this endpoint presently. - |_endpoint_version: EndpointVersion, - task_spawner: TaskSpawner, + |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, network_tx: UnboundedSender>, reprocess_tx: Option>, log: Logger| async move { + let attestations = attestations.into_iter().map(Either::Left).collect(); + let result = crate::publish_attestations::publish_attestations( + task_spawner, + chain, + attestations, + network_tx, + reprocess_tx, + log, + ) + .await + .map(|()| warp::reply::json(&())); + convert_rejection(result).await + }, + ); + + let post_beacon_pool_attestations_v2 = beacon_pool_path_v2 + .clone() + .and(warp::path("attestations")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(network_tx_filter.clone()) + .and(reprocess_send_filter) + .and(log_filter.clone()) + .then( + |task_spawner: TaskSpawner, + chain: Arc>, + attestations: Vec, + network_tx: UnboundedSender>, + reprocess_tx: Option>, + log: Logger| async move { + let attestations = attestations.into_iter().map(Either::Right).collect(); let result = crate::publish_attestations::publish_attestations( task_spawner, chain, @@ -4509,6 +4546,9 @@ pub fn serve( api_types::EventTopic::Attestation => { event_handler.subscribe_attestation() } + api_types::EventTopic::SingleAttestation => { + event_handler.subscribe_single_attestation() + } api_types::EventTopic::VoluntaryExit => { event_handler.subscribe_exit() } @@ -4736,6 +4776,7 @@ pub fn serve( .uor(post_beacon_blocks_v2) .uor(post_beacon_blinded_blocks_v2) .uor(post_beacon_pool_attestations) + .uor(post_beacon_pool_attestations_v2) .uor(post_beacon_pool_attester_slashings) .uor(post_beacon_pool_proposer_slashings) .uor(post_beacon_pool_voluntary_exits) diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index 00654765325..111dee3cffb 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -40,17 +40,19 @@ use beacon_chain::{ BeaconChainTypes, }; use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage}; +use either::Either; use eth2::types::Failure; use lighthouse_network::PubsubMessage; use network::NetworkMessage; use slog::{debug, error, warn, Logger}; +use std::borrow::Cow; use std::sync::Arc; use std::time::Duration; use tokio::sync::{ mpsc::{Sender, UnboundedSender}, oneshot, }; -use types::Attestation; +use types::{Attestation, EthSpec, SingleAttestation}; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] @@ -62,6 +64,7 @@ enum Error { ReprocessDisabled, ReprocessFull, ReprocessTimeout, + FailedConversion(#[allow(dead_code)] BeaconChainError), } enum PublishAttestationResult { @@ -73,24 +76,39 @@ enum PublishAttestationResult { fn verify_and_publish_attestation( chain: &Arc>, - attestation: &Attestation, + either_attestation: &Either, SingleAttestation>, seen_timestamp: Duration, network_tx: &UnboundedSender>, log: &Logger, ) -> Result<(), Error> { - let attestation = chain - .verify_unaggregated_attestation_for_gossip(attestation, None) + let attestation = convert_to_attestation(chain, either_attestation)?; + let verified_attestation = chain + .verify_unaggregated_attestation_for_gossip(&attestation, None) .map_err(Error::Validation)?; - // Publish. - network_tx - .send(NetworkMessage::Publish { - messages: vec![PubsubMessage::Attestation(Box::new(( - attestation.subnet_id(), - attestation.attestation().clone_as_attestation(), - )))], - }) - .map_err(|_| Error::Publication)?; + match either_attestation { + Either::Left(attestation) => { + // Publish. + network_tx + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::Attestation(Box::new(( + verified_attestation.subnet_id(), + attestation.clone(), + )))], + }) + .map_err(|_| Error::Publication)?; + } + Either::Right(single_attestation) => { + network_tx + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::SingleAttestation(Box::new(( + verified_attestation.subnet_id(), + single_attestation.clone(), + )))], + }) + .map_err(|_| Error::Publication)?; + } + } // Notify the validator monitor. chain @@ -98,12 +116,12 @@ fn verify_and_publish_attestation( .read() .register_api_unaggregated_attestation( seen_timestamp, - attestation.indexed_attestation(), + verified_attestation.indexed_attestation(), &chain.slot_clock, ); - let fc_result = chain.apply_attestation_to_fork_choice(&attestation); - let naive_aggregation_result = chain.add_to_naive_aggregation_pool(&attestation); + let fc_result = chain.apply_attestation_to_fork_choice(&verified_attestation); + let naive_aggregation_result = chain.add_to_naive_aggregation_pool(&verified_attestation); if let Err(e) = &fc_result { warn!( @@ -129,10 +147,48 @@ fn verify_and_publish_attestation( } } +fn convert_to_attestation<'a, T: BeaconChainTypes>( + chain: &Arc>, + attestation: &'a Either, SingleAttestation>, +) -> Result>, Error> { + let a = match attestation { + Either::Left(a) => Cow::Borrowed(a), + Either::Right(single_attestation) => chain + .with_committee_cache( + single_attestation.data.target.root, + single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let Some(committee) = committee_cache.get_beacon_committee( + single_attestation.data.slot, + single_attestation.committee_index as u64, + ) else { + return Err(BeaconChainError::AttestationError( + types::AttestationError::NoCommitteeForSlotAndIndex { + slot: single_attestation.data.slot, + index: single_attestation.committee_index as u64, + }, + )); + }; + + let attestation = + single_attestation.to_attestation::(committee.committee)?; + + Ok(Cow::Owned(attestation)) + }, + ) + .map_err(Error::FailedConversion)?, + }; + + Ok(a) +} + pub async fn publish_attestations( task_spawner: TaskSpawner, chain: Arc>, - attestations: Vec>, + attestations: Vec, SingleAttestation>>, network_tx: UnboundedSender>, reprocess_send: Option>, log: Logger, @@ -141,7 +197,10 @@ pub async fn publish_attestations( // move the `attestations` vec into the blocking task, so this small overhead is unavoidable. let attestation_metadata = attestations .iter() - .map(|att| (att.data().slot, att.committee_index())) + .map(|att| match att { + Either::Left(att) => (att.data().slot, att.committee_index()), + Either::Right(att) => (att.data.slot, Some(att.committee_index as u64)), + }) .collect::>(); // Gossip validate and publish attestations that can be immediately processed. diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 8cb6053e9ff..d6b8df33b3f 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -155,10 +155,6 @@ async fn attestations_across_fork_with_skip_slots() { .post_beacon_pool_attestations_v1(&unaggregated_attestations) .await .unwrap(); - client - .post_beacon_pool_attestations_v2(&unaggregated_attestations, fork_name) - .await - .unwrap(); let signed_aggregates = attestations .into_iter() diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 8cfcf5d93e9..60a4c507832 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -890,27 +890,48 @@ async fn queue_attestations_from_http() { let pre_state = harness.get_current_state(); let (block, post_state) = harness.make_block(pre_state, attestation_slot).await; let block_root = block.0.canonical_root(); + let fork_name = tester.harness.spec.fork_name_at_slot::(attestation_slot); // Make attestations to the block and POST them to the beacon node on a background thread. - let attestations = harness - .make_unaggregated_attestations( - &all_validators, - &post_state, - block.0.state_root(), - block_root.into(), - attestation_slot, - ) - .into_iter() - .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) - .collect::>(); - - let fork_name = tester.harness.spec.fork_name_at_slot::(attestation_slot); - let attestation_future = tokio::spawn(async move { - client - .post_beacon_pool_attestations_v2(&attestations, fork_name) - .await - .expect("attestations should be processed successfully") - }); + let attestation_future = if fork_name.electra_enabled() { + let single_attestations = harness + .make_single_attestations( + &all_validators, + &post_state, + block.0.state_root(), + block_root.into(), + attestation_slot, + ) + .into_iter() + .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) + .collect::>(); + + tokio::spawn(async move { + client + .post_beacon_pool_attestations_v2(&single_attestations, fork_name) + .await + .expect("attestations should be processed successfully") + }) + } else { + let attestations = harness + .make_unaggregated_attestations( + &all_validators, + &post_state, + block.0.state_root(), + block_root.into(), + attestation_slot, + ) + .into_iter() + .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) + .collect::>(); + + tokio::spawn(async move { + client + .post_beacon_pool_attestations_v1(&attestations) + .await + .expect("attestations should be processed successfully") + }) + }; // In parallel, apply the block. We need to manually notify the reprocess queue, because the // `beacon_chain` does not know about the queue and will not update it for us. diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 85d3b4e9bae..dd6a92603aa 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -40,7 +40,8 @@ use tree_hash::TreeHash; use types::application_domain::ApplicationDomain; use types::{ attestation::AttestationBase, AggregateSignature, BitList, Domain, EthSpec, ExecutionBlockHash, - Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, Slot, + Hash256, Keypair, MainnetEthSpec, RelativeEpoch, SelectionProof, SignedRoot, SingleAttestation, + Slot, }; type E = MainnetEthSpec; @@ -71,6 +72,7 @@ struct ApiTester { next_block: PublishBlockRequest, reorg_block: PublishBlockRequest, attestations: Vec>, + single_attestations: Vec, contribution_and_proofs: Vec>, attester_slashing: AttesterSlashing, proposer_slashing: ProposerSlashing, @@ -203,6 +205,27 @@ impl ApiTester { "precondition: attestations for testing" ); + let fork_name = harness + .chain + .spec + .fork_name_at_slot::(harness.chain.slot().unwrap()); + + let single_attestations = if fork_name.electra_enabled() { + harness + .get_single_attestations( + &AttestationStrategy::AllValidators, + &head.beacon_state, + head_state_root, + head.beacon_block_root, + harness.chain.slot().unwrap(), + ) + .into_iter() + .flat_map(|vec| vec.into_iter().map(|(attestation, _subnet_id)| attestation)) + .collect::>() + } else { + vec![] + }; + let current_epoch = harness .chain .slot() @@ -294,6 +317,7 @@ impl ApiTester { next_block, reorg_block, attestations, + single_attestations, contribution_and_proofs, attester_slashing, proposer_slashing, @@ -381,6 +405,7 @@ impl ApiTester { next_block, reorg_block, attestations, + single_attestations: vec![], contribution_and_proofs: vec![], attester_slashing, proposer_slashing, @@ -1800,13 +1825,16 @@ impl ApiTester { } pub async fn test_post_beacon_pool_attestations_valid_v2(mut self) -> Self { + if self.single_attestations.is_empty() { + return self; + } let fork_name = self - .attestations + .single_attestations .first() - .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .map(|att| self.chain.spec.fork_name_at_slot::(att.data.slot)) .unwrap(); self.client - .post_beacon_pool_attestations_v2(self.attestations.as_slice(), fork_name) + .post_beacon_pool_attestations_v2(self.single_attestations.as_slice(), fork_name) .await .unwrap(); assert!( @@ -1854,10 +1882,13 @@ impl ApiTester { self } pub async fn test_post_beacon_pool_attestations_invalid_v2(mut self) -> Self { + if self.single_attestations.is_empty() { + return self; + } let mut attestations = Vec::new(); - for attestation in &self.attestations { + for attestation in &self.single_attestations { let mut invalid_attestation = attestation.clone(); - invalid_attestation.data_mut().slot += 1; + invalid_attestation.data.slot += 1; // add both to ensure we only fail on invalid attestations attestations.push(attestation.clone()); @@ -6011,6 +6042,48 @@ impl ApiTester { self } + pub async fn test_get_events_electra(self) -> Self { + let topics = vec![EventTopic::SingleAttestation]; + let mut events_future = self + .client + .get_events::(topics.as_slice()) + .await + .unwrap(); + + let expected_attestation_len = self.single_attestations.len(); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(self.chain.slot().unwrap()); + + self.client + .post_beacon_pool_attestations_v2(&self.single_attestations, fork_name) + .await + .unwrap(); + + let attestation_events = poll_events( + &mut events_future, + expected_attestation_len, + Duration::from_millis(10000), + ) + .await; + + assert_eq!( + attestation_events.as_slice(), + self.single_attestations + .clone() + .into_iter() + .map(|single_attestation| EventKind::SingleAttestation(Box::new( + single_attestation + ))) + .collect::>() + .as_slice() + ); + + self + } + pub async fn test_get_events_altair(self) -> Self { let topics = vec![EventTopic::ContributionAndProof]; let mut events_future = self @@ -6158,6 +6231,20 @@ async fn get_events_altair() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_events_electra() { + let mut config = ApiTesterConfig::default(); + config.spec.altair_fork_epoch = Some(Epoch::new(0)); + config.spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + config.spec.capella_fork_epoch = Some(Epoch::new(0)); + config.spec.deneb_fork_epoch = Some(Epoch::new(0)); + config.spec.electra_fork_epoch = Some(Epoch::new(0)); + ApiTester::new_from_config(config) + .await + .test_get_events_electra() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_events_from_genesis() { ApiTester::new_from_genesis() diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index c9769594708..1e1f3efa18c 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -7,15 +7,14 @@ use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; use types::{ - Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase, - AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, - ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, - ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, - SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, - SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockFulu, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, + Attestation, AttestationBase, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, + BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, + LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, + SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, + SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, + SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, + SignedBeaconBlockFulu, SignedBlsToExecutionChange, SignedContributionAndProof, + SignedVoluntaryExit, SingleAttestation, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; #[derive(Debug, Clone, PartialEq)] @@ -28,8 +27,10 @@ pub enum PubsubMessage { DataColumnSidecar(Box<(DataColumnSubnetId, Arc>)>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), - /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. + /// Gossipsub message providing notification of a raw un-aggregated attestation with its subnet id. Attestation(Box<(SubnetId, Attestation)>), + /// Gossipsub message providing notification of a `SingleAttestation`` with its subnet id. + SingleAttestation(Box<(SubnetId, SingleAttestation)>), /// Gossipsub message providing notification of a voluntary exit. VoluntaryExit(Box), /// Gossipsub message providing notification of a new proposer slashing. @@ -129,6 +130,9 @@ impl PubsubMessage { PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) } + PubsubMessage::SingleAttestation(attestation_data) => { + GossipKind::Attestation(attestation_data.0) + } PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit, PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing, PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing, @@ -189,32 +193,32 @@ impl PubsubMessage { ))) } GossipKind::Attestation(subnet_id) => { - let attestation = - match fork_context.from_context_bytes(gossip_topic.fork_digest) { - Some(&fork_name) => { - if fork_name.electra_enabled() { - Attestation::Electra( - AttestationElectra::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ) - } else { - Attestation::Base( - AttestationBase::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ) - } - } - None => { - return Err(format!( - "Unknown gossipsub fork digest: {:?}", - gossip_topic.fork_digest - )) + match fork_context.from_context_bytes(gossip_topic.fork_digest) { + Some(&fork_name) => { + if fork_name.electra_enabled() { + let single_attestation = + SingleAttestation::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::SingleAttestation(Box::new(( + *subnet_id, + single_attestation, + )))) + } else { + let attestation = Attestation::Base( + AttestationBase::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ); + Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))) } - }; - Ok(PubsubMessage::Attestation(Box::new(( - *subnet_id, - attestation, - )))) + } + None => Err(format!( + "Unknown gossipsub fork digest: {:?}", + gossip_topic.fork_digest + )), + } } GossipKind::BeaconBlock => { let beacon_block = @@ -416,6 +420,7 @@ impl PubsubMessage { PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), + PubsubMessage::SingleAttestation(data) => data.1.as_ssz_bytes(), PubsubMessage::SignedContributionAndProof(data) => data.as_ssz_bytes(), PubsubMessage::SyncCommitteeMessage(data) => data.1.as_ssz_bytes(), PubsubMessage::BlsToExecutionChange(data) => data.as_ssz_bytes(), @@ -460,6 +465,14 @@ impl std::fmt::Display for PubsubMessage { data.1.data().slot, data.1.committee_index(), ), + PubsubMessage::SingleAttestation(data) => write!( + f, + "SingleAttestation: subnet_id: {}, attestation_slot: {}, committee_index: {:?}, attester_index: {:?}", + *data.0, + data.1.data.slot, + data.1.committee_index, + data.1.attester_index, + ), PubsubMessage::VoluntaryExit(_data) => write!(f, "Voluntary Exit"), PubsubMessage::ProposerSlashing(_data) => write!(f, "Proposer Slashing"), PubsubMessage::AttesterSlashing(_data) => write!(f, "Attester Slashing"), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 2d15d39c6fc..4a3fb28e102 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -84,6 +84,58 @@ impl NetworkBeaconProcessor { .map_err(Into::into) } + /// Create a new `Work` event for some `SingleAttestation`. + pub fn send_single_attestation( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + single_attestation: SingleAttestation, + subnet_id: SubnetId, + should_import: bool, + seen_timestamp: Duration, + ) -> Result<(), Error> { + let result = self.chain.with_committee_cache( + single_attestation.data.target.root, + single_attestation + .data + .slot + .epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let Some(committee) = committee_cache.get_beacon_committee( + single_attestation.data.slot, + single_attestation.committee_index as u64, + ) else { + warn!( + self.log, + "No beacon committee for slot and index"; + "slot" => single_attestation.data.slot, + "index" => single_attestation.committee_index + ); + return Ok(Ok(())); + }; + + let attestation = single_attestation.to_attestation(committee.committee)?; + + Ok(self.send_unaggregated_attestation( + message_id.clone(), + peer_id, + attestation, + subnet_id, + should_import, + seen_timestamp, + )) + }, + ); + + match result { + Ok(result) => result, + Err(e) => { + warn!(self.log, "Failed to send SingleAttestation"; "error" => ?e); + Ok(()) + } + } + } + /// Create a new `Work` event for some unaggregated attestation. pub fn send_unaggregated_attestation( self: &Arc, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 0a99b6af0cf..d3da341e1c8 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -398,6 +398,17 @@ impl Router { timestamp_now(), ), ), + PubsubMessage::SingleAttestation(subnet_attestation) => self + .handle_beacon_processor_send_result( + self.network_beacon_processor.send_single_attestation( + message_id, + peer_id, + subnet_attestation.1, + subnet_attestation.0, + should_process, + timestamp_now(), + ), + ), PubsubMessage::BeaconBlock(block) => self.handle_beacon_processor_send_result( self.network_beacon_processor.send_gossip_beacon_block( message_id, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7826807e035..f89241b4ae6 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -549,7 +549,23 @@ impl NetworkService { // the attestation, else we just just propagate the Attestation. let should_process = self.subnet_service.should_process_attestation( Subnet::Attestation(subnet_id), - attestation, + attestation.data(), + ); + self.send_to_router(RouterMessage::PubsubMessage( + id, + source, + message, + should_process, + )); + } + PubsubMessage::SingleAttestation(ref subnet_and_attestation) => { + let subnet_id = subnet_and_attestation.0; + let single_attestation = &subnet_and_attestation.1; + // checks if we have an aggregator for the slot. If so, we should process + // the attestation, else we just just propagate the Attestation. + let should_process = self.subnet_service.should_process_attestation( + Subnet::Attestation(subnet_id), + &single_attestation.data, ); self.send_to_router(RouterMessage::PubsubMessage( id, diff --git a/beacon_node/network/src/subnet_service/mod.rs b/beacon_node/network/src/subnet_service/mod.rs index da1f220f042..33ae567eb3f 100644 --- a/beacon_node/network/src/subnet_service/mod.rs +++ b/beacon_node/network/src/subnet_service/mod.rs @@ -17,7 +17,7 @@ use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDisco use slog::{debug, error, o, warn}; use slot_clock::SlotClock; use types::{ - Attestation, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + AttestationData, EthSpec, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription, }; @@ -363,7 +363,7 @@ impl SubnetService { pub fn should_process_attestation( &self, subnet: Subnet, - attestation: &Attestation, + attestation_data: &AttestationData, ) -> bool { // Proposer-only mode does not need to process attestations if self.proposer_only { @@ -374,7 +374,7 @@ impl SubnetService { .map(|tracked_vals| { tracked_vals.contains_key(&ExactSubnet { subnet, - slot: attestation.data().slot, + slot: attestation_data.slot, }) }) .unwrap_or(true) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 12b1538984e..af8573a5789 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1324,9 +1324,9 @@ impl BeaconNodeHttpClient { } /// `POST v2/beacon/pool/attestations` - pub async fn post_beacon_pool_attestations_v2( + pub async fn post_beacon_pool_attestations_v2( &self, - attestations: &[Attestation], + attestations: &[SingleAttestation], fork_name: ForkName, ) -> Result<(), Error> { let mut path = self.eth_path(V2)?; diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 695d536944b..6d76101cb62 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1113,6 +1113,7 @@ impl ForkVersionDeserialize for SseExtendedPayloadAttributes { #[serde(bound = "E: EthSpec", untagged)] pub enum EventKind { Attestation(Box>), + SingleAttestation(Box), Block(SseBlock), BlobSidecar(SseBlobSidecar), FinalizedCheckpoint(SseFinalizedCheckpoint), @@ -1139,6 +1140,7 @@ impl EventKind { EventKind::Block(_) => "block", EventKind::BlobSidecar(_) => "blob_sidecar", EventKind::Attestation(_) => "attestation", + EventKind::SingleAttestation(_) => "single_attestation", EventKind::VoluntaryExit(_) => "voluntary_exit", EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", EventKind::ChainReorg(_) => "chain_reorg", @@ -1161,6 +1163,11 @@ impl EventKind { "attestation" => Ok(EventKind::Attestation(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Attestation: {:?}", e)), )?)), + "single_attestation" => Ok(EventKind::SingleAttestation( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("SingleAttestation: {:?}", e)) + })?, + )), "block" => Ok(EventKind::Block(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)), )?)), @@ -1255,6 +1262,7 @@ pub enum EventTopic { Block, BlobSidecar, Attestation, + SingleAttestation, VoluntaryExit, FinalizedCheckpoint, ChainReorg, @@ -1280,6 +1288,7 @@ impl FromStr for EventTopic { "block" => Ok(EventTopic::Block), "blob_sidecar" => Ok(EventTopic::BlobSidecar), "attestation" => Ok(EventTopic::Attestation), + "single_attestation" => Ok(EventTopic::SingleAttestation), "voluntary_exit" => Ok(EventTopic::VoluntaryExit), "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), "chain_reorg" => Ok(EventTopic::ChainReorg), @@ -1306,6 +1315,7 @@ impl fmt::Display for EventTopic { EventTopic::Block => write!(f, "block"), EventTopic::BlobSidecar => write!(f, "blob_sidecar"), EventTopic::Attestation => write!(f, "attestation"), + EventTopic::SingleAttestation => write!(f, "single_attestation"), EventTopic::VoluntaryExit => write!(f, "voluntary_exit"), EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), EventTopic::ChainReorg => write!(f, "chain_reorg"), diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 190964736fe..47e41acb5b1 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -12,8 +12,8 @@ use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; use super::{ - AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, - Signature, SignedRoot, + AggregateSignature, AttestationData, BitList, ChainSpec, CommitteeIndex, Domain, EthSpec, Fork, + SecretKey, Signature, SignedRoot, }; #[derive(Debug, PartialEq)] @@ -24,6 +24,10 @@ pub enum Error { IncorrectStateVariant, InvalidCommitteeLength, InvalidCommitteeIndex, + AttesterNotInCommittee(usize), + InvalidCommittee, + MissingCommittee, + NoCommitteeForSlotAndIndex { slot: Slot, index: CommitteeIndex }, } impl From for Error { @@ -231,6 +235,16 @@ impl Attestation { Attestation::Electra(att) => att.aggregation_bits.get(index), } } + + pub fn to_single_attestation_with_attester_index( + &self, + attester_index: usize, + ) -> Result { + match self { + Self::Base(_) => Err(Error::IncorrectStateVariant), + Self::Electra(attn) => attn.to_single_attestation_with_attester_index(attester_index), + } + } } impl AttestationRef<'_, E> { @@ -287,6 +301,14 @@ impl AttestationElectra { self.get_committee_indices().first().cloned() } + pub fn get_aggregation_bits(&self) -> Vec { + self.aggregation_bits + .iter() + .enumerate() + .filter_map(|(index, bit)| if bit { Some(index as u64) } else { None }) + .collect() + } + pub fn get_committee_indices(&self) -> Vec { self.committee_bits .iter() @@ -350,6 +372,22 @@ impl AttestationElectra { Ok(()) } } + + pub fn to_single_attestation_with_attester_index( + &self, + attester_index: usize, + ) -> Result { + let Some(committee_index) = self.committee_index() else { + return Err(Error::InvalidCommitteeIndex); + }; + + Ok(SingleAttestation { + committee_index: committee_index as usize, + attester_index, + data: self.data.clone(), + signature: self.signature.clone(), + }) + } } impl AttestationBase { @@ -527,6 +565,58 @@ impl ForkVersionDeserialize for Vec> { } } +#[derive( + Debug, + Clone, + Serialize, + Deserialize, + Decode, + Encode, + TestRandom, + Derivative, + arbitrary::Arbitrary, + TreeHash, + PartialEq, +)] +pub struct SingleAttestation { + pub committee_index: usize, + pub attester_index: usize, + pub data: AttestationData, + pub signature: AggregateSignature, +} + +impl SingleAttestation { + pub fn to_attestation(&self, committee: &[usize]) -> Result, Error> { + let aggregation_bit = committee + .iter() + .enumerate() + .find_map(|(i, &validator_index)| { + if self.attester_index == validator_index { + return Some(i); + } + None + }) + .ok_or(Error::AttesterNotInCommittee(self.attester_index))?; + + let mut committee_bits: BitVector = BitVector::default(); + committee_bits + .set(self.committee_index, true) + .map_err(|_| Error::InvalidCommitteeIndex)?; + + let mut aggregation_bits = + BitList::with_capacity(committee.len()).map_err(|_| Error::InvalidCommitteeLength)?; + + aggregation_bits.set(aggregation_bit, true)?; + + Ok(Attestation::Electra(AttestationElectra { + aggregation_bits, + committee_bits, + data: self.data.clone(), + signature: self.signature.clone(), + })) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index dcfa918146a..11d1f5271b7 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -118,7 +118,7 @@ pub use crate::aggregate_and_proof::{ }; pub use crate::attestation::{ Attestation, AttestationBase, AttestationElectra, AttestationRef, AttestationRefMut, - Error as AttestationError, + Error as AttestationError, SingleAttestation, }; pub use crate::attestation_data::AttestationData; pub use crate::attestation_duty::AttestationDuty; diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 187b070d29f..981d6d5653d 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -1,4 +1,5 @@ //! Identifies each shard by an integer identifier. +use crate::SingleAttestation; use crate::{AttestationRef, ChainSpec, CommitteeIndex, EthSpec, Slot}; use alloy_primitives::{bytes::Buf, U256}; use safe_arith::{ArithError, SafeArith}; @@ -57,6 +58,21 @@ impl SubnetId { ) } + /// Compute the subnet for an attestation where each slot in the + /// attestation epoch contains `committee_count_per_slot` committees. + pub fn compute_subnet_for_single_attestation( + attestation: &SingleAttestation, + committee_count_per_slot: u64, + spec: &ChainSpec, + ) -> Result { + Self::compute_subnet::( + attestation.data.slot, + attestation.committee_index as u64, + committee_count_per_slot, + spec, + ) + } + /// Compute the subnet for an attestation with `attestation.data.slot == slot` and /// `attestation.data.index == committee_index` where each slot in the attestation epoch /// contains `committee_count_at_slot` committees. diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index e31ad4f661b..58c6ea32988 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -457,8 +457,30 @@ impl AttestationService { &[validator_metrics::ATTESTATIONS_HTTP_POST], ); if fork_name.electra_enabled() { + let single_attestations = attestations + .iter() + .zip(validator_indices) + .filter_map(|(a, i)| { + match a.to_single_attestation_with_attester_index(*i as usize) { + Ok(a) => Some(a), + Err(e) => { + // This shouldn't happen unless BN and VC are out of sync with + // respect to the Electra fork. + error!( + log, + "Unable to convert to SingleAttestation"; + "error" => ?e, + "committee_index" => attestation_data.index, + "slot" => slot.as_u64(), + "type" => "unaggregated", + ); + None + } + } + }) + .collect::>(); beacon_node - .post_beacon_pool_attestations_v2(attestations, fork_name) + .post_beacon_pool_attestations_v2(&single_attestations, fork_name) .await } else { beacon_node