Ephemeral Consumer Cleanup Rate? #1534
-
Hi, we have a use case for ephemeral consumers where we would need to create them per API call we receive at a relatively large RPS rate. We create an ephemeral consumer with an inactivity threshold of 1 minute for the purposes of fetching up to the latest 100 messages from the subject. However, the number of consumers still keep growing (> 1million consumers) despite having set that value. What is the rate at which ephemeral consumers are deleted? Is this a valid use case for ephemeral consumers? Example usage: func (r *repository) GetUserData(
ctx context.Context,
userCode string,
) ([]string, error) {
consumerName := fmt.Sprintf(
"fetch-user-data-%s-%s",
userCode,
uuid.NewString(),
)
cs, err := r.js.CreateConsumer(
ctx,
"some stream",
natsJetstream.ConsumerConfig{
Name: consumerName,
Durable: "", // This is not a durable consumer, DO NOT SET.
Description: "some description.",
DeliverPolicy: natsJetstream.DeliverAllPolicy,
AckPolicy: natsJetstream.AckNonePolicy,
FilterSubject: fmt.Sprintf("USER_DATA.%s", userCode),
ReplayPolicy: natsJetstream.ReplayInstantPolicy,
MaxRequestBatch: 100,
InactiveThreshold: NATSInactiveThreshold, // time.minute
},
)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize consumer")
}
res, err := cs.FetchNoWait(100)
if err != nil {
return nil, errors.Wrap(err, "fetch failed")
}
for msg := range res.Messages() {
// ... message processing
fmt.Println(msg)
}
return nil, nil
} |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 10 replies
-
What server version? What is the topology of the NATS system? |
Beta Was this translation helpful? Give feedback.
-
In addition to what @ripienaar has shared. For using consumers today, make sure the consumer to fetch the history is an R1 and memory backed, that can help or an ordered consumer which will be an R1 and flow controlled. Also, even prior toi batch support landing (it is already on main and nightlies for the server), you could use NextFor and walk the user history today by issuing a new request with a different starting seq upon receipt of the prior response. Then when batch lands you could simply add in Batch to get more than one at a time. |
Beta Was this translation helpful? Give feedback.
-
I have a follow up question that's adjacently related. We have a requirement to implement an optimistic realtime streaming architecture that loops back onto itself:
Is this architecture not possible with JetStream or NATS? How would you recommend building for this requirement? We do not need ack/nak support like a message consumer for this as it is optimistic, so long as we get a realtime update that it has been processed by our downstream consumer, it is sufficient. Will spawning a NATS subscriber (instead of a consumer) on every API call be ok for scaling? |
Beta Was this translation helpful? Give feedback.
Such a high rate of consumer creation will not really work well. Would be better to look into the APIs we have to fetch single messages from the stream - we are about to land a mode that would allow batches to be fetched without a consumer. See nats-io/nats-architecture-and-design#262