From 4000bfbab32363555775997288e351e0929242fd Mon Sep 17 00:00:00 2001 From: Patrick Siegl <3261314+psiegl@users.noreply.github.com> Date: Tue, 19 Nov 2024 23:52:51 +0100 Subject: [PATCH 1/6] PoC bugfix for link-local networks employed on multiple NICs --- commons/zenoh-protocol/src/core/locator.rs | 10 ++++++++++ zenoh/src/net/runtime/orchestrator.rs | 23 ++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/commons/zenoh-protocol/src/core/locator.rs b/commons/zenoh-protocol/src/core/locator.rs index a34cc55320..bae12160e9 100644 --- a/commons/zenoh-protocol/src/core/locator.rs +++ b/commons/zenoh-protocol/src/core/locator.rs @@ -37,6 +37,16 @@ impl Locator { Ok(Self(ep)) } + pub fn new_with_config(protocol: A, address: B, metadata: C, config: String) -> ZResult + where + A: AsRef, + B: AsRef, + C: AsRef, + { + let ep = EndPoint::new(protocol, address, metadata, config)?; + Ok(Self(ep)) + } + pub fn protocol(&self) -> Protocol { self.0.protocol() } diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 1ed468bbf9..b22f3f84d1 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -894,10 +894,33 @@ impl Runtime { let mut reader = buf.as_slice()[..n].reader(); let codec = Zenoh080::new(); let res: Result = codec.read(&mut reader); + if let Ok(msg) = res { tracing::trace!("Received {:?} from {}", msg.body, peer); + if let ScoutingBody::Hello(hello) = &msg.body { + if matcher.matches(hello.whatami) { + + if let Ok(local_addr) = socket.local_addr() { + if let Ok(iface) = zenoh_util::net::get_interface_names_by_addr(local_addr.ip()) { + + let hello_iface = HelloProto { + version: hello.version, + whatami: hello.whatami, + zid: hello.zid, + locators: hello.locators.iter().map(|locator| Locator::new_with_config(locator.protocol(), + locator.address(), + locator.metadata(), + "iface=".to_string() + &iface[0]).unwrap()).collect() + }; + + if let Loop::Break = f(hello_iface.clone()).await { + break; + } + } + } + if let Loop::Break = f(hello.clone()).await { break; } From 4a66a0d761800a9b57312fcd8317f632c92fac7c Mon Sep 17 00:00:00 2001 From: Patrick Siegl <3261314+psiegl@users.noreply.github.com> Date: Wed, 20 Nov 2024 00:03:07 +0100 Subject: [PATCH 2/6] Remove extra lines --- zenoh/src/net/runtime/orchestrator.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index b22f3f84d1..cfb1c9cd0e 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -894,12 +894,9 @@ impl Runtime { let mut reader = buf.as_slice()[..n].reader(); let codec = Zenoh080::new(); let res: Result = codec.read(&mut reader); - if let Ok(msg) = res { tracing::trace!("Received {:?} from {}", msg.body, peer); - if let ScoutingBody::Hello(hello) = &msg.body { - if matcher.matches(hello.whatami) { if let Ok(local_addr) = socket.local_addr() { From 160a62788fac97949a03cdc135347f247ae595c3 Mon Sep 17 00:00:00 2001 From: Patrick Siegl <3261314+psiegl@users.noreply.github.com> Date: Sat, 23 Nov 2024 23:48:31 +0100 Subject: [PATCH 3/6] Rework towards a structure, that can return metadata and endpoints --- commons/zenoh-config/src/wrappers.rs | 16 +-- commons/zenoh-protocol/src/core/locator.rs | 10 -- commons/zenoh-protocol/src/scouting/hello.rs | 10 +- zenoh/src/net/runtime/orchestrator.rs | 109 +++++++++++-------- 4 files changed, 79 insertions(+), 66 deletions(-) diff --git a/commons/zenoh-config/src/wrappers.rs b/commons/zenoh-config/src/wrappers.rs index 31afe358e7..827ca659e1 100644 --- a/commons/zenoh-config/src/wrappers.rs +++ b/commons/zenoh-config/src/wrappers.rs @@ -20,8 +20,8 @@ use std::str::FromStr; use serde::{Deserialize, Serialize}; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, EntityGlobalIdProto, Locator, WhatAmI, ZenohIdProto}, - scouting::HelloProto, + core::{key_expr::OwnedKeyExpr, EntityGlobalIdProto, EndPoint, WhatAmI, ZenohIdProto}, + scouting::hello::HelloEndPoint }; /// The global unique id of a Zenoh session. @@ -102,12 +102,12 @@ impl FromStr for ZenohId { /// A zenoh Hello message. #[derive(Clone)] #[repr(transparent)] -pub struct Hello(HelloProto); +pub struct Hello(HelloEndPoint); impl Hello { /// Get the locators of this Hello message. - pub fn locators(&self) -> &[Locator] { - &self.0.locators + pub fn endpoints(&self) -> &[EndPoint] { + &self.0.endpoints } /// Get the zenoh id of this Hello message. @@ -121,8 +121,8 @@ impl Hello { } } -impl From for Hello { - fn from(inner: HelloProto) -> Self { +impl From for Hello { + fn from(inner: HelloEndPoint) -> Self { Hello(inner) } } @@ -138,7 +138,7 @@ impl fmt::Display for Hello { f.debug_struct("Hello") .field("zid", &self.zid()) .field("whatami", &self.whatami()) - .field("locators", &self.locators()) + .field("endpoints", &self.endpoints()) .finish() } } diff --git a/commons/zenoh-protocol/src/core/locator.rs b/commons/zenoh-protocol/src/core/locator.rs index bae12160e9..a34cc55320 100644 --- a/commons/zenoh-protocol/src/core/locator.rs +++ b/commons/zenoh-protocol/src/core/locator.rs @@ -37,16 +37,6 @@ impl Locator { Ok(Self(ep)) } - pub fn new_with_config(protocol: A, address: B, metadata: C, config: String) -> ZResult - where - A: AsRef, - B: AsRef, - C: AsRef, - { - let ep = EndPoint::new(protocol, address, metadata, config)?; - Ok(Self(ep)) - } - pub fn protocol(&self) -> Protocol { self.0.protocol() } diff --git a/commons/zenoh-protocol/src/scouting/hello.rs b/commons/zenoh-protocol/src/scouting/hello.rs index 69109ed611..583f061d2c 100644 --- a/commons/zenoh-protocol/src/scouting/hello.rs +++ b/commons/zenoh-protocol/src/scouting/hello.rs @@ -13,7 +13,7 @@ // use alloc::vec::Vec; -use crate::core::{Locator, WhatAmI, ZenohIdProto}; +use crate::core::{EndPoint, Locator, WhatAmI, ZenohIdProto}; /// # Hello message /// @@ -129,3 +129,11 @@ impl HelloProto { } } } + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HelloEndPoint { + pub version: u8, + pub whatami: WhatAmI, + pub zid: ZenohIdProto, + pub endpoints: Vec, +} diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index cfb1c9cd0e..489009b396 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -35,7 +35,7 @@ use zenoh_config::{ use zenoh_link::{Locator, LocatorInspector}; use zenoh_protocol::{ core::{whatami::WhatAmIMatcher, EndPoint, Metadata, PriorityRange, WhatAmI, ZenohIdProto}, - scouting::{HelloProto, Scout, ScoutingBody, ScoutingMessage}, + scouting::{HelloProto, Scout, ScoutingBody, ScoutingMessage, hello::HelloEndPoint}, }; use zenoh_result::{bail, zerror, ZResult}; @@ -835,7 +835,7 @@ impl Runtime { mcast_addr: &SocketAddr, f: F, ) where - F: Fn(HelloProto) -> Fut + std::marker::Send + std::marker::Sync + Clone, + F: Fn(HelloEndPoint) -> Fut + std::marker::Send + std::marker::Sync + Clone, Fut: Future + std::marker::Send, Self: Sized, { @@ -898,27 +898,35 @@ impl Runtime { tracing::trace!("Received {:?} from {}", msg.body, peer); if let ScoutingBody::Hello(hello) = &msg.body { if matcher.matches(hello.whatami) { - if let Ok(local_addr) = socket.local_addr() { if let Ok(iface) = zenoh_util::net::get_interface_names_by_addr(local_addr.ip()) { - - let hello_iface = HelloProto { + let endpoint = HelloEndPoint { version: hello.version, whatami: hello.whatami, zid: hello.zid, - locators: hello.locators.iter().map(|locator| Locator::new_with_config(locator.protocol(), - locator.address(), - locator.metadata(), - "iface=".to_string() + &iface[0]).unwrap()).collect() + endpoints: hello.locators + .iter() + .map(|locator| EndPoint::new(locator.protocol(), + locator.address(), + locator.metadata(), + "iface=".to_string() + &iface[0]).unwrap()) + .collect() }; - - if let Loop::Break = f(hello_iface.clone()).await { + if let Loop::Break = f(endpoint.clone()).await { break; } } } - - if let Loop::Break = f(hello.clone()).await { + let endpoint = HelloEndPoint { + version: hello.version, + whatami: hello.whatami, + zid: hello.zid, + endpoints: hello.locators + .iter() + .map(|locator| locator.to_endpoint()) + .collect() + }; + if let Loop::Break = f(endpoint.clone()).await { break; } } else { @@ -947,7 +955,7 @@ impl Runtime { /// Returns `true` if a new Transport instance is established with `zid` or had already been established. #[must_use] - async fn connect(&self, zid: &ZenohIdProto, scouted_locators: &[Locator]) -> bool { + async fn connect(&self, zid: &ZenohIdProto, scouted_endpoints: &[EndPoint]) -> bool { if !self.insert_pending_connection(*zid).await { tracing::debug!("Already connecting to {}. Ignore.", zid); return false; @@ -955,7 +963,7 @@ impl Runtime { const ERR: &str = "Unable to connect to newly scouted peer"; - let configured_locators = self + let configured_endpoints = self .state .config .lock() @@ -964,17 +972,18 @@ impl Runtime { .endpoints() .get(self.whatami()) .iter() - .flat_map(|e| e.iter().map(EndPoint::to_locator)) + .flat_map(|e| e.iter()) + .cloned() .collect::>(); - let locators = scouted_locators + let endpoints = scouted_endpoints .iter() - .filter(|l| !configured_locators.contains(l)) - .collect::>(); + .filter(|e| !configured_endpoints.contains(e)) + .collect::>(); - if locators.is_empty() { + if endpoints.is_empty() { tracing::debug!( - "Already connecting to locators of {} (connect configuration). Ignore.", + "Already connecting to endpoints of {} (connect configuration). Ignore.", zid ); return false; @@ -983,22 +992,21 @@ impl Runtime { let manager = self.manager(); let inspector = LocatorInspector::default(); - for locator in locators { - let is_multicast = match inspector.is_multicast(locator).await { + for endpoint in endpoints { + let is_multicast = match inspector.is_multicast(&endpoint.to_locator()).await { Ok(im) => im, Err(e) => { - tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e); + tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e); continue; } }; - let endpoint = locator.to_owned().into(); - let retry_config = self.get_connect_retry_config(&endpoint); - let priorities = locator + let retry_config = self.get_connect_retry_config(endpoint); + let priorities = endpoint .metadata() .get(Metadata::PRIORITIES) .and_then(|p| PriorityRange::from_str(p).ok()); - let reliability = inspector.is_reliable(locator).ok(); + let reliability = inspector.is_reliable(&endpoint.to_locator()).ok(); if !manager .get_transport_unicast(zid) .await @@ -1015,7 +1023,7 @@ impl Runtime { if is_multicast { match tokio::time::timeout( retry_config.timeout(), - manager.open_transport_multicast(endpoint), + manager.open_transport_multicast(endpoint.clone()), ) .await { @@ -1025,13 +1033,13 @@ impl Runtime { transport ); } - Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), - Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), + Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e), + Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e), } } else { match tokio::time::timeout( retry_config.timeout(), - manager.open_transport_unicast(endpoint), + manager.open_transport_unicast(endpoint.clone()), ) .await { @@ -1041,14 +1049,14 @@ impl Runtime { transport ); } - Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), - Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), + Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e), + Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e), } } } else { tracing::trace!( "Will not attempt to connect to {} via {}: already connected to this peer for this PriorityRange-Reliability pair", - zid, locator + zid, endpoint ); } } @@ -1057,9 +1065,9 @@ impl Runtime { if self.manager().get_transport_unicast(zid).await.is_none() { tracing::warn!( - "Unable to connect to any locator of scouted peer {}: {:?}", + "Unable to connect to any endpoint of scouted peer {}: {:?}", zid, - scouted_locators + scouted_endpoints ); false } else { @@ -1068,7 +1076,7 @@ impl Runtime { } /// Returns `true` if a new Transport instance is established with `zid` or had already been established. - pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) -> bool { + async fn connect_endpoints(&self, zid: &ZenohIdProto, endpoints: &[EndPoint]) -> bool { let manager = self.manager(); if zid != &manager.zid() { let has_unicast = manager.get_transport_unicast(zid).await.is_some(); @@ -1077,7 +1085,7 @@ impl Runtime { for t in manager.get_transports_multicast().await { if let Ok(l) = t.get_link() { if let Some(g) = l.group.as_ref() { - hm |= locators.iter().any(|l| l == g); + hm |= endpoints.iter().any(|l| l == &g.to_endpoint()); } } } @@ -1085,8 +1093,8 @@ impl Runtime { }; if !has_unicast && !has_multicast { - tracing::debug!("Try to connect to peer {} via any of {:?}", zid, locators); - self.connect(zid, locators).await + tracing::debug!("Try to connect to peer {} via any of {:?}", zid, endpoints); + self.connect(zid, endpoints).await } else { tracing::trace!("Already connected scouted peer: {}", zid); true @@ -1096,6 +1104,13 @@ impl Runtime { } } + pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) -> bool { + let endpoints: Vec = locators.iter() + .map(|locator| locator.to_endpoint().clone()) + .collect(); + self.connect_endpoints(zid, &endpoints).await + } + async fn connect_first( &self, sockets: &[UdpSocket], @@ -1106,12 +1121,12 @@ impl Runtime { let scout = async { Runtime::scout(sockets, what, addr, move |hello| async move { tracing::info!("Found {:?}", hello); - if !hello.locators.is_empty() { - if self.connect(&hello.zid, &hello.locators).await { + if !hello.endpoints.is_empty() { + if self.connect(&hello.zid, &hello.endpoints).await { return Loop::Break; } } else { - tracing::warn!("Received Hello with no locators: {:?}", hello); + tracing::warn!("Received Hello with no endpoints: {:?}", hello); } Loop::Continue }) @@ -1135,10 +1150,10 @@ impl Runtime { addr: &SocketAddr, ) { Runtime::scout(ucast_sockets, what, addr, move |hello| async move { - if !hello.locators.is_empty() { - self.connect_peer(&hello.zid, &hello.locators).await; + if !hello.endpoints.is_empty() { + self.connect_endpoints(&hello.zid, &hello.endpoints).await; } else { - tracing::warn!("Received Hello with no locators: {:?}", hello); + tracing::warn!("Received Hello with no endpoints: {:?}", hello); } Loop::Continue }) From a6a695c926a043971c68626a4d7d30a0443c1b7a Mon Sep 17 00:00:00 2001 From: Patrick Siegl <3261314+psiegl@users.noreply.github.com> Date: Mon, 25 Nov 2024 16:23:48 +0100 Subject: [PATCH 4/6] Fix linter --- commons/zenoh-config/src/wrappers.rs | 4 ++-- zenoh/src/net/runtime/orchestrator.rs | 32 ++++++++++++++++++--------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/commons/zenoh-config/src/wrappers.rs b/commons/zenoh-config/src/wrappers.rs index 827ca659e1..e81712867a 100644 --- a/commons/zenoh-config/src/wrappers.rs +++ b/commons/zenoh-config/src/wrappers.rs @@ -20,8 +20,8 @@ use std::str::FromStr; use serde::{Deserialize, Serialize}; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, EntityGlobalIdProto, EndPoint, WhatAmI, ZenohIdProto}, - scouting::hello::HelloEndPoint + core::{key_expr::OwnedKeyExpr, EndPoint, EntityGlobalIdProto, WhatAmI, ZenohIdProto}, + scouting::hello::HelloEndPoint, }; /// The global unique id of a Zenoh session. diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 489009b396..75b2dcd420 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -35,7 +35,7 @@ use zenoh_config::{ use zenoh_link::{Locator, LocatorInspector}; use zenoh_protocol::{ core::{whatami::WhatAmIMatcher, EndPoint, Metadata, PriorityRange, WhatAmI, ZenohIdProto}, - scouting::{HelloProto, Scout, ScoutingBody, ScoutingMessage, hello::HelloEndPoint}, + scouting::{hello::HelloEndPoint, HelloProto, Scout, ScoutingBody, ScoutingMessage}, }; use zenoh_result::{bail, zerror, ZResult}; @@ -899,17 +899,27 @@ impl Runtime { if let ScoutingBody::Hello(hello) = &msg.body { if matcher.matches(hello.whatami) { if let Ok(local_addr) = socket.local_addr() { - if let Ok(iface) = zenoh_util::net::get_interface_names_by_addr(local_addr.ip()) { + if let Ok(iface) = + zenoh_util::net::get_interface_names_by_addr( + local_addr.ip() + ) + { let endpoint = HelloEndPoint { version: hello.version, whatami: hello.whatami, zid: hello.zid, - endpoints: hello.locators + endpoints: hello + .locators .iter() - .map(|locator| EndPoint::new(locator.protocol(), - locator.address(), - locator.metadata(), - "iface=".to_string() + &iface[0]).unwrap()) + .map(|locator| + EndPoint::new( + locator.protocol(), + locator.address(), + locator.metadata(), + "iface=".to_string() + &iface[0], + ) + .unwrap() + ) .collect() }; if let Loop::Break = f(endpoint.clone()).await { @@ -921,10 +931,11 @@ impl Runtime { version: hello.version, whatami: hello.whatami, zid: hello.zid, - endpoints: hello.locators + endpoints: hello + .locators .iter() .map(|locator| locator.to_endpoint()) - .collect() + .collect(), }; if let Loop::Break = f(endpoint.clone()).await { break; @@ -1105,7 +1116,8 @@ impl Runtime { } pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) -> bool { - let endpoints: Vec = locators.iter() + let endpoints: Vec = locators + .iter() .map(|locator| locator.to_endpoint().clone()) .collect(); self.connect_endpoints(zid, &endpoints).await From 7a8c890165b5cfb35d0cbd4186e5852b5130617f Mon Sep 17 00:00:00 2001 From: Patrick Siegl <3261314+psiegl@users.noreply.github.com> Date: Mon, 25 Nov 2024 17:53:34 +0100 Subject: [PATCH 5/6] Fix linter 2nd attempt --- zenoh/src/net/runtime/orchestrator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 75b2dcd420..dc7946703c 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -901,7 +901,7 @@ impl Runtime { if let Ok(local_addr) = socket.local_addr() { if let Ok(iface) = zenoh_util::net::get_interface_names_by_addr( - local_addr.ip() + local_addr.ip(), ) { let endpoint = HelloEndPoint { @@ -911,7 +911,7 @@ impl Runtime { endpoints: hello .locators .iter() - .map(|locator| + .map(|locator| { EndPoint::new( locator.protocol(), locator.address(), @@ -919,7 +919,7 @@ impl Runtime { "iface=".to_string() + &iface[0], ) .unwrap() - ) + }) .collect() }; if let Loop::Break = f(endpoint.clone()).await { From 447f5af5ada5bf231a455e6829b7d3641666414a Mon Sep 17 00:00:00 2001 From: Patrick Siegl <3261314+psiegl@users.noreply.github.com> Date: Tue, 26 Nov 2024 19:32:27 +0100 Subject: [PATCH 6/6] 3rd attempt to fix linter --- zenoh/src/net/runtime/orchestrator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index dc7946703c..d72e0102b4 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -920,7 +920,7 @@ impl Runtime { ) .unwrap() }) - .collect() + .collect(), }; if let Loop::Break = f(endpoint.clone()).await { break;