From efd0328f9e743977cfda2d91d886c81f2959d761 Mon Sep 17 00:00:00 2001 From: Kolby Moroz Liebl <31669092+KolbyML@users.noreply.github.com> Date: Sun, 15 Dec 2024 17:54:42 -0700 Subject: [PATCH] feat: implement ping extensions --- Cargo.lock | 1 + bin/trin/src/cli.rs | 3 +- ethportal-api/Cargo.toml | 1 + ethportal-api/src/types/distance.rs | 4 +- ethportal-api/src/types/mod.rs | 1 + .../src/types/ping_extensions/decode.rs | 74 ++++ .../types/ping_extensions/extensions/mod.rs | 4 + .../ping_extensions/extensions/type_0.rs | 213 ++++++++++ .../ping_extensions/extensions/type_1.rs | 29 ++ .../ping_extensions/extensions/type_2.rs | 33 ++ .../ping_extensions/extensions/type_65535.rs | 52 +++ .../src/types/ping_extensions/mod.rs | 78 ++++ ethportal-api/src/types/portal_wire.rs | 2 +- ethportal-api/src/version.rs | 2 + portalnet/src/overlay/mod.rs | 1 + portalnet/src/overlay/ping_extensions.rs | 33 ++ portalnet/src/overlay/protocol.rs | 65 +-- .../overlay/{service.rs => service/mod.rs} | 288 ++++++-------- portalnet/src/overlay/service/ping.rs | 369 ++++++++++++++++++ portalnet/src/types/kbucket.rs | 5 +- portalnet/src/types/node.rs | 46 ++- portalnet/tests/overlay.rs | 21 +- trin-beacon/src/lib.rs | 1 + trin-beacon/src/network.rs | 17 +- trin-beacon/src/ping_extensions.rs | 41 ++ trin-history/src/lib.rs | 1 + trin-history/src/network.rs | 17 +- trin-history/src/ping_extensions.rs | 41 ++ trin-state/src/lib.rs | 1 + trin-state/src/network.rs | 16 +- trin-state/src/ping_extensions.rs | 41 ++ 31 files changed, 1288 insertions(+), 213 deletions(-) create mode 100644 ethportal-api/src/types/ping_extensions/decode.rs create mode 100644 ethportal-api/src/types/ping_extensions/extensions/mod.rs create mode 100644 ethportal-api/src/types/ping_extensions/extensions/type_0.rs create mode 100644 ethportal-api/src/types/ping_extensions/extensions/type_1.rs create mode 100644 ethportal-api/src/types/ping_extensions/extensions/type_2.rs create mode 100644 ethportal-api/src/types/ping_extensions/extensions/type_65535.rs create mode 100644 ethportal-api/src/types/ping_extensions/mod.rs create mode 100644 portalnet/src/overlay/ping_extensions.rs rename portalnet/src/overlay/{service.rs => service/mod.rs} (95%) create mode 100644 portalnet/src/overlay/service/ping.rs create mode 100644 trin-beacon/src/ping_extensions.rs create mode 100644 trin-history/src/ping_extensions.rs create mode 100644 trin-state/src/ping_extensions.rs diff --git a/Cargo.lock b/Cargo.lock index 4c7a935d6..da377f0e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2233,6 +2233,7 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "hex", + "itertools 0.13.0", "jsonrpsee", "keccak-hash", "lazy_static", diff --git a/bin/trin/src/cli.rs b/bin/trin/src/cli.rs index 3c4fbed4b..4aa49f5a8 100644 --- a/bin/trin/src/cli.rs +++ b/bin/trin/src/cli.rs @@ -14,7 +14,7 @@ use ethportal_api::{ network::Subnetwork, portal_wire::{NetworkSpec, MAINNET}, }, - version::VERSION, + version::{APP_NAME, VERSION}, }; use portalnet::{ bootnodes::Bootnodes, @@ -32,7 +32,6 @@ const DEFAULT_SUBNETWORKS: &str = "history"; pub const DEFAULT_STORAGE_CAPACITY_MB: &str = "1000"; pub const DEFAULT_WEB3_TRANSPORT: &str = "ipc"; -const APP_NAME: &str = "trin"; #[derive(Parser, Debug, PartialEq, Clone)] #[command(name = APP_NAME, author = "https://github.com/ethereum/trin/graphs/contributors", diff --git a/ethportal-api/Cargo.toml b/ethportal-api/Cargo.toml index 74d2a2f5c..9756a45b8 100644 --- a/ethportal-api/Cargo.toml +++ b/ethportal-api/Cargo.toml @@ -29,6 +29,7 @@ ethereum_serde_utils = "0.7.0" ethereum_ssz.workspace = true ethereum_ssz_derive.workspace = true hex.workspace = true +itertools.workspace = true jsonrpsee = { workspace = true, features = ["async-client", "client", "macros", "server"]} keccak-hash.workspace = true lazy_static.workspace = true diff --git a/ethportal-api/src/types/distance.rs b/ethportal-api/src/types/distance.rs index 0d1cd03f3..e1115179a 100644 --- a/ethportal-api/src/types/distance.rs +++ b/ethportal-api/src/types/distance.rs @@ -1,11 +1,13 @@ use std::{fmt, ops::Deref}; use alloy::primitives::U256; +use ssz_derive::{Decode, Encode}; pub type DataRadius = U256; /// Represents a distance between two keys in the DHT key space. -#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)] +#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug, Encode, Decode)] +#[ssz(struct_behaviour = "transparent")] pub struct Distance(U256); impl fmt::Display for Distance { diff --git a/ethportal-api/src/types/mod.rs b/ethportal-api/src/types/mod.rs index e7172bcc3..1f0b5469b 100644 --- a/ethportal-api/src/types/mod.rs +++ b/ethportal-api/src/types/mod.rs @@ -10,6 +10,7 @@ pub mod execution; pub mod jsonrpc; pub mod network; pub mod node_id; +pub mod ping_extensions; pub mod portal; pub mod portal_wire; pub mod query_trace; diff --git a/ethportal-api/src/types/ping_extensions/decode.rs b/ethportal-api/src/types/ping_extensions/decode.rs new file mode 100644 index 000000000..98ad5b6f0 --- /dev/null +++ b/ethportal-api/src/types/ping_extensions/decode.rs @@ -0,0 +1,74 @@ +use anyhow::bail; +use ssz::Decode; + +use super::{ + extensions::{ + type_0::ClientInfoRadiusCapabilities, type_1::BasicRadius, type_2::HistoryRadius, + type_65535::PingError, + }, + CustomPayloadExtensionsFormat, Extensions, +}; +use crate::types::portal_wire::CustomPayload; + +#[derive(Debug, Clone)] +pub enum DecodedExtension { + Capabilities(ClientInfoRadiusCapabilities), + BasicRadius(BasicRadius), + HistoryRadius(HistoryRadius), + Error(PingError), +} + +impl From for Extensions { + fn from(value: DecodedExtension) -> Self { + match value { + DecodedExtension::Capabilities(_) => Extensions::Capabilities, + DecodedExtension::BasicRadius(_) => Extensions::BasicRadius, + DecodedExtension::HistoryRadius(_) => Extensions::HistoryRadius, + DecodedExtension::Error(_) => Extensions::Error, + } + } +} + +impl TryFrom for DecodedExtension { + type Error = anyhow::Error; + + fn try_from(value: CustomPayload) -> Result { + let Ok(ping_custom_payload): anyhow::Result = + value.try_into() + else { + bail!("Failed to decode CustomPayloadExtensionsFormat"); + }; + + let Ok(extension_type) = Extensions::try_from(ping_custom_payload.r#type) else { + bail!("Failed to decode extension type"); + }; + + match extension_type { + Extensions::Capabilities => { + let capabilities = + ClientInfoRadiusCapabilities::from_ssz_bytes(&ping_custom_payload.payload) + .map_err(|err| { + anyhow::anyhow!( + "Failed to decode ClientInfoRadiusCapabilities: {err:?}" + ) + })?; + Ok(DecodedExtension::Capabilities(capabilities)) + } + Extensions::BasicRadius => { + let basic_radius = BasicRadius::from_ssz_bytes(&ping_custom_payload.payload) + .map_err(|err| anyhow::anyhow!("Failed to decode BasicRadius: {err:?}"))?; + Ok(DecodedExtension::BasicRadius(basic_radius)) + } + Extensions::HistoryRadius => { + let history_radius = HistoryRadius::from_ssz_bytes(&ping_custom_payload.payload) + .map_err(|err| anyhow::anyhow!("Failed to decode HistoryRadius: {err:?}"))?; + Ok(DecodedExtension::HistoryRadius(history_radius)) + } + Extensions::Error => { + let error = PingError::from_ssz_bytes(&ping_custom_payload.payload) + .map_err(|err| anyhow::anyhow!("Failed to decode PingError: {err:?}"))?; + Ok(DecodedExtension::Error(error)) + } + } + } +} diff --git a/ethportal-api/src/types/ping_extensions/extensions/mod.rs b/ethportal-api/src/types/ping_extensions/extensions/mod.rs new file mode 100644 index 000000000..75f7d2b72 --- /dev/null +++ b/ethportal-api/src/types/ping_extensions/extensions/mod.rs @@ -0,0 +1,4 @@ +pub mod type_0; +pub mod type_1; +pub mod type_2; +pub mod type_65535; diff --git a/ethportal-api/src/types/ping_extensions/extensions/type_0.rs b/ethportal-api/src/types/ping_extensions/extensions/type_0.rs new file mode 100644 index 000000000..3a7d21999 --- /dev/null +++ b/ethportal-api/src/types/ping_extensions/extensions/type_0.rs @@ -0,0 +1,213 @@ +use std::str::FromStr; + +use anyhow::bail; +use itertools::Itertools; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use ssz_types::{ + typenum::{U200, U400}, + VariableList, +}; + +use crate::{ + types::{ + distance::Distance, + ping_extensions::{CustomPayloadExtensionsFormat, ExtensionError, Extensions}, + portal_wire::CustomPayload, + }, + version::{ + APP_NAME, BUILD_ARCHITECTURE, BUILD_OPERATING_SYSTEM, PROGRAMMING_LANGUAGE_VERSION, + TRIN_SHORT_COMMIT, TRIN_VERSION, + }, +}; + +#[derive(PartialEq, Debug, Clone, Encode, Decode)] +pub struct ClientInfoRadiusCapabilities { + pub client_info: Option, + pub data_radius: Distance, + capabilities: VariableList, +} + +impl ClientInfoRadiusCapabilities { + pub fn new(radius: Distance, capabilities: Vec) -> Self { + Self { + client_info: Some(ClientInfo::trin_client_info()), + data_radius: radius, + capabilities: VariableList::from(capabilities), + } + } + + pub fn capabilities(&self) -> Result, ExtensionError> { + self.capabilities + .iter() + .map(|&value| Extensions::try_from(value)) + .collect::, _>>() + } +} + +impl From for CustomPayload { + fn from(client_info_radius_capacities: ClientInfoRadiusCapabilities) -> Self { + CustomPayload::from( + CustomPayloadExtensionsFormat { + r#type: 0, + payload: client_info_radius_capacities.as_ssz_bytes().into(), + } + .as_ssz_bytes(), + ) + } +} + +/// Information about the client. +/// example: trin/v0.1.1-892ad575/linux-x86_64/rustc1.81.0 +#[derive(PartialEq, Debug, Clone)] +pub struct ClientInfo { + pub client_name: String, + pub client_version: String, + pub short_commit: String, + pub operating_system: String, + pub cpu_architecture: String, + pub programming_language_version: String, +} + +impl ClientInfo { + pub fn trin_client_info() -> Self { + Self { + client_name: APP_NAME.to_string(), + client_version: TRIN_VERSION.to_string(), + short_commit: TRIN_SHORT_COMMIT.to_string(), + operating_system: BUILD_OPERATING_SYSTEM.to_string(), + cpu_architecture: BUILD_ARCHITECTURE.to_string(), + programming_language_version: format!("rustc{PROGRAMMING_LANGUAGE_VERSION}"), + } + } + + pub fn string(&self) -> String { + format!( + "{}/{}-{}/{}-{}/{}", + self.client_name, + self.client_version, + self.short_commit, + self.operating_system, + self.cpu_architecture, + self.programming_language_version + ) + } +} + +impl FromStr for ClientInfo { + type Err = anyhow::Error; + + fn from_str(string: &str) -> Result { + let parts: Vec<&str> = string.split('/').collect(); + + if parts.len() != 4 { + bail!("Invalid client info string: should have 4 /'s {}", string); + } + + let client_name = parts[0]; + + let Some((client_version, short_commit)) = parts[1].split('-').collect_tuple() else { + bail!( + "Invalid client info string: should look like 0.1.1-2b00d730 got {}", + parts[1] + ); + }; + + let Some((operating_system, cpu_architecture)) = parts[2].split('-').collect_tuple() else { + bail!( + "Invalid client info string: should look like linux-x86_64 got {}", + parts[2] + ); + }; + + Ok(Self { + client_name: client_name.to_string(), + client_version: client_version.to_string(), + short_commit: short_commit.to_string(), + operating_system: operating_system.to_string(), + cpu_architecture: cpu_architecture.to_string(), + programming_language_version: parts[3].to_string(), + }) + } +} + +impl Encode for ClientInfo { + fn is_ssz_fixed_len() -> bool { + false + } + + fn ssz_append(&self, buf: &mut Vec) { + let bytes: Vec = self.string().as_bytes().to_vec(); + let byte_list: VariableList = VariableList::from(bytes); + buf.extend_from_slice(&byte_list); + } + + fn ssz_bytes_len(&self) -> usize { + self.as_ssz_bytes().len() + } +} + +impl Decode for ClientInfo { + fn from_ssz_bytes(bytes: &[u8]) -> Result { + let byte_list = VariableList::::from_ssz_bytes(bytes)?; + let string = String::from_utf8(byte_list.to_vec()).map_err(|_| { + ssz::DecodeError::BytesInvalid(format!("Invalid utf8 string: {byte_list:?}")) + })?; + Self::from_str(&string).map_err(|err| { + ssz::DecodeError::BytesInvalid(format!("Failed to parse client info: {err:?}")) + }) + } + + fn is_ssz_fixed_len() -> bool { + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_client_info_round_trip() { + let client_info = ClientInfo::trin_client_info(); + let bytes = client_info.as_ssz_bytes(); + let decoded = ClientInfo::from_ssz_bytes(&bytes).unwrap(); + assert_eq!(client_info, decoded); + } + + #[test] + fn test_client_info_from_str() { + let client_info = ClientInfo::trin_client_info(); + let string = client_info.string(); + let decoded = ClientInfo::from_str(&string).unwrap(); + assert_eq!(client_info, decoded); + } + + #[test] + fn test_client_info_from_str_invalid() { + let string = "trin/0.1.1-2b00d730/linux-x86_64"; + let decoded = ClientInfo::from_str(string); + assert!(decoded.is_err()); + } + + #[test] + fn test_client_info_from_str_invalid_parts() { + let string = "trin/0.1.1-2b00d730/linux-x86_64/rustc1.81.0/extra"; + let decoded = ClientInfo::from_str(string); + assert!(decoded.is_err()); + } + + #[test] + fn test_client_info_from_str_invalid_version() { + let string = "trin/0.1.1/linux-x86_64/rustc1.81.0"; + let decoded = ClientInfo::from_str(string); + assert!(decoded.is_err()); + } + + #[test] + fn test_client_info_from_str_invalid_os() { + let string = "trin/0.1.1-2b00d730/linux/rustc1.81.0"; + let decoded = ClientInfo::from_str(string); + assert!(decoded.is_err()); + } +} diff --git a/ethportal-api/src/types/ping_extensions/extensions/type_1.rs b/ethportal-api/src/types/ping_extensions/extensions/type_1.rs new file mode 100644 index 000000000..a39bd1789 --- /dev/null +++ b/ethportal-api/src/types/ping_extensions/extensions/type_1.rs @@ -0,0 +1,29 @@ +use ssz::Encode; +use ssz_derive::{Decode, Encode}; + +use crate::types::{ + distance::Distance, ping_extensions::CustomPayloadExtensionsFormat, portal_wire::CustomPayload, +}; + +#[derive(PartialEq, Debug, Clone, Encode, Decode)] +pub struct BasicRadius { + pub data_radius: Distance, +} + +impl BasicRadius { + pub fn new(data_radius: Distance) -> Self { + Self { data_radius } + } +} + +impl From for CustomPayload { + fn from(basic_radius: BasicRadius) -> Self { + CustomPayload::from( + CustomPayloadExtensionsFormat { + r#type: 1, + payload: basic_radius.as_ssz_bytes().into(), + } + .as_ssz_bytes(), + ) + } +} diff --git a/ethportal-api/src/types/ping_extensions/extensions/type_2.rs b/ethportal-api/src/types/ping_extensions/extensions/type_2.rs new file mode 100644 index 000000000..12e342ba6 --- /dev/null +++ b/ethportal-api/src/types/ping_extensions/extensions/type_2.rs @@ -0,0 +1,33 @@ +use ssz::Encode; +use ssz_derive::{Decode, Encode}; + +use crate::types::{ + distance::Distance, ping_extensions::CustomPayloadExtensionsFormat, portal_wire::CustomPayload, +}; + +#[derive(PartialEq, Debug, Clone, Encode, Decode)] +pub struct HistoryRadius { + pub data_radius: Distance, + pub ephemeral_header_count: u16, +} + +impl HistoryRadius { + pub fn new(data_radius: Distance, ephemeral_header_count: u16) -> Self { + Self { + data_radius, + ephemeral_header_count, + } + } +} + +impl From for CustomPayload { + fn from(history_radius: HistoryRadius) -> Self { + CustomPayload::from( + CustomPayloadExtensionsFormat { + r#type: 2, + payload: history_radius.as_ssz_bytes().into(), + } + .as_ssz_bytes(), + ) + } +} diff --git a/ethportal-api/src/types/ping_extensions/extensions/type_65535.rs b/ethportal-api/src/types/ping_extensions/extensions/type_65535.rs new file mode 100644 index 000000000..3a39d7851 --- /dev/null +++ b/ethportal-api/src/types/ping_extensions/extensions/type_65535.rs @@ -0,0 +1,52 @@ +use ssz::Encode; +use ssz_derive::{Decode, Encode}; +use ssz_types::{typenum::U300, VariableList}; + +use crate::types::{ping_extensions::CustomPayloadExtensionsFormat, portal_wire::CustomPayload}; + +/// Used to response to pings which the node can't handle +#[derive(PartialEq, Debug, Clone, Encode, Decode)] + +pub struct PingError { + pub error_code: u16, + pub message: VariableList, +} + +impl PingError { + pub fn new(error_code: ErrorCodes) -> Self { + Self { + error_code: error_code.into(), + message: VariableList::empty(), + } + } +} + +impl From for CustomPayload { + fn from(ping_error: PingError) -> Self { + CustomPayload::from( + CustomPayloadExtensionsFormat { + r#type: 65535, + payload: ping_error.as_ssz_bytes().into(), + } + .as_ssz_bytes(), + ) + } +} + +pub enum ErrorCodes { + ExtensionNotSupported, + RequestedDataNotFound, + FailedToDecodePayload, + SystemError, +} + +impl From for u16 { + fn from(error_code: ErrorCodes) -> u16 { + match error_code { + ErrorCodes::ExtensionNotSupported => 0, + ErrorCodes::RequestedDataNotFound => 1, + ErrorCodes::FailedToDecodePayload => 2, + ErrorCodes::SystemError => 3, + } + } +} diff --git a/ethportal-api/src/types/ping_extensions/mod.rs b/ethportal-api/src/types/ping_extensions/mod.rs new file mode 100644 index 000000000..3749c9953 --- /dev/null +++ b/ethportal-api/src/types/ping_extensions/mod.rs @@ -0,0 +1,78 @@ +pub mod decode; +pub mod extensions; + +use ssz::Decode; +use ssz_derive::{Decode, Encode}; +use ssz_types::{ + typenum::{ + bit::{B0, B1}, + UInt, UTerm, + }, + VariableList, +}; + +use super::portal_wire::CustomPayload; + +// 1100 in binary is 10001001100 +pub type U1100 = UInt< + UInt< + UInt< + UInt, B0>, B0>, B0>, B1>, B0>, B0>, B1>, + B1, + >, + B0, + >, + B0, +>; + +#[derive(PartialEq, Debug, Encode, Decode)] +pub struct CustomPayloadExtensionsFormat { + pub r#type: u16, + pub payload: VariableList, +} + +impl TryFrom for CustomPayloadExtensionsFormat { + type Error = anyhow::Error; + + fn try_from(value: CustomPayload) -> Result { + CustomPayloadExtensionsFormat::from_ssz_bytes(&value.payload) + .map_err(|e| anyhow::anyhow!("Failed to decode CustomPayloadExtensionsFormat: {:?}", e)) + } +} + +#[derive(PartialEq, Debug, Clone, Copy, Eq)] +pub enum Extensions { + Capabilities, + BasicRadius, + HistoryRadius, + Error, +} + +impl TryFrom for Extensions { + type Error = ExtensionError; + + fn try_from(value: u16) -> Result { + match value { + 0 => Ok(Extensions::Capabilities), + 1 => Ok(Extensions::BasicRadius), + 2 => Ok(Extensions::HistoryRadius), + 65535 => Ok(Extensions::Error), + _ => Err(ExtensionError::NonSupportedExtension(value)), + } + } +} + +impl From for u16 { + fn from(value: Extensions) -> u16 { + match value { + Extensions::Capabilities => 0, + Extensions::BasicRadius => 1, + Extensions::HistoryRadius => 2, + Extensions::Error => 65535, + } + } +} + +pub enum ExtensionError { + NonSupportedExtension(u16), +} diff --git a/ethportal-api/src/types/portal_wire.rs b/ethportal-api/src/types/portal_wire.rs index 7ebe95285..e76aa5010 100644 --- a/ethportal-api/src/types/portal_wire.rs +++ b/ethportal-api/src/types/portal_wire.rs @@ -74,7 +74,7 @@ pub const MAX_PORTAL_CONTENT_PAYLOAD_SIZE: usize = MAX_DISCV5_TALK_REQ_PAYLOAD_S /// Custom payload element of Ping and Pong overlay messages #[derive(Debug, PartialEq, Clone)] pub struct CustomPayload { - payload: ByteList2048, + pub payload: ByteList2048, } impl TryFrom<&Value> for CustomPayload { diff --git a/ethportal-api/src/version.rs b/ethportal-api/src/version.rs index ce385d21c..c48b9ea65 100644 --- a/ethportal-api/src/version.rs +++ b/ethportal-api/src/version.rs @@ -1,3 +1,5 @@ +pub const APP_NAME: &str = "trin"; + /// The latest git commit hash of the build. pub const TRIN_FULL_COMMIT: &str = env!("VERGEN_GIT_SHA"); pub const TRIN_SHORT_COMMIT: &str = const_format::str_index!(TRIN_FULL_COMMIT, ..8); diff --git a/portalnet/src/overlay/mod.rs b/portalnet/src/overlay/mod.rs index 87b5641fd..918f4cc88 100644 --- a/portalnet/src/overlay/mod.rs +++ b/portalnet/src/overlay/mod.rs @@ -1,6 +1,7 @@ pub mod command; pub mod config; pub mod errors; +pub mod ping_extensions; pub mod protocol; pub mod request; pub mod service; diff --git a/portalnet/src/overlay/ping_extensions.rs b/portalnet/src/overlay/ping_extensions.rs new file mode 100644 index 000000000..5148ae749 --- /dev/null +++ b/portalnet/src/overlay/ping_extensions.rs @@ -0,0 +1,33 @@ +use ethportal_api::types::ping_extensions::Extensions; + +pub trait PingExtension { + fn is_supported(&self, extension: Extensions) -> bool; + + /// Returns the newest extension that is supported by both clients, used for extended ping + /// responses. + fn newest_commonly_supported_base_extension( + &self, + extensions: &[Extensions], + ) -> Option; + + fn raw_extensions(&self) -> Vec; +} + +pub struct MockPingExtension {} + +impl PingExtension for MockPingExtension { + fn is_supported(&self, _extension: Extensions) -> bool { + true + } + + fn newest_commonly_supported_base_extension( + &self, + _extensions: &[Extensions], + ) -> Option { + Some(Extensions::HistoryRadius) + } + + fn raw_extensions(&self) -> Vec { + vec![0, 1] + } +} diff --git a/portalnet/src/overlay/protocol.rs b/portalnet/src/overlay/protocol.rs index ffb70f672..b3dd3b648 100644 --- a/portalnet/src/overlay/protocol.rs +++ b/portalnet/src/overlay/protocol.rs @@ -20,9 +20,10 @@ use ethportal_api::{ distance::{Distance, Metric}, enr::Enr, network::Subnetwork, + ping_extensions::extensions::type_0::ClientInfoRadiusCapabilities, portal_wire::{ - Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, OfferTrace, - Ping, Pong, PopulatedOffer, PopulatedOfferWithResult, Request, Response, + Accept, Content, FindContent, FindNodes, Message, Nodes, OfferTrace, Ping, Pong, + PopulatedOffer, PopulatedOfferWithResult, Request, Response, }, }, utils::bytes::hex_encode, @@ -30,7 +31,6 @@ use ethportal_api::{ }; use futures::channel::oneshot; use parking_lot::RwLock; -use ssz::Encode; use tokio::sync::{broadcast, mpsc::UnboundedSender}; use tracing::{debug, error, info, warn}; use trin_metrics::{overlay::OverlayMetricsReporter, portalnet::PORTALNET_METRICS}; @@ -38,6 +38,7 @@ use trin_storage::ContentStore; use trin_validation::validator::{ValidationResult, Validator}; use utp_rs::socket::UtpSocket; +use super::ping_extensions::PingExtension; use crate::{ bootnodes::Bootnode, discovery::{Discovery, UtpEnr}, @@ -63,7 +64,7 @@ use crate::{ /// implement the overlay protocol and the overlay protocol is where we can encapsulate the logic /// for handling common network requests/responses. #[derive(Clone)] -pub struct OverlayProtocol { +pub struct OverlayProtocol { /// Reference to the underlying discv5 protocol pub discovery: Arc, /// The data store. @@ -86,6 +87,8 @@ pub struct OverlayProtocol { validator: Arc, /// Runtime telemetry metrics for the overlay network. metrics: OverlayMetricsReporter, + /// Ping extensions for the overlay network. + ping_extensions: Arc, } impl< @@ -93,7 +96,8 @@ impl< TMetric: Metric + Send + Sync, TValidator: 'static + Validator + Send + Sync, TStore: 'static + ContentStore + Send + Sync, - > OverlayProtocol + TPingExtensions: 'static + PingExtension + Send + Sync, + > OverlayProtocol { pub async fn new( config: OverlayConfig, @@ -102,6 +106,7 @@ impl< store: Arc>, protocol: Subnetwork, validator: Arc, + ping_extensions: Arc, ) -> Self { let local_node_id = discovery.local_enr().node_id(); let kbuckets = SharedKBucketsTable::new(KBucketsTable::new( @@ -121,25 +126,27 @@ impl< utp_socket, metrics.clone(), )); - let command_tx = OverlayService::::spawn( - Arc::clone(&discovery), - Arc::clone(&store), - kbuckets.clone(), - config.bootnode_enrs, - config.ping_queue_interval, - protocol, - Arc::clone(&utp_controller), - metrics.clone(), - Arc::clone(&validator), - config.query_timeout, - config.query_peer_timeout, - config.query_parallelism, - config.query_num_results, - config.findnodes_query_distances_per_peer, - config.disable_poke, - config.gossip_dropped, - ) - .await; + let command_tx = + OverlayService::::spawn( + Arc::clone(&discovery), + Arc::clone(&store), + kbuckets.clone(), + config.bootnode_enrs, + config.ping_queue_interval, + protocol, + Arc::clone(&utp_controller), + metrics.clone(), + Arc::clone(&validator), + config.query_timeout, + config.query_peer_timeout, + config.query_parallelism, + config.query_num_results, + config.findnodes_query_distances_per_peer, + config.disable_poke, + config.gossip_dropped, + Arc::clone(&ping_extensions), + ) + .await; Self { discovery, @@ -152,6 +159,7 @@ impl< _phantom_metric: PhantomData, validator, metrics, + ping_extensions, } } @@ -246,10 +254,7 @@ impl< /// `AddEnr` adds requested `enr` to our kbucket. pub fn add_enr(&self, enr: Enr) -> Result<(), OverlayRequestError> { match self.kbuckets.insert_or_update( - Node { - enr, - data_radius: Distance::MAX, - }, + Node::new(enr.clone(), Distance::MAX), NodeStatus { state: ConnectionState::Connected, direction: ConnectionDirection::Incoming, @@ -337,7 +342,9 @@ impl< // Construct the request. let enr_seq = self.discovery.local_enr().seq(); let data_radius = self.data_radius(); - let custom_payload = CustomPayload::from(data_radius.as_ssz_bytes()); + let custom_payload = + ClientInfoRadiusCapabilities::new(data_radius, self.ping_extensions.raw_extensions()) + .into(); let request = Ping { enr_seq, custom_payload, diff --git a/portalnet/src/overlay/service.rs b/portalnet/src/overlay/service/mod.rs similarity index 95% rename from portalnet/src/overlay/service.rs rename to portalnet/src/overlay/service/mod.rs index 68978b839..743864784 100644 --- a/portalnet/src/overlay/service.rs +++ b/portalnet/src/overlay/service/mod.rs @@ -1,3 +1,5 @@ +pub mod ping; + use std::{ collections::HashMap, marker::{PhantomData, Sync}, @@ -25,9 +27,9 @@ use ethportal_api::{ enr::{Enr, SszEnr}, network::Subnetwork, portal_wire::{ - Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, Offer, - OfferTrace, Ping, Pong, PopulatedOffer, Request, Response, - MAX_PORTAL_CONTENT_PAYLOAD_SIZE, MAX_PORTAL_NODES_ENRS_SIZE, + Accept, Content, FindContent, FindNodes, Message, Nodes, Offer, OfferTrace, + PopulatedOffer, Request, Response, MAX_PORTAL_CONTENT_PAYLOAD_SIZE, + MAX_PORTAL_NODES_ENRS_SIZE, }, query_trace::{QueryFailureKind, QueryTrace}, }, @@ -54,6 +56,7 @@ use trin_storage::{ContentStore, ShouldWeStoreContent}; use trin_validation::validator::Validator; use utp_rs::cid::ConnectionId; +use super::ping_extensions::PingExtension; use crate::{ accept_queue::AcceptQueue, discovery::{Discovery, UtpEnr}, @@ -105,7 +108,7 @@ const BUCKET_REFRESH_INTERVAL_SECS: u64 = 60; const EVENT_STREAM_CHANNEL_CAPACITY: usize = 10; /// The overlay service. -pub struct OverlayService +pub struct OverlayService where TContentKey: OverlayContentKey, { @@ -169,6 +172,8 @@ where gossip_dropped: bool, /// Accept Queue for inbound content keys accept_queue: Arc>>, + /// Ping extensions for the overlay network. + ping_extensions: Arc, } impl< @@ -176,7 +181,8 @@ impl< TMetric: Metric + Send + Sync, TValidator: 'static + Validator + Send + Sync, TStore: 'static + ContentStore + Send + Sync, - > OverlayService + TPingExtensions: 'static + PingExtension + Send + Sync, + > OverlayService { /// Spawns the overlay network service. /// @@ -201,6 +207,7 @@ impl< findnodes_query_distances_per_peer: usize, disable_poke: bool, gossip_dropped: bool, + ping_extensions: Arc, ) -> UnboundedSender> { let (command_tx, command_rx) = mpsc::unbounded_channel(); let internal_command_tx = command_tx.clone(); @@ -246,6 +253,7 @@ impl< disable_poke, gossip_dropped, accept_queue: Arc::new(RwLock::new(AcceptQueue::default())), + ping_extensions, }; info!(protocol = %protocol, "Starting overlay service"); @@ -394,18 +402,18 @@ impl< } Some(Ok(node_id)) = self.peers_to_ping.next() => { if let Some(node) = self.kbuckets.entry(node_id).present() { - self.ping_node(&node.enr); + self.ping_node(node); self.peers_to_ping.insert(node_id); } } - query_event = OverlayService::::query_event_poll(&mut self.find_node_query_pool) => { + query_event = OverlayService::::query_event_poll(&mut self.find_node_query_pool) => { self.handle_find_nodes_query_event(query_event); } // Handle query events for queries in the find content query pool. - query_event = OverlayService::::query_event_poll(&mut self.find_content_query_pool) => { + query_event = OverlayService::::query_event_poll(&mut self.find_content_query_pool) => { self.handle_find_content_query_event(query_event); } - _ = OverlayService::::bucket_maintenance_poll(self.protocol, &self.kbuckets) => {} + _ = OverlayService::::bucket_maintenance_poll(self.protocol, &self.kbuckets) => {} Some(trace_event) = self.content_query_trace_events_rx.recv() => { self.track_content_query_trace_event(trace_event); } @@ -954,24 +962,6 @@ impl< } } - /// Builds a `Pong` response for a `Ping` request. - fn handle_ping(&self, request: Ping, source: &NodeId, request_id: RequestId) -> Pong { - trace!( - protocol = %self.protocol, - request.source = %source, - request.discv5.id = %request_id, - "Handling Ping message {request}", - ); - - let enr_seq = self.local_enr().seq(); - let data_radius = self.data_radius(); - let custom_payload = CustomPayload::from(data_radius.as_ssz_bytes()); - Pong { - enr_seq, - custom_payload, - } - } - /// Builds a `Nodes` response for a `FindNodes` request. fn handle_find_nodes( &self, @@ -1392,36 +1382,12 @@ impl< // peer. if let Some(node_addr) = self.discovery.cached_node_addr(&source) { // TODO: Decide default data radius, and define a constant. - let node = Node { - enr: node_addr.enr, - data_radius: Distance::MAX, - }; + let node = Node::new(node_addr.enr, Distance::MAX); self.connect_node(node, ConnectionDirection::Incoming); } } } - /// Processes a ping request from some source node. - fn process_ping(&self, ping: Ping, source: NodeId) { - // If the node is in the routing table, then check if we need to update the node. - if let Some(node) = self.kbuckets.entry(source).present_or_pending() { - // TODO: How do we handle data in the custom payload? This is unique to each overlay - // network, so there may need to be some way to parameterize the update for a - // ping/pong. - - // If the ENR sequence number in pong is less than the ENR sequence number for the - // routing table entry, then request the node. - if node.enr().seq() < ping.enr_seq { - self.request_node(&node.enr()); - } - - let data_radius: Distance = ping.custom_payload.into(); - if node.data_radius != data_radius { - self.update_node_radius(node.enr(), data_radius); - } - } - } - /// Processes a failed request intended for some destination node. fn process_request_failure( &mut self, @@ -1798,39 +1764,11 @@ impl< Ok(()) } - /// Processes a Pong response. - /// - /// Refreshes the node if necessary. Attempts to mark the node as connected. - fn process_pong(&self, pong: Pong, source: Enr) { - let node_id = source.node_id(); - trace!( - protocol = %self.protocol, - response.source = %node_id, - "Processing Pong message {pong}" - ); - - // If the ENR sequence number in pong is less than the ENR sequence number for the routing - // table entry, then request the node. - // - // TODO: Perform update on non-ENR node entry state. See note in `process_ping`. - if let Some(node) = self.kbuckets.entry(node_id).present_or_pending() { - if node.enr().seq() < pong.enr_seq { - self.request_node(&node.enr()); - } - - let data_radius: Distance = pong.custom_payload.into(); - if node.data_radius != data_radius { - self.update_node_radius(source, data_radius); - } - } - } - /// Update the recorded radius of a node in our routing table. - fn update_node_radius(&self, enr: Enr, data_radius: Distance) { - let node_id = enr.node_id(); + fn update_node(&self, node: Node) { + let node_id = node.enr.node_id(); - let updated_node = Node::new(enr, data_radius); - if let UpdateResult::Failed(_) = self.kbuckets.update_node(updated_node, None) { + if let UpdateResult::Failed(_) = self.kbuckets.update_node(node, None) { error!( "Failed to update radius of node {}", hex_encode_compact(node_id.raw()) @@ -2179,35 +2117,6 @@ impl< } } - /// Submits a request to ping a destination (target) node. - /// - /// This can block the thread, so make sure you are not holding any lock while calling this. - fn ping_node(&self, destination: &Enr) { - trace!( - protocol = %self.protocol, - request.dest = %destination.node_id(), - "Sending Ping message", - ); - - let enr_seq = self.local_enr().seq(); - let data_radius = self.data_radius(); - let custom_payload = CustomPayload::from(data_radius.as_ssz_bytes()); - let ping = Request::Ping(Ping { - enr_seq, - custom_payload, - }); - let request = OverlayRequest::new( - ping, - RequestDirection::Outgoing { - destination: destination.clone(), - }, - None, - None, - None, - ); - let _ = self.command_tx.send(OverlayCommand::Request(request)); - } - /// Submits a request for the node info of a destination (target) node. fn request_node(&self, destination: &Enr) { let find_nodes = Request::FindNodes(FindNodes { distances: vec![0] }); @@ -2247,7 +2156,7 @@ impl< // currently considered disconnected. This node should be pinged to check // for connectivity. if let Some(node) = self.kbuckets.entry(disconnected.into_preimage()).present() { - self.ping_node(&node.enr); + self.ping_node(node); } } InsertResult::StatusUpdated { @@ -2538,15 +2447,18 @@ where gossip_dropped: bool, } -impl - From<&OverlayService> +impl + From<&OverlayService> for UtpProcessing where TContentKey: OverlayContentKey + Send + Sync, TValidator: Validator, TStore: ContentStore, + TPingExtensions: PingExtension, { - fn from(service: &OverlayService) -> Self { + fn from( + service: &OverlayService, + ) -> Self { Self { validator: Arc::clone(&service.validator), store: Arc::clone(&service.store), @@ -2611,8 +2523,11 @@ mod tests { use alloy::primitives::U256; use discv5::kbucket; use ethportal_api::types::{ - content_key::overlay::IdentityContentKey, distance::XorMetric, - enr::generate_random_remote_enr, portal_wire::MAINNET, + content_key::overlay::IdentityContentKey, + distance::XorMetric, + enr::generate_random_remote_enr, + ping_extensions::extensions::type_0::ClientInfoRadiusCapabilities, + portal_wire::{Ping, Pong, MAINNET}, }; use kbucket::KBucketsTable; use rstest::*; @@ -2631,7 +2546,7 @@ mod tests { config::PortalnetConfig, constants::{DEFAULT_DISCOVERY_PORT, DEFAULT_UTP_TRANSFER_LIMIT}, discovery::{Discovery, NodeAddress}, - overlay::config::OverlayConfig, + overlay::{config::OverlayConfig, ping_extensions::MockPingExtension}, }; macro_rules! poll_command_rx { @@ -2640,8 +2555,13 @@ mod tests { }; } - fn build_service( - ) -> OverlayService { + fn build_service() -> OverlayService< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + > { let portal_config = PortalnetConfig { no_stun: true, no_upnp: true, @@ -2722,6 +2642,7 @@ mod tests { disable_poke: false, gossip_dropped: false, accept_queue, + ping_extensions: Arc::new(MockPingExtension {}), } } @@ -2744,7 +2665,11 @@ mod tests { let ping = Ping { enr_seq: source.seq() + 1, - custom_payload: CustomPayload::from(data_radius.as_ssz_bytes()), + custom_payload: ClientInfoRadiusCapabilities::new( + data_radius, + service.ping_extensions.raw_extensions(), + ) + .into(), }; service.process_ping(ping, node_id); @@ -2789,7 +2714,11 @@ mod tests { let ping = Ping { enr_seq: source.seq(), - custom_payload: CustomPayload::from(data_radius.as_ssz_bytes()), + custom_payload: ClientInfoRadiusCapabilities::new( + data_radius, + service.ping_extensions.raw_extensions(), + ) + .into(), }; service.process_ping(ping, node_id); @@ -2850,7 +2779,11 @@ mod tests { let pong = Pong { enr_seq: source.seq() + 1, - custom_payload: CustomPayload::from(data_radius.as_ssz_bytes()), + custom_payload: ClientInfoRadiusCapabilities::new( + data_radius, + service.ping_extensions.raw_extensions(), + ) + .into(), }; service.process_pong(pong, source.clone()); @@ -2894,7 +2827,11 @@ mod tests { let pong = Pong { enr_seq: source.seq(), - custom_payload: CustomPayload::from(data_radius.as_ssz_bytes()), + custom_payload: ClientInfoRadiusCapabilities::new( + data_radius, + service.ping_extensions.raw_extensions(), + ) + .into(), }; service.process_pong(pong, source); @@ -3032,7 +2969,13 @@ mod tests { let peer_node_ids: Vec = vec![peer.enr.node_id()]; // Node has maximum radius, so there should be one offer in the channel. - OverlayService::::poke_content( + OverlayService::< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::poke_content( &service.kbuckets, service.command_tx.clone(), content_key, @@ -3070,7 +3013,13 @@ mod tests { let peer_node_ids: Vec = peers.iter().map(|p| p.node_id()).collect(); // No nodes in the routing table, so no commands should be in the channel. - OverlayService::::poke_content( + OverlayService::< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::poke_content( &service.kbuckets, service.command_tx.clone(), content_key, @@ -3108,7 +3057,13 @@ mod tests { let peer_node_ids: Vec = peers.iter().map(|p| p.enr.node_id()).collect(); // One offer should be in the channel for the maximum radius node. - OverlayService::::poke_content( + OverlayService::< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::poke_content( &service.kbuckets, service.command_tx.clone(), content_key, @@ -3173,7 +3128,7 @@ mod tests { let mut service = task::spawn(build_service()); let (_, destination) = generate_random_remote_enr(); - service.ping_node(&destination); + service.ping_node(Node::new(destination.clone(), Distance::MAX)); let command = assert_ready!(poll_command_rx!(service)); assert!(command.is_some()); @@ -3382,6 +3337,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; match event { @@ -3418,6 +3374,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; @@ -3435,6 +3392,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; @@ -3459,6 +3417,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; @@ -3502,6 +3461,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; @@ -3547,10 +3507,7 @@ mod tests { let (_, bootnode_enr) = generate_random_remote_enr(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3617,10 +3574,7 @@ mod tests { let (_, bootnode_enr) = generate_random_remote_enr(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3678,10 +3632,7 @@ mod tests { let bootnode_node_id = bootnode_enr.node_id(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3741,10 +3692,7 @@ mod tests { let bootnode_node_id = bootnode_enr.node_id(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3806,10 +3754,7 @@ mod tests { let (_, bootnode_enr) = generate_random_remote_enr(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3830,11 +3775,14 @@ mod tests { ); let query_id = query_id.expect("Query ID for new find content query is `None`"); - let query_event = - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( - &mut service.find_content_query_pool, - ) - .await; + let query_event = OverlayService::< + _, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::query_event_poll(&mut service.find_content_query_pool) + .await; // Query event should be `Waiting` variant. assert!(matches!(query_event, QueryEvent::Waiting(_, _, _))); @@ -3869,11 +3817,14 @@ mod tests { content.clone(), ); - let query_event = - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( - &mut service.find_content_query_pool, - ) - .await; + let query_event = OverlayService::< + _, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::query_event_poll(&mut service.find_content_query_pool) + .await; // Query event should be `Validating` variant. assert!(matches!(query_event, QueryEvent::Validating(..))); @@ -3883,13 +3834,18 @@ mod tests { // Poll until content is validated let mut attempts_left = 5; let query_event = loop { - let polled = timeout( - Duration::from_millis(10), - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( - &mut service.find_content_query_pool, - ), - ) - .await; + let polled = + timeout( + Duration::from_millis(10), + OverlayService::< + _, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::query_event_poll(&mut service.find_content_query_pool), + ) + .await; if let Ok(query_event) = polled { break query_event; } diff --git a/portalnet/src/overlay/service/ping.rs b/portalnet/src/overlay/service/ping.rs new file mode 100644 index 000000000..8d97ea963 --- /dev/null +++ b/portalnet/src/overlay/service/ping.rs @@ -0,0 +1,369 @@ +use std::marker::Sync; + +use discv5::{enr::NodeId, rpc::RequestId}; +use ethportal_api::{ + types::{ + distance::Metric, + enr::Enr, + ping_extensions::{ + decode::DecodedExtension, + extensions::{ + type_0::ClientInfoRadiusCapabilities, + type_1::BasicRadius, + type_2::HistoryRadius, + type_65535::{ErrorCodes, PingError}, + }, + CustomPayloadExtensionsFormat, Extensions, + }, + portal_wire::{CustomPayload, Ping, Pong, Request}, + }, + OverlayContentKey, +}; +use tracing::{trace, warn}; +use trin_storage::ContentStore; +use trin_validation::validator::Validator; + +use super::OverlayService; +use crate::{ + overlay::{ + command::OverlayCommand, + ping_extensions::PingExtension, + request::{OverlayRequest, RequestDirection}, + }, + types::node::Node, +}; + +impl< + TContentKey: 'static + OverlayContentKey + Send + Sync, + TMetric: Metric + Send + Sync, + TValidator: 'static + Validator + Send + Sync, + TStore: 'static + ContentStore + Send + Sync, + TPingExtensions: 'static + PingExtension + Send + Sync, + > OverlayService +{ + /// Builds a `Pong` response for a `Ping` request. + pub(super) fn handle_ping( + &self, + request: Ping, + source: &NodeId, + request_id: RequestId, + ) -> Pong { + trace!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Handling Ping message {request}", + ); + + let Ok(ping_custom_payload): anyhow::Result = + request.custom_payload.clone().try_into() + else { + warn!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Invalid Ping message: {request}", + ); + return Pong { + enr_seq: self.local_enr().seq(), + custom_payload: PingError::new(ErrorCodes::FailedToDecodePayload).into(), + }; + }; + + let extension_type: Extensions = match ping_custom_payload.r#type.try_into() { + Ok(extension) => extension, + Err(_) => { + warn!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Invalid Ping message: {request}", + ); + return Pong { + enr_seq: self.local_enr().seq(), + custom_payload: PingError::new(ErrorCodes::ExtensionNotSupported).into(), + }; + } + }; + + if !self.ping_extensions.is_supported(extension_type) { + warn!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Unsupported extension type: {extension_type:?}", + ); + return Pong { + enr_seq: self.local_enr().seq(), + custom_payload: PingError::new(ErrorCodes::ExtensionNotSupported).into(), + }; + } + + match extension_type { + Extensions::Capabilities => Pong { + enr_seq: self.local_enr().seq(), + custom_payload: self.create_capabilities().into(), + }, + Extensions::BasicRadius => { + let data_radius = self.data_radius(); + let basic_payload = BasicRadius { data_radius }; + Pong { + enr_seq: self.local_enr().seq(), + custom_payload: basic_payload.into(), + } + } + Extensions::HistoryRadius => { + let history_payload = HistoryRadius { + data_radius: self.data_radius(), + ephemeral_header_count: 0, + }; + Pong { + enr_seq: self.local_enr().seq(), + custom_payload: history_payload.into(), + } + } + Extensions::Error => { + warn!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Invalid Ping message, Error's should only be received from pongs: {request}", + ); + Pong { + enr_seq: self.local_enr().seq(), + custom_payload: PingError::new(ErrorCodes::SystemError).into(), + } + } + } + } + + /// Processes a ping request from some source node. + pub(super) fn process_ping(&self, ping: Ping, source: NodeId) { + // If the node is in the routing table, then check if we need to update the node. + if let Some(mut node) = self.kbuckets.entry(source).present_or_pending() { + // TODO: How do we handle data in the custom payload? This is unique to each overlay + // network, so there may need to be some way to parameterize the update for a + // ping/pong. + + // If the ENR sequence number in pong is less than the ENR sequence number for the + // routing table entry, then request the node. + if node.enr().seq() < ping.enr_seq { + self.request_node(&node.enr()); + } + + let extension = match DecodedExtension::try_from(ping.custom_payload) { + Ok(extension) => extension, + Err(err) => { + warn!( + protocol = %self.protocol, + request.source = %source, + "Failed to decode custom payload during process_ping: {err:?}", + ); + return; + } + }; + + if !self.ping_extensions.is_supported(extension.clone().into()) { + warn!( + protocol = %self.protocol, + request.source = %source, + "Extension type isn't supported on this subnetwork: {extension:?}", + ); + return; + } + + match extension { + DecodedExtension::Capabilities(radius_capabilities) => { + let Ok(capabilities) = radius_capabilities.capabilities() else { + warn!( + protocol = %self.protocol, + request.source = %source, + "Capabilities weren't decoded correctly", + ); + return; + }; + if node.data_radius != radius_capabilities.data_radius + || node.capabilities != Some(capabilities.clone()) + { + node.set_data_radius(radius_capabilities.data_radius); + node.set_capabilities(capabilities); + self.update_node(node); + } + } + DecodedExtension::BasicRadius(basic_radius) => { + let data_radius = basic_radius.data_radius; + if node.data_radius != data_radius { + node.set_data_radius(data_radius); + self.update_node(node); + } + } + DecodedExtension::HistoryRadius(history_radius) => { + let data_radius = history_radius.data_radius; + let ephemeral_header_count = history_radius.ephemeral_header_count; + if node.data_radius != data_radius + || node.ephemeral_header_count != Some(ephemeral_header_count) + { + node.set_data_radius(data_radius); + node.set_ephemeral_header_count(ephemeral_header_count); + self.update_node(node); + } + } + DecodedExtension::Error(ping_error) => { + warn!( + protocol = %self.protocol, + request.source = %source, + "Received an error response from a ping request: {ping_error:?}", + ); + } + } + } + } + + /// Processes a Pong response. + /// + /// Refreshes the node if necessary. Attempts to mark the node as connected. + pub(super) fn process_pong(&self, pong: Pong, source: Enr) { + let node_id = source.node_id(); + trace!( + protocol = %self.protocol, + response.source = %node_id, + "Processing Pong message {pong}" + ); + + // If the ENR sequence number in pong is less than the ENR sequence number for the routing + // table entry, then request the node. + // + // TODO: Perform update on non-ENR node entry state. See note in `process_ping`. + if let Some(mut node) = self.kbuckets.entry(node_id).present_or_pending() { + if node.enr().seq() < pong.enr_seq { + self.request_node(&node.enr()); + } + + let extension = match DecodedExtension::try_from(pong.custom_payload) { + Ok(extension) => extension, + Err(err) => { + warn!( + protocol = %self.protocol, + request.source = %source, + "Failed to decode custom payload during process_ping: {err:?}", + ); + return; + } + }; + + if !self.ping_extensions.is_supported(extension.clone().into()) { + warn!( + protocol = %self.protocol, + request.source = %source, + "Extension type isn't supported on this subnetwork: {extension:?}", + ); + return; + } + + match extension { + DecodedExtension::Capabilities(radius_capabilities) => { + let Ok(capabilities) = radius_capabilities.capabilities() else { + warn!( + protocol = %self.protocol, + request.source = %source, + "Capabilities weren't decoded correctly", + ); + return; + }; + if node.data_radius != radius_capabilities.data_radius + || node.compare_capabilities(&capabilities) + { + node.set_data_radius(radius_capabilities.data_radius); + node.set_capabilities(capabilities); + self.update_node(node); + } + } + DecodedExtension::BasicRadius(basic_radius) => { + let data_radius = basic_radius.data_radius; + if node.data_radius != data_radius { + node.set_data_radius(data_radius); + self.update_node(node); + } + } + DecodedExtension::HistoryRadius(history_radius) => { + let data_radius = history_radius.data_radius; + let ephemeral_header_count = history_radius.ephemeral_header_count; + if node.data_radius != data_radius + || node.ephemeral_header_count != Some(ephemeral_header_count) + { + node.set_data_radius(data_radius); + node.set_ephemeral_header_count(ephemeral_header_count); + self.update_node(node); + } + } + DecodedExtension::Error(ping_error) => { + warn!( + protocol = %self.protocol, + request.source = %source, + "Received an error response from a pong request: {ping_error:?}", + ); + } + } + } + } + + fn create_capabilities(&self) -> ClientInfoRadiusCapabilities { + ClientInfoRadiusCapabilities::new(self.data_radius(), self.ping_extensions.raw_extensions()) + } + + fn handle_base_extension(&self, extension: Extensions, node_id: NodeId) -> CustomPayload { + match extension { + Extensions::BasicRadius => BasicRadius { + data_radius: self.data_radius(), + } + .into(), + Extensions::HistoryRadius => HistoryRadius { + data_radius: self.data_radius(), + ephemeral_header_count: 0, + } + .into(), + _ => { + warn!( + protocol = %self.protocol, + request.dest = %node_id, + "Base extension wasn't implemented: {extension:?}, sending Capabilities instead this is a bug", + ); + self.create_capabilities().into() + } + } + } + + /// Submits a request to ping a destination (target) node. + /// + /// This can block the thread, so make sure you are not holding any lock while calling this. + pub(super) fn ping_node(&self, node: Node) { + trace!( + protocol = %self.protocol, + request.dest = %node.enr.node_id(), + "Sending Ping message", + ); + + let custom_payload = match node.capabilities().map(|capabilities| { + self.ping_extensions + .newest_commonly_supported_base_extension(capabilities) + }) { + Some(Some(extension)) => self.handle_base_extension(extension, node.enr.node_id()), + _ => self.create_capabilities().into(), + }; + + let ping = Request::Ping(Ping { + enr_seq: self.local_enr().seq(), + custom_payload, + }); + let request = OverlayRequest::new( + ping, + RequestDirection::Outgoing { + destination: node.enr.clone(), + }, + None, + None, + None, + ); + let _ = self.command_tx.send(OverlayCommand::Request(request)); + } +} diff --git a/portalnet/src/types/kbucket.rs b/portalnet/src/types/kbucket.rs index 66f5481a7..63f3ad595 100644 --- a/portalnet/src/types/kbucket.rs +++ b/portalnet/src/types/kbucket.rs @@ -149,10 +149,9 @@ impl SharedKBucketsTable { // If the node is not in the routing table, then insert the node in a disconnected state // (a subsequent ping will establish connectivity with the node). Ignore insertion // failures. - if let Some(node) = Entry::from(kbuckets.entry(&key)).present_or_pending() { + if let Some(mut node) = Entry::from(kbuckets.entry(&key)).present_or_pending() { if node.enr.seq() < enr.seq() { - let node = Node::new(enr, node.data_radius()); - + node.set_enr(enr); if let UpdateResult::Failed(reason) = kbuckets.update_node(&key, node, None) { // The update removed the node because it would violate the incoming peers // condition or a bucket/table filter. diff --git a/portalnet/src/types/node.rs b/portalnet/src/types/node.rs index 40c0a032e..bf330b171 100644 --- a/portalnet/src/types/node.rs +++ b/portalnet/src/types/node.rs @@ -1,20 +1,32 @@ use std::fmt; -use ethportal_api::types::{distance::Distance, enr::Enr}; +use ethportal_api::types::{distance::Distance, enr::Enr, ping_extensions::Extensions}; /// A node in the overlay network routing table. #[derive(Clone, Debug, Eq, PartialEq)] pub struct Node { /// The node's ENR. pub enr: Enr, + /// The node's data radius. pub data_radius: Distance, + + /// The node's capabilities. + pub capabilities: Option>, + + /// The node's ephemeral header count (only used for History Network) + pub ephemeral_header_count: Option, } impl Node { /// Creates a new node. pub fn new(enr: Enr, data_radius: Distance) -> Node { - Node { enr, data_radius } + Node { + enr, + data_radius, + capabilities: None, + ephemeral_header_count: None, + } } /// Returns the ENR of the node. @@ -27,6 +39,26 @@ impl Node { self.data_radius } + /// Returns the capabilities of the node. + pub fn capabilities(&self) -> Option<&[Extensions]> { + self.capabilities.as_deref() + } + + /// Compares the capabilities of the node with the given capabilities. + /// Returns true if the capabilities are the same. + pub fn compare_capabilities(&self, capabilities: &[Extensions]) -> bool { + if let Some(node_capabilities) = &self.capabilities { + capabilities.iter().all(|c| node_capabilities.contains(c)) + } else { + false + } + } + + /// Returns the ephemeral header count of the node. + pub fn ephemeral_header_count(&self) -> Option { + self.ephemeral_header_count + } + /// Sets the ENR of the node. pub fn set_enr(&mut self, enr: Enr) { self.enr = enr; @@ -36,6 +68,16 @@ impl Node { pub fn set_data_radius(&mut self, radius: Distance) { self.data_radius = radius; } + + /// Sets the capabilities of the node. + pub fn set_capabilities(&mut self, capabilities: Vec) { + self.capabilities = Some(capabilities); + } + + /// Sets the ephemeral header count of the node. + pub fn set_ephemeral_header_count(&mut self, count: u16) { + self.ephemeral_header_count = Some(count); + } } impl fmt::Display for Node { diff --git a/portalnet/tests/overlay.rs b/portalnet/tests/overlay.rs index b14427b72..8ed095ab3 100644 --- a/portalnet/tests/overlay.rs +++ b/portalnet/tests/overlay.rs @@ -21,6 +21,7 @@ use portalnet::{ discovery::{Discovery, Discv5UdpSocket}, overlay::{ config::{FindContentConfig, OverlayConfig}, + ping_extensions::MockPingExtension, protocol::OverlayProtocol, }, }; @@ -35,7 +36,13 @@ use utp_rs::socket::UtpSocket; async fn init_overlay( discovery: Arc, subnetwork: Subnetwork, -) -> OverlayProtocol { +) -> OverlayProtocol< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, +> { let overlay_config = OverlayConfig::default(); let node_id = discovery.local_enr().node_id(); @@ -50,6 +57,7 @@ async fn init_overlay( let utp_socket = Arc::new(UtpSocket::with_socket(discv5_utp)); let validator = Arc::new(MockValidator {}); + let ping_extensions = Arc::new(MockPingExtension {}); OverlayProtocol::new( overlay_config, @@ -58,13 +66,22 @@ async fn init_overlay( store, subnetwork, validator, + ping_extensions, ) .await } async fn spawn_overlay( mut talk_req_rx: mpsc::Receiver, - overlay: Arc>, + overlay: Arc< + OverlayProtocol< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >, + >, ) { let (overlay_tx, mut overlay_rx) = mpsc::unbounded_channel(); diff --git a/trin-beacon/src/lib.rs b/trin-beacon/src/lib.rs index f558e4a76..f0414f845 100644 --- a/trin-beacon/src/lib.rs +++ b/trin-beacon/src/lib.rs @@ -4,6 +4,7 @@ pub mod events; mod jsonrpc; pub mod network; +mod ping_extensions; mod storage; mod sync; #[cfg(test)] diff --git a/trin-beacon/src/network.rs b/trin-beacon/src/network.rs index 7772b414d..f54d7b454 100644 --- a/trin-beacon/src/network.rs +++ b/trin-beacon/src/network.rs @@ -18,13 +18,24 @@ use trin_storage::PortalStorageConfig; use trin_validation::oracle::HeaderOracle; use utp_rs::socket::UtpSocket; -use crate::{storage::BeaconStorage, sync::BeaconSync, validation::BeaconValidator}; +use crate::{ + ping_extensions::BeaconPingExtensions, storage::BeaconStorage, sync::BeaconSync, + validation::BeaconValidator, +}; /// Beacon network layer on top of the overlay protocol. Encapsulates beacon network specific data /// and logic. #[derive(Clone)] pub struct BeaconNetwork { - pub overlay: Arc>, + pub overlay: Arc< + OverlayProtocol< + BeaconContentKey, + XorMetric, + BeaconValidator, + BeaconStorage, + BeaconPingExtensions, + >, + >, pub beacon_client: Arc>>>, } @@ -49,6 +60,7 @@ impl BeaconNetwork { let storage = Arc::new(PLRwLock::new(BeaconStorage::new(storage_config)?)); let storage_clone = Arc::clone(&storage); let validator = Arc::new(BeaconValidator::new(header_oracle)); + let ping_extensions = Arc::new(BeaconPingExtensions {}); let overlay = OverlayProtocol::new( config, discovery, @@ -56,6 +68,7 @@ impl BeaconNetwork { storage, Subnetwork::Beacon, validator, + ping_extensions, ) .await; diff --git a/trin-beacon/src/ping_extensions.rs b/trin-beacon/src/ping_extensions.rs new file mode 100644 index 000000000..829a051c4 --- /dev/null +++ b/trin-beacon/src/ping_extensions.rs @@ -0,0 +1,41 @@ +use ethportal_api::types::ping_extensions::Extensions; +use portalnet::overlay::ping_extensions::PingExtension; + +pub struct BeaconPingExtensions {} + +impl BeaconPingExtensions { + pub const SUPPORT_EXTENSIONS: &[Extensions] = &[ + Extensions::Capabilities, + Extensions::BasicRadius, + Extensions::Error, + ]; + + /// Base extensions that are required for the core subnetwork to function. + /// These must be sorted by latest to oldest + pub const BASE_EXTENSIONS: &[Extensions] = &[Extensions::BasicRadius]; +} + +impl PingExtension for BeaconPingExtensions { + fn is_supported(&self, extension: Extensions) -> bool { + Self::SUPPORT_EXTENSIONS.contains(&extension) + } + + fn newest_commonly_supported_base_extension( + &self, + extensions: &[Extensions], + ) -> Option { + for base_extension in Self::BASE_EXTENSIONS { + if extensions.contains(base_extension) { + return Some(*base_extension); + } + } + None + } + + fn raw_extensions(&self) -> Vec { + Self::SUPPORT_EXTENSIONS + .iter() + .map(|e| u16::from(*e)) + .collect() + } +} diff --git a/trin-history/src/lib.rs b/trin-history/src/lib.rs index c8fa2deba..5803c9660 100644 --- a/trin-history/src/lib.rs +++ b/trin-history/src/lib.rs @@ -4,6 +4,7 @@ pub mod events; mod jsonrpc; pub mod network; +mod ping_extensions; mod storage; pub mod validation; diff --git a/trin-history/src/network.rs b/trin-history/src/network.rs index 89489c663..15505c047 100644 --- a/trin-history/src/network.rs +++ b/trin-history/src/network.rs @@ -15,7 +15,10 @@ use trin_storage::PortalStorageConfig; use trin_validation::oracle::HeaderOracle; use utp_rs::socket::UtpSocket; -use crate::{storage::HistoryStorage, validation::ChainHistoryValidator}; +use crate::{ + ping_extensions::HistoryPingExtensions, storage::HistoryStorage, + validation::ChainHistoryValidator, +}; /// Gossip content as it gets dropped from local storage, /// enabled by default for the history network. @@ -25,8 +28,15 @@ const GOSSIP_DROPPED: bool = true; /// and logic. #[derive(Clone)] pub struct HistoryNetwork { - pub overlay: - Arc>, + pub overlay: Arc< + OverlayProtocol< + HistoryContentKey, + XorMetric, + ChainHistoryValidator, + HistoryStorage, + HistoryPingExtensions, + >, + >, } impl HistoryNetwork { @@ -53,6 +63,7 @@ impl HistoryNetwork { storage, Subnetwork::History, validator, + Arc::new(HistoryPingExtensions {}), ) .await; diff --git a/trin-history/src/ping_extensions.rs b/trin-history/src/ping_extensions.rs new file mode 100644 index 000000000..d5759cf4c --- /dev/null +++ b/trin-history/src/ping_extensions.rs @@ -0,0 +1,41 @@ +use ethportal_api::types::ping_extensions::Extensions; +use portalnet::overlay::ping_extensions::PingExtension; + +pub struct HistoryPingExtensions {} + +impl HistoryPingExtensions { + pub const SUPPORT_EXTENSIONS: &[Extensions] = &[ + Extensions::Capabilities, + Extensions::HistoryRadius, + Extensions::Error, + ]; + + /// Base extensions that are required for the core subnetwork to function. + /// These must be sorted by latest to oldest + pub const BASE_EXTENSIONS: &[Extensions] = &[Extensions::HistoryRadius]; +} + +impl PingExtension for HistoryPingExtensions { + fn is_supported(&self, extension: Extensions) -> bool { + Self::SUPPORT_EXTENSIONS.contains(&extension) + } + + fn newest_commonly_supported_base_extension( + &self, + extensions: &[Extensions], + ) -> Option { + for base_extension in Self::BASE_EXTENSIONS { + if extensions.contains(base_extension) { + return Some(*base_extension); + } + } + None + } + + fn raw_extensions(&self) -> Vec { + Self::SUPPORT_EXTENSIONS + .iter() + .map(|e| u16::from(*e)) + .collect() + } +} diff --git a/trin-state/src/lib.rs b/trin-state/src/lib.rs index bab5b8e78..53773d9ee 100644 --- a/trin-state/src/lib.rs +++ b/trin-state/src/lib.rs @@ -29,6 +29,7 @@ use crate::{events::StateEvents, jsonrpc::StateRequestHandler}; pub mod events; mod jsonrpc; pub mod network; +mod ping_extensions; mod storage; pub mod validation; diff --git a/trin-state/src/network.rs b/trin-state/src/network.rs index 6c67c1ea4..1a9a5f79f 100644 --- a/trin-state/src/network.rs +++ b/trin-state/src/network.rs @@ -16,13 +16,23 @@ use trin_storage::PortalStorageConfig; use trin_validation::oracle::HeaderOracle; use utp_rs::socket::UtpSocket; -use crate::{storage::StateStorage, validation::StateValidator}; +use crate::{ + ping_extensions::StatePingExtensions, storage::StateStorage, validation::StateValidator, +}; /// State network layer on top of the overlay protocol. Encapsulates state network specific data and /// logic. #[derive(Clone)] pub struct StateNetwork { - pub overlay: Arc>, + pub overlay: Arc< + OverlayProtocol< + StateContentKey, + XorMetric, + StateValidator, + StateStorage, + StatePingExtensions, + >, + >, } /// Poke is disabled for state network because Offer/Accept and Find/Found Content are different, @@ -52,6 +62,7 @@ impl StateNetwork { }; let storage = Arc::new(PLRwLock::new(StateStorage::new(storage_config)?)); let validator = Arc::new(StateValidator { header_oracle }); + let ping_extensions = Arc::new(StatePingExtensions {}); let overlay = OverlayProtocol::new( config, discovery, @@ -59,6 +70,7 @@ impl StateNetwork { storage, Subnetwork::State, validator, + ping_extensions, ) .await; diff --git a/trin-state/src/ping_extensions.rs b/trin-state/src/ping_extensions.rs new file mode 100644 index 000000000..ba231200d --- /dev/null +++ b/trin-state/src/ping_extensions.rs @@ -0,0 +1,41 @@ +use ethportal_api::types::ping_extensions::Extensions; +use portalnet::overlay::ping_extensions::PingExtension; + +pub struct StatePingExtensions {} + +impl StatePingExtensions { + pub const SUPPORT_EXTENSIONS: &[Extensions] = &[ + Extensions::Capabilities, + Extensions::BasicRadius, + Extensions::Error, + ]; + + /// Base extensions that are required for the core subnetwork to function. + /// These must be sorted by latest to oldest + pub const BASE_EXTENSIONS: &[Extensions] = &[Extensions::BasicRadius]; +} + +impl PingExtension for StatePingExtensions { + fn is_supported(&self, extension: Extensions) -> bool { + Self::SUPPORT_EXTENSIONS.contains(&extension) + } + + fn newest_commonly_supported_base_extension( + &self, + extensions: &[Extensions], + ) -> Option { + for base_extension in Self::BASE_EXTENSIONS { + if extensions.contains(base_extension) { + return Some(*base_extension); + } + } + None + } + + fn raw_extensions(&self) -> Vec { + Self::SUPPORT_EXTENSIONS + .iter() + .map(|e| u16::from(*e)) + .collect() + } +}