Skip to content

Commit

Permalink
Stream ingest rate limiting
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Sep 6, 2024
1 parent b715238 commit eef5abf
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 7 deletions.
76 changes: 76 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24250,3 +24250,79 @@ func TestJetStreamSourceRemovalAndReAdd(t *testing.T) {
require_Equal(t, m.Subject, fmt.Sprintf("foo.%d", i))
}
}

func TestJetStreamRateLimitHighStreamIngest(t *testing.T) {
cfgFmt := []byte(fmt.Sprintf(`
jetstream: {
enabled: true
store_dir: %s
max_buffered_size: 1kb
max_buffered_msgs: 1
}
`, t.TempDir()))

conf := createConfFile(t, cfgFmt)
s, opts := RunServerWithConfig(conf)
defer s.Shutdown()

require_Equal(t, opts.StreamMaxBufferedSize, 1024)
require_Equal(t, opts.StreamMaxBufferedMsgs, 1)

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)

// Create a reply inbox that we can await API requests on.
// This is instead of using nc.Request().
inbox := nc.NewRespInbox()
resp := make(chan *nats.Msg, 1000)
_, err = nc.ChanSubscribe(inbox, resp)
require_NoError(t, err)

// Publish a large number of messages using Core NATS withou
// waiting for the responses from the API.
msg := &nats.Msg{
Subject: "test",
Reply: inbox,
}
for i := 0; i < 1000; i++ {
require_NoError(t, nc.PublishMsg(msg))
}

// Now sort through the API responses. We're looking for one
// that tells us that we were rate-limited. If we don't find
// one then we fail the test.
var rateLimited bool
for i, msg := 0, <-resp; i < 1000; i, msg = i+1, <-resp {
if msg.Header.Get("Status") == "429" {
rateLimited = true
break
}
}
require_True(t, rateLimited)
}

func TestJetStreamRateLimitHighStreamIngestDefaults(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)

stream, err := s.globalAccount().lookupStream("TEST")
require_NoError(t, err)

require_Equal(t, stream.msgs.mlen, streamDefaultMaxQueueMsgs)
require_Equal(t, stream.msgs.msz, streamDefaultMaxQueueBytes)
}
14 changes: 14 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ type Options struct {
JetStreamLimits JSLimitOpts
JetStreamTpm JSTpmOpts
JetStreamMaxCatchup int64
StreamMaxBufferedMsgs int
StreamMaxBufferedSize int64
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
Expand Down Expand Up @@ -2373,6 +2375,18 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
}
opts.JetStreamMaxCatchup = s
case "max_buffered_size":
s, err := getStorageSize(mv)
if err != nil {
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
}
opts.StreamMaxBufferedSize = s
case "max_buffered_msgs":
mlen, ok := mv.(int64)
if !ok {
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
opts.StreamMaxBufferedMsgs = int(mlen)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
42 changes: 35 additions & 7 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ type ExternalStream struct {
DeliverPrefix string `json:"deliver"`
}

// For managing stream ingest.
const (
streamDefaultMaxQueueMsgs = 10_000
streamDefaultMaxQueueBytes = 1024 * 1024 * 128
)

// Stream is a jetstream stream of messages. When we receive a message internally destined
// for a Stream we will direct link from the client to this structure.
type stream struct {
Expand Down Expand Up @@ -576,6 +582,16 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
c := s.createInternalJetStreamClient()
ic := s.createInternalJetStreamClient()

// Work out the stream ingest limits.
mlen := s.opts.StreamMaxBufferedMsgs
msz := uint64(s.opts.StreamMaxBufferedSize)
if mlen == 0 {
mlen = streamDefaultMaxQueueMsgs
}
if msz == 0 {
msz = streamDefaultMaxQueueBytes
}

qpfx := fmt.Sprintf("[ACC:%s] stream '%s' ", a.Name, config.Name)
mset := &stream{
acc: a,
Expand All @@ -588,12 +604,18 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
tier: tier,
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
mqch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
msgs: newIPQueue[*inMsg](s, qpfx+"messages",
ipqSizeCalculation(func(msg *inMsg) uint64 {
return uint64(len(msg.hdr) + len(msg.msg) + len(msg.rply) + len(msg.subj))
}),
ipqLimitByLen[*inMsg](mlen),
ipqLimitBySize[*inMsg](msz),
),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
mqch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
}

// Start our signaling routine to process consumers.
Expand Down Expand Up @@ -4156,7 +4178,13 @@ func (im *inMsg) returnToPool() {
func (mset *stream) queueInbound(ib *ipQueue[*inMsg], subj, rply string, hdr, msg []byte, si *sourceInfo, mt *msgTrace) {
im := inMsgPool.Get().(*inMsg)
im.subj, im.rply, im.hdr, im.msg, im.si, im.mt = subj, rply, hdr, msg, si, mt
ib.push(im)
if _, err := ib.push(im); err != nil {
mset.srv.RateLimitWarnf("Dropping messages due to excessive stream ingest rate on '%s' > '%s': %s", mset.acc.Name, mset.name(), err)
if rply != _EMPTY_ {
hdr := []byte("NATS/1.0 429 Too Many Requests\r\n\r\n")
mset.outq.send(newJSPubMsg(rply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
}
}

var dgPool = sync.Pool{
Expand Down

0 comments on commit eef5abf

Please sign in to comment.