Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more metrics around connection buffers #219

Merged
merged 4 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions internal/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package internal

import (
"context"
"fmt"

"github.com/getsentry/sentry-go"

"github.com/rs/zerolog"
Expand All @@ -17,6 +19,7 @@ var (
type data struct {
userID string
deviceID string
bufferSummary string
since int64
next int64
numRooms int
Expand Down Expand Up @@ -53,6 +56,15 @@ func SetRequestContextUserID(ctx context.Context, userID, deviceID string) {
}
}

func SetConnBufferInfo(ctx context.Context, bufferLen, nextLen, bufferCap int) {
d := ctx.Value(ctxData)
if d == nil {
return
}
da := d.(*data)
da.bufferSummary = fmt.Sprintf("%d/%d/%d", bufferLen, nextLen, bufferCap)
}

func SetRequestContextResponseInfo(
ctx context.Context, since, next int64, numRooms int, txnID string, numToDeviceEvents, numGlobalAccountData int,
numChangedDevices, numLeftDevices int,
Expand Down Expand Up @@ -108,5 +120,8 @@ func DecorateLogger(ctx context.Context, l *zerolog.Event) *zerolog.Event {
if da.numLeftDevices > 0 {
l = l.Int("dl-l", da.numLeftDevices)
}
if da.bufferSummary != "" {
l = l.Str("b", da.bufferSummary)
}
return l
}
68 changes: 63 additions & 5 deletions sync3/connmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/ReneKroon/ttlcache/v2"
"github.com/prometheus/client_golang/prometheus"
)

// ConnMap stores a collection of Conns.
Expand All @@ -15,10 +16,15 @@ type ConnMap struct {
userIDToConn map[string][]*Conn
connIDToConn map[string]*Conn

numConns prometheus.Gauge
// counters for reasons why connections have expired
expiryTimedOutCounter prometheus.Counter
expiryBufferFullCounter prometheus.Counter

mu *sync.Mutex
}

func NewConnMap() *ConnMap {
func NewConnMap(enablePrometheus bool) *ConnMap {
cm := &ConnMap{
userIDToConn: make(map[string][]*Conn),
connIDToConn: make(map[string]*Conn),
Expand All @@ -27,17 +33,61 @@ func NewConnMap() *ConnMap {
}
cm.cache.SetTTL(30 * time.Minute) // TODO: customisable
cm.cache.SetExpirationCallback(cm.closeConnExpires)

if enablePrometheus {
cm.expiryTimedOutCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "expiry_conn_timed_out",
Help: "Counter of expired API connections due to reaching TTL limit",
})
prometheus.MustRegister(cm.expiryTimedOutCounter)
cm.expiryBufferFullCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "expiry_conn_buffer_full",
Help: "Counter of expired API connections due to reaching buffer update limit",
})
prometheus.MustRegister(cm.expiryBufferFullCounter)
cm.numConns = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "num_active_conns",
Help: "Number of active sliding sync connections.",
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
})
prometheus.MustRegister(cm.numConns)
}
return cm
}

func (m *ConnMap) Teardown() {
m.cache.Close()

if m.numConns != nil {
prometheus.Unregister(m.numConns)
}
if m.expiryBufferFullCounter != nil {
prometheus.Unregister(m.expiryBufferFullCounter)
}
if m.expiryTimedOutCounter != nil {
prometheus.Unregister(m.expiryTimedOutCounter)
}
}

func (m *ConnMap) Len() int {
// UpdateMetrics recalculates the number of active connections. Do this when you think there is a change.
func (m *ConnMap) UpdateMetrics() {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.connIDToConn)
m.updateMetrics(len(m.connIDToConn))
}

// updateMetrics is like UpdateMetrics but doesn't touch connIDToConn and hence doesn't need to lock. We use this internally
// when we need to update the metric and already have the lock held, as calling UpdateMetrics would deadlock.
func (m *ConnMap) updateMetrics(numConns int) {
if m.numConns == nil {
return
}
m.numConns.Set(float64(numConns))
}

// Conns return all connections for this user|device
Expand All @@ -64,8 +114,11 @@ func (m *ConnMap) Conn(cid ConnID) *Conn {
return conn
}
// e.g buffer exceeded, close it and remove it from the cache
logger.Trace().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)")
logger.Info().Str("conn", cid.String()).Msg("closing connection due to dead connection (buffer full)")
m.closeConn(conn)
if m.expiryBufferFullCounter != nil {
m.expiryBufferFullCounter.Inc()
}
return nil
}

Expand All @@ -92,6 +145,7 @@ func (m *ConnMap) CreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Co
m.cache.Set(cid.String(), conn)
m.connIDToConn[cid.String()] = conn
m.userIDToConn[cid.UserID] = append(m.userIDToConn[cid.UserID], conn)
m.updateMetrics(len(m.connIDToConn))
return conn, true
}

Expand Down Expand Up @@ -121,7 +175,10 @@ func (m *ConnMap) closeConnExpires(connID string, value interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
conn := value.(*Conn)
logger.Trace().Str("conn", connID).Msg("closing connection due to expired TTL in cache")
logger.Info().Str("conn", connID).Msg("closing connection due to expired TTL in cache")
if m.expiryTimedOutCounter != nil {
m.expiryTimedOutCounter.Inc()
}
m.closeConn(conn)
}

Expand All @@ -147,4 +204,5 @@ func (m *ConnMap) closeConn(conn *Conn) {
m.userIDToConn[conn.UserID] = conns
// remove user cache listeners etc
h.Destroy()
m.updateMetrics(len(m.connIDToConn))
}
4 changes: 4 additions & 0 deletions sync3/handler/connstate_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (s *connStateLive) liveUpdate(
if req.TimeoutMSecs() < 100 {
req.SetTimeoutMSecs(100)
}
startBufferSize := len(s.updates)
// block until we get a new event, with appropriate timeout
startTime := time.Now()
hasLiveStreamed := false
Expand Down Expand Up @@ -104,6 +105,9 @@ func (s *connStateLive) liveUpdate(
}

log.Trace().Bool("live_streamed", hasLiveStreamed).Msg("liveUpdate: returning")

internal.SetConnBufferInfo(ctx, startBufferSize, len(s.updates), cap(s.updates))

// TODO: op consolidation
}

Expand Down
24 changes: 3 additions & 21 deletions sync3/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type SyncLiveHandler struct {
GlobalCache *caches.GlobalCache
maxPendingEventUpdates int

numConns prometheus.Gauge
setupHistVec *prometheus.HistogramVec
histVec *prometheus.HistogramVec
slowReqs prometheus.Counter
Expand All @@ -74,7 +73,7 @@ func NewSync3Handler(
V2: v2Client,
Storage: store,
V2Store: storev2,
ConnMap: sync3.NewConnMap(),
ConnMap: sync3.NewConnMap(enablePrometheus),
userCaches: &sync.Map{},
Dispatcher: sync3.NewDispatcher(),
GlobalCache: caches.NewGlobalCache(store),
Expand Down Expand Up @@ -128,9 +127,6 @@ func (h *SyncLiveHandler) Teardown() {
h.V2Sub.Teardown()
h.EnsurePoller.Teardown()
h.ConnMap.Teardown()
if h.numConns != nil {
prometheus.Unregister(h.numConns)
}
if h.setupHistVec != nil {
prometheus.Unregister(h.setupHistVec)
}
Expand All @@ -142,20 +138,7 @@ func (h *SyncLiveHandler) Teardown() {
}
}

func (h *SyncLiveHandler) updateMetrics() {
if h.numConns == nil {
return
}
h.numConns.Set(float64(h.ConnMap.Len()))
}

func (h *SyncLiveHandler) addPrometheusMetrics() {
h.numConns = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "num_active_conns",
Help: "Number of active sliding sync connections.",
})
h.setupHistVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Expand All @@ -176,7 +159,6 @@ func (h *SyncLiveHandler) addPrometheusMetrics() {
Name: "slow_requests",
Help: "Counter of slow (>=50s) requests, initial or otherwise.",
})
prometheus.MustRegister(h.numConns)
prometheus.MustRegister(h.setupHistVec)
prometheus.MustRegister(h.histVec)
prometheus.MustRegister(h.slowReqs)
Expand Down Expand Up @@ -398,7 +380,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
}

pid := sync2.PollerID{UserID: token.UserID, DeviceID: token.DeviceID}
log.Trace().Any("pid", pid).Msg("checking poller exists and is running")
log.Trace().Any("pid", pid).Msg("checking poller exists and is running")
h.EnsurePoller.EnsurePolling(req.Context(), pid, token.AccessTokenHash)
log.Trace().Msg("poller exists and is running")
// this may take a while so if the client has given up (e.g timed out) by this point, just stop.
Expand All @@ -421,7 +403,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
}

// once we have the conn, make sure our metrics are correct
defer h.updateMetrics()
defer h.ConnMap.UpdateMetrics()

// Now the v2 side of things are running, we can make a v3 live sync conn
// NB: this isn't inherently racey (we did the check for an existing conn before EnsurePolling)
Expand Down
Loading