canonical way to process batches of jetstream pull consumer messages as they come in #800
-
if i want a pull consumer to process messages as they come in, forever, is there anyway to avoid refreshing a #[rstest]
#[tokio::test]
async fn test_consumer_task(
#[future] nats: async_nats::Client,
) {
let nats = nats.await;
let jetstream = async_nats::jetstream::new(nats);
let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await.unwrap();
stream.purge().await.unwrap();
let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
}).await.unwrap();
let mut task = tokio::spawn(async move {
loop {
println!("looping");
if let Ok(messages) = consumer.fetch().max_messages(200).messages().await {
let messages = messages.collect::<Vec<_>>().await;
if !messages.is_empty() {
println!("messages: {}", messages.len());
}
}
}
}).fuse();
tokio::time::sleep(Duration::from_millis(500)).await;
jetstream.publish("events".to_string(), "data".into()).await.unwrap();
println!("sent");
futures::select! {
_ = task => (),
_ = tokio::time::sleep(Duration::from_millis(5000)).fuse() => ()
}
} the published message is never processed, since the first loop of is there a canonical way to do this that i'm missing? thanks |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Maybe something like let mut stream = consumer.messages().await?;
while let Some(message) = stream.next().await {
let message = message?;
// process message
} |
Beta Was this translation helpful? Give feedback.
Maybe something like