Skip to content

Commit

Permalink
Add deleted details to Stream info
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Oct 14, 2024
1 parent 3dbbf9e commit 36cb05a
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 33 deletions.
152 changes: 120 additions & 32 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,41 @@ impl Stream<Info> {
}
}

/// Returns cached [Info] for the [Stream].
/// Cache is either from initial creation/retrieval of the [Stream] or last call to
/// [Stream::info].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let stream = jetstream.get_stream("events").await?;
///
/// let info = stream.cached_info();
/// # Ok(())
/// # }
/// ```
pub fn cached_info(&self) -> &Info {
&self.info
}
}

impl<I> Stream<I> {
/// Retrieves `info` about [Stream] from the server. Does not update the cache.
/// Can be used on Stream retrieved by [Context::get_stream_no_info]
pub async fn get_info(&self) -> Result<Info, InfoError> {
let subject = format!("STREAM.INFO.{}", self.name);

match self.context.request(subject, &json!({})).await? {
Response::Ok::<Info>(info) => Ok(info),
Response::Err { error } => Err(error.into()),
}
}

/// Retrieves [[stream::Info]] from the server and returns a [[futures::Stream]] that allows
/// iterating over all subjects in the stream fetched via paged API.
///
Expand Down Expand Up @@ -186,7 +221,7 @@ impl Stream<Info> {
) -> Result<InfoWithSubjects, InfoError> {
let subjects_filter = subjects_filter.as_ref().to_string();
// TODO: validate the subject and decide if this should be a `Subject`
let mut info = stream_info_with_details(
let info = stream_info_with_details(
self.context.clone(),
self.name.clone(),
0,
Expand All @@ -195,53 +230,37 @@ impl Stream<Info> {
)
.await?;

let subjects = info.state.subjects.take().unwrap_or_default();

Ok(InfoWithSubjects {
context: self.context.clone(),
Ok(InfoWithSubjects::new(
self.context.clone(),
info,
pages_done: subjects.is_empty(),
offset: subjects.len(),
subjects: subjects.into_iter(),
subjects_filter,
stream: self.name.clone(),
info_request: None,
})
))
}

/// Returns cached [Info] for the [Stream].
/// Cache is either from initial creation/retrieval of the [Stream] or last call to
/// [Stream::info].
/// Creates a builder that allows to customize `Stream::Info`.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let stream = jetstream.get_stream("events").await?;
/// let mut stream = jetstream.get_stream("events").await?;
///
/// let info = stream.cached_info();
/// let mut info = stream.info_builder()
/// .with_deleted()
/// .subjects("events.>").try_next().await?;
///
/// while let Some((subject, count)) = info.try_next().await? {
/// println!("Subject: {} count: {}", subject, count);
/// }
/// # Ok(())
/// # }
/// ```
pub fn cached_info(&self) -> &Info {
&self.info
}
}

impl<I> Stream<I> {
/// Retrieves `info` about [Stream] from the server. Does not update the cache.
/// Can be used on Stream retrieved by [Context::get_stream_no_info]
pub async fn get_info(&self) -> Result<Info, InfoError> {
let subject = format!("STREAM.INFO.{}", self.name);

match self.context.request(subject, &json!({})).await? {
Response::Ok::<Info>(info) => Ok(info),
Response::Err { error } => Err(error.into()),
}
pub fn info_builder(&self) -> StreamInfoBuilder {
StreamInfoBuilder::new(self.context.clone(), self.name.clone())
}

/// Gets next message for a [Stream].
Expand Down Expand Up @@ -1047,6 +1066,47 @@ impl<I> Stream<I> {
}
}

pub struct StreamInfoBuilder {
pub(crate) context: Context,
pub(crate) name: String,
pub(crate) deleted: bool,
pub(crate) subject: String,
}

impl StreamInfoBuilder {
fn new(context: Context, name: String) -> Self {
Self {
context,
name,
deleted: false,
subject: "".to_string(),
}
}

pub fn with_deleted(mut self, deleted: bool) -> Self {
self.deleted = deleted;
self
}

pub fn subjects<S: Into<String>>(mut self, subject: S) -> Self {
self.subject = subject.into();
self
}

pub async fn fetch(self) -> Result<InfoWithSubjects, InfoError> {
let info = stream_info_with_details(
self.context.clone(),
self.name.clone(),
0,
self.deleted,
self.subject.clone(),
)
.await?;

Ok(InfoWithSubjects::new(self.context, info, self.subject))
}
}

/// `StreamConfig` determines the properties for a stream.
/// There are sensible defaults for most. If no subjects are
/// given the name will be used as the only subject.
Expand Down Expand Up @@ -1341,6 +1401,23 @@ pub struct InfoWithSubjects {
pages_done: bool,
}

impl InfoWithSubjects {
pub fn new(context: Context, mut info: Info, subject: String) -> Self {
let subjects = info.state.subjects.take().unwrap_or_default();
let name = info.config.name.clone();
InfoWithSubjects {
context,
info,
pages_done: subjects.is_empty(),
offset: subjects.len(),
subjects: subjects.into_iter(),
subjects_filter: subject,
stream: name,
info_request: None,
}
}
}

impl futures::Stream for InfoWithSubjects {
type Item = Result<(String, usize), InfoError>;

Expand Down Expand Up @@ -1452,6 +1529,17 @@ pub struct State {
pub last_timestamp: time::OffsetDateTime,
/// The number of consumers configured to consume this stream
pub consumer_count: usize,
/// The number of subjects in the stream
#[serde(default, rename = "num_subjects")]
pub subjects_count: u64,
/// The number of deleted messages in the stream
#[serde(default, rename = "num_deleted")]
pub deleted_count: Option<u64>,
/// The list of deleted subjects from the Stream.
/// This field will be filled only if [[StreamInfoBuilder::with_deleted]] option is set.
#[serde(default)]
pub deleted: Option<Vec<u64>>,

pub(crate) subjects: Option<HashMap<String, usize>>,
}

Expand Down
23 changes: 22 additions & 1 deletion async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,14 @@ mod jetstream {
.stream_sequence,
3
);

let info = stream
.info_builder()
.with_deleted(true)
.fetch()
.await
.unwrap();
assert_eq!(info.info.state.deleted_count, Some(1));
}

#[tokio::test]
Expand Down Expand Up @@ -3746,7 +3754,20 @@ mod jetstream {
let i = info.info.clone();
let count = info.count().await;
println!("messages: {:?}", i.state.messages);
// println!("info: {:?}", i);
println!("count: {count}");
assert!(count.eq(&220_000));

let info = stream
.info_builder()
.subjects("events.>")
.with_deleted(true)
.fetch()
.await
.unwrap();

let i = info.info.clone();
let count = info.count().await;
println!("messages: {:?}", i.state.messages);
println!("count: {count}");
assert!(count.eq(&220_000));
}
Expand Down

0 comments on commit 36cb05a

Please sign in to comment.