From 36cb05a451adefa12d9668cce971e0f319f6b091 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 14 Oct 2024 17:01:52 +0200 Subject: [PATCH] Add deleted details to Stream info Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/stream.rs | 152 ++++++++++++++++++++++------ async-nats/tests/jetstream_tests.rs | 23 ++++- 2 files changed, 142 insertions(+), 33 deletions(-) diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index d915fc954..b7230aa02 100755 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -158,6 +158,41 @@ impl Stream { } } + /// Returns cached [Info] for the [Stream]. + /// Cache is either from initial creation/retrieval of the [Stream] or last call to + /// [Stream::info]. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("localhost:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// + /// let stream = jetstream.get_stream("events").await?; + /// + /// let info = stream.cached_info(); + /// # Ok(()) + /// # } + /// ``` + pub fn cached_info(&self) -> &Info { + &self.info + } +} + +impl Stream { + /// Retrieves `info` about [Stream] from the server. Does not update the cache. + /// Can be used on Stream retrieved by [Context::get_stream_no_info] + pub async fn get_info(&self) -> Result { + let subject = format!("STREAM.INFO.{}", self.name); + + match self.context.request(subject, &json!({})).await? { + Response::Ok::(info) => Ok(info), + Response::Err { error } => Err(error.into()), + } + } + /// Retrieves [[stream::Info]] from the server and returns a [[futures::Stream]] that allows /// iterating over all subjects in the stream fetched via paged API. /// @@ -186,7 +221,7 @@ impl Stream { ) -> Result { let subjects_filter = subjects_filter.as_ref().to_string(); // TODO: validate the subject and decide if this should be a `Subject` - let mut info = stream_info_with_details( + let info = stream_info_with_details( self.context.clone(), self.name.clone(), 0, @@ -195,53 +230,37 @@ impl Stream { ) .await?; - let subjects = info.state.subjects.take().unwrap_or_default(); - - Ok(InfoWithSubjects { - context: self.context.clone(), + Ok(InfoWithSubjects::new( + self.context.clone(), info, - pages_done: subjects.is_empty(), - offset: subjects.len(), - subjects: subjects.into_iter(), subjects_filter, - stream: self.name.clone(), - info_request: None, - }) + )) } - /// Returns cached [Info] for the [Stream]. - /// Cache is either from initial creation/retrieval of the [Stream] or last call to - /// [Stream::info]. + /// Creates a builder that allows to customize `Stream::Info`. /// /// # Examples - /// /// ```no_run /// # #[tokio::main] /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::TryStreamExt; /// let client = async_nats::connect("localhost:4222").await?; /// let jetstream = async_nats::jetstream::new(client); /// - /// let stream = jetstream.get_stream("events").await?; + /// let mut stream = jetstream.get_stream("events").await?; /// - /// let info = stream.cached_info(); + /// let mut info = stream.info_builder() + /// .with_deleted() + /// .subjects("events.>").try_next().await?; + /// + /// while let Some((subject, count)) = info.try_next().await? { + /// println!("Subject: {} count: {}", subject, count); + /// } /// # Ok(()) /// # } /// ``` - pub fn cached_info(&self) -> &Info { - &self.info - } -} - -impl Stream { - /// Retrieves `info` about [Stream] from the server. Does not update the cache. - /// Can be used on Stream retrieved by [Context::get_stream_no_info] - pub async fn get_info(&self) -> Result { - let subject = format!("STREAM.INFO.{}", self.name); - - match self.context.request(subject, &json!({})).await? { - Response::Ok::(info) => Ok(info), - Response::Err { error } => Err(error.into()), - } + pub fn info_builder(&self) -> StreamInfoBuilder { + StreamInfoBuilder::new(self.context.clone(), self.name.clone()) } /// Gets next message for a [Stream]. @@ -1047,6 +1066,47 @@ impl Stream { } } +pub struct StreamInfoBuilder { + pub(crate) context: Context, + pub(crate) name: String, + pub(crate) deleted: bool, + pub(crate) subject: String, +} + +impl StreamInfoBuilder { + fn new(context: Context, name: String) -> Self { + Self { + context, + name, + deleted: false, + subject: "".to_string(), + } + } + + pub fn with_deleted(mut self, deleted: bool) -> Self { + self.deleted = deleted; + self + } + + pub fn subjects>(mut self, subject: S) -> Self { + self.subject = subject.into(); + self + } + + pub async fn fetch(self) -> Result { + let info = stream_info_with_details( + self.context.clone(), + self.name.clone(), + 0, + self.deleted, + self.subject.clone(), + ) + .await?; + + Ok(InfoWithSubjects::new(self.context, info, self.subject)) + } +} + /// `StreamConfig` determines the properties for a stream. /// There are sensible defaults for most. If no subjects are /// given the name will be used as the only subject. @@ -1341,6 +1401,23 @@ pub struct InfoWithSubjects { pages_done: bool, } +impl InfoWithSubjects { + pub fn new(context: Context, mut info: Info, subject: String) -> Self { + let subjects = info.state.subjects.take().unwrap_or_default(); + let name = info.config.name.clone(); + InfoWithSubjects { + context, + info, + pages_done: subjects.is_empty(), + offset: subjects.len(), + subjects: subjects.into_iter(), + subjects_filter: subject, + stream: name, + info_request: None, + } + } +} + impl futures::Stream for InfoWithSubjects { type Item = Result<(String, usize), InfoError>; @@ -1452,6 +1529,17 @@ pub struct State { pub last_timestamp: time::OffsetDateTime, /// The number of consumers configured to consume this stream pub consumer_count: usize, + /// The number of subjects in the stream + #[serde(default, rename = "num_subjects")] + pub subjects_count: u64, + /// The number of deleted messages in the stream + #[serde(default, rename = "num_deleted")] + pub deleted_count: Option, + /// The list of deleted subjects from the Stream. + /// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set. + #[serde(default)] + pub deleted: Option>, + pub(crate) subjects: Option>, } diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 7f90e2aff..f2e107bc6 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -938,6 +938,14 @@ mod jetstream { .stream_sequence, 3 ); + + let info = stream + .info_builder() + .with_deleted(true) + .fetch() + .await + .unwrap(); + assert_eq!(info.info.state.deleted_count, Some(1)); } #[tokio::test] @@ -3746,7 +3754,20 @@ mod jetstream { let i = info.info.clone(); let count = info.count().await; println!("messages: {:?}", i.state.messages); - // println!("info: {:?}", i); + println!("count: {count}"); + assert!(count.eq(&220_000)); + + let info = stream + .info_builder() + .subjects("events.>") + .with_deleted(true) + .fetch() + .await + .unwrap(); + + let i = info.info.clone(); + let count = info.count().await; + println!("messages: {:?}", i.state.messages); println!("count: {count}"); assert!(count.eq(&220_000)); }