From e9f3b376315d680e8b172b5de65a051e702b9699 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 8 Oct 2024 16:55:25 +0200 Subject: [PATCH] Make FIFO `(try)recv{,_deadline,_timeout}` return `ZResult>` (#1521) --- zenoh/src/api/handlers/fifo.rs | 30 +++- zenoh/tests/liveliness.rs | 282 ++++++++++++++++----------------- zenoh/tests/matching.rs | 76 +++++---- 3 files changed, 212 insertions(+), 176 deletions(-) diff --git a/zenoh/src/api/handlers/fifo.rs b/zenoh/src/api/handlers/fifo.rs index 47cdf7b5b6..3029baca17 100644 --- a/zenoh/src/api/handlers/fifo.rs +++ b/zenoh/src/api/handlers/fifo.rs @@ -72,8 +72,14 @@ impl IntoHandler for FifoChannel { impl FifoChannelHandler { /// Attempt to fetch an incoming value from the channel associated with this receiver, returning /// an error if the channel is empty or if all senders have been dropped. - pub fn try_recv(&self) -> ZResult { - self.0.try_recv().map_err(Into::into) + /// + /// If the channel is empty, this will return [`None`]. + pub fn try_recv(&self) -> ZResult> { + match self.0.try_recv() { + Ok(value) => Ok(Some(value)), + Err(flume::TryRecvError::Empty) => Ok(None), + Err(err) => Err(err.into()), + } } /// Wait for an incoming value from the channel associated with this receiver, returning an @@ -84,14 +90,26 @@ impl FifoChannelHandler { /// Wait for an incoming value from the channel associated with this receiver, returning an /// error if all senders have been dropped or the deadline has passed. - pub fn recv_deadline(&self, deadline: Instant) -> ZResult { - self.0.recv_deadline(deadline).map_err(Into::into) + /// + /// If the deadline has expired, this will return [`None`]. + pub fn recv_deadline(&self, deadline: Instant) -> ZResult> { + match self.0.recv_deadline(deadline) { + Ok(value) => Ok(Some(value)), + Err(flume::RecvTimeoutError::Timeout) => Ok(None), + Err(err) => Err(err.into()), + } } /// Wait for an incoming value from the channel associated with this receiver, returning an /// error if all senders have been dropped or the timeout has expired. - pub fn recv_timeout(&self, duration: Duration) -> ZResult { - self.0.recv_timeout(duration).map_err(Into::into) + /// + /// If the timeout has expired, this will return [`None`]. + pub fn recv_timeout(&self, duration: Duration) -> ZResult> { + match self.0.recv_timeout(duration) { + Ok(value) => Ok(Some(value)), + Err(flume::RecvTimeoutError::Timeout) => Ok(None), + Err(err) => Err(err.into()), + } } /// Create a blocking iterator over the values received on the channel that finishes iteration diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index f8dca5f23a..805f3ab4b2 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -486,7 +486,7 @@ async fn test_liveliness_subscriber_double_client_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(client_sub .liveliness() @@ -494,7 +494,7 @@ async fn test_liveliness_subscriber_double_client_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -502,12 +502,12 @@ async fn test_liveliness_subscriber_double_client_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -584,7 +584,7 @@ async fn test_liveliness_subscriber_double_client_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(client_sub .liveliness() @@ -592,7 +592,7 @@ async fn test_liveliness_subscriber_double_client_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -600,12 +600,12 @@ async fn test_liveliness_subscriber_double_client_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -687,12 +687,12 @@ async fn test_liveliness_subscriber_double_client_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -700,12 +700,12 @@ async fn test_liveliness_subscriber_double_client_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -783,7 +783,7 @@ async fn test_liveliness_subscriber_double_client_history_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(client_sub .liveliness() @@ -795,7 +795,7 @@ async fn test_liveliness_subscriber_double_client_history_before() { let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -803,12 +803,12 @@ async fn test_liveliness_subscriber_double_client_history_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -886,7 +886,7 @@ async fn test_liveliness_subscriber_double_client_history_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(client_sub .liveliness() @@ -898,7 +898,7 @@ async fn test_liveliness_subscriber_double_client_history_middle() { let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -906,12 +906,12 @@ async fn test_liveliness_subscriber_double_client_history_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -995,12 +995,12 @@ async fn test_liveliness_subscriber_double_client_history_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1008,12 +1008,12 @@ async fn test_liveliness_subscriber_double_client_history_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1088,12 +1088,12 @@ async fn test_liveliness_subscriber_double_peer_before() { let sub1 = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1101,12 +1101,12 @@ async fn test_liveliness_subscriber_double_peer_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1180,12 +1180,12 @@ async fn test_liveliness_subscriber_double_peer_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1193,12 +1193,12 @@ async fn test_liveliness_subscriber_double_peer_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1274,12 +1274,12 @@ async fn test_liveliness_subscriber_double_peer_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1287,12 +1287,12 @@ async fn test_liveliness_subscriber_double_peer_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1370,7 +1370,7 @@ async fn test_liveliness_subscriber_double_peer_history_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(peer_sub .liveliness() @@ -1382,7 +1382,7 @@ async fn test_liveliness_subscriber_double_peer_history_before() { let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1390,12 +1390,12 @@ async fn test_liveliness_subscriber_double_peer_history_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1473,7 +1473,7 @@ async fn test_liveliness_subscriber_double_peer_history_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(peer_sub .liveliness() @@ -1485,7 +1485,7 @@ async fn test_liveliness_subscriber_double_peer_history_middle() { let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1493,12 +1493,12 @@ async fn test_liveliness_subscriber_double_peer_history_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1582,12 +1582,12 @@ async fn test_liveliness_subscriber_double_peer_history_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1595,12 +1595,12 @@ async fn test_liveliness_subscriber_double_peer_history_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1683,7 +1683,7 @@ async fn test_liveliness_subscriber_double_router_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(router_sub .liveliness() @@ -1691,7 +1691,7 @@ async fn test_liveliness_subscriber_double_router_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1699,12 +1699,12 @@ async fn test_liveliness_subscriber_double_router_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1786,7 +1786,7 @@ async fn test_liveliness_subscriber_double_router_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(router_sub .liveliness() @@ -1794,7 +1794,7 @@ async fn test_liveliness_subscriber_double_router_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1802,12 +1802,12 @@ async fn test_liveliness_subscriber_double_router_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1894,12 +1894,12 @@ async fn test_liveliness_subscriber_double_router_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -1907,12 +1907,12 @@ async fn test_liveliness_subscriber_double_router_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -1995,7 +1995,7 @@ async fn test_liveliness_subscriber_double_router_history_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(router_sub .liveliness() @@ -2007,7 +2007,7 @@ async fn test_liveliness_subscriber_double_router_history_before() { let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2015,12 +2015,12 @@ async fn test_liveliness_subscriber_double_router_history_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -2103,7 +2103,7 @@ async fn test_liveliness_subscriber_double_router_history_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(router_sub .liveliness() @@ -2115,7 +2115,7 @@ async fn test_liveliness_subscriber_double_router_history_middle() { let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2123,12 +2123,12 @@ async fn test_liveliness_subscriber_double_router_history_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -2217,12 +2217,12 @@ async fn test_liveliness_subscriber_double_router_history_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2230,12 +2230,12 @@ async fn test_liveliness_subscriber_double_router_history_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -2331,7 +2331,7 @@ async fn test_liveliness_subscriber_double_clientviapeer_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(client_sub .liveliness() @@ -2339,7 +2339,7 @@ async fn test_liveliness_subscriber_double_clientviapeer_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2347,12 +2347,12 @@ async fn test_liveliness_subscriber_double_clientviapeer_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -2448,7 +2448,7 @@ async fn test_liveliness_subscriber_double_clientviapeer_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(client_sub .liveliness() @@ -2456,7 +2456,7 @@ async fn test_liveliness_subscriber_double_clientviapeer_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2464,12 +2464,12 @@ async fn test_liveliness_subscriber_double_clientviapeer_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -2570,12 +2570,12 @@ async fn test_liveliness_subscriber_double_clientviapeer_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2583,12 +2583,12 @@ async fn test_liveliness_subscriber_double_clientviapeer_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -2686,7 +2686,7 @@ async fn test_liveliness_subscriber_double_clientviapeer_history_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(client_sub .liveliness() @@ -2698,7 +2698,7 @@ async fn test_liveliness_subscriber_double_clientviapeer_history_before() { let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2706,12 +2706,12 @@ async fn test_liveliness_subscriber_double_clientviapeer_history_before() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -2809,7 +2809,7 @@ async fn test_liveliness_subscriber_double_clientviapeer_history_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sub2 = ztimeout!(client_sub .liveliness() @@ -2821,7 +2821,7 @@ async fn test_liveliness_subscriber_double_clientviapeer_history_middle() { let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2829,12 +2829,12 @@ async fn test_liveliness_subscriber_double_clientviapeer_history_middle() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -2938,12 +2938,12 @@ async fn test_liveliness_subscriber_double_clientviapeer_history_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -2951,12 +2951,12 @@ async fn test_liveliness_subscriber_double_clientviapeer_history_after() { let sample = ztimeout!(sub1.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); let sample = ztimeout!(sub2.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + assert!(sub2.try_recv().unwrap().is_none()); sub1.undeclare().await.unwrap(); sub2.undeclare().await.unwrap(); @@ -3035,7 +3035,7 @@ async fn test_liveliness_subget_client_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(client_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3051,7 +3051,7 @@ async fn test_liveliness_subget_client_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(client_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3112,7 +3112,7 @@ async fn test_liveliness_subget_client_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let client_tok = { let mut c = zenoh::Config::default(); @@ -3133,7 +3133,7 @@ async fn test_liveliness_subget_client_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(client_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3149,7 +3149,7 @@ async fn test_liveliness_subget_client_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(client_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3230,7 +3230,7 @@ async fn test_liveliness_subget_client_history_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(client_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3246,7 +3246,7 @@ async fn test_liveliness_subget_client_history_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(client_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3308,7 +3308,7 @@ async fn test_liveliness_subget_client_history_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let client_tok = { let mut c = zenoh::Config::default(); @@ -3329,7 +3329,7 @@ async fn test_liveliness_subget_client_history_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(client_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3345,7 +3345,7 @@ async fn test_liveliness_subget_client_history_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(client_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3426,7 +3426,7 @@ async fn test_liveliness_subget_peer_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(peer_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3442,7 +3442,7 @@ async fn test_liveliness_subget_peer_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(peer_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3503,7 +3503,7 @@ async fn test_liveliness_subget_peer_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let client_tok = { let mut c = zenoh::Config::default(); @@ -3524,7 +3524,7 @@ async fn test_liveliness_subget_peer_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(peer_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3540,7 +3540,7 @@ async fn test_liveliness_subget_peer_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(peer_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3621,7 +3621,7 @@ async fn test_liveliness_subget_peer_history_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(peer_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3637,7 +3637,7 @@ async fn test_liveliness_subget_peer_history_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(peer_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3699,7 +3699,7 @@ async fn test_liveliness_subget_peer_history_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let client_tok = { let mut c = zenoh::Config::default(); @@ -3720,7 +3720,7 @@ async fn test_liveliness_subget_peer_history_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(peer_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3736,7 +3736,7 @@ async fn test_liveliness_subget_peer_history_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(peer_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3822,7 +3822,7 @@ async fn test_liveliness_subget_router_before() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(router_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3838,7 +3838,7 @@ async fn test_liveliness_subget_router_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(router_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3904,7 +3904,7 @@ async fn test_liveliness_subget_router_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let client_tok = { let mut c = zenoh::Config::default(); @@ -3925,7 +3925,7 @@ async fn test_liveliness_subget_router_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(router_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -3941,7 +3941,7 @@ async fn test_liveliness_subget_router_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(router_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -4027,7 +4027,7 @@ async fn test_liveliness_subget_router_history_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(router_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -4043,7 +4043,7 @@ async fn test_liveliness_subget_router_history_before() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(router_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -4110,7 +4110,7 @@ async fn test_liveliness_subget_router_history_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let client_tok = { let mut c = zenoh::Config::default(); @@ -4131,7 +4131,7 @@ async fn test_liveliness_subget_router_history_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(router_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -4147,7 +4147,7 @@ async fn test_liveliness_subget_router_history_middle() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let get = ztimeout!(router_subget.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; @@ -4229,7 +4229,7 @@ async fn test_liveliness_regression_1() { let sub = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -4237,7 +4237,7 @@ async fn test_liveliness_regression_1() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); peer_tok.close().await.unwrap(); peer_sub.close().await.unwrap(); @@ -4297,7 +4297,7 @@ async fn test_liveliness_regression_2() { let sub = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let peer_tok2 = { let mut c = zenoh::Config::default(); @@ -4318,12 +4318,12 @@ async fn test_liveliness_regression_2() { let token2 = ztimeout!(peer_tok2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); token2.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -4331,7 +4331,7 @@ async fn test_liveliness_regression_2() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); peer_tok1.close().await.unwrap(); peer_tok2.close().await.unwrap(); @@ -4398,7 +4398,7 @@ async fn test_liveliness_regression_2_history() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); let peer_tok2 = { let mut c = zenoh::Config::default(); @@ -4419,12 +4419,12 @@ async fn test_liveliness_regression_2_history() { let token2 = ztimeout!(peer_tok2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); token2.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -4432,7 +4432,7 @@ async fn test_liveliness_regression_2_history() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); peer_tok1.close().await.unwrap(); peer_tok2.close().await.unwrap(); @@ -4524,12 +4524,12 @@ async fn test_liveliness_regression_3() { let sub = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); token2.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -4537,7 +4537,7 @@ async fn test_liveliness_regression_3() { let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.try_recv().is_err()); + assert!(sub.try_recv().unwrap().is_none()); peer_tok1.close().await.unwrap(); client_tok2.close().await.unwrap(); @@ -4662,7 +4662,7 @@ async fn test_liveliness_issue_1470() { assert!(sample.kind() == SampleKind::Put); puts0.insert(sample.key_expr().to_string()); - assert!(sub0.try_recv().is_err()); + assert!(sub0.try_recv().unwrap().is_none()); assert_eq!( puts0, @@ -4708,7 +4708,7 @@ async fn test_liveliness_issue_1470() { assert!(sample.kind() == SampleKind::Put); puts1.insert(sample.key_expr().to_string()); - assert!(sub1.try_recv().is_err()); + assert!(sub1.try_recv().unwrap().is_none()); assert_eq!( puts1, diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index 16f1376507..5fc3256cf6 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -58,9 +58,7 @@ async fn zenoh_matching_status_any() -> ZResult<()> { let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!( - received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) - ); + assert!(received_status.unwrap().is_none()); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -68,7 +66,11 @@ async fn zenoh_matching_status_any() -> ZResult<()> { let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_any_test")).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); + assert!(received_status + .ok() + .flatten() + .map(|s| s.matching_subscribers()) + .eq(&Some(true))); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(matching_status.matching_subscribers()); @@ -76,7 +78,11 @@ async fn zenoh_matching_status_any() -> ZResult<()> { ztimeout!(sub.undeclare()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); + assert!(received_status + .ok() + .flatten() + .map(|s| s.matching_subscribers()) + .eq(&Some(false))); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -84,7 +90,11 @@ async fn zenoh_matching_status_any() -> ZResult<()> { let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_any_test")).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); + assert!(received_status + .ok() + .flatten() + .map(|s| s.matching_subscribers()) + .eq(&Some(true))); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(matching_status.matching_subscribers()); @@ -92,7 +102,11 @@ async fn zenoh_matching_status_any() -> ZResult<()> { ztimeout!(sub.undeclare()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); + assert!(received_status + .ok() + .flatten() + .map(|s| s.matching_subscribers()) + .eq(&Some(false))); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -114,9 +128,7 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!( - received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) - ); + assert!(received_status.unwrap().is_none()); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -124,9 +136,7 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_remote_test")).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!( - received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) - ); + assert!(received_status.unwrap().is_none()); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -134,9 +144,7 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { ztimeout!(sub.undeclare()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!( - received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) - ); + assert!(received_status.unwrap().is_none()); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -144,7 +152,11 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_remote_test")).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); + assert!(received_status + .ok() + .flatten() + .map(|s| s.matching_subscribers()) + .eq(&Some(true))); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(matching_status.matching_subscribers()); @@ -152,7 +164,11 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { ztimeout!(sub.undeclare()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); + assert!(received_status + .ok() + .flatten() + .map(|s| s.matching_subscribers()) + .eq(&Some(false))); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -175,9 +191,7 @@ async fn zenoh_matching_status_local() -> ZResult<()> { let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!( - received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) - ); + assert!(received_status.unwrap().is_none()); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -185,7 +199,11 @@ async fn zenoh_matching_status_local() -> ZResult<()> { let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_local_test")).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); + assert!(received_status + .ok() + .flatten() + .map(|s| s.matching_subscribers()) + .eq(&Some(true))); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(matching_status.matching_subscribers()); @@ -193,7 +211,11 @@ async fn zenoh_matching_status_local() -> ZResult<()> { ztimeout!(sub.undeclare()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); + assert!(received_status + .ok() + .flatten() + .map(|s| s.matching_subscribers()) + .eq(&Some(false))); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -201,9 +223,7 @@ async fn zenoh_matching_status_local() -> ZResult<()> { let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_local_test")).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!( - received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) - ); + assert!(received_status.unwrap().is_none()); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers()); @@ -211,9 +231,7 @@ async fn zenoh_matching_status_local() -> ZResult<()> { ztimeout!(sub.undeclare()).unwrap(); let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); - assert!( - received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout) - ); + assert!(received_status.unwrap().is_none()); let matching_status = ztimeout!(publisher1.matching_status()).unwrap(); assert!(!matching_status.matching_subscribers());