Skip to content

Commit

Permalink
Decide whether we should queue the update
Browse files Browse the repository at this point in the history
  • Loading branch information
David Robertson committed Jul 25, 2023
1 parent ad3a7ca commit 04be1f0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
2 changes: 1 addition & 1 deletion sync3/handler/connstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewConnState(
ConnState: cs,
updates: make(chan caches.Update, maxPendingEventUpdates),
}
cs.txnIDWaiter = NewTxnIDWaiter(userID, cs.live.onUpdate)
cs.txnIDWaiter = NewTxnIDWaiter(userID, cs.live.onUpdate, cs.subscribedOrVisible)
// subscribe for updates before loading. We risk seeing dupes but that's fine as load positions
// will stop us double-processing.
cs.userCacheID = cs.userCache.Subsribe(cs)
Expand Down
35 changes: 26 additions & 9 deletions sync3/handler/txn_id_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,38 @@ import (
)

type TxnIDWaiter struct {
userID string
publish func(update caches.Update)
queued map[string][]caches.Update
userID string
publish func(update caches.Update)
subscribedOrVisible func(roomID string) bool
queued map[string][]caches.Update
}

func NewTxnIDWaiter(userID string, publish func(caches.Update)) *TxnIDWaiter {
func NewTxnIDWaiter(userID string, publish func(caches.Update), subscribedOrVisible func(string) bool) *TxnIDWaiter {
return &TxnIDWaiter{
userID: userID,
publish: publish,
queued: make(map[string][]caches.Update),
userID: userID,
publish: publish,
subscribedOrVisible: subscribedOrVisible,
queued: make(map[string][]caches.Update),
}
}

func (t *TxnIDWaiter) Ingest(up caches.Update) {
// TODO: investigate whether this update needs to be queued.
if !t.shouldQueue(up) {
t.publish(up)
}

// TODO: bound the queue size?
t.publish(up)
// TODO: enqueue and timeout
}

func (t *TxnIDWaiter) shouldQueue(up caches.Update) bool {
e, isEventUpdate := up.(*caches.RoomEventUpdate)
if isEventUpdate {
// TODO: ensure we don't keep length-0 or nil slices in the queued map so this works correctly.
_, roomQueued := t.queued[e.EventData.RoomID]
if (e.EventData.Sender == t.userID || roomQueued) && t.subscribedOrVisible(e.EventData.RoomID) {
return true
}
}
return false
}

0 comments on commit 04be1f0

Please sign in to comment.