diff --git a/commons/zenoh-config/src/wrappers.rs b/commons/zenoh-config/src/wrappers.rs index 31afe358e7..e81712867a 100644 --- a/commons/zenoh-config/src/wrappers.rs +++ b/commons/zenoh-config/src/wrappers.rs @@ -20,8 +20,8 @@ use std::str::FromStr; use serde::{Deserialize, Serialize}; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, EntityGlobalIdProto, Locator, WhatAmI, ZenohIdProto}, - scouting::HelloProto, + core::{key_expr::OwnedKeyExpr, EndPoint, EntityGlobalIdProto, WhatAmI, ZenohIdProto}, + scouting::hello::HelloEndPoint, }; /// The global unique id of a Zenoh session. @@ -102,12 +102,12 @@ impl FromStr for ZenohId { /// A zenoh Hello message. #[derive(Clone)] #[repr(transparent)] -pub struct Hello(HelloProto); +pub struct Hello(HelloEndPoint); impl Hello { /// Get the locators of this Hello message. - pub fn locators(&self) -> &[Locator] { - &self.0.locators + pub fn endpoints(&self) -> &[EndPoint] { + &self.0.endpoints } /// Get the zenoh id of this Hello message. @@ -121,8 +121,8 @@ impl Hello { } } -impl From for Hello { - fn from(inner: HelloProto) -> Self { +impl From for Hello { + fn from(inner: HelloEndPoint) -> Self { Hello(inner) } } @@ -138,7 +138,7 @@ impl fmt::Display for Hello { f.debug_struct("Hello") .field("zid", &self.zid()) .field("whatami", &self.whatami()) - .field("locators", &self.locators()) + .field("endpoints", &self.endpoints()) .finish() } } diff --git a/commons/zenoh-protocol/src/scouting/hello.rs b/commons/zenoh-protocol/src/scouting/hello.rs index 69109ed611..583f061d2c 100644 --- a/commons/zenoh-protocol/src/scouting/hello.rs +++ b/commons/zenoh-protocol/src/scouting/hello.rs @@ -13,7 +13,7 @@ // use alloc::vec::Vec; -use crate::core::{Locator, WhatAmI, ZenohIdProto}; +use crate::core::{EndPoint, Locator, WhatAmI, ZenohIdProto}; /// # Hello message /// @@ -129,3 +129,11 @@ impl HelloProto { } } } + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HelloEndPoint { + pub version: u8, + pub whatami: WhatAmI, + pub zid: ZenohIdProto, + pub endpoints: Vec, +} diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 1ed468bbf9..d72e0102b4 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -35,7 +35,7 @@ use zenoh_config::{ use zenoh_link::{Locator, LocatorInspector}; use zenoh_protocol::{ core::{whatami::WhatAmIMatcher, EndPoint, Metadata, PriorityRange, WhatAmI, ZenohIdProto}, - scouting::{HelloProto, Scout, ScoutingBody, ScoutingMessage}, + scouting::{hello::HelloEndPoint, HelloProto, Scout, ScoutingBody, ScoutingMessage}, }; use zenoh_result::{bail, zerror, ZResult}; @@ -835,7 +835,7 @@ impl Runtime { mcast_addr: &SocketAddr, f: F, ) where - F: Fn(HelloProto) -> Fut + std::marker::Send + std::marker::Sync + Clone, + F: Fn(HelloEndPoint) -> Fut + std::marker::Send + std::marker::Sync + Clone, Fut: Future + std::marker::Send, Self: Sized, { @@ -898,7 +898,46 @@ impl Runtime { tracing::trace!("Received {:?} from {}", msg.body, peer); if let ScoutingBody::Hello(hello) = &msg.body { if matcher.matches(hello.whatami) { - if let Loop::Break = f(hello.clone()).await { + if let Ok(local_addr) = socket.local_addr() { + if let Ok(iface) = + zenoh_util::net::get_interface_names_by_addr( + local_addr.ip(), + ) + { + let endpoint = HelloEndPoint { + version: hello.version, + whatami: hello.whatami, + zid: hello.zid, + endpoints: hello + .locators + .iter() + .map(|locator| { + EndPoint::new( + locator.protocol(), + locator.address(), + locator.metadata(), + "iface=".to_string() + &iface[0], + ) + .unwrap() + }) + .collect(), + }; + if let Loop::Break = f(endpoint.clone()).await { + break; + } + } + } + let endpoint = HelloEndPoint { + version: hello.version, + whatami: hello.whatami, + zid: hello.zid, + endpoints: hello + .locators + .iter() + .map(|locator| locator.to_endpoint()) + .collect(), + }; + if let Loop::Break = f(endpoint.clone()).await { break; } } else { @@ -927,7 +966,7 @@ impl Runtime { /// Returns `true` if a new Transport instance is established with `zid` or had already been established. #[must_use] - async fn connect(&self, zid: &ZenohIdProto, scouted_locators: &[Locator]) -> bool { + async fn connect(&self, zid: &ZenohIdProto, scouted_endpoints: &[EndPoint]) -> bool { if !self.insert_pending_connection(*zid).await { tracing::debug!("Already connecting to {}. Ignore.", zid); return false; @@ -935,7 +974,7 @@ impl Runtime { const ERR: &str = "Unable to connect to newly scouted peer"; - let configured_locators = self + let configured_endpoints = self .state .config .lock() @@ -944,17 +983,18 @@ impl Runtime { .endpoints() .get(self.whatami()) .iter() - .flat_map(|e| e.iter().map(EndPoint::to_locator)) + .flat_map(|e| e.iter()) + .cloned() .collect::>(); - let locators = scouted_locators + let endpoints = scouted_endpoints .iter() - .filter(|l| !configured_locators.contains(l)) - .collect::>(); + .filter(|e| !configured_endpoints.contains(e)) + .collect::>(); - if locators.is_empty() { + if endpoints.is_empty() { tracing::debug!( - "Already connecting to locators of {} (connect configuration). Ignore.", + "Already connecting to endpoints of {} (connect configuration). Ignore.", zid ); return false; @@ -963,22 +1003,21 @@ impl Runtime { let manager = self.manager(); let inspector = LocatorInspector::default(); - for locator in locators { - let is_multicast = match inspector.is_multicast(locator).await { + for endpoint in endpoints { + let is_multicast = match inspector.is_multicast(&endpoint.to_locator()).await { Ok(im) => im, Err(e) => { - tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e); + tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e); continue; } }; - let endpoint = locator.to_owned().into(); - let retry_config = self.get_connect_retry_config(&endpoint); - let priorities = locator + let retry_config = self.get_connect_retry_config(endpoint); + let priorities = endpoint .metadata() .get(Metadata::PRIORITIES) .and_then(|p| PriorityRange::from_str(p).ok()); - let reliability = inspector.is_reliable(locator).ok(); + let reliability = inspector.is_reliable(&endpoint.to_locator()).ok(); if !manager .get_transport_unicast(zid) .await @@ -995,7 +1034,7 @@ impl Runtime { if is_multicast { match tokio::time::timeout( retry_config.timeout(), - manager.open_transport_multicast(endpoint), + manager.open_transport_multicast(endpoint.clone()), ) .await { @@ -1005,13 +1044,13 @@ impl Runtime { transport ); } - Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), - Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), + Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e), + Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e), } } else { match tokio::time::timeout( retry_config.timeout(), - manager.open_transport_unicast(endpoint), + manager.open_transport_unicast(endpoint.clone()), ) .await { @@ -1021,14 +1060,14 @@ impl Runtime { transport ); } - Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), - Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e), + Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e), + Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e), } } } else { tracing::trace!( "Will not attempt to connect to {} via {}: already connected to this peer for this PriorityRange-Reliability pair", - zid, locator + zid, endpoint ); } } @@ -1037,9 +1076,9 @@ impl Runtime { if self.manager().get_transport_unicast(zid).await.is_none() { tracing::warn!( - "Unable to connect to any locator of scouted peer {}: {:?}", + "Unable to connect to any endpoint of scouted peer {}: {:?}", zid, - scouted_locators + scouted_endpoints ); false } else { @@ -1048,7 +1087,7 @@ impl Runtime { } /// Returns `true` if a new Transport instance is established with `zid` or had already been established. - pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) -> bool { + async fn connect_endpoints(&self, zid: &ZenohIdProto, endpoints: &[EndPoint]) -> bool { let manager = self.manager(); if zid != &manager.zid() { let has_unicast = manager.get_transport_unicast(zid).await.is_some(); @@ -1057,7 +1096,7 @@ impl Runtime { for t in manager.get_transports_multicast().await { if let Ok(l) = t.get_link() { if let Some(g) = l.group.as_ref() { - hm |= locators.iter().any(|l| l == g); + hm |= endpoints.iter().any(|l| l == &g.to_endpoint()); } } } @@ -1065,8 +1104,8 @@ impl Runtime { }; if !has_unicast && !has_multicast { - tracing::debug!("Try to connect to peer {} via any of {:?}", zid, locators); - self.connect(zid, locators).await + tracing::debug!("Try to connect to peer {} via any of {:?}", zid, endpoints); + self.connect(zid, endpoints).await } else { tracing::trace!("Already connected scouted peer: {}", zid); true @@ -1076,6 +1115,14 @@ impl Runtime { } } + pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) -> bool { + let endpoints: Vec = locators + .iter() + .map(|locator| locator.to_endpoint().clone()) + .collect(); + self.connect_endpoints(zid, &endpoints).await + } + async fn connect_first( &self, sockets: &[UdpSocket], @@ -1086,12 +1133,12 @@ impl Runtime { let scout = async { Runtime::scout(sockets, what, addr, move |hello| async move { tracing::info!("Found {:?}", hello); - if !hello.locators.is_empty() { - if self.connect(&hello.zid, &hello.locators).await { + if !hello.endpoints.is_empty() { + if self.connect(&hello.zid, &hello.endpoints).await { return Loop::Break; } } else { - tracing::warn!("Received Hello with no locators: {:?}", hello); + tracing::warn!("Received Hello with no endpoints: {:?}", hello); } Loop::Continue }) @@ -1115,10 +1162,10 @@ impl Runtime { addr: &SocketAddr, ) { Runtime::scout(ucast_sockets, what, addr, move |hello| async move { - if !hello.locators.is_empty() { - self.connect_peer(&hello.zid, &hello.locators).await; + if !hello.endpoints.is_empty() { + self.connect_endpoints(&hello.zid, &hello.endpoints).await; } else { - tracing::warn!("Received Hello with no locators: {:?}", hello); + tracing::warn!("Received Hello with no endpoints: {:?}", hello); } Loop::Continue })