Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC scouting bugfix for IPv4LL/IPv6LL employed on multiple NICs #1598

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions commons/zenoh-config/src/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use std::str::FromStr;

use serde::{Deserialize, Serialize};
use zenoh_protocol::{
core::{key_expr::OwnedKeyExpr, EntityGlobalIdProto, Locator, WhatAmI, ZenohIdProto},
scouting::HelloProto,
core::{key_expr::OwnedKeyExpr, EndPoint, EntityGlobalIdProto, WhatAmI, ZenohIdProto},
scouting::hello::HelloEndPoint,
};

/// The global unique id of a Zenoh session.
Expand Down Expand Up @@ -102,12 +102,12 @@ impl FromStr for ZenohId {
/// A zenoh Hello message.
#[derive(Clone)]
#[repr(transparent)]
pub struct Hello(HelloProto);
pub struct Hello(HelloEndPoint);

impl Hello {
/// Get the locators of this Hello message.
pub fn locators(&self) -> &[Locator] {
&self.0.locators
pub fn endpoints(&self) -> &[EndPoint] {
&self.0.endpoints
}

/// Get the zenoh id of this Hello message.
Expand All @@ -121,8 +121,8 @@ impl Hello {
}
}

impl From<HelloProto> for Hello {
fn from(inner: HelloProto) -> Self {
impl From<HelloEndPoint> for Hello {
fn from(inner: HelloEndPoint) -> Self {
Hello(inner)
}
}
Expand All @@ -138,7 +138,7 @@ impl fmt::Display for Hello {
f.debug_struct("Hello")
.field("zid", &self.zid())
.field("whatami", &self.whatami())
.field("locators", &self.locators())
.field("endpoints", &self.endpoints())
.finish()
}
}
Expand Down
10 changes: 9 additions & 1 deletion commons/zenoh-protocol/src/scouting/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use alloc::vec::Vec;

use crate::core::{Locator, WhatAmI, ZenohIdProto};
use crate::core::{EndPoint, Locator, WhatAmI, ZenohIdProto};

/// # Hello message
///
Expand Down Expand Up @@ -129,3 +129,11 @@ impl HelloProto {
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HelloEndPoint {
pub version: u8,
pub whatami: WhatAmI,
pub zid: ZenohIdProto,
pub endpoints: Vec<EndPoint>,
}
121 changes: 84 additions & 37 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use zenoh_config::{
use zenoh_link::{Locator, LocatorInspector};
use zenoh_protocol::{
core::{whatami::WhatAmIMatcher, EndPoint, Metadata, PriorityRange, WhatAmI, ZenohIdProto},
scouting::{HelloProto, Scout, ScoutingBody, ScoutingMessage},
scouting::{hello::HelloEndPoint, HelloProto, Scout, ScoutingBody, ScoutingMessage},
};
use zenoh_result::{bail, zerror, ZResult};

Expand Down Expand Up @@ -835,7 +835,7 @@ impl Runtime {
mcast_addr: &SocketAddr,
f: F,
) where
F: Fn(HelloProto) -> Fut + std::marker::Send + std::marker::Sync + Clone,
F: Fn(HelloEndPoint) -> Fut + std::marker::Send + std::marker::Sync + Clone,
Fut: Future<Output = Loop> + std::marker::Send,
Self: Sized,
{
Expand Down Expand Up @@ -898,7 +898,46 @@ impl Runtime {
tracing::trace!("Received {:?} from {}", msg.body, peer);
if let ScoutingBody::Hello(hello) = &msg.body {
if matcher.matches(hello.whatami) {
if let Loop::Break = f(hello.clone()).await {
if let Ok(local_addr) = socket.local_addr() {
psiegl marked this conversation as resolved.
Show resolved Hide resolved
if let Ok(iface) =
zenoh_util::net::get_interface_names_by_addr(
local_addr.ip(),
)
{
let endpoint = HelloEndPoint {
version: hello.version,
whatami: hello.whatami,
zid: hello.zid,
endpoints: hello
.locators
.iter()
.map(|locator| {
EndPoint::new(
locator.protocol(),
locator.address(),
locator.metadata(),
"iface=".to_string() + &iface[0],
)
.unwrap()
})
.collect(),
};
if let Loop::Break = f(endpoint.clone()).await {
break;
}
}
}
let endpoint = HelloEndPoint {
version: hello.version,
whatami: hello.whatami,
zid: hello.zid,
endpoints: hello
.locators
.iter()
.map(|locator| locator.to_endpoint())
.collect(),
};
if let Loop::Break = f(endpoint.clone()).await {
break;
}
} else {
Expand Down Expand Up @@ -927,15 +966,15 @@ impl Runtime {

/// Returns `true` if a new Transport instance is established with `zid` or had already been established.
#[must_use]
async fn connect(&self, zid: &ZenohIdProto, scouted_locators: &[Locator]) -> bool {
async fn connect(&self, zid: &ZenohIdProto, scouted_endpoints: &[EndPoint]) -> bool {
if !self.insert_pending_connection(*zid).await {
tracing::debug!("Already connecting to {}. Ignore.", zid);
return false;
}

const ERR: &str = "Unable to connect to newly scouted peer";

let configured_locators = self
let configured_endpoints = self
.state
.config
.lock()
Expand All @@ -944,17 +983,18 @@ impl Runtime {
.endpoints()
.get(self.whatami())
.iter()
.flat_map(|e| e.iter().map(EndPoint::to_locator))
.flat_map(|e| e.iter())
.cloned()
.collect::<HashSet<_>>();

let locators = scouted_locators
let endpoints = scouted_endpoints
.iter()
.filter(|l| !configured_locators.contains(l))
.collect::<Vec<&Locator>>();
.filter(|e| !configured_endpoints.contains(e))
.collect::<Vec<&EndPoint>>();

if locators.is_empty() {
if endpoints.is_empty() {
tracing::debug!(
"Already connecting to locators of {} (connect configuration). Ignore.",
"Already connecting to endpoints of {} (connect configuration). Ignore.",
zid
);
return false;
Expand All @@ -963,22 +1003,21 @@ impl Runtime {
let manager = self.manager();

let inspector = LocatorInspector::default();
for locator in locators {
let is_multicast = match inspector.is_multicast(locator).await {
for endpoint in endpoints {
let is_multicast = match inspector.is_multicast(&endpoint.to_locator()).await {
Ok(im) => im,
Err(e) => {
tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e);
tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e);
continue;
}
};

let endpoint = locator.to_owned().into();
let retry_config = self.get_connect_retry_config(&endpoint);
let priorities = locator
let retry_config = self.get_connect_retry_config(endpoint);
let priorities = endpoint
.metadata()
.get(Metadata::PRIORITIES)
.and_then(|p| PriorityRange::from_str(p).ok());
let reliability = inspector.is_reliable(locator).ok();
let reliability = inspector.is_reliable(&endpoint.to_locator()).ok();
if !manager
.get_transport_unicast(zid)
.await
Expand All @@ -995,7 +1034,7 @@ impl Runtime {
if is_multicast {
match tokio::time::timeout(
retry_config.timeout(),
manager.open_transport_multicast(endpoint),
manager.open_transport_multicast(endpoint.clone()),
)
.await
{
Expand All @@ -1005,13 +1044,13 @@ impl Runtime {
transport
);
}
Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e),
Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e),
}
} else {
match tokio::time::timeout(
retry_config.timeout(),
manager.open_transport_unicast(endpoint),
manager.open_transport_unicast(endpoint.clone()),
)
.await
{
Expand All @@ -1021,14 +1060,14 @@ impl Runtime {
transport
);
}
Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, locator, e),
Ok(Err(e)) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e),
Err(e) => tracing::trace!("{} {} on {}: {}", ERR, zid, endpoint, e),
}
}
} else {
tracing::trace!(
"Will not attempt to connect to {} via {}: already connected to this peer for this PriorityRange-Reliability pair",
zid, locator
zid, endpoint
);
}
}
Expand All @@ -1037,9 +1076,9 @@ impl Runtime {

if self.manager().get_transport_unicast(zid).await.is_none() {
tracing::warn!(
"Unable to connect to any locator of scouted peer {}: {:?}",
"Unable to connect to any endpoint of scouted peer {}: {:?}",
zid,
scouted_locators
scouted_endpoints
);
false
} else {
Expand All @@ -1048,7 +1087,7 @@ impl Runtime {
}

/// Returns `true` if a new Transport instance is established with `zid` or had already been established.
pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) -> bool {
async fn connect_endpoints(&self, zid: &ZenohIdProto, endpoints: &[EndPoint]) -> bool {
let manager = self.manager();
if zid != &manager.zid() {
let has_unicast = manager.get_transport_unicast(zid).await.is_some();
Expand All @@ -1057,16 +1096,16 @@ impl Runtime {
for t in manager.get_transports_multicast().await {
if let Ok(l) = t.get_link() {
if let Some(g) = l.group.as_ref() {
hm |= locators.iter().any(|l| l == g);
hm |= endpoints.iter().any(|l| l == &g.to_endpoint());
}
}
}
hm
};

if !has_unicast && !has_multicast {
tracing::debug!("Try to connect to peer {} via any of {:?}", zid, locators);
self.connect(zid, locators).await
tracing::debug!("Try to connect to peer {} via any of {:?}", zid, endpoints);
self.connect(zid, endpoints).await
} else {
tracing::trace!("Already connected scouted peer: {}", zid);
true
Expand All @@ -1076,6 +1115,14 @@ impl Runtime {
}
}

pub async fn connect_peer(&self, zid: &ZenohIdProto, locators: &[Locator]) -> bool {
let endpoints: Vec<EndPoint> = locators
.iter()
.map(|locator| locator.to_endpoint().clone())
.collect();
self.connect_endpoints(zid, &endpoints).await
}

async fn connect_first(
&self,
sockets: &[UdpSocket],
Expand All @@ -1086,12 +1133,12 @@ impl Runtime {
let scout = async {
Runtime::scout(sockets, what, addr, move |hello| async move {
tracing::info!("Found {:?}", hello);
if !hello.locators.is_empty() {
if self.connect(&hello.zid, &hello.locators).await {
if !hello.endpoints.is_empty() {
if self.connect(&hello.zid, &hello.endpoints).await {
return Loop::Break;
}
} else {
tracing::warn!("Received Hello with no locators: {:?}", hello);
tracing::warn!("Received Hello with no endpoints: {:?}", hello);
}
Loop::Continue
})
Expand All @@ -1115,10 +1162,10 @@ impl Runtime {
addr: &SocketAddr,
) {
Runtime::scout(ucast_sockets, what, addr, move |hello| async move {
if !hello.locators.is_empty() {
self.connect_peer(&hello.zid, &hello.locators).await;
if !hello.endpoints.is_empty() {
self.connect_endpoints(&hello.zid, &hello.endpoints).await;
} else {
tracing::warn!("Received Hello with no locators: {:?}", hello);
tracing::warn!("Received Hello with no endpoints: {:?}", hello);
}
Loop::Continue
})
Expand Down