From 04be1f0c68bf8d354c9bf56c3037d33504a2df02 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 25 Jul 2023 17:39:18 +0100 Subject: [PATCH] Decide whether we should queue the update --- sync3/handler/connstate.go | 2 +- sync3/handler/txn_id_waiter.go | 35 +++++++++++++++++++++++++--------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/sync3/handler/connstate.go b/sync3/handler/connstate.go index dcad6ba7..f9a3a212 100644 --- a/sync3/handler/connstate.go +++ b/sync3/handler/connstate.go @@ -81,7 +81,7 @@ func NewConnState( ConnState: cs, updates: make(chan caches.Update, maxPendingEventUpdates), } - cs.txnIDWaiter = NewTxnIDWaiter(userID, cs.live.onUpdate) + cs.txnIDWaiter = NewTxnIDWaiter(userID, cs.live.onUpdate, cs.subscribedOrVisible) // subscribe for updates before loading. We risk seeing dupes but that's fine as load positions // will stop us double-processing. cs.userCacheID = cs.userCache.Subsribe(cs) diff --git a/sync3/handler/txn_id_waiter.go b/sync3/handler/txn_id_waiter.go index c6ac84a5..e101572a 100644 --- a/sync3/handler/txn_id_waiter.go +++ b/sync3/handler/txn_id_waiter.go @@ -5,21 +5,38 @@ import ( ) type TxnIDWaiter struct { - userID string - publish func(update caches.Update) - queued map[string][]caches.Update + userID string + publish func(update caches.Update) + subscribedOrVisible func(roomID string) bool + queued map[string][]caches.Update } -func NewTxnIDWaiter(userID string, publish func(caches.Update)) *TxnIDWaiter { +func NewTxnIDWaiter(userID string, publish func(caches.Update), subscribedOrVisible func(string) bool) *TxnIDWaiter { return &TxnIDWaiter{ - userID: userID, - publish: publish, - queued: make(map[string][]caches.Update), + userID: userID, + publish: publish, + subscribedOrVisible: subscribedOrVisible, + queued: make(map[string][]caches.Update), } } func (t *TxnIDWaiter) Ingest(up caches.Update) { - // TODO: investigate whether this update needs to be queued. + if !t.shouldQueue(up) { + t.publish(up) + } + // TODO: bound the queue size? - t.publish(up) + // TODO: enqueue and timeout +} + +func (t *TxnIDWaiter) shouldQueue(up caches.Update) bool { + e, isEventUpdate := up.(*caches.RoomEventUpdate) + if isEventUpdate { + // TODO: ensure we don't keep length-0 or nil slices in the queued map so this works correctly. + _, roomQueued := t.queued[e.EventData.RoomID] + if (e.EventData.Sender == t.userID || roomQueued) && t.subscribedOrVisible(e.EventData.RoomID) { + return true + } + } + return false }