Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(2.11) JetStream API routed queue changes #6342

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,43 @@ func (q *ipQueue[T]) popOne() (T, bool) {
return e, true
}

// Returns the last element from the queue, if any. See comment above
// regarding calling after being notified that there is something and
// the use of drain(). In short, the caller should always check the
// boolean return value to ensure that the value is genuine and not a
// default empty value.
func (q *ipQueue[T]) popOneLast() (T, bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: popLast() may be enough.

Adding a test for this new API would be a good thing, for instance making sure that we can do popOne() and popLast() mixed for instance.

q.Lock()
l := len(q.elts) - q.pos
if l == 0 {
q.Unlock()
var empty T
return empty, false
}
e := q.elts[len(q.elts)-1]
q.elts = q.elts[:len(q.elts)-1]
if l--; l > 0 {
if q.calc != nil {
q.sz -= q.calc(e)
}
// We need to re-signal
select {
case q.ch <- struct{}{}:
default:
}
} else {
// We have just emptied the queue, so we can reuse unless it is too big.
if cap(q.elts) <= q.mrs {
q.elts = q.elts[:0]
} else {
q.elts = nil
}
q.pos, q.sz = 0, 0
}
q.Unlock()
return e, true
}

// After a pop(), the slice can be recycled for the next push() when
// a first element is added to the queue.
// This will also decrement the "in progress" count with the length
Expand Down
128 changes: 126 additions & 2 deletions server/ipqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,99 @@ func TestIPQueuePopOne(t *testing.T) {
q.recycle(&values)
}

func TestIPQueuePopOneLast(t *testing.T) {
s := &Server{}
q := newIPQueue[int](s, "test")
q.push(1)
<-q.ch
e, ok := q.popOneLast()
if !ok {
t.Fatal("Got nil")
}
if i := e; i != 1 {
t.Fatalf("Expected 1, got %v", i)
}
if l := q.len(); l != 0 {
t.Fatalf("Expected len to be 0, got %v", l)
}
// That does not affect the number of notProcessed
if n := q.inProgress(); n != 0 {
t.Fatalf("Expected count to be 0, got %v", n)
}
select {
case <-q.ch:
t.Fatalf("Should not have been notified of addition")
default:
// OK
}
q.push(2)
q.push(3)
e, ok = q.popOneLast()
if !ok {
t.Fatal("Got nil")
}
if i := e; i != 3 {
t.Fatalf("Expected 3, got %v", i)
}
if l := q.len(); l != 1 {
t.Fatalf("Expected len to be 1, got %v", l)
}
select {
case <-q.ch:
// OK
default:
t.Fatalf("Should have been notified that there is more")
}
e, ok = q.popOneLast()
if !ok {
t.Fatal("Got nil")
}
if i := e; i != 2 {
t.Fatalf("Expected 2, got %v", i)
}
if l := q.len(); l != 0 {
t.Fatalf("Expected len to be 0, got %v", l)
}
select {
case <-q.ch:
t.Fatalf("Should not have been notified that there is more")
default:
// OK
}
// Calling it again now that we know there is nothing, we
// should get nil.
if e, ok = q.popOneLast(); ok {
t.Fatalf("Expected nil, got %v", e)
}

q = newIPQueue[int](s, "test2")
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
q.push(1)
q.push(2)
// Capture current capacity
q.Lock()
c := cap(q.elts)
q.Unlock()
e, ok = q.popOneLast()
if !ok || e != 2 {
t.Fatalf("Invalid value: %v", e)
}
if l := q.len(); l != 1 {
t.Fatalf("Expected len to be 1, got %v", l)
}
values := q.pop()
if len(values) != 1 || values[0] != 1 {
t.Fatalf("Unexpected values: %v", values)
}
if cap(values) != c {
t.Fatalf("Unexpected capacity: %v vs %v", cap(values), c)
}
if l := q.len(); l != 0 {
t.Fatalf("Expected len to be 0, got %v", l)
}
// Just make sure that this is ok...
q.recycle(&values)
}

func TestIPQueueMultiProducers(t *testing.T) {
s := &Server{}
q := newIPQueue[int](s, "test")
Expand Down Expand Up @@ -382,7 +475,36 @@ func TestIPQueueDrain(t *testing.T) {
}
}

func TestIPQueueSizeCalculation(t *testing.T) {
func TestIPQueueSizeCalculationPopOne(t *testing.T) {
type testType = [16]byte
var testValue testType

calc := ipqSizeCalculation[testType](func(e testType) uint64 {
return uint64(len(e))
})
s := &Server{}
q := newIPQueue[testType](s, "test", calc)

for i := 0; i < 10; i++ {
testValue[0] = byte(i)
q.push(testValue)
require_Equal(t, q.len(), i+1)
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
}

for i := 10; i > 5; i-- {
v, _ := q.popOne()
require_Equal(t, 10-v[0], byte(i))
require_Equal(t, q.len(), i-1)
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
}

q.pop()
require_Equal(t, q.len(), 0)
require_Equal(t, q.size(), 0)
}

func TestIPQueueSizeCalculationPopOneLast(t *testing.T) {
type testType = [16]byte
var testValue testType

Expand All @@ -393,13 +515,15 @@ func TestIPQueueSizeCalculation(t *testing.T) {
q := newIPQueue[testType](s, "test", calc)

for i := 0; i < 10; i++ {
testValue[0] = byte(i)
q.push(testValue)
require_Equal(t, q.len(), i+1)
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
}

for i := 10; i > 5; i-- {
q.popOne()
v, _ := q.popOneLast()
require_Equal(t, v[0]+1, byte(i))
require_Equal(t, q.len(), i-1)
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
}
Expand Down
33 changes: 26 additions & 7 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,18 +883,37 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// If we are here we have received this request over a non-client connection.
// We need to make sure not to block. We will send the request to a long-lived
// pool of go routines.

// Increment inflight. Do this before queueing.
atomic.AddInt64(&js.apiInflight, 1)

// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
limit := atomic.LoadInt64(&js.queueLimit)
retry:
atomic.AddInt64(&js.apiInflight, 1)
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
if _, ok := s.jsAPIRoutedReqs.popOne(); ok {
// If we were able to take one of the oldest items off the queue, then
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
// retry the insert.
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request")
atomic.AddInt64(&js.apiInflight, -1)
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Type: JSAPILimitReachedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: 1,
})
goto retry
}

// It's likely not possible to get to this point, but if for some reason we have got here,
// then something is wrong for us to be both over the limit but unable to pull entries, so
// throw everything away and hope we recover from it.
drained := int64(s.jsAPIRoutedReqs.drain())
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", drained)
atomic.AddInt64(&js.apiInflight, -drained)

s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
Expand Down Expand Up @@ -926,7 +945,7 @@ func (s *Server) processJSAPIRoutedRequests() {
// Only pop one item at a time here, otherwise if the system is recovering
// from queue buildup, then one worker will pull off all the tasks and the
// others will be starved of work.
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
for r, ok := queue.popOneLast(); ok && r != nil; r, ok = queue.popOneLast() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say that this seems wrong: picture a push("some create operation") followed by push("some delete of that created object"). With popOneLast(), you would possibly get the "delete" first, then the "create". But I realize that a change was made some time ago to make processJSAPIRoutedRequests() be executed from several go routines, which then already completely put the "ordering" out of the window. So I assume that if that was done it means that those actions are independent of each other and order does not matter. You would want to verify that this is the case. Again, if there is possibly ordering issue, then popOneLast() would be wrong, but running processJSAPIRoutedRequests() from more than 1 go routine would be wrong too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the ordering is already indeterminate. As far as I can tell it always has been, as we have never been able to predict which worker goroutine will process a task. If they were pulled off by different goroutines at roughly the same time, we don't know which one would "win" races to the locks etc.

A client making related requests should be waiting for responses anyway before proceeding onto the next request.

client.pa = r.pa
start := time.Now()
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
Expand Down
Loading