From 97e36ea8648ab3b6261d7b7edc285e50ed565dfb Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 12 Aug 2024 12:54:13 +0100 Subject: [PATCH] Stream ingest rate limiting Signed-off-by: Neil Twigg --- server/jetstream_test.go | 51 ++++++++++++++++++++++++++++++++++++++++ server/stream.go | 35 ++++++++++++++++++++++----- 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 8bb40f654ca..caa66771e01 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24068,3 +24068,54 @@ func addConsumerWithError(t *testing.T, nc *nats.Conn, cfg *CreateConsumerReques } return resp.ConsumerInfo, resp.Error } + +func TestJetStreamRateLimitHighStreamIngest(t *testing.T) { + streamMaxQueueMsgs = 1 + streamMaxQueueBytes = 1 + defer func() { + streamMaxQueueMsgs = streamDefaultMaxQueueMsgs + streamMaxQueueBytes = streamDefaultMaxQueueBytes + }() + + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"test"}, + }) + require_NoError(t, err) + + // Create a reply inbox that we can await API requests on. + // This is instead of using nc.Request(). + inbox := nc.NewRespInbox() + resp := make(chan *nats.Msg, 1000) + _, err = nc.ChanSubscribe(inbox, resp) + require_NoError(t, err) + + // Publish a large number of messages using Core NATS withou + // waiting for the responses from the API. + msg := &nats.Msg{ + Subject: "test", + Reply: inbox, + } + for i := 0; i < 1000; i++ { + require_NoError(t, nc.PublishMsg(msg)) + } + + // Now sort through the API responses. We're looking for one + // that tells us that we were rate-limited. If we don't find + // one then we fail the test. + var rateLimited bool + for i, msg := 0, <-resp; i < 1000; i, msg = i+1, <-resp { + if msg.Header.Get("Status") == "429" { + rateLimited = true + break + } + } + require_True(t, rateLimited) +} diff --git a/server/stream.go b/server/stream.go index 608f895cd62..209d7f26c3e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -218,6 +218,19 @@ type ExternalStream struct { DeliverPrefix string `json:"deliver"` } +// For managing stream ingest. +// TODO: What values make sense here? +const ( + streamDefaultMaxQueueMsgs = 100_000 + streamDefaultMaxQueueBytes = 1024 * 1024 * 64 +) + +// In case unit tests need to overwrite these. +var ( + streamMaxQueueMsgs = streamDefaultMaxQueueMsgs + streamMaxQueueBytes = streamDefaultMaxQueueBytes +) + // Stream is a jetstream stream of messages. When we receive a message internally destined // for a Stream we will direct link from the client to this structure. type stream struct { @@ -587,12 +600,14 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt tier: tier, stype: cfg.Storage, consumers: make(map[string]*consumer), - msgs: newIPQueue[*inMsg](s, qpfx+"messages"), - gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), - qch: make(chan struct{}), - mqch: make(chan struct{}), - uch: make(chan struct{}, 4), - sch: make(chan struct{}, 1), + msgs: newIPQueue[*inMsg](s, qpfx+"messages", ipQueue_SizeCalculation(func(msg *inMsg) uint64 { + return uint64(len(msg.hdr) + len(msg.msg) + len(msg.rply) + len(msg.subj)) + })), + gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), + qch: make(chan struct{}), + mqch: make(chan struct{}), + uch: make(chan struct{}, 4), + sch: make(chan struct{}, 1), } // Start our signaling routine to process consumers. @@ -4150,6 +4165,14 @@ func (im *inMsg) returnToPool() { func (mset *stream) queueInbound(ib *ipQueue[*inMsg], subj, rply string, hdr, msg []byte, si *sourceInfo, mt *msgTrace) { im := inMsgPool.Get().(*inMsg) im.subj, im.rply, im.hdr, im.msg, im.si, im.mt = subj, rply, hdr, msg, si, mt + if ib.len() >= streamMaxQueueMsgs || ib.size() >= uint64(streamMaxQueueBytes) { + mset.srv.RateLimitWarnf("Dropping messages due to excessive stream ingest rate on '%s' > '%s'", mset.acc.Name, mset.name()) + if rply != _EMPTY_ { + hdr := []byte("NATS/1.0 429 Too Many Requests\r\n\r\n") + mset.outq.send(newJSPubMsg(rply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + } + return + } ib.push(im) }