Skip to content

Commit

Permalink
Stream ingest rate limiting
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Aug 27, 2024
1 parent 7bb38c3 commit 97e36ea
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 6 deletions.
51 changes: 51 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24068,3 +24068,54 @@ func addConsumerWithError(t *testing.T, nc *nats.Conn, cfg *CreateConsumerReques
}
return resp.ConsumerInfo, resp.Error
}

func TestJetStreamRateLimitHighStreamIngest(t *testing.T) {
streamMaxQueueMsgs = 1
streamMaxQueueBytes = 1
defer func() {
streamMaxQueueMsgs = streamDefaultMaxQueueMsgs
streamMaxQueueBytes = streamDefaultMaxQueueBytes
}()

s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)

// Create a reply inbox that we can await API requests on.
// This is instead of using nc.Request().
inbox := nc.NewRespInbox()
resp := make(chan *nats.Msg, 1000)
_, err = nc.ChanSubscribe(inbox, resp)
require_NoError(t, err)

// Publish a large number of messages using Core NATS withou
// waiting for the responses from the API.
msg := &nats.Msg{
Subject: "test",
Reply: inbox,
}
for i := 0; i < 1000; i++ {
require_NoError(t, nc.PublishMsg(msg))
}

// Now sort through the API responses. We're looking for one
// that tells us that we were rate-limited. If we don't find
// one then we fail the test.
var rateLimited bool
for i, msg := 0, <-resp; i < 1000; i, msg = i+1, <-resp {
if msg.Header.Get("Status") == "429" {
rateLimited = true
break
}
}
require_True(t, rateLimited)
}
35 changes: 29 additions & 6 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,19 @@ type ExternalStream struct {
DeliverPrefix string `json:"deliver"`
}

// For managing stream ingest.
// TODO: What values make sense here?
const (
streamDefaultMaxQueueMsgs = 100_000
streamDefaultMaxQueueBytes = 1024 * 1024 * 64
)

// In case unit tests need to overwrite these.
var (
streamMaxQueueMsgs = streamDefaultMaxQueueMsgs
streamMaxQueueBytes = streamDefaultMaxQueueBytes
)

// Stream is a jetstream stream of messages. When we receive a message internally destined
// for a Stream we will direct link from the client to this structure.
type stream struct {
Expand Down Expand Up @@ -587,12 +600,14 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
tier: tier,
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
mqch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
msgs: newIPQueue[*inMsg](s, qpfx+"messages", ipQueue_SizeCalculation(func(msg *inMsg) uint64 {
return uint64(len(msg.hdr) + len(msg.msg) + len(msg.rply) + len(msg.subj))
})),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
mqch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
}

// Start our signaling routine to process consumers.
Expand Down Expand Up @@ -4150,6 +4165,14 @@ func (im *inMsg) returnToPool() {
func (mset *stream) queueInbound(ib *ipQueue[*inMsg], subj, rply string, hdr, msg []byte, si *sourceInfo, mt *msgTrace) {
im := inMsgPool.Get().(*inMsg)
im.subj, im.rply, im.hdr, im.msg, im.si, im.mt = subj, rply, hdr, msg, si, mt
if ib.len() >= streamMaxQueueMsgs || ib.size() >= uint64(streamMaxQueueBytes) {
mset.srv.RateLimitWarnf("Dropping messages due to excessive stream ingest rate on '%s' > '%s'", mset.acc.Name, mset.name())
if rply != _EMPTY_ {
hdr := []byte("NATS/1.0 429 Too Many Requests\r\n\r\n")
mset.outq.send(newJSPubMsg(rply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
return
}
ib.push(im)
}

Expand Down

0 comments on commit 97e36ea

Please sign in to comment.