-
I am using the new jetstream package and would like to start my consumers from the newest event on the stream. However, I am observing erratic behavior. If the consumer shutdowns for 10+ seconds and then I start it up it works fine, however if the consumer shutdowns and restarts immediately (let's say 2-3 seconds); it reads messages from where it left off. It appears that the consumer is not immediately destroyed/cleaned up from the server so it picks it up again. This is the code snippet: cfg := jetstream.StreamConfig{
Name: "orders",
Subjects: []string{"orders.*"},
}
stream, err := js.CreateStream(context.Background(), cfg)
if err != nil {
panic(err)
}
consumer, err := stream.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{
Name: "group-b",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: []string{"orders.*"},
DeliverPolicy: jetstream.DeliverNewPolicy,
})
if err != nil {
log.Println(err)
}
if info, err := consumer.Info(context.Background()); err == nil {
bytes, _ := json.MarshalIndent(info, "", " ")
log.Println(string(bytes)) // upon restarts I can see the consumer still exists sometimes
}
ctx, err := consumer.Consume(func(msg jetstream.Msg) {
log.Printf("received message: %s", string(msg.Data()))
if err := msg.DoubleAck(context.Background()); err != nil {
log.Printf("ack error: %s", err)
}
})
if err != nil {
panic(err)
}
<-stopChan
ctx.Stop() |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Hey @el-hivo. Client does not do consumer cleanup automatically, that is handled by the server. The reason why you're sometimes seeing the consumer after restarting the app is a parameter on Depending on your needs, you can either:
|
Beta Was this translation helpful? Give feedback.
Hey @el-hivo. Client does not do consumer cleanup automatically, that is handled by the server. The reason why you're sometimes seeing the consumer after restarting the app is a parameter on
ConsumerConfig
calledInactiveThreshold
. This is the value which server uses to clean up existing non-active (not actively polling for messages) consumers. So if you stop consuming forInactiveThreshold
, the consumer is automatically removed from server. Default value for this parameter is 5 seconds.Depending on your needs, you can either:
InactiveThreshold
to a large value or by settingDurable
field onConsumerConfig
).