diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index b92304ade..07a0b652e 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -617,10 +617,26 @@ impl Client { } /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining - /// messages, then closes the connection + /// messages, then closes the connection. Once completed, any associated streams associated with the + /// client will be closed, and further client commands will fail /// /// # Examples - /// TODO + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("demo.nats.io").await?; + /// let mut subscription = client.subscribe("events.>").await?; + /// + /// client.drain().await?; + /// + /// # // existing subscriptions are closed and further commands will fail + /// assert!(subscription.next().await.is_none()); + /// client.subscribe().await.expect_err("Expected further commands to fail"); + /// + /// # Ok(()) + /// # } + /// ``` pub async fn drain(&self) -> Result<(), DrainError> { // Drain all subscriptions self.sender.send(Command::Drain { sid: None }).await?; diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index dc9f30db2..d418045a3 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -539,15 +539,16 @@ impl ConnectionHandler { } // Before handling any commands, drop any subscriptions which are draining - // Note: safe to assume drain has completed, as we would have flushed all outgoing - // UNSUB messages in the previous call to this fn, and we would have processed and delivered - // any remaining messages to the subscription in the loop above. + // Note: safe to assume subscription drain has completed at this point, as we would have flushed + // all outgoing UNSUB messages in the previous call to this fn, and we would have processed and + // delivered any remaining messages to the subscription in the loop above. self.handler.subscriptions.retain(|_, s| !s.is_draining); if self.handler.is_draining { - // The entire connection is draining. This means we flushed outgoing messages and all subs - // were drained by the above retain and we should exit instead of processing any further - // messages + // The entire connection is draining. This means we flushed outgoing messages in the previous + // call to this fn, we handled any remaining messages from the server in the loop above, and + // all subs were drained, so drain is complete and we should exit instead of processing any + // further messages return Poll::Ready(ExitReason::Closed); } @@ -810,6 +811,8 @@ impl ConnectionHandler { drain_sub(&sid, sub); } } else { + // sid isn't set, so drain the whole client + self.is_draining = true; for (sid, sub) in self.subscriptions.iter_mut() { drain_sub(sid, sub); } @@ -1304,9 +1307,9 @@ impl Subscriber { Ok(()) } - /// Unsubscribes from subscription immediately leaves the stream open for the configured drain period - /// to allow any in-flight messages on the subscription to be delivered. The stream will be closed - /// at the end of the drain period + /// Unsubscribes immediately but leaves the stream open to allow any in-flight messages on the + /// subscription to be delivered. The stream will be closed after any remaining messages are + /// delivered /// /// # Examples /// ``` diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index b7275a24f..3845dfef8 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -996,4 +996,142 @@ mod client { assert_eq!(stats.out_bytes.load(Ordering::Relaxed), 139); assert_eq!(stats.connects.load(Ordering::Relaxed), 2); } + + #[tokio::test] + async fn drain_subscription_basic() { + use std::error::Error; + let server = nats_server::run_basic_server(); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let mut sub = client.subscribe("test").await.unwrap(); + + // publish some data + client.publish("test", "data".into()).await.unwrap(); + client.flush().await.unwrap(); + + // confirm we receive that data + assert!(sub.next().await.is_some()); + + // now drain the subscription + let result = sub.drain().await; + match result { + Ok(()) => println!("ok"), + Err(err) => { + println!("error: {}", err); + println!("source: {:?}", err.source()) + } + } + + // assert the stream is closed after draining + assert!(sub.next().await.is_none()); + + // confirm we can still reconnect and send messages on a new subscription + let mut sub2 = client.subscribe("test2").await.unwrap(); + client.publish("test2", "data".into()).await.unwrap(); + client.flush().await.unwrap(); + assert!(sub2.next().await.is_some()); + } + + #[tokio::test] + async fn drain_subscription_unsub_after() { + let server = nats_server::run_basic_server(); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let mut sub = client.subscribe("test").await.unwrap(); + + sub.unsubscribe_after(120) + .await + .expect("Expected to send unsub_after"); + + // publish some data + client.publish("test", "data".into()).await.unwrap(); + client.publish("test", "data".into()).await.unwrap(); + client.flush().await.unwrap(); + + // Send the drain command + sub.drain().await.expect("Expected to drain the sub"); + + // we should receive all published data then close immediately + assert!(sub.next().await.is_some()); + assert!(sub.next().await.is_some()); + assert!(sub.next().await.is_none()); + } + + #[tokio::test] + async fn drain_subscription_active() { + let server = nats_server::run_basic_server(); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + // spawn a task to constantly write to the subscription + let constant_writer = tokio::spawn({ + let client = client.clone(); + async move { + loop { + client.publish("test", "data".into()).await.unwrap(); + client.flush().await.unwrap(); + } + } + }); + + let mut sub = client.subscribe("test").await.unwrap(); + + // confirm we receive some data + assert!(sub.next().await.is_some()); + + // now drain the subscription + sub.drain().await.unwrap(); + + // yield to the runtime to ensure constant_writer gets a chance to publish a message or two to the subject + tokio::time::sleep(Duration::from_millis(1)).await; + + // assert the subscription stream is closed after draining + let sleep_fut = async move { while let Some(_) = sub.next().await {} }; + tokio::time::timeout(Duration::from_secs(10), sleep_fut) + .await + .expect("Expected stream to drain within 10s"); + + // assert constant_writer doesn't fail to write after the only sub is drained (i.e. client operations still work fine) + assert!(!constant_writer.is_finished()); + + // confirm we can still reconnect and receive messages on the same subject on a new subscription + let mut sub2 = client.subscribe("test").await.unwrap(); + assert!(sub2.next().await.is_some()); + } + + #[tokio::test] + async fn drain_client_basic() { + let server = nats_server::run_basic_server(); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let mut sub = client.subscribe("test").await.unwrap(); + + // publish some data + client.publish("test", "data".into()).await.unwrap(); + client.flush().await.unwrap(); + + // confirm we receive that data + assert!(sub.next().await.is_some()); + + // now drain the client + client.drain().await.unwrap(); + + // assert the sub's stream is closed after draining + assert!(sub.next().await.is_none()); + + // we should not be able to perform any more operations on a drained client + client + .subscribe("test2") + .await + .expect_err("Expected client to be drained"); + + client + .publish("test", "data".into()) + .await + .expect_err("Expected client to be drained"); + + // we should be able to connect with a new client + let _client2 = async_nats::connect(server.client_url()) + .await + .expect("Expected to be able to create a new client"); + } }