diff --git a/Cargo.lock b/Cargo.lock index 35514ee65..d3546e9ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2202,6 +2202,7 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "hex", + "itertools 0.13.0", "jsonrpsee", "keccak-hash", "lazy_static", diff --git a/bin/trin/src/cli.rs b/bin/trin/src/cli.rs index bb6dcbabf..82ec8a1ab 100644 --- a/bin/trin/src/cli.rs +++ b/bin/trin/src/cli.rs @@ -12,7 +12,7 @@ use ethportal_api::{ network::Subnetwork, portal_wire::{NetworkSpec, MAINNET}, }, - version::FULL_VERSION, + version::{APP_NAME, FULL_VERSION}, }; use portalnet::{ bootnodes::Bootnodes, @@ -33,7 +33,6 @@ const DEFAULT_SUBNETWORKS: &str = "history"; pub const DEFAULT_STORAGE_CAPACITY_MB: &str = "1000"; pub const DEFAULT_WEB3_TRANSPORT: &str = "ipc"; -const APP_NAME: &str = "trin"; #[derive(Parser, Debug, PartialEq, Clone)] #[command(name = APP_NAME, author = "https://github.com/ethereum/trin/graphs/contributors", @@ -442,7 +441,7 @@ mod tests { #[test_log::test] fn test_default_args() { let expected_config = TrinConfig::default(); - let actual_config = TrinConfig::new_from(["trin"]).unwrap(); + let actual_config = TrinConfig::new_from([APP_NAME]).unwrap(); assert_eq!(actual_config.web3_transport, expected_config.web3_transport); assert_eq!( actual_config.web3_http_address, @@ -456,7 +455,7 @@ mod tests { #[test_log::test] fn test_help() { - TrinConfig::new_from(["trin", "-h"]).expect_err("Should be an error to exit early"); + TrinConfig::new_from([APP_NAME, "-h"]).expect_err("Should be an error to exit early"); } #[test_log::test] @@ -467,7 +466,7 @@ mod tests { ..Default::default() }; let actual_config = TrinConfig::new_from([ - "trin", + APP_NAME, "--web3-transport", "http", "--web3-http-address", @@ -499,7 +498,7 @@ mod tests { #[test_log::test] fn test_ipc_with_custom_path() { let actual_config = - TrinConfig::new_from(["trin", "--web3-ipc-path", "/path/test.ipc"]).unwrap(); + TrinConfig::new_from([APP_NAME, "--web3-ipc-path", "/path/test.ipc"]).unwrap(); let expected_config = TrinConfig { web3_http_address: Url::parse(DEFAULT_WEB3_HTTP_ADDRESS).unwrap(), web3_ipc_path: PathBuf::from("/path/test.ipc"), @@ -519,7 +518,7 @@ mod tests { fn test_http_protocol_rejects_custom_web3_ipc_path() { TrinConfig::new_from([ - "trin", + APP_NAME, "--web3-transport", "http", "--web3-ipc-path", @@ -531,7 +530,7 @@ mod tests { #[test_log::test] #[should_panic(expected = "Must not supply an http address when using ipc")] fn test_ipc_protocol_rejects_custom_web3_http_address() { - TrinConfig::new_from(["trin", "--web3-http-address", "http://127.0.0.1:1234/"]).unwrap(); + TrinConfig::new_from([APP_NAME, "--web3-http-address", "http://127.0.0.1:1234/"]).unwrap(); } #[test_log::test] @@ -540,14 +539,14 @@ mod tests { discovery_port: 999, ..Default::default() }; - let actual_config = TrinConfig::new_from(["trin", "--discovery-port", "999"]).unwrap(); + let actual_config = TrinConfig::new_from([APP_NAME, "--discovery-port", "999"]).unwrap(); assert_eq!(actual_config.discovery_port, expected_config.discovery_port); } #[test_log::test] fn test_manual_external_addr_v4() { let actual_config = - TrinConfig::new_from(["trin", "--external-address", "127.0.0.1:1234"]).unwrap(); + TrinConfig::new_from([APP_NAME, "--external-address", "127.0.0.1:1234"]).unwrap(); assert_eq!( actual_config.external_addr, Some(SocketAddr::from(([127, 0, 0, 1], 1234))) @@ -557,7 +556,7 @@ mod tests { #[test_log::test] fn test_manual_external_addr_v6() { let actual_config = - TrinConfig::new_from(["trin", "--external-address", "[::1]:1234"]).unwrap(); + TrinConfig::new_from([APP_NAME, "--external-address", "[::1]:1234"]).unwrap(); assert_eq!( actual_config.external_addr, Some(SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 1], 1234))) @@ -571,7 +570,7 @@ mod tests { ..Default::default() }; let actual_config = TrinConfig::new_from([ - "trin", + APP_NAME, "--unsafe-private-key", "0x0101010101010101010101010101010101010101010101010101010101010101", ]) @@ -585,7 +584,7 @@ mod tests { ephemeral: true, ..Default::default() }; - let actual_config = TrinConfig::new_from(["trin", "--ephemeral"]).unwrap(); + let actual_config = TrinConfig::new_from([APP_NAME, "--ephemeral"]).unwrap(); assert_eq!(actual_config.ephemeral, expected_config.ephemeral); } @@ -599,7 +598,8 @@ mod tests { ..Default::default() }; let actual_config = - TrinConfig::new_from(["trin", "--enable-metrics-with-url", "127.0.0.1:1234"]).unwrap(); + TrinConfig::new_from([APP_NAME, "--enable-metrics-with-url", "127.0.0.1:1234"]) + .unwrap(); assert_eq!( actual_config.enable_metrics_with_url, expected_config.enable_metrics_with_url @@ -612,7 +612,7 @@ mod tests { )] fn test_custom_private_key_odd_length() { TrinConfig::new_from([ - "trin", + APP_NAME, "--unsafe-private-key", "0x010101010101010101010101010101010101010101010101010101010101010", ]) @@ -625,7 +625,7 @@ mod tests { )] fn test_custom_private_key_requires_32_bytes() { TrinConfig::new_from([ - "trin", + APP_NAME, "--unsafe-private-key", "0x01010101010101010101010101010101010101010101010101010101010101", ]) @@ -638,7 +638,7 @@ mod tests { )] fn test_trusted_block_root_requires_32_bytes() { TrinConfig::new_from([ - "trin", + APP_NAME, "--trusted-block-root", "0x01010101010101010101010101010101010101010101010101010101010101", ]) @@ -649,7 +649,7 @@ mod tests { #[should_panic(expected = "Trusted block root must be prefixed with 0x")] fn test_trusted_block_root_starts_with_0x() { TrinConfig::new_from([ - "trin", + APP_NAME, "--trusted-block-root", "010101010101010101010101010101010101010101010101010101010101010101", ]) @@ -659,7 +659,7 @@ mod tests { #[test_log::test] #[should_panic(expected = "Invalid web3-transport arg. Expected either 'http' or 'ipc'")] fn test_invalid_web3_transport_argument() { - TrinConfig::new_from(["trin", "--web3-transport", "invalid"]).unwrap(); + TrinConfig::new_from([APP_NAME, "--web3-transport", "invalid"]).unwrap(); } mod storage_config { @@ -667,7 +667,7 @@ mod tests { #[test_log::test] fn no_flags() { - let config = TrinConfig::new_from(["trin"]).unwrap(); + let config = TrinConfig::new_from([APP_NAME]).unwrap(); assert_eq!( config.storage_capacity_config(), StorageCapacityConfig::Combined { @@ -680,7 +680,7 @@ mod tests { #[test_log::test] fn with_subnetworks() { let config = - TrinConfig::new_from(["trin", "--portal-subnetworks", "history,state"]).unwrap(); + TrinConfig::new_from([APP_NAME, "--portal-subnetworks", "history,state"]).unwrap(); assert_eq!( config.storage_capacity_config(), StorageCapacityConfig::Combined { @@ -692,7 +692,7 @@ mod tests { #[test_log::test] fn with_total() { - let config = TrinConfig::new_from(["trin", "--storage.total", "200"]).unwrap(); + let config = TrinConfig::new_from([APP_NAME, "--storage.total", "200"]).unwrap(); assert_eq!( config.storage_capacity_config(), StorageCapacityConfig::Combined { @@ -705,7 +705,7 @@ mod tests { #[test_log::test] fn with_total_and_subnetworks() { let config = TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.total", "200", "--portal-subnetworks", @@ -723,7 +723,7 @@ mod tests { #[test_log::test] fn with_mb() { - let config = TrinConfig::new_from(["trin", "--mb", "200"]).unwrap(); + let config = TrinConfig::new_from([APP_NAME, "--mb", "200"]).unwrap(); assert_eq!( config.storage_capacity_config(), StorageCapacityConfig::Combined { @@ -736,7 +736,7 @@ mod tests { #[test_log::test] fn with_mb_and_subnetworks() { let config = TrinConfig::new_from([ - "trin", + APP_NAME, "--mb", "200", "--portal-subnetworks", @@ -755,7 +755,7 @@ mod tests { #[test_log::test] fn with_total_and_all_subnetworks() { let config = TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.total", "200", "--portal-subnetworks", @@ -776,8 +776,14 @@ mod tests { expected = "--storage.total and --storage.beacon can't be set at the same time" )] fn with_total_and_beacon() { - TrinConfig::new_from(["trin", "--storage.total", "200", "--storage.beacon", "100"]) - .unwrap(); + TrinConfig::new_from([ + APP_NAME, + "--storage.total", + "200", + "--storage.beacon", + "100", + ]) + .unwrap(); } #[test_log::test] @@ -785,8 +791,14 @@ mod tests { expected = "--storage.total and --storage.history can't be set at the same time" )] fn with_total_and_history() { - TrinConfig::new_from(["trin", "--storage.total", "200", "--storage.history", "100"]) - .unwrap(); + TrinConfig::new_from([ + APP_NAME, + "--storage.total", + "200", + "--storage.history", + "100", + ]) + .unwrap(); } #[test_log::test] @@ -794,13 +806,13 @@ mod tests { expected = "--storage.total and --storage.state can't be set at the same time" )] fn with_total_and_state() { - TrinConfig::new_from(["trin", "--storage.total", "200", "--storage.state", "100"]) + TrinConfig::new_from([APP_NAME, "--storage.total", "200", "--storage.state", "100"]) .unwrap(); } #[test_log::test] fn with_history() { - let config = TrinConfig::new_from(["trin", "--storage.history", "200"]).unwrap(); + let config = TrinConfig::new_from([APP_NAME, "--storage.history", "200"]).unwrap(); assert_eq!( config.storage_capacity_config(), StorageCapacityConfig::Specific { @@ -814,7 +826,7 @@ mod tests { #[test_log::test] fn with_history_and_state() { let config = TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.history", "200", "--storage.state", @@ -836,7 +848,7 @@ mod tests { #[test_log::test] fn with_history_and_state_and_beacon() { let config = TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.history", "200", "--storage.state", @@ -860,15 +872,21 @@ mod tests { #[test_log::test] #[should_panic(expected = "--storage.state is set but State subnetwork is not enabled")] fn with_history_and_state_without_subnetworks() { - TrinConfig::new_from(["trin", "--storage.history", "200", "--storage.state", "300"]) - .unwrap(); + TrinConfig::new_from([ + APP_NAME, + "--storage.history", + "200", + "--storage.state", + "300", + ]) + .unwrap(); } #[test_log::test] #[should_panic(expected = "--storage.beacon is set but Beacon subnetwork is not enabled")] fn with_history_and_beacon_without_subnetworks() { TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.history", "200", "--storage.beacon", @@ -881,7 +899,7 @@ mod tests { #[should_panic(expected = "--storage.history is set but History subnetwork is not enabled")] fn with_history_and_beacon_without_history_subnetwork() { TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.history", "200", "--storage.beacon", @@ -896,7 +914,7 @@ mod tests { #[should_panic(expected = "History subnetwork enabled but --storage.history is not set")] fn specific_without_history_with_subnetwork() { TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.state", "200", "--portal-subnetworks", @@ -909,7 +927,7 @@ mod tests { #[should_panic(expected = "State subnetwork enabled but --storage.state is not set")] fn specific_without_state_with_subnetwork() { TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.history", "200", "--portal-subnetworks", @@ -922,7 +940,7 @@ mod tests { #[should_panic(expected = "Beacon subnetwork enabled but --storage.beacon is not set")] fn specific_without_beacon_with_subnetwork() { TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.history", "200", "--portal-subnetworks", @@ -934,7 +952,7 @@ mod tests { #[test_log::test] fn with_total_zero() { let config = TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.total", "0", "--portal-subnetworks", @@ -953,7 +971,7 @@ mod tests { #[test_log::test] fn with_zero_per_subnetwork() { let config = TrinConfig::new_from([ - "trin", + APP_NAME, "--storage.history", "0", "--storage.state", @@ -983,7 +1001,7 @@ mod tests { #[test_log::test] fn test_bootnodes_default_with_default_bootnodes() { - let config = TrinConfig::new_from(["trin"]).unwrap(); + let config = TrinConfig::new_from([APP_NAME]).unwrap(); assert_eq!(config.bootnodes, Bootnodes::Default); let bootnodes: Vec = config.bootnodes.to_enrs(Network::Mainnet); assert_eq!(bootnodes.len(), 11); @@ -991,7 +1009,7 @@ mod tests { #[test_log::test] fn test_bootnodes_default_with_explicit_default_bootnodes() { - let config = TrinConfig::new_from(["trin", "--bootnodes", "default"]).unwrap(); + let config = TrinConfig::new_from([APP_NAME, "--bootnodes", "default"]).unwrap(); assert_eq!(config.bootnodes, Bootnodes::Default); let bootnodes: Vec = config.bootnodes.to_enrs(Network::Mainnet); assert_eq!(bootnodes.len(), 11); @@ -999,7 +1017,7 @@ mod tests { #[test_log::test] fn test_bootnodes_default_with_no_bootnodes() { - let config = TrinConfig::new_from(["trin", "--bootnodes", "none"]).unwrap(); + let config = TrinConfig::new_from([APP_NAME, "--bootnodes", "none"]).unwrap(); assert_eq!(config.bootnodes, Bootnodes::None); let bootnodes: Vec = config.bootnodes.to_enrs(Network::Mainnet); assert_eq!(bootnodes.len(), 0); @@ -1010,7 +1028,7 @@ mod tests { #[case("enr:-IS4QBISSFfBzsBrjq61iSIxPMfp5ShBTW6KQUglzH_tj8_SJaehXdlnZI-NAkTGeoclwnTB-pU544BQA44BiDZ2rkMBgmlkgnY0gmlwhKEjVaWJc2VjcDI1NmsxoQOSGugH1jSdiE_fRK1FIBe9oLxaWH8D_7xXSnaOVBe-SYN1ZHCCIyg,invalid")] #[should_panic] fn test_bootnodes_invalid_enr(#[case] bootnode: &str) { - TrinConfig::new_from(["trin", "--bootnodes", bootnode]).unwrap(); + TrinConfig::new_from([APP_NAME, "--bootnodes", bootnode]).unwrap(); } #[rstest::rstest] @@ -1020,7 +1038,7 @@ mod tests { fn test_bootnodes_valid_enrs(#[case] bootnode: &str, #[case] expected_length: usize) { use ethportal_api::types::network::Network; - let config = TrinConfig::new_from(["trin", "--bootnodes", bootnode]).unwrap(); + let config = TrinConfig::new_from([APP_NAME, "--bootnodes", bootnode]).unwrap(); match config.bootnodes.clone() { Bootnodes::Custom(bootnodes) => { assert_eq!(bootnodes.len(), expected_length); @@ -1033,7 +1051,7 @@ mod tests { #[rstest::rstest] fn test_angelfood_network_defaults_to_correct_bootnodes() { - let config = TrinConfig::new_from(["trin", "--network", "angelfood"]).unwrap(); + let config = TrinConfig::new_from([APP_NAME, "--network", "angelfood"]).unwrap(); assert_eq!(config.bootnodes, Bootnodes::Default); let bootnodes: Vec = config.bootnodes.to_enrs(Network::Angelfood); assert_eq!(bootnodes.len(), 1); @@ -1043,7 +1061,7 @@ mod tests { fn test_custom_bootnodes_override_angelfood_default() { let enr = "enr:-IS4QBISSFfBzsBrjq61iSIxPMfp5ShBTW6KQUglzH_tj8_SJaehXdlnZI-NAkTGeoclwnTB-pU544BQA44BiDZ2rkMBgmlkgnY0gmlwhKEjVaWJc2VjcDI1NmsxoQOSGugH1jSdiE_fRK1FIBe9oLxaWH8D_7xXSnaOVBe-SYN1ZHCCIyg"; let config = - TrinConfig::new_from(["trin", "--network", "angelfood", "--bootnodes", enr]) + TrinConfig::new_from([APP_NAME, "--network", "angelfood", "--bootnodes", enr]) .unwrap(); assert_eq!( config.bootnodes, diff --git a/crates/ethportal-api/Cargo.toml b/crates/ethportal-api/Cargo.toml index 51b9b2778..6e5e69be5 100644 --- a/crates/ethportal-api/Cargo.toml +++ b/crates/ethportal-api/Cargo.toml @@ -27,6 +27,7 @@ ethereum_serde_utils.workspace = true ethereum_ssz.workspace = true ethereum_ssz_derive.workspace = true hex.workspace = true +itertools.workspace = true jsonrpsee = { workspace = true, features = ["async-client", "client", "macros", "server"]} keccak-hash.workspace = true lazy_static.workspace = true diff --git a/crates/ethportal-api/src/types/bytes.rs b/crates/ethportal-api/src/types/bytes.rs index fba49eb8f..e28171fba 100644 --- a/crates/ethportal-api/src/types/bytes.rs +++ b/crates/ethportal-api/src/types/bytes.rs @@ -1,7 +1,23 @@ -use ssz_types::{typenum, VariableList}; +use ssz_types::{ + typenum::{self, UInt, UTerm, B0, B1}, + VariableList, +}; + +// 1100 in binary is 10001001100 +pub type U1100 = UInt< + UInt< + UInt< + UInt, B0>, B0>, B0>, B1>, B0>, B0>, B1>, + B1, + >, + B0, + >, + B0, +>; pub type ByteList32 = VariableList; pub type ByteList1024 = VariableList; +pub type ByteList1100 = VariableList; pub type ByteList2048 = VariableList; pub type ByteList32K = VariableList; pub type ByteList1G = VariableList; diff --git a/crates/ethportal-api/src/types/distance.rs b/crates/ethportal-api/src/types/distance.rs index 0d1cd03f3..e1115179a 100644 --- a/crates/ethportal-api/src/types/distance.rs +++ b/crates/ethportal-api/src/types/distance.rs @@ -1,11 +1,13 @@ use std::{fmt, ops::Deref}; use alloy::primitives::U256; +use ssz_derive::{Decode, Encode}; pub type DataRadius = U256; /// Represents a distance between two keys in the DHT key space. -#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)] +#[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug, Encode, Decode)] +#[ssz(struct_behaviour = "transparent")] pub struct Distance(U256); impl fmt::Display for Distance { diff --git a/crates/ethportal-api/src/types/mod.rs b/crates/ethportal-api/src/types/mod.rs index b99b126cd..dfca03132 100644 --- a/crates/ethportal-api/src/types/mod.rs +++ b/crates/ethportal-api/src/types/mod.rs @@ -9,6 +9,7 @@ pub mod execution; pub mod jsonrpc; pub mod network; pub mod node_id; +pub mod ping_extensions; pub mod portal; pub mod portal_wire; pub mod query_trace; diff --git a/crates/ethportal-api/src/types/ping_extensions/decode.rs b/crates/ethportal-api/src/types/ping_extensions/decode.rs new file mode 100644 index 000000000..e52a2f1b8 --- /dev/null +++ b/crates/ethportal-api/src/types/ping_extensions/decode.rs @@ -0,0 +1,63 @@ +use anyhow::{anyhow, bail}; +use ssz::Decode; + +use super::{ + extension_types::Extensions, + extensions::{ + type_0::ClientInfoRadiusCapabilities, type_1::BasicRadius, type_2::HistoryRadius, + type_65535::PingError, + }, +}; +use crate::types::portal_wire::CustomPayload; + +#[derive(Debug, Clone)] +pub enum DecodedExtension { + Capabilities(ClientInfoRadiusCapabilities), + BasicRadius(BasicRadius), + HistoryRadius(HistoryRadius), + Error(PingError), +} + +impl From for Extensions { + fn from(value: DecodedExtension) -> Self { + match value { + DecodedExtension::Capabilities(_) => Extensions::Capabilities, + DecodedExtension::BasicRadius(_) => Extensions::BasicRadius, + DecodedExtension::HistoryRadius(_) => Extensions::HistoryRadius, + DecodedExtension::Error(_) => Extensions::Error, + } + } +} + +impl DecodedExtension { + pub fn decode_extension(payload_type: u16, payload: CustomPayload) -> anyhow::Result { + let Ok(extension_type) = Extensions::try_from(payload_type) else { + bail!("Failed to decode extension type {payload_type}"); + }; + + match extension_type { + Extensions::Capabilities => { + let capabilities = ClientInfoRadiusCapabilities::from_ssz_bytes(&payload.payload) + .map_err(|err| { + anyhow!("Failed to decode ClientInfoRadiusCapabilities: {err:?}") + })?; + Ok(DecodedExtension::Capabilities(capabilities)) + } + Extensions::BasicRadius => { + let basic_radius = BasicRadius::from_ssz_bytes(&payload.payload) + .map_err(|err| anyhow!("Failed to decode BasicRadius: {err:?}"))?; + Ok(DecodedExtension::BasicRadius(basic_radius)) + } + Extensions::HistoryRadius => { + let history_radius = HistoryRadius::from_ssz_bytes(&payload.payload) + .map_err(|err| anyhow!("Failed to decode HistoryRadius: {err:?}"))?; + Ok(DecodedExtension::HistoryRadius(history_radius)) + } + Extensions::Error => { + let error = PingError::from_ssz_bytes(&payload.payload) + .map_err(|err| anyhow!("Failed to decode PingError: {err:?}"))?; + Ok(DecodedExtension::Error(error)) + } + } + } +} diff --git a/crates/ethportal-api/src/types/ping_extensions/extension_types.rs b/crates/ethportal-api/src/types/ping_extensions/extension_types.rs new file mode 100644 index 000000000..62cf22ad6 --- /dev/null +++ b/crates/ethportal-api/src/types/ping_extensions/extension_types.rs @@ -0,0 +1,37 @@ +#[derive(PartialEq, Debug, Clone, Copy, Eq)] +pub enum Extensions { + Capabilities, + BasicRadius, + HistoryRadius, + Error, +} + +impl TryFrom for Extensions { + type Error = ExtensionError; + + fn try_from(value: u16) -> Result { + match value { + 0 => Ok(Extensions::Capabilities), + 1 => Ok(Extensions::BasicRadius), + 2 => Ok(Extensions::HistoryRadius), + 65535 => Ok(Extensions::Error), + _ => Err(ExtensionError::NonSupportedExtension(value)), + } + } +} + +impl From for u16 { + fn from(value: Extensions) -> u16 { + match value { + Extensions::Capabilities => 0, + Extensions::BasicRadius => 1, + Extensions::HistoryRadius => 2, + Extensions::Error => 65535, + } + } +} + +#[derive(Debug)] +pub enum ExtensionError { + NonSupportedExtension(u16), +} diff --git a/crates/ethportal-api/src/types/ping_extensions/extensions/mod.rs b/crates/ethportal-api/src/types/ping_extensions/extensions/mod.rs new file mode 100644 index 000000000..75f7d2b72 --- /dev/null +++ b/crates/ethportal-api/src/types/ping_extensions/extensions/mod.rs @@ -0,0 +1,4 @@ +pub mod type_0; +pub mod type_1; +pub mod type_2; +pub mod type_65535; diff --git a/crates/ethportal-api/src/types/ping_extensions/extensions/type_0.rs b/crates/ethportal-api/src/types/ping_extensions/extensions/type_0.rs new file mode 100644 index 000000000..2599c612f --- /dev/null +++ b/crates/ethportal-api/src/types/ping_extensions/extensions/type_0.rs @@ -0,0 +1,370 @@ +use std::str::FromStr; + +use alloy::primitives::U256; +use anyhow::{bail, ensure}; +use itertools::Itertools; +use ssz::{Decode, Encode, SszDecoderBuilder, SszEncoder}; +use ssz_types::{ + typenum::{U200, U400}, + VariableList, +}; + +use crate::{ + types::{ + distance::Distance, + ping_extensions::extension_types::{ExtensionError, Extensions}, + portal_wire::CustomPayload, + }, + version::{ + APP_NAME, BUILD_ARCHITECTURE, BUILD_OPERATING_SYSTEM, PROGRAMMING_LANGUAGE_VERSION, + TRIN_SHORT_COMMIT, TRIN_VERSION, + }, +}; + +#[derive(PartialEq, Debug, Clone)] +pub struct ClientInfoRadiusCapabilities { + pub client_info: Option, + pub data_radius: Distance, + capabilities: VariableList, +} + +impl ClientInfoRadiusCapabilities { + pub fn new(radius: Distance, capabilities: Vec) -> Self { + Self { + client_info: Some(ClientInfo::trin_client_info()), + data_radius: radius, + capabilities: VariableList::from(capabilities), + } + } + + pub fn new_with_client_info( + client_info: Option, + radius: Distance, + capabilities: Vec, + ) -> Self { + Self { + client_info, + data_radius: radius, + capabilities: VariableList::from(capabilities), + } + } + + pub fn capabilities(&self) -> Result, ExtensionError> { + self.capabilities + .iter() + .map(|&value| Extensions::try_from(value)) + .collect::, _>>() + } +} + +impl From for CustomPayload { + fn from(client_info_radius_capacities: ClientInfoRadiusCapabilities) -> Self { + CustomPayload::from(client_info_radius_capacities.as_ssz_bytes()) + } +} + +impl Encode for ClientInfoRadiusCapabilities { + fn is_ssz_fixed_len() -> bool { + false + } + + fn ssz_append(&self, buf: &mut Vec) { + let offset = as Encode>::ssz_fixed_len() + + ::ssz_fixed_len() + + as Encode>::ssz_fixed_len(); + let mut encoder = SszEncoder::container(buf, offset); + let client_info = match &self.client_info { + Some(client_info) => client_info.string(), + None => "".to_string(), + }; + let bytes: Vec = client_info.as_bytes().to_vec(); + let client_info: VariableList = VariableList::from(bytes); + + encoder.append(&client_info); + encoder.append(&self.data_radius); + encoder.append(&self.capabilities); + encoder.finalize(); + } + + fn ssz_bytes_len(&self) -> usize { + self.as_ssz_bytes().len() + } +} + +impl Decode for ClientInfoRadiusCapabilities { + fn from_ssz_bytes(bytes: &[u8]) -> Result { + let mut builder = SszDecoderBuilder::new(bytes); + builder.register_type::>()?; + builder.register_type::()?; + builder.register_type::>()?; + let mut decoder = builder.build()?; + let client_info: VariableList = decoder.decode_next()?; + let data_radius: U256 = decoder.decode_next()?; + let capabilities: VariableList = decoder.decode_next()?; + + let string = String::from_utf8(client_info.to_vec()).map_err(|_| { + ssz::DecodeError::BytesInvalid(format!("Invalid utf8 string: {client_info:?}")) + })?; + let client_info = match string.as_str() { + "" => None, + _ => Some(ClientInfo::from_str(&string).map_err(|err| { + ssz::DecodeError::BytesInvalid(format!("Failed to parse client info: {err:?}")) + })?), + }; + + Ok(Self { + client_info, + data_radius: Distance::from(data_radius), + capabilities, + }) + } + + fn is_ssz_fixed_len() -> bool { + false + } +} + +/// Information about the client. +/// example: trin/v0.1.1-892ad575/linux-x86_64/rustc1.81.0 +#[derive(PartialEq, Debug, Clone)] +pub struct ClientInfo { + pub client_name: String, + pub client_version: String, + pub short_commit: String, + pub operating_system: String, + pub cpu_architecture: String, + pub programming_language_version: String, +} + +impl ClientInfo { + pub fn trin_client_info() -> Self { + Self { + client_name: APP_NAME.to_string(), + client_version: TRIN_VERSION.to_string(), + short_commit: TRIN_SHORT_COMMIT.to_string(), + operating_system: BUILD_OPERATING_SYSTEM.to_string(), + cpu_architecture: BUILD_ARCHITECTURE.to_string(), + programming_language_version: format!("rustc{PROGRAMMING_LANGUAGE_VERSION}"), + } + } + + pub fn string(&self) -> String { + format!( + "{}/{}-{}/{}-{}/{}", + self.client_name, + self.client_version, + self.short_commit, + self.operating_system, + self.cpu_architecture, + self.programming_language_version + ) + } +} + +impl FromStr for ClientInfo { + type Err = anyhow::Error; + + fn from_str(string: &str) -> Result { + ensure!( + string.as_bytes().len() <= 200, + "Client info string is too long" + ); + let parts: Vec<&str> = string.split('/').collect(); + + if parts.len() != 4 { + bail!( + "Invalid client info string: should have 4 /'s instead got {} | {}", + parts.len(), + string + ); + } + + let client_name = parts[0]; + + let Some((client_version, short_commit)) = parts[1].rsplit_once('-') else { + bail!( + "Invalid client info string: should look like 0.1.1-2b00d730 got {}", + parts[1] + ); + }; + + let Some((operating_system, cpu_architecture)) = parts[2].split('-').collect_tuple() else { + bail!( + "Invalid client info string: should look like linux-x86_64 got {}", + parts[2] + ); + }; + + Ok(Self { + client_name: client_name.to_string(), + client_version: client_version.to_string(), + short_commit: short_commit.to_string(), + operating_system: operating_system.to_string(), + cpu_architecture: cpu_architecture.to_string(), + programming_language_version: parts[3].to_string(), + }) + } +} + +#[cfg(test)] +mod tests { + use alloy::primitives::U256; + use rstest::rstest; + + use super::*; + use crate::{ + types::{ + ping_extensions::decode::DecodedExtension, + portal_wire::{Message, Ping, Pong}, + }, + utils::bytes::{hex_decode, hex_encode}, + }; + + #[test] + fn test_client_info_radius_capabilities() { + let radius = Distance::from(U256::from(42)); + let capabilities = vec![0, 1, 2, 3]; + let client_info_radius_capabilities = + ClientInfoRadiusCapabilities::new(radius, capabilities); + let custom_payload = CustomPayload::from(client_info_radius_capabilities.clone()); + + let decoded_extension = DecodedExtension::decode_extension(0, custom_payload).unwrap(); + + if let DecodedExtension::Capabilities(decoded_client_info_radius_capabilities) = + decoded_extension + { + assert_eq!( + client_info_radius_capabilities, + decoded_client_info_radius_capabilities + ); + } else { + panic!("Decoded extension is not ClientInfoRadiusCapabilities"); + } + } + + #[test] + fn test_client_info_from_str() { + let client_info = ClientInfo::trin_client_info(); + let string = client_info.string(); + let decoded = ClientInfo::from_str(&string).unwrap(); + assert_eq!(client_info, decoded); + } + + #[rstest] + /// Fails because there are not enough parts + #[case("trin/0.1.1-2b00d730/linux-x86_64")] + /// Fails because there are too many parts + #[case("trin/0.1.1-2b00d730/linux-x86_64/rustc1.81.0/extra")] + /// Fails because the short commit is missing + #[case("trin/0.1.1/linux-x86_64/rustc1.81.0")] + /// Fails because the CPU architecture is missing + #[case("trin/0.1.1-2b00d730/linux/rustc1.81.0")] + /// Fails because client string is too long + #[case(&"t".repeat(201))] + #[should_panic] + fn test_client_info_from_str_invalid(#[case] string: &str) { + ClientInfo::from_str(string).unwrap(); + } + + #[test] + fn message_encoding_ping_capabilities_with_client_info() { + let data_radius = Distance::from(U256::MAX - U256::from(1)); + let client_info = + ClientInfo::from_str("trin/v0.1.1-b61fdc5c/linux-x86_64/rustc1.81.0").unwrap(); + let capabilities = vec![0, 1, 65535]; + let capabilities_payload = ClientInfoRadiusCapabilities::new_with_client_info( + Some(client_info), + data_radius, + capabilities, + ); + let payload = CustomPayload::from(capabilities_payload); + let ping = Ping { + enr_seq: 1, + payload_type: 0, + payload, + }; + let ping = Message::Ping(ping); + + let encoded: Vec = ping.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x00010000000000000000000e00000028000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff550000007472696e2f76302e312e312d62363166646335632f6c696e75782d7838365f36342f7275737463312e38312e3000000100ffff"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, ping); + } + + #[test] + fn message_encoding_ping_capabilities_without_client_info() { + let data_radius = Distance::from(U256::MAX - U256::from(1)); + let capabilities = vec![0, 1, 65535]; + let capabilities_payload = + ClientInfoRadiusCapabilities::new_with_client_info(None, data_radius, capabilities); + let payload = CustomPayload::from(capabilities_payload); + let ping = Ping { + enr_seq: 1, + payload_type: 0, + payload, + }; + let ping = Message::Ping(ping); + + let encoded: Vec = ping.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x00010000000000000000000e00000028000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff2800000000000100ffff"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, ping); + } + + #[test] + fn message_encoding_pong_capabilities_with_client_info() { + let data_radius = Distance::from(U256::MAX - U256::from(1)); + let client_info = + ClientInfo::from_str("trin/v0.1.1-b61fdc5c/linux-x86_64/rustc1.81.0").unwrap(); + let capabilities = vec![0, 1, 65535]; + let capabilities_payload = ClientInfoRadiusCapabilities::new_with_client_info( + Some(client_info), + data_radius, + capabilities, + ); + let payload = CustomPayload::from(capabilities_payload); + let pong = Pong { + enr_seq: 1, + payload_type: 0, + payload, + }; + let pong = Message::Pong(pong); + + let encoded: Vec = pong.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x01010000000000000000000e00000028000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff550000007472696e2f76302e312e312d62363166646335632f6c696e75782d7838365f36342f7275737463312e38312e3000000100ffff"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, pong); + } + + #[test] + fn message_encoding_pong_capabilities_without_client_info() { + let data_radius = Distance::from(U256::MAX - U256::from(1)); + let capabilities = vec![0, 1, 65535]; + let capabilities_payload = + ClientInfoRadiusCapabilities::new_with_client_info(None, data_radius, capabilities); + let payload = CustomPayload::from(capabilities_payload); + let pong = Pong { + enr_seq: 1, + payload_type: 0, + payload, + }; + let pong = Message::Pong(pong); + + let encoded: Vec = pong.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x01010000000000000000000e00000028000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff2800000000000100ffff"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, pong); + } +} diff --git a/crates/ethportal-api/src/types/ping_extensions/extensions/type_1.rs b/crates/ethportal-api/src/types/ping_extensions/extensions/type_1.rs new file mode 100644 index 000000000..0989a2bf8 --- /dev/null +++ b/crates/ethportal-api/src/types/ping_extensions/extensions/type_1.rs @@ -0,0 +1,104 @@ +use ssz::Encode; +use ssz_derive::{Decode, Encode}; + +use crate::types::{distance::Distance, portal_wire::CustomPayload}; + +#[derive(PartialEq, Debug, Clone, Encode, Decode)] +pub struct BasicRadius { + pub data_radius: Distance, +} + +impl BasicRadius { + pub fn new(data_radius: Distance) -> Self { + Self { data_radius } + } +} + +impl From for CustomPayload { + fn from(basic_radius: BasicRadius) -> Self { + CustomPayload::from(basic_radius.as_ssz_bytes()) + } +} + +#[cfg(test)] +mod tests { + use alloy::primitives::U256; + use ssz::Decode; + + use super::*; + use crate::{ + types::{ + distance::Distance, + ping_extensions::decode::DecodedExtension, + portal_wire::{Message, Ping, Pong}, + }, + utils::bytes::{hex_decode, hex_encode}, + }; + + #[test] + fn test_basic_radius() { + let data_radius = Distance::from(U256::from(42)); + let basic_radius = BasicRadius::new(data_radius); + let custom_payload = CustomPayload::from(basic_radius.clone()); + + let decoded_extension = DecodedExtension::decode_extension(1, custom_payload).unwrap(); + + if let DecodedExtension::BasicRadius(decoded_basic_radius) = decoded_extension { + assert_eq!(basic_radius, decoded_basic_radius); + } else { + panic!("Decoded extension is not BasicRadius"); + } + } + + #[test] + fn test_basic_radius_ssz_round_trip() { + let data_radius = Distance::from(U256::from(42)); + let basic_radius = BasicRadius::new(data_radius); + let bytes = basic_radius.as_ssz_bytes(); + let decoded = BasicRadius::from_ssz_bytes(&bytes).unwrap(); + assert_eq!(bytes.len(), 32); + assert_eq!(basic_radius, decoded); + } + + #[test] + fn message_encoding_ping_basic_radius() { + let data_radius = Distance::from(U256::MAX - U256::from(1)); + let basic_radius = BasicRadius::new(data_radius); + let payload = CustomPayload::from(basic_radius); + let ping = Ping { + enr_seq: 1, + payload_type: 1, + payload, + }; + let ping = Message::Ping(ping); + + let encoded: Vec = ping.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x00010000000000000001000e000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, ping); + } + + #[test] + fn message_encoding_pong_basic_radius() { + let data_radius = Distance::from(U256::MAX - U256::from(1)); + let basic_radius = BasicRadius::new(data_radius); + let payload = CustomPayload::from(basic_radius); + let pong = Pong { + enr_seq: 1, + payload_type: 1, + payload, + }; + let pong = Message::Pong(pong); + + let encoded: Vec = pong.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x01010000000000000001000e000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, pong); + } +} diff --git a/crates/ethportal-api/src/types/ping_extensions/extensions/type_2.rs b/crates/ethportal-api/src/types/ping_extensions/extensions/type_2.rs new file mode 100644 index 000000000..77cb67505 --- /dev/null +++ b/crates/ethportal-api/src/types/ping_extensions/extensions/type_2.rs @@ -0,0 +1,110 @@ +use ssz::Encode; +use ssz_derive::{Decode, Encode}; + +use crate::types::{distance::Distance, portal_wire::CustomPayload}; + +#[derive(PartialEq, Debug, Clone, Encode, Decode)] +pub struct HistoryRadius { + pub data_radius: Distance, + pub ephemeral_header_count: u16, +} + +impl HistoryRadius { + pub fn new(data_radius: Distance, ephemeral_header_count: u16) -> Self { + Self { + data_radius, + ephemeral_header_count, + } + } +} + +impl From for CustomPayload { + fn from(history_radius: HistoryRadius) -> Self { + CustomPayload::from(history_radius.as_ssz_bytes()) + } +} + +#[cfg(test)] +mod tests { + use alloy::primitives::U256; + use ssz::Decode; + + use super::*; + use crate::{ + types::{ + distance::Distance, + ping_extensions::decode::DecodedExtension, + portal_wire::{Message, Ping, Pong}, + }, + utils::bytes::{hex_decode, hex_encode}, + }; + + #[test] + fn test_history_radius() { + let data_radius = Distance::from(U256::from(42)); + let history_radius = HistoryRadius::new(data_radius, 42); + let custom_payload = CustomPayload::from(history_radius.clone()); + + let decoded_extension = DecodedExtension::decode_extension(2, custom_payload).unwrap(); + + if let DecodedExtension::HistoryRadius(decoded_history_radius) = decoded_extension { + assert_eq!(history_radius, decoded_history_radius); + } else { + panic!("Decoded extension is not HistoryRadius"); + } + } + + #[test] + fn test_history_radius_ssz_round_trip() { + let data_radius = Distance::from(U256::from(42)); + let history_radius = HistoryRadius::new(data_radius, 42); + let bytes = history_radius.as_ssz_bytes(); + let decoded = HistoryRadius::from_ssz_bytes(&bytes).unwrap(); + assert_eq!(bytes.len(), 34); + assert_eq!(history_radius, decoded); + } + + #[test] + fn message_encoding_ping_history_radius() { + let data_radius = Distance::from(U256::MAX - U256::from(1)); + let ephemeral_header_count = 4242; + let history_radius = HistoryRadius::new(data_radius, ephemeral_header_count); + let payload = CustomPayload::from(history_radius); + let ping = Ping { + enr_seq: 1, + payload_type: 2, + payload, + }; + let ping = Message::Ping(ping); + + let encoded: Vec = ping.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x00010000000000000002000e000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff9210"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, ping); + } + + #[test] + fn message_encoding_pong_history_radius() { + let data_radius = Distance::from(U256::MAX - U256::from(1)); + let ephemeral_header_count = 4242; + let history_radius = HistoryRadius::new(data_radius, ephemeral_header_count); + let payload = CustomPayload::from(history_radius); + let pong = Pong { + enr_seq: 1, + payload_type: 2, + payload, + }; + let pong = Message::Pong(pong); + + let encoded: Vec = pong.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x01010000000000000002000e000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff9210"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, pong); + } +} diff --git a/crates/ethportal-api/src/types/ping_extensions/extensions/type_65535.rs b/crates/ethportal-api/src/types/ping_extensions/extensions/type_65535.rs new file mode 100644 index 000000000..907a9eb06 --- /dev/null +++ b/crates/ethportal-api/src/types/ping_extensions/extensions/type_65535.rs @@ -0,0 +1,129 @@ +use anyhow::anyhow; +use ssz::Encode; +use ssz_derive::{Decode, Encode}; +use ssz_types::{typenum::U300, VariableList}; + +use crate::types::portal_wire::CustomPayload; + +/// Used to respond to pings which the node can't handle +#[derive(PartialEq, Debug, Clone, Encode, Decode)] +pub struct PingError { + pub error_code: u16, + pub message: VariableList, +} + +impl PingError { + pub fn new(error_code: ErrorCodes) -> Self { + Self { + error_code: error_code.into(), + message: VariableList::empty(), + } + } + + pub fn new_with_message(error_code: ErrorCodes, message: Vec) -> anyhow::Result { + Ok(Self { + error_code: error_code.into(), + message: VariableList::new(message).map_err(|err| { + anyhow!("PingError can only handle messages up to 300 bytes, received {err:?}") + })?, + }) + } +} + +impl From for CustomPayload { + fn from(ping_error: PingError) -> Self { + CustomPayload::from(ping_error.as_ssz_bytes()) + } +} + +pub enum ErrorCodes { + ExtensionNotSupported, + RequestedDataNotFound, + FailedToDecodePayload, + SystemError, +} + +impl From for u16 { + fn from(error_code: ErrorCodes) -> u16 { + match error_code { + ErrorCodes::ExtensionNotSupported => 0, + ErrorCodes::RequestedDataNotFound => 1, + ErrorCodes::FailedToDecodePayload => 2, + ErrorCodes::SystemError => 3, + } + } +} + +#[cfg(test)] +mod tests { + use ssz::Decode; + + use super::*; + use crate::{ + types::{ + ping_extensions::decode::DecodedExtension, + portal_wire::{Message, Pong}, + }, + utils::bytes::{hex_decode, hex_encode}, + }; + + #[test] + fn test_ping_error() { + let error_code = ErrorCodes::ExtensionNotSupported; + let ping_error = PingError::new(error_code); + let custom_payload = CustomPayload::from(ping_error.clone()); + + let decoded_extension = DecodedExtension::decode_extension(65535, custom_payload).unwrap(); + + if let DecodedExtension::Error(decoded_ping_error) = decoded_extension { + assert_eq!(ping_error, decoded_ping_error); + } else { + panic!("Decoded extension is not PingError"); + } + } + + #[test] + fn test_ping_error_ssz_round_trip() { + let error_code = ErrorCodes::ExtensionNotSupported; + let ping_error = PingError::new(error_code); + let bytes = ping_error.as_ssz_bytes(); + let decoded = PingError::from_ssz_bytes(&bytes).unwrap(); + assert_eq!(bytes.len(), 6); + assert_eq!(ping_error, decoded); + } + + #[rstest::rstest] + #[case(301, true)] + #[case(300, false)] + fn test_ping_error_message_too_long(#[case] message_length: usize, #[case] expected: bool) { + let error_code = ErrorCodes::FailedToDecodePayload; + let message = vec![0; message_length]; + assert_eq!( + PingError::new_with_message(error_code, message).is_err(), + expected + ); + } + + #[test] + fn message_encoding_pong_basic_radius() { + let error_code = ErrorCodes::FailedToDecodePayload; + let message = "hello world"; + let basic_radius = + PingError::new_with_message(error_code, message.as_bytes().to_vec()).unwrap(); + let payload = CustomPayload::from(basic_radius); + let pong = Pong { + enr_seq: 1, + payload_type: 65535, + payload, + }; + let pong = Message::Pong(pong); + + let encoded: Vec = pong.clone().into(); + let encoded = hex_encode(encoded); + let expected_encoded = "0x010100000000000000ffff0e00000002000600000068656c6c6f20776f726c64"; + assert_eq!(encoded, expected_encoded); + + let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); + assert_eq!(decoded, pong); + } +} diff --git a/crates/ethportal-api/src/types/ping_extensions/mod.rs b/crates/ethportal-api/src/types/ping_extensions/mod.rs new file mode 100644 index 000000000..b07fc94a6 --- /dev/null +++ b/crates/ethportal-api/src/types/ping_extensions/mod.rs @@ -0,0 +1,3 @@ +pub mod decode; +pub mod extension_types; +pub mod extensions; diff --git a/crates/ethportal-api/src/types/portal_wire.rs b/crates/ethportal-api/src/types/portal_wire.rs index cc275fcd8..e1b0e767e 100644 --- a/crates/ethportal-api/src/types/portal_wire.rs +++ b/crates/ethportal-api/src/types/portal_wire.rs @@ -1,7 +1,6 @@ use std::{ convert::{TryFrom, TryInto}, fmt, - ops::Deref, sync::Arc, }; @@ -18,10 +17,9 @@ use ssz_types::{typenum, BitList}; use thiserror::Error; use validator::ValidationError; +use super::bytes::ByteList1100; use crate::{ types::{ - bytes::ByteList2048, - distance::Distance, enr::{Enr, SszEnr}, network::{Network, Subnetwork}, }, @@ -74,7 +72,7 @@ pub const MAX_PORTAL_CONTENT_PAYLOAD_SIZE: usize = MAX_DISCV5_TALK_REQ_PAYLOAD_S /// Custom payload element of Ping and Pong overlay messages #[derive(Debug, PartialEq, Clone)] pub struct CustomPayload { - payload: ByteList2048, + pub payload: ByteList1100, } impl TryFrom<&Value> for CustomPayload { @@ -91,7 +89,7 @@ impl TryFrom<&Value> for CustomPayload { ))?, }; Ok(Self { - payload: ByteList2048::from(payload), + payload: ByteList1100::from(payload), }) } } @@ -99,18 +97,11 @@ impl TryFrom<&Value> for CustomPayload { impl From> for CustomPayload { fn from(ssz_bytes: Vec) -> Self { Self { - payload: ByteList2048::from(ssz_bytes), + payload: ByteList1100::from(ssz_bytes), } } } -impl From for Distance { - fn from(val: CustomPayload) -> Self { - let bytes = val.payload; - U256::from_le_slice(bytes.deref()).into() - } -} - impl ssz::Decode for CustomPayload { fn is_ssz_fixed_len() -> bool { false @@ -118,7 +109,7 @@ impl ssz::Decode for CustomPayload { fn from_ssz_bytes(bytes: &[u8]) -> Result { Ok(Self { - payload: ByteList2048::from(bytes.to_vec()), + payload: ByteList1100::from(bytes.to_vec()), }) } } @@ -347,16 +338,18 @@ impl TryFrom for Response { #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct Ping { pub enr_seq: u64, - pub custom_payload: CustomPayload, + pub payload_type: u16, + pub payload: CustomPayload, } impl fmt::Display for Ping { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "Ping(enr_seq={}, radius={})", + "Ping(enr_seq={}, payload_type={}, payload={})", self.enr_seq, - hex_encode(self.custom_payload.as_ssz_bytes()) + self.payload_type, + hex_encode(self.payload.as_ssz_bytes()) ) } } @@ -364,16 +357,18 @@ impl fmt::Display for Ping { #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct Pong { pub enr_seq: u64, - pub custom_payload: CustomPayload, + pub payload_type: u16, + pub payload: CustomPayload, } impl fmt::Display for Pong { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "Pong(enr_seq={}, radius={})", + "Pong(enr_seq={}, payload_type={}, payload={})", self.enr_seq, - hex_encode(self.custom_payload.as_ssz_bytes()) + self.payload_type, + hex_encode(self.payload.as_ssz_bytes()) ) } } @@ -381,7 +376,7 @@ impl fmt::Display for Pong { /// Convert to JSON Value from Pong ssz bytes impl From for Value { fn from(val: Pong) -> Self { - match U256::from_ssz_bytes(&val.custom_payload.payload.as_ssz_bytes()) { + match U256::from_ssz_bytes(&val.payload.payload.as_ssz_bytes()) { Ok(data_radius) => { let mut result = Map::new(); result.insert("enrSeq".to_owned(), Value::String(val.enr_seq.to_string())); @@ -611,46 +606,6 @@ mod test { assert_eq!(hex, expected_hex); } - // Wire message test vectors available in Ethereum Portal Network specs repo: - // github.com/ethereum/portal-network-specs - #[test] - fn message_encoding_ping() { - let data_radius: U256 = U256::MAX - U256::from(1u8); - let custom_payload = CustomPayload::from(data_radius.as_ssz_bytes()); - let ping = Ping { - enr_seq: 1, - custom_payload, - }; - let ping = Message::Ping(ping); - - let encoded: Vec = ping.clone().into(); - let encoded = hex_encode(encoded); - let expected_encoded = "0x0001000000000000000c000000feffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"; - assert_eq!(encoded, expected_encoded); - - let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); - assert_eq!(decoded, ping); - } - - #[test] - fn message_encoding_pong() { - let data_radius: U256 = U256::MAX / U256::from(2u8); - let custom_payload = CustomPayload::from(data_radius.as_ssz_bytes()); - let pong = Pong { - enr_seq: 1, - custom_payload, - }; - let pong = Message::Pong(pong); - - let encoded: Vec = pong.clone().into(); - let encoded = hex_encode(encoded); - let expected_encoded = "0x0101000000000000000c000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff7f"; - assert_eq!(encoded, expected_encoded); - - let decoded = Message::try_from(hex_decode(&encoded).unwrap()).unwrap(); - assert_eq!(decoded, pong); - } - #[test] fn message_encoding_find_nodes() { let distances = vec![256, 255]; diff --git a/crates/ethportal-api/src/version.rs b/crates/ethportal-api/src/version.rs index 994835284..fdb579553 100644 --- a/crates/ethportal-api/src/version.rs +++ b/crates/ethportal-api/src/version.rs @@ -1,4 +1,4 @@ -use std::env; +pub const APP_NAME: &str = "trin"; /// The latest git commit hash of the build. pub const TRIN_FULL_COMMIT: &str = env!("VERGEN_GIT_SHA"); diff --git a/crates/portalnet/src/overlay/mod.rs b/crates/portalnet/src/overlay/mod.rs index 87b5641fd..918f4cc88 100644 --- a/crates/portalnet/src/overlay/mod.rs +++ b/crates/portalnet/src/overlay/mod.rs @@ -1,6 +1,7 @@ pub mod command; pub mod config; pub mod errors; +pub mod ping_extensions; pub mod protocol; pub mod request; pub mod service; diff --git a/crates/portalnet/src/overlay/ping_extensions.rs b/crates/portalnet/src/overlay/ping_extensions.rs new file mode 100644 index 000000000..cb291148f --- /dev/null +++ b/crates/portalnet/src/overlay/ping_extensions.rs @@ -0,0 +1,35 @@ +use ethportal_api::types::ping_extensions::extension_types::Extensions; + +pub trait PingExtension { + /// Returns true if the extension is supported by the clients subnetwork. + fn is_supported(&self, extension: Extensions) -> bool; + + /// Returns the newest extension that is supported by both clients, used for extended ping + /// responses. + fn latest_mutually_supported_base_extension( + &self, + extensions: &[Extensions], + ) -> Option; + + /// Returns the extensions by their u16 type id that are supported by the clients subnetwork. + fn raw_extensions(&self) -> Vec; +} + +pub struct MockPingExtension {} + +impl PingExtension for MockPingExtension { + fn is_supported(&self, _extension: Extensions) -> bool { + true + } + + fn latest_mutually_supported_base_extension( + &self, + _extensions: &[Extensions], + ) -> Option { + Some(Extensions::HistoryRadius) + } + + fn raw_extensions(&self) -> Vec { + vec![0, 1] + } +} diff --git a/crates/portalnet/src/overlay/protocol.rs b/crates/portalnet/src/overlay/protocol.rs index 1a2b3003c..3a3d23ad5 100644 --- a/crates/portalnet/src/overlay/protocol.rs +++ b/crates/portalnet/src/overlay/protocol.rs @@ -20,10 +20,11 @@ use ethportal_api::{ distance::{Distance, Metric}, enr::Enr, network::Subnetwork, + ping_extensions::extensions::type_0::ClientInfoRadiusCapabilities, portal::PutContentInfo, portal_wire::{ - Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, OfferTrace, - Ping, Pong, PopulatedOffer, PopulatedOfferWithResult, Request, Response, + Accept, Content, FindContent, FindNodes, Message, Nodes, OfferTrace, Ping, Pong, + PopulatedOffer, PopulatedOfferWithResult, Request, Response, }, }, utils::bytes::hex_encode, @@ -31,7 +32,6 @@ use ethportal_api::{ }; use futures::channel::oneshot; use parking_lot::RwLock; -use ssz::Encode; use tokio::sync::{broadcast, mpsc::UnboundedSender}; use tracing::{debug, error, info, warn}; use trin_metrics::{overlay::OverlayMetricsReporter, portalnet::PORTALNET_METRICS}; @@ -39,6 +39,7 @@ use trin_storage::{ContentStore, ShouldWeStoreContent}; use trin_validation::validator::{ValidationResult, Validator}; use utp_rs::socket::UtpSocket; +use super::{ping_extensions::PingExtension, service::OverlayService}; use crate::{ bootnodes::Bootnode, discovery::{Discovery, UtpEnr}, @@ -49,7 +50,6 @@ use crate::{ config::{FindContentConfig, OverlayConfig}, errors::OverlayRequestError, request::{OverlayRequest, RequestDirection}, - service::OverlayService, }, put_content::{ propagate_put_content_cross_thread, trace_propagate_put_content_cross_thread, @@ -67,7 +67,7 @@ use crate::{ /// implement the overlay protocol and the overlay protocol is where we can encapsulate the logic /// for handling common network requests/responses. #[derive(Clone)] -pub struct OverlayProtocol { +pub struct OverlayProtocol { /// Reference to the underlying discv5 protocol pub discovery: Arc, /// The data store. @@ -90,6 +90,8 @@ pub struct OverlayProtocol { validator: Arc, /// Runtime telemetry metrics for the overlay network. metrics: OverlayMetricsReporter, + /// Ping extensions for the overlay network. + ping_extensions: Arc, } impl< @@ -97,7 +99,8 @@ impl< TMetric: Metric + Send + Sync, TValidator: 'static + Validator + Send + Sync, TStore: 'static + ContentStore + Send + Sync, - > OverlayProtocol + TPingExtensions: 'static + PingExtension + Send + Sync, + > OverlayProtocol { pub async fn new( config: OverlayConfig, @@ -106,6 +109,7 @@ impl< store: Arc>, protocol: Subnetwork, validator: Arc, + ping_extensions: Arc, ) -> Self { let local_node_id = discovery.local_enr().node_id(); let kbuckets = SharedKBucketsTable::new(KBucketsTable::new( @@ -125,25 +129,27 @@ impl< utp_socket, metrics.clone(), )); - let command_tx = OverlayService::::spawn( - Arc::clone(&discovery), - Arc::clone(&store), - kbuckets.clone(), - config.bootnode_enrs, - config.ping_queue_interval, - protocol, - Arc::clone(&utp_controller), - metrics.clone(), - Arc::clone(&validator), - config.query_timeout, - config.query_peer_timeout, - config.query_parallelism, - config.query_num_results, - config.findnodes_query_distances_per_peer, - config.disable_poke, - config.gossip_dropped, - ) - .await; + let command_tx = + OverlayService::::spawn( + Arc::clone(&discovery), + Arc::clone(&store), + kbuckets.clone(), + config.bootnode_enrs, + config.ping_queue_interval, + protocol, + Arc::clone(&utp_controller), + metrics.clone(), + Arc::clone(&validator), + config.query_timeout, + config.query_peer_timeout, + config.query_parallelism, + config.query_num_results, + config.findnodes_query_distances_per_peer, + config.disable_poke, + config.gossip_dropped, + Arc::clone(&ping_extensions), + ) + .await; Self { discovery, @@ -156,6 +162,7 @@ impl< _phantom_metric: PhantomData, validator, metrics, + ping_extensions, } } @@ -280,10 +287,7 @@ impl< /// `AddEnr` adds requested `enr` to our kbucket. pub fn add_enr(&self, enr: Enr) -> Result<(), OverlayRequestError> { match self.kbuckets.insert_or_update( - Node { - enr, - data_radius: Distance::MAX, - }, + Node::new(enr.clone(), Distance::MAX), NodeStatus { state: ConnectionState::Connected, direction: ConnectionDirection::Incoming, @@ -371,10 +375,13 @@ impl< // Construct the request. let enr_seq = self.discovery.local_enr().seq(); let data_radius = self.data_radius(); - let custom_payload = CustomPayload::from(data_radius.as_ssz_bytes()); + let payload = + ClientInfoRadiusCapabilities::new(data_radius, self.ping_extensions.raw_extensions()) + .into(); let request = Ping { enr_seq, - custom_payload, + payload_type: 0, + payload, }; let direction = RequestDirection::Outgoing { destination: enr }; diff --git a/crates/portalnet/src/overlay/service.rs b/crates/portalnet/src/overlay/service/manager.rs similarity index 93% rename from crates/portalnet/src/overlay/service.rs rename to crates/portalnet/src/overlay/service/manager.rs index dbc1ba2cb..254b890b4 100644 --- a/crates/portalnet/src/overlay/service.rs +++ b/crates/portalnet/src/overlay/service/manager.rs @@ -25,9 +25,9 @@ use ethportal_api::{ enr::{Enr, SszEnr}, network::Subnetwork, portal_wire::{ - Accept, Content, CustomPayload, FindContent, FindNodes, Message, Nodes, Offer, - OfferTrace, Ping, Pong, PopulatedOffer, Request, Response, - MAX_PORTAL_CONTENT_PAYLOAD_SIZE, MAX_PORTAL_NODES_ENRS_SIZE, + Accept, Content, FindContent, FindNodes, Message, Nodes, Offer, OfferTrace, + PopulatedOffer, Request, Response, MAX_PORTAL_CONTENT_PAYLOAD_SIZE, + MAX_PORTAL_NODES_ENRS_SIZE, }, query_trace::{QueryFailureKind, QueryTrace}, }, @@ -43,7 +43,7 @@ use ssz_types::BitList; use tokio::{ sync::{ broadcast, - mpsc::{self, UnboundedReceiver, UnboundedSender}, + mpsc::{self, UnboundedSender}, }, task::JoinHandle, }; @@ -53,6 +53,7 @@ use trin_storage::{ContentStore, ShouldWeStoreContent}; use trin_validation::validator::Validator; use utp_rs::cid::ConnectionId; +use super::OverlayService; use crate::{ accept_queue::AcceptQueue, discovery::{Discovery, UtpEnr}, @@ -73,6 +74,7 @@ use crate::{ command::OverlayCommand, config::FindContentConfig, errors::OverlayRequestError, + ping_extensions::PingExtension, request::{ ActiveOutgoingRequest, OverlayRequest, OverlayRequestId, OverlayResponse, RequestDirection, @@ -103,79 +105,13 @@ const BUCKET_REFRESH_INTERVAL_SECS: u64 = 60; /// The capacity of the event-stream's broadcast channel. const EVENT_STREAM_CHANNEL_CAPACITY: usize = 10; -/// The overlay service. -pub struct OverlayService -where - TContentKey: OverlayContentKey, -{ - /// The underlying Discovery v5 protocol. - discovery: Arc, - /// The content database of the local node. - store: Arc>, - /// The routing table of the local node. - kbuckets: SharedKBucketsTable, - /// The protocol identifier. - protocol: Subnetwork, - /// A queue of peers that require regular ping to check connectivity. - /// Inserted entries expire after a fixed time. Nodes to be pinged are inserted with a timeout - /// duration equal to some ping interval, and we continuously poll the queue to check for - /// expired entries. - peers_to_ping: HashSetDelay, - // TODO: This should probably be a bounded channel. - /// The receiver half of the service command channel. - command_rx: UnboundedReceiver>, - /// The sender half of the service command channel. - /// This is used internally to submit requests (e.g. maintenance ping requests). - command_tx: UnboundedSender>, - /// A map of active outgoing requests. - active_outgoing_requests: Arc>>, - /// A query pool that manages find node queries. - find_node_query_pool: QueryPool, TContentKey>, - /// A query pool that manages find content queries. - find_content_query_pool: QueryPool, TContentKey>, - /// A channel for recording events related to content queries. - content_query_trace_events_tx: UnboundedSender, - content_query_trace_events_rx: UnboundedReceiver, - /// Timeout after which a peer in an ongoing query is marked unresponsive. - query_peer_timeout: Duration, - /// Timeout for each complete query - query_timeout: Duration, - /// Number of peers to request data from in parallel for a single query. - query_parallelism: usize, - /// Number of new peers to discover before considering a FINDNODES query complete. - query_num_results: usize, - /// The number of buckets we simultaneously request from each peer in a FINDNODES query. - findnodes_query_distances_per_peer: usize, - /// The receiver half of a channel for responses to outgoing requests. - response_rx: UnboundedReceiver, - /// The sender half of a channel for responses to outgoing requests. - response_tx: UnboundedSender, - /// uTP controller. - utp_controller: Arc, - /// Phantom content key. - _phantom_content_key: PhantomData, - /// Phantom metric (distance function). - _phantom_metric: PhantomData, - /// Metrics reporting component - metrics: OverlayMetricsReporter, - /// Validator for overlay network content. - validator: Arc, - /// A channel that the overlay service emits events on. - event_stream: broadcast::Sender, - /// Disable poke mechanism - disable_poke: bool, - /// Gossip content as it gets dropped from local storage - gossip_dropped: bool, - /// Accept Queue for inbound content keys - accept_queue: Arc>>, -} - impl< TContentKey: 'static + OverlayContentKey + Send + Sync, TMetric: Metric + Send + Sync, TValidator: 'static + Validator + Send + Sync, TStore: 'static + ContentStore + Send + Sync, - > OverlayService + TPingExtensions: 'static + PingExtension + Send + Sync, + > OverlayService { /// Spawns the overlay network service. /// @@ -200,6 +136,7 @@ impl< findnodes_query_distances_per_peer: usize, disable_poke: bool, gossip_dropped: bool, + ping_extensions: Arc, ) -> UnboundedSender> { let (command_tx, command_rx) = mpsc::unbounded_channel(); let internal_command_tx = command_tx.clone(); @@ -245,6 +182,7 @@ impl< disable_poke, gossip_dropped, accept_queue: Arc::new(RwLock::new(AcceptQueue::default())), + ping_extensions, }; info!(protocol = %protocol, "Starting overlay service"); @@ -380,18 +318,18 @@ impl< } Some(Ok(node_id)) = self.peers_to_ping.next() => { if let Some(node) = self.kbuckets.entry(node_id).present() { - self.ping_node(&node.enr); + self.ping_node(node); self.peers_to_ping.insert(node_id); } } - query_event = OverlayService::::query_event_poll(&mut self.find_node_query_pool) => { + query_event = OverlayService::::query_event_poll(&mut self.find_node_query_pool) => { self.handle_find_nodes_query_event(query_event); } // Handle query events for queries in the find content query pool. - query_event = OverlayService::::query_event_poll(&mut self.find_content_query_pool) => { + query_event = OverlayService::::query_event_poll(&mut self.find_content_query_pool) => { self.handle_find_content_query_event(query_event); } - _ = OverlayService::::bucket_maintenance_poll(self.protocol, &self.kbuckets) => {} + _ = OverlayService::::bucket_maintenance_poll(self.protocol, &self.kbuckets) => {} Some(trace_event) = self.content_query_trace_events_rx.recv() => { self.track_content_query_trace_event(trace_event); } @@ -429,7 +367,7 @@ impl< } /// Returns the local ENR of the node. - fn local_enr(&self) -> Enr { + pub(super) fn local_enr(&self) -> Enr { self.discovery.local_enr() } @@ -437,7 +375,7 @@ impl< /// /// This requires store lock and can block the thread, so it shouldn't be called other lock is /// already held. - fn data_radius(&self) -> Distance { + pub(super) fn data_radius(&self) -> Distance { self.store.read().radius() } @@ -936,24 +874,6 @@ impl< } } - /// Builds a `Pong` response for a `Ping` request. - fn handle_ping(&self, request: Ping, source: &NodeId, request_id: RequestId) -> Pong { - trace!( - protocol = %self.protocol, - request.source = %source, - request.discv5.id = %request_id, - "Handling Ping message {request}", - ); - - let enr_seq = self.local_enr().seq(); - let data_radius = self.data_radius(); - let custom_payload = CustomPayload::from(data_radius.as_ssz_bytes()); - Pong { - enr_seq, - custom_payload, - } - } - /// Builds a `Nodes` response for a `FindNodes` request. fn handle_find_nodes( &self, @@ -1373,36 +1293,12 @@ impl< // peer. if let Some(node_addr) = self.discovery.cached_node_addr(&source) { // TODO: Decide default data radius, and define a constant. - let node = Node { - enr: node_addr.enr, - data_radius: Distance::MAX, - }; + let node = Node::new(node_addr.enr, Distance::MAX); self.connect_node(node, ConnectionDirection::Incoming); } } } - /// Processes a ping request from some source node. - fn process_ping(&self, ping: Ping, source: NodeId) { - // If the node is in the routing table, then check if we need to update the node. - if let Some(node) = self.kbuckets.entry(source).present_or_pending() { - // TODO: How do we handle data in the custom payload? This is unique to each overlay - // network, so there may need to be some way to parameterize the update for a - // ping/pong. - - // If the ENR sequence number in pong is less than the ENR sequence number for the - // routing table entry, then request the node. - if node.enr().seq() < ping.enr_seq { - self.request_node(&node.enr()); - } - - let data_radius: Distance = ping.custom_payload.into(); - if node.data_radius != data_radius { - self.update_node_radius(node.enr(), data_radius); - } - } - } - /// Processes a failed request intended for some destination node. fn process_request_failure( &mut self, @@ -1779,39 +1675,11 @@ impl< Ok(()) } - /// Processes a Pong response. - /// - /// Refreshes the node if necessary. Attempts to mark the node as connected. - fn process_pong(&self, pong: Pong, source: Enr) { - let node_id = source.node_id(); - trace!( - protocol = %self.protocol, - response.source = %node_id, - "Processing Pong message {pong}" - ); - - // If the ENR sequence number in pong is less than the ENR sequence number for the routing - // table entry, then request the node. - // - // TODO: Perform update on non-ENR node entry state. See note in `process_ping`. - if let Some(node) = self.kbuckets.entry(node_id).present_or_pending() { - if node.enr().seq() < pong.enr_seq { - self.request_node(&node.enr()); - } - - let data_radius: Distance = pong.custom_payload.into(); - if node.data_radius != data_radius { - self.update_node_radius(source, data_radius); - } - } - } - /// Update the recorded radius of a node in our routing table. - fn update_node_radius(&self, enr: Enr, data_radius: Distance) { - let node_id = enr.node_id(); + pub(super) fn update_node(&self, node: Node) { + let node_id = node.enr.node_id(); - let updated_node = Node::new(enr, data_radius); - if let UpdateResult::Failed(_) = self.kbuckets.update_node(updated_node, None) { + if let UpdateResult::Failed(_) = self.kbuckets.update_node(node, None) { error!( "Failed to update radius of node {}", hex_encode_compact(node_id.raw()) @@ -2160,37 +2028,8 @@ impl< } } - /// Submits a request to ping a destination (target) node. - /// - /// This can block the thread, so make sure you are not holding any lock while calling this. - fn ping_node(&self, destination: &Enr) { - trace!( - protocol = %self.protocol, - request.dest = %destination.node_id(), - "Sending Ping message", - ); - - let enr_seq = self.local_enr().seq(); - let data_radius = self.data_radius(); - let custom_payload = CustomPayload::from(data_radius.as_ssz_bytes()); - let ping = Request::Ping(Ping { - enr_seq, - custom_payload, - }); - let request = OverlayRequest::new( - ping, - RequestDirection::Outgoing { - destination: destination.clone(), - }, - None, - None, - None, - ); - let _ = self.command_tx.send(OverlayCommand::Request(request)); - } - /// Submits a request for the node info of a destination (target) node. - fn request_node(&self, destination: &Enr) { + pub(super) fn request_node(&self, destination: &Enr) { let find_nodes = Request::FindNodes(FindNodes { distances: vec![0] }); let request = OverlayRequest::new( find_nodes, @@ -2228,7 +2067,7 @@ impl< // currently considered disconnected. This node should be pinged to check // for connectivity. if let Some(node) = self.kbuckets.entry(disconnected.into_preimage()).present() { - self.ping_node(&node.enr); + self.ping_node(node); } } InsertResult::StatusUpdated { @@ -2543,15 +2382,18 @@ where gossip_dropped: bool, } -impl - From<&OverlayService> +impl + From<&OverlayService> for UtpProcessing where TContentKey: OverlayContentKey + Send + Sync, TValidator: Validator, TStore: ContentStore, + TPingExtensions: PingExtension, { - fn from(service: &OverlayService) -> Self { + fn from( + service: &OverlayService, + ) -> Self { Self { validator: Arc::clone(&service.validator), store: Arc::clone(&service.store), @@ -2616,8 +2458,11 @@ mod tests { use alloy::primitives::U256; use discv5::kbucket; use ethportal_api::types::{ - content_key::overlay::IdentityContentKey, distance::XorMetric, - enr::generate_random_remote_enr, portal_wire::MAINNET, + content_key::overlay::IdentityContentKey, + distance::XorMetric, + enr::generate_random_remote_enr, + ping_extensions::extensions::type_0::ClientInfoRadiusCapabilities, + portal_wire::{Ping, Pong, MAINNET}, }; use kbucket::KBucketsTable; use rstest::*; @@ -2636,7 +2481,7 @@ mod tests { config::PortalnetConfig, constants::{DEFAULT_DISCOVERY_PORT, DEFAULT_UTP_TRANSFER_LIMIT}, discovery::{Discovery, NodeAddress}, - overlay::config::OverlayConfig, + overlay::{config::OverlayConfig, ping_extensions::MockPingExtension}, }; macro_rules! poll_command_rx { @@ -2645,8 +2490,13 @@ mod tests { }; } - fn build_service( - ) -> OverlayService { + fn build_service() -> OverlayService< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + > { let portal_config = PortalnetConfig { no_stun: true, no_upnp: true, @@ -2727,6 +2577,7 @@ mod tests { disable_poke: false, gossip_dropped: false, accept_queue, + ping_extensions: Arc::new(MockPingExtension {}), } } @@ -2749,7 +2600,12 @@ mod tests { let ping = Ping { enr_seq: source.seq() + 1, - custom_payload: CustomPayload::from(data_radius.as_ssz_bytes()), + payload_type: 0, + payload: ClientInfoRadiusCapabilities::new( + data_radius, + service.ping_extensions.raw_extensions(), + ) + .into(), }; service.process_ping(ping, node_id); @@ -2794,7 +2650,12 @@ mod tests { let ping = Ping { enr_seq: source.seq(), - custom_payload: CustomPayload::from(data_radius.as_ssz_bytes()), + payload_type: 0, + payload: ClientInfoRadiusCapabilities::new( + data_radius, + service.ping_extensions.raw_extensions(), + ) + .into(), }; service.process_ping(ping, node_id); @@ -2855,7 +2716,12 @@ mod tests { let pong = Pong { enr_seq: source.seq() + 1, - custom_payload: CustomPayload::from(data_radius.as_ssz_bytes()), + payload_type: 0, + payload: ClientInfoRadiusCapabilities::new( + data_radius, + service.ping_extensions.raw_extensions(), + ) + .into(), }; service.process_pong(pong, source.clone()); @@ -2899,7 +2765,12 @@ mod tests { let pong = Pong { enr_seq: source.seq(), - custom_payload: CustomPayload::from(data_radius.as_ssz_bytes()), + payload_type: 0, + payload: ClientInfoRadiusCapabilities::new( + data_radius, + service.ping_extensions.raw_extensions(), + ) + .into(), }; service.process_pong(pong, source); @@ -3037,7 +2908,13 @@ mod tests { let peer_node_ids: Vec = vec![peer.enr.node_id()]; // Node has maximum radius, so there should be one offer in the channel. - OverlayService::::poke_content( + OverlayService::< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::poke_content( &service.kbuckets, service.command_tx.clone(), content_key, @@ -3075,7 +2952,13 @@ mod tests { let peer_node_ids: Vec = peers.iter().map(|p| p.node_id()).collect(); // No nodes in the routing table, so no commands should be in the channel. - OverlayService::::poke_content( + OverlayService::< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::poke_content( &service.kbuckets, service.command_tx.clone(), content_key, @@ -3113,7 +2996,13 @@ mod tests { let peer_node_ids: Vec = peers.iter().map(|p| p.enr.node_id()).collect(); // One offer should be in the channel for the maximum radius node. - OverlayService::::poke_content( + OverlayService::< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::poke_content( &service.kbuckets, service.command_tx.clone(), content_key, @@ -3178,7 +3067,7 @@ mod tests { let mut service = task::spawn(build_service()); let (_, destination) = generate_random_remote_enr(); - service.ping_node(&destination); + service.ping_node(Node::new(destination.clone(), Distance::MAX)); let command = assert_ready!(poll_command_rx!(service)); assert!(command.is_some()); @@ -3387,6 +3276,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; match event { @@ -3423,6 +3313,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; @@ -3440,6 +3331,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; @@ -3464,6 +3356,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; @@ -3507,6 +3400,7 @@ mod tests { XorMetric, MockValidator, MemoryContentStore, + MockPingExtension, >::query_event_poll(&mut service.find_node_query_pool) .await; @@ -3552,10 +3446,7 @@ mod tests { let (_, bootnode_enr) = generate_random_remote_enr(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3624,10 +3515,7 @@ mod tests { let (_, bootnode_enr) = generate_random_remote_enr(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3685,10 +3573,7 @@ mod tests { let bootnode_node_id = bootnode_enr.node_id(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3748,10 +3633,7 @@ mod tests { let bootnode_node_id = bootnode_enr.node_id(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3813,10 +3695,7 @@ mod tests { let (_, bootnode_enr) = generate_random_remote_enr(); let data_radius = Distance::MAX; - let bootnode = Node { - enr: bootnode_enr.clone(), - data_radius, - }; + let bootnode = Node::new(bootnode_enr.clone(), data_radius); let connection_direction = ConnectionDirection::Outgoing; let status = NodeStatus { @@ -3837,11 +3716,14 @@ mod tests { ); let query_id = QueryId(0); // default query id for all initial queries - let query_event = - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( - &mut service.find_content_query_pool, - ) - .await; + let query_event = OverlayService::< + _, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::query_event_poll(&mut service.find_content_query_pool) + .await; // Query event should be `Waiting` variant. assert!(matches!(query_event, QueryEvent::Waiting(_, _, _))); @@ -3876,11 +3758,14 @@ mod tests { content.clone(), ); - let query_event = - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( - &mut service.find_content_query_pool, - ) - .await; + let query_event = OverlayService::< + _, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::query_event_poll(&mut service.find_content_query_pool) + .await; // Query event should be `Validating` variant. assert!(matches!(query_event, QueryEvent::Validating(..))); @@ -3890,13 +3775,18 @@ mod tests { // Poll until content is validated let mut attempts_left = 5; let query_event = loop { - let polled = timeout( - Duration::from_millis(10), - OverlayService::<_, XorMetric, MockValidator, MemoryContentStore>::query_event_poll( - &mut service.find_content_query_pool, - ), - ) - .await; + let polled = + timeout( + Duration::from_millis(10), + OverlayService::< + _, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >::query_event_poll(&mut service.find_content_query_pool), + ) + .await; if let Ok(query_event) = polled { break query_event; } diff --git a/crates/portalnet/src/overlay/service/mod.rs b/crates/portalnet/src/overlay/service/mod.rs new file mode 100644 index 000000000..e7d456242 --- /dev/null +++ b/crates/portalnet/src/overlay/service/mod.rs @@ -0,0 +1,100 @@ +pub mod manager; +pub mod ping; + +use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration}; + +use delay_map::HashSetDelay; +use discv5::enr::NodeId; +use ethportal_api::{types::network::Subnetwork, OverlayContentKey}; +use manager::QueryTraceEvent; +use parking_lot::RwLock; +use tokio::sync::{ + broadcast, + mpsc::{UnboundedReceiver, UnboundedSender}, +}; +use trin_metrics::overlay::OverlayMetricsReporter; + +use crate::{ + accept_queue::AcceptQueue, + discovery::Discovery, + events::EventEnvelope, + find::{ + iterators::{findcontent::FindContentQuery, findnodes::FindNodeQuery}, + query_pool::QueryPool, + }, + overlay::{ + command::OverlayCommand, + request::{ActiveOutgoingRequest, OverlayRequestId, OverlayResponse}, + }, + types::kbucket::SharedKBucketsTable, + utp::controller::UtpController, +}; + +/// The overlay service. +pub struct OverlayService +where + TContentKey: OverlayContentKey, +{ + /// The underlying Discovery v5 protocol. + discovery: Arc, + /// The content database of the local node. + store: Arc>, + /// The routing table of the local node. + kbuckets: SharedKBucketsTable, + /// The protocol identifier. + protocol: Subnetwork, + /// A queue of peers that require regular ping to check connectivity. + /// Inserted entries expire after a fixed time. Nodes to be pinged are inserted with a timeout + /// duration equal to some ping interval, and we continuously poll the queue to check for + /// expired entries. + peers_to_ping: HashSetDelay, + // TODO: This should probably be a bounded channel. + /// The receiver half of the service command channel. + command_rx: UnboundedReceiver>, + /// The sender half of the service command channel. + /// This is used internally to submit requests (e.g. maintenance ping requests). + command_tx: UnboundedSender>, + /// A map of active outgoing requests. + active_outgoing_requests: Arc>>, + /// A query pool that manages find node queries. + find_node_query_pool: QueryPool, TContentKey>, + /// A query pool that manages find content queries. + find_content_query_pool: QueryPool, TContentKey>, + /// A channel for recording events related to content queries. + content_query_trace_events_tx: UnboundedSender, + content_query_trace_events_rx: UnboundedReceiver, + /// Timeout after which a peer in an ongoing query is marked unresponsive. + query_peer_timeout: Duration, + /// Timeout for each complete query + query_timeout: Duration, + /// Number of peers to request data from in parallel for a single query. + query_parallelism: usize, + /// Number of new peers to discover before considering a FINDNODES query complete. + query_num_results: usize, + /// The number of buckets we simultaneously request from each peer in a FINDNODES query. + findnodes_query_distances_per_peer: usize, + /// The receiver half of a channel for responses to outgoing requests. + response_rx: UnboundedReceiver, + /// The sender half of a channel for responses to outgoing requests. + response_tx: UnboundedSender, + /// uTP controller. + utp_controller: Arc, + /// Phantom content key. + _phantom_content_key: PhantomData, + /// Phantom metric (distance function). + _phantom_metric: PhantomData, + /// Metrics reporting component + metrics: OverlayMetricsReporter, + /// Validator for overlay network content. + validator: Arc, + /// A channel that the overlay service emits events on. + event_stream: broadcast::Sender, + /// Disable poke mechanism + disable_poke: bool, + /// Gossip content as it gets dropped from local storage + gossip_dropped: bool, + /// Accept Queue for inbound content keys + accept_queue: Arc>>, + /// Ping extensions for the overlay network. + ping_extensions: Arc, +} diff --git a/crates/portalnet/src/overlay/service/ping/handlers.rs b/crates/portalnet/src/overlay/service/ping/handlers.rs new file mode 100644 index 000000000..b7911856d --- /dev/null +++ b/crates/portalnet/src/overlay/service/ping/handlers.rs @@ -0,0 +1,54 @@ +use ethportal_api::types::{ + network::Subnetwork, + ping_extensions::extensions::{ + type_0::ClientInfoRadiusCapabilities, type_1::BasicRadius, type_2::HistoryRadius, + }, +}; +use tracing::warn; + +use crate::types::node::Node; + +pub fn handle_capabilities( + radius_capabilities: ClientInfoRadiusCapabilities, + mut node: Node, + protocol: Subnetwork, +) -> Option { + let Ok(capabilities) = radius_capabilities.capabilities() else { + warn!( + protocol = %protocol, + request.source = %node.enr.node_id(), + "Capabilities weren't decoded correctly", + ); + return None; + }; + if node.data_radius != radius_capabilities.data_radius + || node.compare_capabilities(&capabilities) + { + node.set_data_radius(radius_capabilities.data_radius); + node.set_capabilities(capabilities); + return Some(node); + } + None +} + +pub fn handle_basic_radius(basic_radius: BasicRadius, mut node: Node) -> Option { + let data_radius = basic_radius.data_radius; + if node.data_radius != data_radius { + node.set_data_radius(data_radius); + return Some(node); + } + None +} + +pub fn handle_history_radius(history_radius: HistoryRadius, mut node: Node) -> Option { + let data_radius = history_radius.data_radius; + let ephemeral_header_count = history_radius.ephemeral_header_count; + if node.data_radius != data_radius + || node.ephemeral_header_count != Some(ephemeral_header_count) + { + node.set_data_radius(data_radius); + node.set_ephemeral_header_count(ephemeral_header_count); + return Some(node); + } + None +} diff --git a/crates/portalnet/src/overlay/service/ping/mod.rs b/crates/portalnet/src/overlay/service/ping/mod.rs new file mode 100644 index 000000000..3b19eb2c8 --- /dev/null +++ b/crates/portalnet/src/overlay/service/ping/mod.rs @@ -0,0 +1,333 @@ +pub mod handlers; + +use std::marker::Sync; + +use discv5::{enr::NodeId, rpc::RequestId}; +use ethportal_api::{ + types::{ + distance::Metric, + enr::Enr, + ping_extensions::{ + decode::DecodedExtension, + extension_types::Extensions, + extensions::{ + type_0::ClientInfoRadiusCapabilities, + type_1::BasicRadius, + type_2::HistoryRadius, + type_65535::{ErrorCodes, PingError}, + }, + }, + portal_wire::{CustomPayload, Ping, Pong, Request}, + }, + OverlayContentKey, +}; +use handlers::{handle_basic_radius, handle_capabilities, handle_history_radius}; +use tracing::{trace, warn}; +use trin_storage::ContentStore; +use trin_validation::validator::Validator; + +use super::OverlayService; +use crate::{ + overlay::{ + command::OverlayCommand, + ping_extensions::PingExtension, + request::{OverlayRequest, RequestDirection}, + }, + types::node::Node, +}; + +/// Implementation of the `OverlayService` for handling Ping and Pong Extensions. +impl< + TContentKey: 'static + OverlayContentKey + Send + Sync, + TMetric: Metric + Send + Sync, + TValidator: 'static + Validator + Send + Sync, + TStore: 'static + ContentStore + Send + Sync, + TPingExtensions: 'static + PingExtension + Send + Sync, + > OverlayService +{ + fn create_pong(&self, payload_type: u16, payload: CustomPayload) -> Pong { + Pong { + enr_seq: self.local_enr().seq(), + payload_type, + payload, + } + } + + /// Builds a `Pong` response for a `Ping` request. + pub(super) fn handle_ping( + &self, + request: Ping, + source: &NodeId, + request_id: RequestId, + ) -> Pong { + trace!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Handling Ping message {request}", + ); + + let extension_type = match Extensions::try_from(request.payload_type) { + Ok(extension) => extension, + Err(err) => { + warn!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Received non-supported extension type in ping message: {err:?}", + ); + return self.create_pong( + Extensions::Error.into(), + PingError::new(ErrorCodes::ExtensionNotSupported).into(), + ); + } + }; + + if !self.ping_extensions.is_supported(extension_type) { + warn!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Received non-supported ping extension on this portal subnetwork: {extension_type:?}", + ); + return self.create_pong( + Extensions::Error.into(), + PingError::new(ErrorCodes::ExtensionNotSupported).into(), + ); + } + + match extension_type { + Extensions::Capabilities => { + self.create_pong(extension_type.into(), self.create_capabilities().into()) + } + Extensions::BasicRadius => self.create_pong( + extension_type.into(), + BasicRadius { + data_radius: self.data_radius(), + } + .into(), + ), + Extensions::HistoryRadius => self.create_pong( + extension_type.into(), + HistoryRadius { + data_radius: self.data_radius(), + ephemeral_header_count: 0, + } + .into(), + ), + Extensions::Error => { + warn!( + protocol = %self.protocol, + request.source = %source, + request.discv5.id = %request_id, + "Received invalid Ping message, Errors should only be received from pong", + ); + self.create_pong( + extension_type.into(), + PingError::new(ErrorCodes::SystemError).into(), + ) + } + } + } + + /// Processes a ping request from some source node. + pub(super) fn process_ping(&self, ping: Ping, source: NodeId) { + // If the node is in the routing table, then check if we need to update the node. + if let Some(node) = self.kbuckets.entry(source).present_or_pending() { + // TODO: How do we handle data in the custom payload? This is unique to each overlay + // network, so there may need to be some way to parameterize the update for a + // ping/pong. + + // If the ENR sequence number in pong is less than the ENR sequence number for the + // routing table entry, then request the node. + if node.enr().seq() < ping.enr_seq { + self.request_node(&node.enr()); + } + + let extension = + match DecodedExtension::decode_extension(ping.payload_type, ping.payload) { + Ok(extension) => extension, + Err(err) => { + warn!( + protocol = %self.protocol, + request.source = %source, + "Failed to decode custom payload during process_ping: {err:?}", + ); + return; + } + }; + + if !self.ping_extensions.is_supported(extension.clone().into()) { + warn!( + protocol = %self.protocol, + request.source = %source, + "Extension type isn't supported on this subnetwork: {extension:?}", + ); + return; + } + + let node = match extension { + DecodedExtension::Capabilities(radius_capabilities) => { + handle_capabilities(radius_capabilities, node, self.protocol) + } + DecodedExtension::BasicRadius(basic_radius) => { + handle_basic_radius(basic_radius, node) + } + DecodedExtension::HistoryRadius(history_radius) => { + handle_history_radius(history_radius, node) + } + DecodedExtension::Error(ping_error) => { + warn!( + protocol = %self.protocol, + request.source = %source, + "Received an error response from a ping request: {ping_error:?}", + ); + return; + } + }; + + if let Some(node) = node { + self.update_node(node); + } + } + } + + /// Processes a Pong response. + /// + /// Refreshes the node if necessary. Attempts to mark the node as connected. + pub(super) fn process_pong(&self, pong: Pong, source: Enr) { + let node_id = source.node_id(); + trace!( + protocol = %self.protocol, + response.source = %node_id, + "Processing Pong message {pong}" + ); + + // If the ENR sequence number in pong is less than the ENR sequence number for the routing + // table entry, then request the node. + // + // TODO: Perform update on non-ENR node entry state. See note in `process_ping`. + if let Some(node) = self.kbuckets.entry(node_id).present_or_pending() { + if node.enr().seq() < pong.enr_seq { + self.request_node(&node.enr()); + } + + let extension = + match DecodedExtension::decode_extension(pong.payload_type, pong.payload) { + Ok(extension) => extension, + Err(err) => { + warn!( + protocol = %self.protocol, + request.source = %source, + "Failed to decode custom payload during process_ping: {err:?}", + ); + return; + } + }; + + if !self.ping_extensions.is_supported(extension.clone().into()) { + warn!( + protocol = %self.protocol, + request.source = %source, + "Extension type isn't supported on this subnetwork: {extension:?}", + ); + return; + } + + let node = match extension { + DecodedExtension::Capabilities(radius_capabilities) => { + handle_capabilities(radius_capabilities, node, self.protocol) + } + DecodedExtension::BasicRadius(basic_radius) => { + handle_basic_radius(basic_radius, node) + } + DecodedExtension::HistoryRadius(history_radius) => { + handle_history_radius(history_radius, node) + } + DecodedExtension::Error(ping_error) => { + warn!( + protocol = %self.protocol, + request.source = %source, + "Received an error response from a pong request: {ping_error:?}", + ); + return; + } + }; + + if let Some(node) = node { + self.update_node(node); + } + } + } + + fn create_capabilities(&self) -> ClientInfoRadiusCapabilities { + ClientInfoRadiusCapabilities::new(self.data_radius(), self.ping_extensions.raw_extensions()) + } + + fn handle_base_extension( + &self, + extension: Extensions, + node_id: NodeId, + ) -> (u16, CustomPayload) { + match extension { + Extensions::BasicRadius => ( + extension.into(), + BasicRadius { + data_radius: self.data_radius(), + } + .into(), + ), + Extensions::HistoryRadius => ( + extension.into(), + HistoryRadius { + data_radius: self.data_radius(), + ephemeral_header_count: 0, + } + .into(), + ), + _ => { + warn!( + protocol = %self.protocol, + request.dest = %node_id, + "Base extension wasn't implemented: {extension:?}, sending Capabilities instead. This is a bug!", + ); + (0, self.create_capabilities().into()) + } + } + } + + /// Submits a request to ping a destination (target) node. + /// + /// This can block the thread, so make sure you are not holding any lock while calling this. + pub(super) fn ping_node(&self, node: Node) { + trace!( + protocol = %self.protocol, + request.dest = %node.enr.node_id(), + "Sending Ping message", + ); + + let (payload_type, payload) = match node.capabilities().map(|capabilities| { + self.ping_extensions + .latest_mutually_supported_base_extension(capabilities) + }) { + Some(Some(extension)) => self.handle_base_extension(extension, node.enr.node_id()), + _ => (0, self.create_capabilities().into()), + }; + + let ping = Request::Ping(Ping { + enr_seq: self.local_enr().seq(), + payload_type, + payload, + }); + let request = OverlayRequest::new( + ping, + RequestDirection::Outgoing { + destination: node.enr.clone(), + }, + None, + None, + None, + ); + let _ = self.command_tx.send(OverlayCommand::Request(request)); + } +} diff --git a/crates/portalnet/src/types/kbucket.rs b/crates/portalnet/src/types/kbucket.rs index 66f5481a7..63f3ad595 100644 --- a/crates/portalnet/src/types/kbucket.rs +++ b/crates/portalnet/src/types/kbucket.rs @@ -149,10 +149,9 @@ impl SharedKBucketsTable { // If the node is not in the routing table, then insert the node in a disconnected state // (a subsequent ping will establish connectivity with the node). Ignore insertion // failures. - if let Some(node) = Entry::from(kbuckets.entry(&key)).present_or_pending() { + if let Some(mut node) = Entry::from(kbuckets.entry(&key)).present_or_pending() { if node.enr.seq() < enr.seq() { - let node = Node::new(enr, node.data_radius()); - + node.set_enr(enr); if let UpdateResult::Failed(reason) = kbuckets.update_node(&key, node, None) { // The update removed the node because it would violate the incoming peers // condition or a bucket/table filter. diff --git a/crates/portalnet/src/types/node.rs b/crates/portalnet/src/types/node.rs index 40c0a032e..c11e50e74 100644 --- a/crates/portalnet/src/types/node.rs +++ b/crates/portalnet/src/types/node.rs @@ -1,20 +1,34 @@ use std::fmt; -use ethportal_api::types::{distance::Distance, enr::Enr}; +use ethportal_api::types::{ + distance::Distance, enr::Enr, ping_extensions::extension_types::Extensions, +}; /// A node in the overlay network routing table. #[derive(Clone, Debug, Eq, PartialEq)] pub struct Node { /// The node's ENR. pub enr: Enr, + /// The node's data radius. pub data_radius: Distance, + + /// The node's capabilities. + pub capabilities: Option>, + + /// The node's ephemeral header count (only used for History Network) + pub ephemeral_header_count: Option, } impl Node { /// Creates a new node. pub fn new(enr: Enr, data_radius: Distance) -> Node { - Node { enr, data_radius } + Node { + enr, + data_radius, + capabilities: None, + ephemeral_header_count: None, + } } /// Returns the ENR of the node. @@ -27,6 +41,26 @@ impl Node { self.data_radius } + /// Returns the capabilities of the node. + pub fn capabilities(&self) -> Option<&[Extensions]> { + self.capabilities.as_deref() + } + + /// Compares the capabilities of the node with the given capabilities. + /// Returns true if the capabilities are the same. + pub fn compare_capabilities(&self, capabilities: &[Extensions]) -> bool { + if let Some(node_capabilities) = &self.capabilities { + capabilities.iter().all(|c| node_capabilities.contains(c)) + } else { + false + } + } + + /// Returns the ephemeral header count of the node. + pub fn ephemeral_header_count(&self) -> Option { + self.ephemeral_header_count + } + /// Sets the ENR of the node. pub fn set_enr(&mut self, enr: Enr) { self.enr = enr; @@ -36,6 +70,16 @@ impl Node { pub fn set_data_radius(&mut self, radius: Distance) { self.data_radius = radius; } + + /// Sets the capabilities of the node. + pub fn set_capabilities(&mut self, capabilities: Vec) { + self.capabilities = Some(capabilities); + } + + /// Sets the ephemeral header count of the node. + pub fn set_ephemeral_header_count(&mut self, count: u16) { + self.ephemeral_header_count = Some(count); + } } impl fmt::Display for Node { diff --git a/crates/portalnet/src/utils/db.rs b/crates/portalnet/src/utils/db.rs index 727e4ddbc..ccb201de7 100644 --- a/crates/portalnet/src/utils/db.rs +++ b/crates/portalnet/src/utils/db.rs @@ -9,10 +9,10 @@ use discv5::enr::{CombinedKey, Enr, NodeId}; use ethportal_api::{ types::network::Network, utils::bytes::{hex_decode, hex_encode}, + version::APP_NAME, }; use trin_utils::dir::setup_data_dir; -const APP_NAME: &str = "trin"; const TRIN_DATA_ENV_VAR: &str = "TRIN_DATA_PATH"; const UNSAFE_PRIVATE_KEY_FILE_NAME: &str = "unsafe_private_key.hex"; diff --git a/crates/portalnet/tests/overlay.rs b/crates/portalnet/tests/overlay.rs index b14427b72..8ed095ab3 100644 --- a/crates/portalnet/tests/overlay.rs +++ b/crates/portalnet/tests/overlay.rs @@ -21,6 +21,7 @@ use portalnet::{ discovery::{Discovery, Discv5UdpSocket}, overlay::{ config::{FindContentConfig, OverlayConfig}, + ping_extensions::MockPingExtension, protocol::OverlayProtocol, }, }; @@ -35,7 +36,13 @@ use utp_rs::socket::UtpSocket; async fn init_overlay( discovery: Arc, subnetwork: Subnetwork, -) -> OverlayProtocol { +) -> OverlayProtocol< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, +> { let overlay_config = OverlayConfig::default(); let node_id = discovery.local_enr().node_id(); @@ -50,6 +57,7 @@ async fn init_overlay( let utp_socket = Arc::new(UtpSocket::with_socket(discv5_utp)); let validator = Arc::new(MockValidator {}); + let ping_extensions = Arc::new(MockPingExtension {}); OverlayProtocol::new( overlay_config, @@ -58,13 +66,22 @@ async fn init_overlay( store, subnetwork, validator, + ping_extensions, ) .await } async fn spawn_overlay( mut talk_req_rx: mpsc::Receiver, - overlay: Arc>, + overlay: Arc< + OverlayProtocol< + IdentityContentKey, + XorMetric, + MockValidator, + MemoryContentStore, + MockPingExtension, + >, + >, ) { let (overlay_tx, mut overlay_rx) = mpsc::unbounded_channel(); diff --git a/crates/subnetworks/beacon/src/jsonrpc.rs b/crates/subnetworks/beacon/src/jsonrpc.rs index a25f7b19e..31b3e87e9 100644 --- a/crates/subnetworks/beacon/src/jsonrpc.rs +++ b/crates/subnetworks/beacon/src/jsonrpc.rs @@ -4,8 +4,8 @@ use discv5::enr::NodeId; use ethportal_api::{ types::{ content_value::ContentValue, - distance::Distance, jsonrpc::{endpoints::BeaconEndpoint, request::BeaconJsonRpcRequest}, + ping_extensions::decode::DecodedExtension, portal::{AcceptInfo, FindNodesInfo, GetContentInfo, PongInfo, TraceContentInfo}, portal_wire::Content, }, @@ -399,10 +399,18 @@ async fn ping( enr: discv5::enr::Enr, ) -> Result { match network.overlay.send_ping(enr).await { - Ok(pong) => Ok(json!(PongInfo { - enr_seq: pong.enr_seq, - data_radius: *Distance::from(pong.custom_payload), - })), + Ok(pong) => { + let data_radius = + match DecodedExtension::decode_extension(pong.payload_type, pong.payload) { + Ok(DecodedExtension::Capabilities(capabilities)) => *capabilities.data_radius, + err => return Err(format!("Failed to decode capabilities: {err:?}")), + }; + + Ok(json!(PongInfo { + enr_seq: pong.enr_seq, + data_radius, + })) + } Err(msg) => Err(format!("Ping request timeout: {msg:?}")), } } diff --git a/crates/subnetworks/beacon/src/lib.rs b/crates/subnetworks/beacon/src/lib.rs index f558e4a76..f0414f845 100644 --- a/crates/subnetworks/beacon/src/lib.rs +++ b/crates/subnetworks/beacon/src/lib.rs @@ -4,6 +4,7 @@ pub mod events; mod jsonrpc; pub mod network; +mod ping_extensions; mod storage; mod sync; #[cfg(test)] diff --git a/crates/subnetworks/beacon/src/network.rs b/crates/subnetworks/beacon/src/network.rs index 7772b414d..f54d7b454 100644 --- a/crates/subnetworks/beacon/src/network.rs +++ b/crates/subnetworks/beacon/src/network.rs @@ -18,13 +18,24 @@ use trin_storage::PortalStorageConfig; use trin_validation::oracle::HeaderOracle; use utp_rs::socket::UtpSocket; -use crate::{storage::BeaconStorage, sync::BeaconSync, validation::BeaconValidator}; +use crate::{ + ping_extensions::BeaconPingExtensions, storage::BeaconStorage, sync::BeaconSync, + validation::BeaconValidator, +}; /// Beacon network layer on top of the overlay protocol. Encapsulates beacon network specific data /// and logic. #[derive(Clone)] pub struct BeaconNetwork { - pub overlay: Arc>, + pub overlay: Arc< + OverlayProtocol< + BeaconContentKey, + XorMetric, + BeaconValidator, + BeaconStorage, + BeaconPingExtensions, + >, + >, pub beacon_client: Arc>>>, } @@ -49,6 +60,7 @@ impl BeaconNetwork { let storage = Arc::new(PLRwLock::new(BeaconStorage::new(storage_config)?)); let storage_clone = Arc::clone(&storage); let validator = Arc::new(BeaconValidator::new(header_oracle)); + let ping_extensions = Arc::new(BeaconPingExtensions {}); let overlay = OverlayProtocol::new( config, discovery, @@ -56,6 +68,7 @@ impl BeaconNetwork { storage, Subnetwork::Beacon, validator, + ping_extensions, ) .await; diff --git a/crates/subnetworks/beacon/src/ping_extensions.rs b/crates/subnetworks/beacon/src/ping_extensions.rs new file mode 100644 index 000000000..7ccf668dc --- /dev/null +++ b/crates/subnetworks/beacon/src/ping_extensions.rs @@ -0,0 +1,41 @@ +use ethportal_api::types::ping_extensions::extension_types::Extensions; +use portalnet::overlay::ping_extensions::PingExtension; + +pub struct BeaconPingExtensions {} + +impl BeaconPingExtensions { + pub const SUPPORT_EXTENSIONS: &[Extensions] = &[ + Extensions::Capabilities, + Extensions::BasicRadius, + Extensions::Error, + ]; + + /// Base extensions that are required for the core subnetwork to function. + /// These must be sorted by latest to oldest + pub const BASE_EXTENSIONS: &[Extensions] = &[Extensions::BasicRadius]; +} + +impl PingExtension for BeaconPingExtensions { + fn is_supported(&self, extension: Extensions) -> bool { + Self::SUPPORT_EXTENSIONS.contains(&extension) + } + + fn latest_mutually_supported_base_extension( + &self, + extensions: &[Extensions], + ) -> Option { + for base_extension in Self::BASE_EXTENSIONS { + if extensions.contains(base_extension) { + return Some(*base_extension); + } + } + None + } + + fn raw_extensions(&self) -> Vec { + Self::SUPPORT_EXTENSIONS + .iter() + .map(|e| u16::from(*e)) + .collect() + } +} diff --git a/crates/subnetworks/history/src/jsonrpc.rs b/crates/subnetworks/history/src/jsonrpc.rs index cb91937b8..f48b0548f 100644 --- a/crates/subnetworks/history/src/jsonrpc.rs +++ b/crates/subnetworks/history/src/jsonrpc.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use discv5::enr::NodeId; use ethportal_api::{ types::{ - distance::Distance, jsonrpc::{endpoints::HistoryEndpoint, request::HistoryJsonRpcRequest}, + ping_extensions::decode::DecodedExtension, portal::{AcceptInfo, FindNodesInfo, GetContentInfo, PongInfo, TraceContentInfo}, portal_wire::Content, }, @@ -345,10 +345,18 @@ async fn ping( enr: discv5::enr::Enr, ) -> Result { match network.overlay.send_ping(enr).await { - Ok(pong) => Ok(json!(PongInfo { - enr_seq: pong.enr_seq, - data_radius: *Distance::from(pong.custom_payload), - })), + Ok(pong) => { + let data_radius = + match DecodedExtension::decode_extension(pong.payload_type, pong.payload) { + Ok(DecodedExtension::Capabilities(capabilities)) => *capabilities.data_radius, + err => return Err(format!("Failed to decode capabilities: {err:?}")), + }; + + Ok(json!(PongInfo { + enr_seq: pong.enr_seq, + data_radius, + })) + } Err(msg) => Err(format!("Ping request timeout: {msg:?}")), } } diff --git a/crates/subnetworks/history/src/lib.rs b/crates/subnetworks/history/src/lib.rs index c8fa2deba..5803c9660 100644 --- a/crates/subnetworks/history/src/lib.rs +++ b/crates/subnetworks/history/src/lib.rs @@ -4,6 +4,7 @@ pub mod events; mod jsonrpc; pub mod network; +mod ping_extensions; mod storage; pub mod validation; diff --git a/crates/subnetworks/history/src/network.rs b/crates/subnetworks/history/src/network.rs index 89489c663..f4b96057d 100644 --- a/crates/subnetworks/history/src/network.rs +++ b/crates/subnetworks/history/src/network.rs @@ -15,7 +15,10 @@ use trin_storage::PortalStorageConfig; use trin_validation::oracle::HeaderOracle; use utp_rs::socket::UtpSocket; -use crate::{storage::HistoryStorage, validation::ChainHistoryValidator}; +use crate::{ + ping_extensions::HistoryPingExtensions, storage::HistoryStorage, + validation::ChainHistoryValidator, +}; /// Gossip content as it gets dropped from local storage, /// enabled by default for the history network. @@ -25,8 +28,15 @@ const GOSSIP_DROPPED: bool = true; /// and logic. #[derive(Clone)] pub struct HistoryNetwork { - pub overlay: - Arc>, + pub overlay: Arc< + OverlayProtocol< + HistoryContentKey, + XorMetric, + ChainHistoryValidator, + HistoryStorage, + HistoryPingExtensions, + >, + >, } impl HistoryNetwork { @@ -46,6 +56,7 @@ impl HistoryNetwork { }; let storage = Arc::new(PLRwLock::new(HistoryStorage::new(storage_config)?)); let validator = Arc::new(ChainHistoryValidator { header_oracle }); + let ping_extensions = Arc::new(HistoryPingExtensions {}); let overlay = OverlayProtocol::new( config, discovery, @@ -53,6 +64,7 @@ impl HistoryNetwork { storage, Subnetwork::History, validator, + ping_extensions, ) .await; diff --git a/crates/subnetworks/history/src/ping_extensions.rs b/crates/subnetworks/history/src/ping_extensions.rs new file mode 100644 index 000000000..315560b9a --- /dev/null +++ b/crates/subnetworks/history/src/ping_extensions.rs @@ -0,0 +1,41 @@ +use ethportal_api::types::ping_extensions::extension_types::Extensions; +use portalnet::overlay::ping_extensions::PingExtension; + +pub struct HistoryPingExtensions {} + +impl HistoryPingExtensions { + pub const SUPPORT_EXTENSIONS: &[Extensions] = &[ + Extensions::Capabilities, + Extensions::HistoryRadius, + Extensions::Error, + ]; + + /// Base extensions that are required for the core subnetwork to function. + /// These must be sorted by latest to oldest + pub const BASE_EXTENSIONS: &[Extensions] = &[Extensions::HistoryRadius]; +} + +impl PingExtension for HistoryPingExtensions { + fn is_supported(&self, extension: Extensions) -> bool { + Self::SUPPORT_EXTENSIONS.contains(&extension) + } + + fn latest_mutually_supported_base_extension( + &self, + extensions: &[Extensions], + ) -> Option { + for base_extension in Self::BASE_EXTENSIONS { + if extensions.contains(base_extension) { + return Some(*base_extension); + } + } + None + } + + fn raw_extensions(&self) -> Vec { + Self::SUPPORT_EXTENSIONS + .iter() + .map(|e| u16::from(*e)) + .collect() + } +} diff --git a/crates/subnetworks/state/src/jsonrpc.rs b/crates/subnetworks/state/src/jsonrpc.rs index aababc6a0..0785fe33d 100644 --- a/crates/subnetworks/state/src/jsonrpc.rs +++ b/crates/subnetworks/state/src/jsonrpc.rs @@ -4,8 +4,8 @@ use discv5::{enr::NodeId, Enr}; use ethportal_api::{ jsonrpsee::core::Serialize, types::{ - distance::Distance, jsonrpc::{endpoints::StateEndpoint, request::StateJsonRpcRequest}, + ping_extensions::decode::DecodedExtension, portal::{AcceptInfo, FindNodesInfo, GetContentInfo, PongInfo, TraceContentInfo}, portal_wire::Content, }, @@ -96,13 +96,22 @@ fn routing_table_info(network: Arc) -> Result { } async fn ping(network: Arc, enr: Enr) -> Result { - to_json_result( - "Ping", - network.overlay.send_ping(enr).await.map(|pong| PongInfo { - enr_seq: pong.enr_seq, - data_radius: *Distance::from(pong.custom_payload), - }), - ) + let pong = match network.overlay.send_ping(enr.clone()).await { + Ok(pong) => { + let data_radius = + match DecodedExtension::decode_extension(pong.payload_type, pong.payload) { + Ok(DecodedExtension::Capabilities(capabilities)) => *capabilities.data_radius, + err => return Err(format!("Failed to decode capabilities: {err:?}")), + }; + + Ok(PongInfo { + enr_seq: pong.enr_seq, + data_radius, + }) + } + Err(msg) => Err(format!("Ping request timeout: {msg:?}")), + }; + to_json_result("Ping", pong) } fn add_enr(network: Arc, enr: Enr) -> Result { diff --git a/crates/subnetworks/state/src/lib.rs b/crates/subnetworks/state/src/lib.rs index bab5b8e78..53773d9ee 100644 --- a/crates/subnetworks/state/src/lib.rs +++ b/crates/subnetworks/state/src/lib.rs @@ -29,6 +29,7 @@ use crate::{events::StateEvents, jsonrpc::StateRequestHandler}; pub mod events; mod jsonrpc; pub mod network; +mod ping_extensions; mod storage; pub mod validation; diff --git a/crates/subnetworks/state/src/network.rs b/crates/subnetworks/state/src/network.rs index 6c67c1ea4..1a9a5f79f 100644 --- a/crates/subnetworks/state/src/network.rs +++ b/crates/subnetworks/state/src/network.rs @@ -16,13 +16,23 @@ use trin_storage::PortalStorageConfig; use trin_validation::oracle::HeaderOracle; use utp_rs::socket::UtpSocket; -use crate::{storage::StateStorage, validation::StateValidator}; +use crate::{ + ping_extensions::StatePingExtensions, storage::StateStorage, validation::StateValidator, +}; /// State network layer on top of the overlay protocol. Encapsulates state network specific data and /// logic. #[derive(Clone)] pub struct StateNetwork { - pub overlay: Arc>, + pub overlay: Arc< + OverlayProtocol< + StateContentKey, + XorMetric, + StateValidator, + StateStorage, + StatePingExtensions, + >, + >, } /// Poke is disabled for state network because Offer/Accept and Find/Found Content are different, @@ -52,6 +62,7 @@ impl StateNetwork { }; let storage = Arc::new(PLRwLock::new(StateStorage::new(storage_config)?)); let validator = Arc::new(StateValidator { header_oracle }); + let ping_extensions = Arc::new(StatePingExtensions {}); let overlay = OverlayProtocol::new( config, discovery, @@ -59,6 +70,7 @@ impl StateNetwork { storage, Subnetwork::State, validator, + ping_extensions, ) .await; diff --git a/crates/subnetworks/state/src/ping_extensions.rs b/crates/subnetworks/state/src/ping_extensions.rs new file mode 100644 index 000000000..6c59b3528 --- /dev/null +++ b/crates/subnetworks/state/src/ping_extensions.rs @@ -0,0 +1,41 @@ +use ethportal_api::types::ping_extensions::extension_types::Extensions; +use portalnet::overlay::ping_extensions::PingExtension; + +pub struct StatePingExtensions {} + +impl StatePingExtensions { + pub const SUPPORT_EXTENSIONS: &[Extensions] = &[ + Extensions::Capabilities, + Extensions::BasicRadius, + Extensions::Error, + ]; + + /// Base extensions that are required for the core subnetwork to function. + /// These must be sorted by latest to oldest + pub const BASE_EXTENSIONS: &[Extensions] = &[Extensions::BasicRadius]; +} + +impl PingExtension for StatePingExtensions { + fn is_supported(&self, extension: Extensions) -> bool { + Self::SUPPORT_EXTENSIONS.contains(&extension) + } + + fn latest_mutually_supported_base_extension( + &self, + extensions: &[Extensions], + ) -> Option { + for base_extension in Self::BASE_EXTENSIONS { + if extensions.contains(base_extension) { + return Some(*base_extension); + } + } + None + } + + fn raw_extensions(&self) -> Vec { + Self::SUPPORT_EXTENSIONS + .iter() + .map(|e| u16::from(*e)) + .collect() + } +} diff --git a/testing/ethportal-peertest/src/lib.rs b/testing/ethportal-peertest/src/lib.rs index c2b2e8c26..798826b6c 100644 --- a/testing/ethportal-peertest/src/lib.rs +++ b/testing/ethportal-peertest/src/lib.rs @@ -11,6 +11,7 @@ use ethportal_api::{ network::{Network, Subnetwork}, }, utils::bytes::hex_encode, + version::APP_NAME, Discv5ApiClient, }; use futures::future; @@ -91,7 +92,7 @@ fn generate_trin_config( let network = network.to_string(); let trin_config_args = [ - "trin", + APP_NAME, "--network", &network, "--portal-subnetworks", diff --git a/testing/ethportal-peertest/src/scenarios/basic.rs b/testing/ethportal-peertest/src/scenarios/basic.rs index 27ae75c7e..599187e6e 100644 --- a/testing/ethportal-peertest/src/scenarios/basic.rs +++ b/testing/ethportal-peertest/src/scenarios/basic.rs @@ -129,7 +129,7 @@ pub async fn test_ping(subnetwork: Subnetwork, target: &Client, peertest: &Peert _ => panic!("Unexpected subnetwork: {subnetwork}"), } .await - .unwrap(); + .unwrap_or_else(|_| panic!("Ping failed {subnetwork}")); assert_eq!( result.data_radius, U256::from_be_slice(Distance::MAX.as_ssz_bytes().as_slice()) diff --git a/testing/ethportal-peertest/src/scenarios/put_content.rs b/testing/ethportal-peertest/src/scenarios/put_content.rs index ccfe3ee06..af1737c98 100644 --- a/testing/ethportal-peertest/src/scenarios/put_content.rs +++ b/testing/ethportal-peertest/src/scenarios/put_content.rs @@ -1,7 +1,8 @@ use std::net::{IpAddr, Ipv4Addr}; use ethportal_api::{ - jsonrpsee::async_client::Client, ContentValue, Discv5ApiClient, HistoryNetworkApiClient, + jsonrpsee::async_client::Client, version::APP_NAME, ContentValue, Discv5ApiClient, + HistoryNetworkApiClient, }; use tracing::info; use trin::cli::TrinConfig; @@ -350,7 +351,7 @@ fn fresh_node_config() -> (String, TrinConfig) { let external_addr = format!("{test_ip_addr}:{test_discovery_port}"); let fresh_ipc_path = format!("/tmp/trin-jsonrpc-{test_discovery_port}.ipc"); let trin_config = TrinConfig::new_from([ - "trin", + APP_NAME, "--portal-subnetworks", "history", "--external-address", diff --git a/testing/ethportal-peertest/tests/rpc_server.rs b/testing/ethportal-peertest/tests/rpc_server.rs index 64b400268..841548459 100644 --- a/testing/ethportal-peertest/tests/rpc_server.rs +++ b/testing/ethportal-peertest/tests/rpc_server.rs @@ -16,6 +16,7 @@ use alloy::{ use ethportal_api::{ types::execution::{block_body::BlockBody, header_with_proof::HeaderWithProof}, utils::bytes::{hex_decode, hex_encode}, + version::APP_NAME, ContentValue, Header, HistoryContentKey, HistoryContentValue, HistoryNetworkApiClient, }; use jsonrpsee::async_client::Client; @@ -40,7 +41,7 @@ async fn setup_web3_server() -> (RpcServerHandle, RootProvider, // Run a client, to be tested let trin_config = TrinConfig::new_from([ - "trin", + APP_NAME, "--external-address", external_addr.as_str(), "--web3-ipc-path", diff --git a/testing/ethportal-peertest/tests/self_peertest.rs b/testing/ethportal-peertest/tests/self_peertest.rs index e010dd153..cc7f08ccc 100644 --- a/testing/ethportal-peertest/tests/self_peertest.rs +++ b/testing/ethportal-peertest/tests/self_peertest.rs @@ -4,7 +4,10 @@ use std::{ net::{IpAddr, Ipv4Addr}, }; -use ethportal_api::types::network::{Network, Subnetwork}; +use ethportal_api::{ + types::network::{Network, Subnetwork}, + version::APP_NAME, +}; use ethportal_peertest as peertest; use ethportal_peertest::Peertest; use jsonrpsee::{async_client::Client, http_client::HttpClient}; @@ -325,7 +328,7 @@ async fn peertest_ping_cross_discv5_protocol_id() { let external_addr = format!("{}:{test_discovery_port}", Ipv4Addr::new(127, 0, 0, 1)); let trin_config = TrinConfig::new_from([ - "trin", + APP_NAME, "--network", &Network::Mainnet.to_string(), "--portal-subnetworks", @@ -374,7 +377,7 @@ async fn setup_peertest( // Run a client, to be tested let trin_config = TrinConfig::new_from([ - "trin", + APP_NAME, "--network", &network.to_string(), "--portal-subnetworks", @@ -429,7 +432,7 @@ async fn setup_peertest_bridge( env::set_var("PANDAOPS_CLIENT_SECRET", "xxx"); // Run a client, to be tested let trin_config = TrinConfig::new_from([ - "trin", + APP_NAME, "--network", &network.to_string(), "--portal-subnetworks",