Skip to content

Commit

Permalink
[IMPROVED] Log rate limited warnings (#5793)
Browse files Browse the repository at this point in the history
RateLimitWarnf does actually rate limit when the error message changes
every time (i.e. if it includes something like a count or a number that
changes with every log line)

Added rateLimitFormatWarnf that does the rate limiting for all log
messages according to the format (rather than the statement).

This also updates the default warning threshold for pending JS requests
from 32 to 128.
  • Loading branch information
derekcollison authored Aug 18, 2024
2 parents 60589da + a4761fa commit 89b042d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 3 deletions.
8 changes: 8 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6089,6 +6089,14 @@ func (c *client) Warnf(format string, v ...any) {
c.srv.Warnf(format, v...)
}

func (c *client) rateLimitFormatWarnf(format string, v ...any) {
if _, loaded := c.srv.rateLimitLogging.LoadOrStore(format, time.Now()); loaded {
return
}
statement := fmt.Sprintf(format, v...)
c.Warnf("%s", statement)
}

func (c *client) RateLimitWarnf(format string, v ...any) {
// Do the check before adding the client info to the format...
statement := fmt.Sprintf(format, v...)
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,10 +862,10 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
const warnThresh = 32
const warnThresh = 128
pending := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending > warnThresh {
s.RateLimitWarnf("JetStream request queue has high pending count: %d", pending)
if pending >= warnThresh {
s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending)
}
}

Expand Down
8 changes: 8 additions & 0 deletions server/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ func (s *Server) Warnf(format string, v ...any) {
}, format, v...)
}

func (s *Server) rateLimitFormatWarnf(format string, v ...any) {
if _, loaded := s.rateLimitLogging.LoadOrStore(format, time.Now()); loaded {
return
}
statement := fmt.Sprintf(format, v...)
s.Warnf("%s", statement)
}

func (s *Server) RateLimitWarnf(format string, v ...any) {
statement := fmt.Sprintf(format, v...)
if _, loaded := s.rateLimitLogging.LoadOrStore(statement, time.Now()); loaded {
Expand Down
20 changes: 20 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2011,23 +2011,29 @@ func TestServerRateLimitLogging(t *testing.T) {

s.RateLimitWarnf("Warning number 1")
s.RateLimitWarnf("Warning number 2")
s.rateLimitFormatWarnf("warning value %d", 1)
s.RateLimitWarnf("Warning number 1")
s.RateLimitWarnf("Warning number 2")
s.rateLimitFormatWarnf("warning value %d", 2)

checkLog := func(c1, c2 *client) {
t.Helper()

nb1 := "Warning number 1"
nb2 := "Warning number 2"
nbv := "warning value"
gotOne := 0
gotTwo := 0
gotFormat := 0
for done := false; !done; {
select {
case w := <-l.warn:
if strings.Contains(w, nb1) {
gotOne++
} else if strings.Contains(w, nb2) {
gotTwo++
} else if strings.Contains(w, nbv) {
gotFormat++
}
case <-time.After(150 * time.Millisecond):
done = true
Expand All @@ -2039,27 +2045,39 @@ func TestServerRateLimitLogging(t *testing.T) {
if gotTwo != 1 {
t.Fatalf("Should have had only 1 warning for nb2, got %v", gotTwo)
}
if gotFormat != 1 {
t.Fatalf("Should have had only 1 warning for format, got %v", gotFormat)
}

// Wait for more than the expiration interval
time.Sleep(200 * time.Millisecond)
if c1 == nil {
s.RateLimitWarnf(nb1)
s.rateLimitFormatWarnf("warning value %d", 1)
} else {
c1.RateLimitWarnf(nb1)
c2.RateLimitWarnf(nb1)
c1.rateLimitFormatWarnf("warning value %d", 1)
}
gotOne = 0
gotFormat = 0
for {
select {
case w := <-l.warn:
if strings.Contains(w, nb1) {
gotOne++
} else if strings.Contains(w, nbv) {
gotFormat++
}
case <-time.After(200 * time.Millisecond):
if gotOne == 0 {
t.Fatalf("Warning was still suppressed")
} else if gotOne > 1 {
t.Fatalf("Should have had only 1 warning for nb1, got %v", gotOne)
} else if gotFormat == 0 {
t.Fatalf("Warning was still suppressed")
} else if gotFormat > 1 {
t.Fatalf("Should have had only 1 warning for format, got %v", gotFormat)
} else {
// OK! we are done
return
Expand Down Expand Up @@ -2101,8 +2119,10 @@ func TestServerRateLimitLogging(t *testing.T) {

c1.RateLimitWarnf("Warning number 1")
c1.RateLimitWarnf("Warning number 2")
c1.rateLimitFormatWarnf("warning value %d", 1)
c2.RateLimitWarnf("Warning number 1")
c2.RateLimitWarnf("Warning number 2")
c2.rateLimitFormatWarnf("warning value %d", 2)

checkLog(c1, c2)
}
Expand Down

0 comments on commit 89b042d

Please sign in to comment.