Skip to content

Commit

Permalink
Config: add interests timeout (#1710)
Browse files Browse the repository at this point in the history
* Config: add interests timeout

* Fix indentation

* Remove fields from HatCode structs

* Move interests_timeout to generic Tables

* improve doc and log

---------

Co-authored-by: OlivierHecart <[email protected]>
  • Loading branch information
JEnoch and OlivierHecart authored Jan 14, 2025
1 parent 70820dd commit 2c889ba
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 7 deletions.
8 changes: 8 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@
/// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
mode: "peer_to_peer",
},
/// The interests-based routing configuration.
/// This configuration applies regardless of the mode (router, peer or client).
interests: {
/// The timeout to wait for incoming interests declarations in milliseconds.
/// The expiration of this timeout implies that the discovery protocol might be incomplete,
/// leading to potential loss of messages, queries or liveliness tokens.
timeout: 10000,
},
},

// /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values)
Expand Down
3 changes: 3 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ pub mod routing {
pub mod peer {
pub const mode: &str = "peer_to_peer";
}
pub mod interests {
pub const timeout: u64 = 10000;
}
}

impl Default for ListenConfig {
Expand Down
7 changes: 7 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ validated_struct::validator! {
/// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
mode: Option<String>,
},
/// The interests-based routing configuration.
/// This configuration applies regardless of the mode (router, peer or client).
pub interests: #[derive(Default)]
InterestsConf {
/// The timeout to wait for incoming interests declarations.
timeout: Option<u64>,
},
},

/// The declarations aggregation strategy.
Expand Down
11 changes: 6 additions & 5 deletions zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ use crate::net::routing::{
RoutingContext,
};

static INTEREST_TIMEOUT_MS: u64 = 10000;

pub(crate) struct CurrentInterest {
pub(crate) src_face: Arc<FaceState>,
pub(crate) src_interest_id: InterestId,
Expand Down Expand Up @@ -129,25 +127,28 @@ pub(crate) struct CurrentInterestCleanup {
tables: Arc<TablesLock>,
face: Weak<FaceState>,
id: InterestId,
interests_timeout: Duration,
}

impl CurrentInterestCleanup {
pub(crate) fn spawn_interest_clean_up_task(
face: &Arc<FaceState>,
tables_ref: &Arc<TablesLock>,
id: u32,
interests_timeout: Duration,
) {
let mut cleanup = CurrentInterestCleanup {
tables: tables_ref.clone(),
face: Arc::downgrade(face),
id,
interests_timeout,
};
if let Some((_, cancellation_token)) = face.pending_current_interests.get(&id) {
let c_cancellation_token = cancellation_token.clone();
face.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(INTEREST_TIMEOUT_MS)) => { cleanup.run().await }
_ = tokio::time::sleep(cleanup.interests_timeout) => { cleanup.run().await }
_ = c_cancellation_token.cancelled() => {}
}
});
Expand All @@ -166,11 +167,11 @@ impl Timed for CurrentInterestCleanup {
{
drop(ctrl_lock);
tracing::warn!(
"Didn't receive DeclareFinal {}:{} from {}: Timeout({:#?})!",
"Didn't receive DeclareFinal {}:{} from {} for interests: Timeout({:#?})!",
interest.0.src_face,
self.id,
face,
Duration::from_millis(INTEREST_TIMEOUT_MS),
self.interests_timeout,
);
finalize_pending_interest(interest, &mut |p, m| p.send_declare(m));
}
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct Tables {
pub(crate) hlc: Option<Arc<HLC>>,
pub(crate) drop_future_timestamp: bool,
pub(crate) queries_default_timeout: Duration,
pub(crate) interests_timeout: Duration,
pub(crate) root_res: Arc<Resource>,
pub(crate) faces: HashMap<usize, Arc<FaceState>>,
pub(crate) mcast_groups: Vec<Arc<FaceState>>,
Expand All @@ -93,6 +94,8 @@ impl Tables {
unwrap_or_default!(config.routing().router().peers_failover_brokering());
let queries_default_timeout =
Duration::from_millis(unwrap_or_default!(config.queries_default_timeout()));
let interests_timeout =
Duration::from_millis(unwrap_or_default!(config.routing().interests().timeout()));
let hat_code = hat::new_hat(whatami, config);
Ok(Tables {
zid,
Expand All @@ -102,6 +105,7 @@ impl Tables {
hlc,
drop_future_timestamp,
queries_default_timeout,
interests_timeout,
root_res: Resource::root(),
faces: HashMap::new(),
mcast_groups: vec![],
Expand Down
7 changes: 6 additions & 1 deletion zenoh/src/net/routing/hat/client/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ impl HatInterestTrait for HatCode {
dst_face_mut
.pending_current_interests
.insert(id, (interest.clone(), cancellation_token));
CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id);
CurrentInterestCleanup::spawn_interest_clean_up_task(
dst_face,
tables_ref,
id,
tables.interests_timeout,
);
}
let wire_expr = res
.as_ref()
Expand Down
7 changes: 6 additions & 1 deletion zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ impl HatInterestTrait for HatCode {
dst_face_mut
.pending_current_interests
.insert(id, (interest.clone(), cancellation_token));
CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id);
CurrentInterestCleanup::spawn_interest_clean_up_task(
dst_face,
tables_ref,
id,
tables.interests_timeout,
);
}
let wire_expr = res.as_ref().map(|res| {
Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client)
Expand Down

0 comments on commit 2c889ba

Please sign in to comment.