JetStream understanding PushSubscribeAsync / DeliverSubject #528
Replies: 6 comments
-
DeliverySubject is required when you make a push consumer ahead of time via The FilterSubject is the key to getting only the subject you want. In this case, since you did not set WithFilterSubject, it's essentially a wildcard for every subject in the stream, which is what you reported. And you were correct to be confused as why the subject (first) parameter of the subscribe didn't appear to do anything. I need to add some code to the subscription to suss out this disconnect and report an error. All the subject did in this case was locate the correct stream, but I probably could find that from the Consumer configuration. I think the subscribe api needs a little tweak, some overloads without the subject. In the meantime, I put together some code I'll put in the next comment. |
Beta Was this translation helpful? Give feedback.
-
var js = c.CreateJetStreamContext();
var jsm = c.CreateJetStreamManagementContext();
var sc = StreamConfiguration.Builder()
.WithName("stream")
.WithStorageType(StorageType.File)
.WithSubjects("topic", "topic3")
.Build();
if (JsUtils.GetStreamInfoOrNullWhenNotExist(jsm, "stream") == null)
jsm.AddStream(sc);
else
jsm.UpdateStream(sc);
var c1 = ConsumerConfiguration.Builder()
.WithDeliverSubject("topic-deliver") // Push
.WithFilterSubject("topic")
.WithDurable("c1");
var c2 = ConsumerConfiguration.Builder()
.WithDeliverSubject("topic3-deliver") // Push
.WithFilterSubject("topic3")
.WithDurable("c2");
jsm.AddOrUpdateConsumer("stream", c1.Build());
jsm.AddOrUpdateConsumer("stream", c2.Build());
byte[] msg = Encoding.UTF8.GetBytes(DateTime.Now.ToString());
js.Publish(new Msg("topic", null, new MsgHeader(), msg));
js.Publish(new Msg("topic3", null, new MsgHeader(), msg));
new Thread(() =>
{
var sub1 = js.PushSubscribeAsync("topic", (sender, args) =>
{
Console.WriteLine("SUB 1: " + args.Message);
args.Message.Ack();
}, false, c1.BuildPushSubscribeOptions());
}).Start();
new Thread(() =>
{
var sub2 = js.PushSubscribeAsync("topic3", (sender, args) =>
{
Console.WriteLine("SUB 2: " + args.Message);
args.Message.Ack();
}, false, c2.BuildPushSubscribeOptions());
}).Start();
// Gives some time for the subs to work
Thread.Sleep(2000); |
Beta Was this translation helpful? Give feedback.
-
It seems to me the subject and queue params could be removed from PushSubscribeAsync and read all info from the ConsumerConfiguration. I also tested queue which needs to be the same as deliverygroup, thus its kinda redundant. I also noticed AddOrUpdateConsumer can only update the exact same config, even changing maxAckPending fails |
Beta Was this translation helpful? Give feedback.
-
@coronabytes Agreed. The BindTo is really suppose to work with existing subscriptions and I need to make sure it works and probably add some documentation and examples. Yes, queue and delivery group are redundant, but the api with the queue existed and without a major version change I did not want to remove the queue parameter. Yes, the consumer is not supposed to change. |
Beta Was this translation helpful? Give feedback.
-
Then why is it called AddOrUpdateConsumer ? maybe AddIfNotExists() ? on a side note: "consistent" api and "real-world" examples would be nice |
Beta Was this translation helpful? Give feedback.
-
I said the consumer is not "supposed" to change. There are maybe a field or 2 that can change, but I can't remember off the top of my head which they are. Again, not supposed to change. Also in the context of making a subscription, the consumer is not allowed to change versus a durable consumer already on the server- at that point make another consumer and use starting sequence if you have to. I get it, it's not perfect. We are extremely strict about versioning, regardless of being below 1.0.0. I'll bring it up with the product manager. There are quite a few samples already covering basic use cases. Also there are some java examples in the java-nats-examples repo and they would be functionally the same. The biggest difference is really the syntax and classes for threading and async and maybe the fact that NextMessage times out with an exception when out of messages in .NET versus returning a null in java. There are two reasons for the extraneous queue. 1. The feature is basically brand new. 2. The previous incarnation of server queuing worked differently, if at all, and the java api supported it. The .NET api was modelled off of the java one before the server change. This doesn't make it right but it works. Yes, there could, should be changes. We take all constructive feedback seriously. The PM and I will discuss this, we just didn't get to it today. It's also open source and anyone is welcome to fork the code and make a PR back to the originally repo, to make issues and to propose solutions. Yes we need more examples and documentation. People have been beating down our doors waiting for a release and I had to get something out the door. There were pre-releases out for weeks, and I got literally zero feedback on it until it was release without the "pre" moniker. |
Beta Was this translation helpful? Give feedback.
-
So with 0.14 I finally got jetstream up and running.
As you can see in the code I have 1 stream, 2 topics and 2 consumers.
However I need some info on what deliverysubject does exactly.
Would you use the consumer group name?
Another thing is the first parameter of PushSubscribeAsync.
It needs to be a valid stream subject, however it doesn't seem to filter out messages at all. (stream lookup only?)
as i get topic messages in the topic3 sub (bad example)
same with the queue parameter where apparently the consumer deliverygroup should be used.
but then why are these parameters there?
Beta Was this translation helpful? Give feedback.
All reactions