register a handler for incoming messages on a push consumer #1176
-
Hi. Is it possible to register a handler for a specific subject on a push consumer. All examples show how to poll on the Stream::next() function, which I want to avoid. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
Hey! This is an idiomatic approach to handle streams of messages in Rust. If you want callabck behavior, you can use let mut messages = consumer
.messages()
.await
.unwrap()
.for_each(|message| async move {
println!("handle the message on subject {}", message.unwrap().subject);
}); If you want |
Beta Was this translation helpful? Give feedback.
-
Hi. Thanks for the answer. The reason I don't like iterating over the messages is that I'd have to start/spawn a background task/thread and monitor the health of that thread with my own code, which I'd like to avoid. let client=nats::connect("nats://127.0.0.1:4222").unwrap();
let jetstream=nats::jetstream::new(client);
let stream=jetstream.add_stream(StreamConfig{name: "mystream".to_owned(),subjects: vec!["mysubject".to_owned()],..Default::default()}).unwrap();
println!("New stream {:#?}",stream);
let uic=jetstream.subscribe("mysubject").unwrap().with_handler(move|m|{
println!("New message {}",m);
m.ack().unwrap();
Ok(())
}); with_handler starts a background thread and runs the closure whenever a message comes in. jetstream.subscribe() creates a consumer internally to allow for data persistence. |
Beta Was this translation helpful? Give feedback.
Hey!
This is an idiomatic approach to handle streams of messages in Rust.
Why you want to avoid it?
If you want callabck behavior, you can use
for_each
(or evenfor_each_concurrent
):If you want
each
call to not block execution of code until it's finished - use a Task.Neither our implementation of
Stream
, norfutures::StreamExt::each
does not spawn anyTasks
to be as lightweight as possible.