Skip to content

Commit

Permalink
Change Consumer Priority Group State info and fix pinned TTL (#6081)
Browse files Browse the repository at this point in the history
This PR introduces few changes & improvements to Pinned Consumers

**Fix pinned TTL timer when its assigned from existing waiting request**
Until now, we were resetting Pinned TTL only if pin was set on incoming
Pull Request, but not when we were pinning an existing waiting request.
It also simplifies some logic.

**Switch to slice of PriorityGroupState**
Previously, we were storing just the pinned IDs, but to be more
future-proof, this commit will switch approach to store a struct.
It also adds `PinnedTS` which can be useful for debugging pinned
consumers.

cc @ripienaar @jnmoyne 

Signed-off-by: Tomasz Pietrek <[email protected]>

---------

Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema authored Nov 6, 2024
1 parent 4122ab9 commit e2ece84
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 20 deletions.
59 changes: 39 additions & 20 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ type ConsumerInfo struct {
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
PinnedIds map[string]string `json:"pinned_ids,omitempty"`
TimeStamp time.Time `json:"ts"`
PriorityGroups []PriorityGroupState `json:"priority_groups,omitempty"`
}

type PriorityGroupState struct {
Group string `json:"group"`
PinnedClientID string `json:"pinned_client_id,omitempty"`
PinnedTS time.Time `json:"pinned_ts,omitempty"`
}

type ConsumerConfig struct {
Expand Down Expand Up @@ -480,6 +486,7 @@ type consumer struct {
currentPinId string
/// pinnedTtl is the remaining time before the current PinId expires.
pinnedTtl *time.Timer
pinnedTS time.Time
}

// A single subject filter.
Expand Down Expand Up @@ -2900,10 +2907,14 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
rg = o.ca.Group
}

priorityGroups := []PriorityGroupState{}
// TODO(jrm): when we introduce supporting many priority groups, we need to update assigning `o.currentNuid` for each group.
pinnedIds := make(map[string]string)
if len(o.cfg.PriorityGroups) > 0 {
pinnedIds[o.cfg.PriorityGroups[0]] = o.currentPinId
priorityGroups = append(priorityGroups, PriorityGroupState{
Group: o.cfg.PriorityGroups[0],
PinnedClientID: o.currentPinId,
PinnedTS: o.pinnedTS,
})
}

cfg := o.cfg
Expand All @@ -2925,7 +2936,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
PinnedIds: pinnedIds,
PriorityGroups: priorityGroups,
}
if o.cfg.PauseUntil != nil {
p := *o.cfg.PauseUntil
Expand Down Expand Up @@ -3515,6 +3526,21 @@ func (o *consumer) pendingRequests() map[string]*waitingRequest {
return m
}

func (o *consumer) setPinnedTimer(priorityGroup string) {
if o.pinnedTtl != nil {
o.pinnedTtl.Reset(o.cfg.PinnedTTL)
} else {
o.pinnedTtl = time.AfterFunc(o.cfg.PinnedTTL, func() {
o.mu.Lock()
o.pinnedTS = time.Now().Add(o.cfg.PinnedTTL)
o.currentPinId = _EMPTY_
o.sendUnpinnedAdvisoryLocked(priorityGroup, "timeout")
o.mu.Unlock()
o.signalNewMessages()
})
}
}

// Return next waiting request. This will check for expirations but not noWait or interest.
// That will be handled by processWaiting.
// Lock should be held.
Expand Down Expand Up @@ -3565,6 +3591,8 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
if wr.priorityGroup.Id == _EMPTY_ {
o.currentPinId = nuid.Next()
wr.priorityGroup.Id = o.currentPinId
o.setPinnedTimer(priorityGroup)

} else {
// There is pin id set, but not a matching one. Send a notification to the client and remove the request.
// Probably this is the old pin id.
Expand Down Expand Up @@ -3775,21 +3803,12 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
return
}

if priorityGroup.Id != _EMPTY_ && priorityGroup.Id != o.currentPinId && o.currentPinId != _EMPTY_ {
sendErr(423, "Nats-Pin-Id mismatch")
return
} else {
if o.pinnedTtl != nil && priorityGroup.Id == o.currentPinId && o.currentPinId != _EMPTY_ {

o.pinnedTtl.Reset(o.cfg.PinnedTTL)
} else if o.pinnedTtl == nil {
o.pinnedTtl = time.AfterFunc(o.cfg.PinnedTTL, func() {
o.mu.Lock()
o.currentPinId = _EMPTY_
o.sendUnpinnedAdvisoryLocked(priorityGroup.Group, "timeout")
o.mu.Unlock()
o.signalNewMessages()
})
if o.currentPinId != _EMPTY_ {
if priorityGroup.Id == o.currentPinId {
o.setPinnedTimer(priorityGroup.Group)
} else if priorityGroup.Id != _EMPTY_ {
sendErr(423, "Nats-Pin-Id mismatch")
return
}
}
}
Expand Down
79 changes: 79 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,85 @@ func TestJetStreamConsumerUnpinPickDifferentRequest(t *testing.T) {
require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), "")
}

func TestJetStreamPinnedTTL(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

acc := s.GlobalAccount()

mset, err := acc.addStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: LimitsPolicy,
})
require_NoError(t, err)

_, err = mset.addConsumer(&ConsumerConfig{
Durable: "C",
FilterSubject: "foo",
PriorityGroups: []string{"A"},
PriorityPolicy: PriorityPinnedClient,
AckPolicy: AckExplicit,
PinnedTTL: 3 * time.Second,
})
require_NoError(t, err)

for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, "foo", "data")
}

req := JSApiConsumerGetNextRequest{Batch: 1, Expires: 10 * time.Second, PriorityGroup: PriorityGroup{
Group: "A",
}}

reqBytes, err := json.Marshal(req)
require_NoError(t, err)

firstInbox := "FIRST"
firstReplies, err := nc.SubscribeSync(firstInbox)
require_NoError(t, err)
nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", firstInbox, reqBytes)

msg, err := firstReplies.NextMsg(1 * time.Second)
require_NoError(t, err)
pinId := msg.Header.Get("Nats-Pin-Id")
require_NotEqual(t, pinId, "")

secondInbox := "SECOND"
secondReplies, err := nc.SubscribeSync(secondInbox)
require_NoError(t, err)
nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", secondInbox, reqBytes)

// Expect error, as first request should be still pinned.
_, err = secondReplies.NextMsg(1 * time.Second)
require_Error(t, err)

// During the 5 second window, the first Pin should time out and this request
// should become the pinned one and get the message.
msg, err = secondReplies.NextMsg(5 * time.Second)
require_NoError(t, err)
newPinId := msg.Header.Get("Nats-Pin-Id")
require_NotEqual(t, newPinId, pinId)
require_NotEqual(t, newPinId, "")

thirdInbox := "THIRD"
thirdReplies, err := nc.SubscribeSync(thirdInbox)
require_NoError(t, err)
nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", thirdInbox, reqBytes)

// The same process as above, but tests different codepath - one where Pin
// is set on existing waiting request.
msg, err = thirdReplies.NextMsg(5 * time.Second)
require_NoError(t, err)
require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), pinId)
require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), newPinId)
require_NotEqual(t, newPinId, "")

}

func TestJetStreamConsumerUnpin(t *testing.T) {
single := RunBasicJetStreamServer(t)
defer single.Shutdown()
Expand Down

0 comments on commit e2ece84

Please sign in to comment.