From 4eba516ac78512f619ac0a3263960bb2ccb4ddb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Mon, 9 Sep 2024 10:21:53 +0200 Subject: [PATCH] Clean up inactive sessions Not cleaning up the sessions causes quite a big memory leak which in effect causes OOMKilled --- pm/sender.go | 7 +++++++ pm/stub.go | 5 +++++ server/broadcast.go | 13 ++++++++++--- server/broadcast_test.go | 15 +++++---------- server/sessionpool_test.go | 3 ++- 5 files changed, 29 insertions(+), 14 deletions(-) diff --git a/pm/sender.go b/pm/sender.go index 6f6acf0fd4..4ad443de4c 100644 --- a/pm/sender.go +++ b/pm/sender.go @@ -22,6 +22,9 @@ type Sender interface { // for creating new tickets StartSession(ticketParams TicketParams) string + // StopSession deletes session from the internal map + StopSession(sessionID string) + // CreateTicketBatch returns a ticket batch of the specified size CreateTicketBatch(sessionID string, size int) (*TicketBatch, error) @@ -82,6 +85,10 @@ func (s *sender) EV(sessionID string) (*big.Rat, error) { return ticketEV(session.ticketParams.FaceValue, session.ticketParams.WinProb), nil } +func (s *sender) StopSession(sessionID string) { + s.sessions.Delete(sessionID) +} + func (s *sender) validateSender(info *SenderInfo) error { maxWithdrawRound := new(big.Int).Add(s.timeManager.LastInitializedRound(), big.NewInt(1)) if info.WithdrawRound.Int64() != 0 && info.WithdrawRound.Cmp(maxWithdrawRound) != 1 { diff --git a/pm/stub.go b/pm/stub.go index a3e0f7f224..01f2010e30 100644 --- a/pm/stub.go +++ b/pm/stub.go @@ -511,6 +511,11 @@ func (m *MockSender) StartSession(ticketParams TicketParams) string { return args.String(0) } +// StopSession deletes session from the internal ma +func (m *MockSender) StopSession(sessionID string) { + m.Called(sessionID) +} + // EV returns the ticket EV for a session func (m *MockSender) EV(sessionID string) (*big.Rat, error) { args := m.Called(sessionID) diff --git a/server/broadcast.go b/server/broadcast.go index bcd0a54658..0056eb8951 100755 --- a/server/broadcast.go +++ b/server/broadcast.go @@ -85,6 +85,7 @@ func (cfg *BroadcastConfig) SetMaxPrice(price *core.AutoConvertedPrice) { } type sessionsCreator func() ([]*BroadcastSession, error) +type sessionsDeleter func(sessionId string) type SessionPool struct { mid core.ManifestID @@ -101,10 +102,11 @@ type SessionPool struct { finished bool // set at stream end createSessions sessionsCreator + deleteSessions sessionsDeleter sus *suspender } -func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, createSession sessionsCreator, +func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, createSession sessionsCreator, deleteSession sessionsDeleter, sel BroadcastSessionsSelector) *SessionPool { return &SessionPool{ @@ -114,6 +116,7 @@ func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, sessMap: make(map[string]*BroadcastSession), sel: sel, createSessions: createSession, + deleteSessions: deleteSession, sus: sus, } } @@ -378,6 +381,7 @@ func (sp *SessionPool) removeSession(session *BroadcastSession) { sp.lock.Lock() defer sp.lock.Unlock() + sp.deleteSessions(session.PMSessionID) delete(sp.sessMap, session.Transcoder()) } @@ -469,6 +473,9 @@ func NewSessionManager(ctx context.Context, node *core.LivepeerNode, params *cor createSessionsUntrusted := func() ([]*BroadcastSession, error) { return selectOrchestrator(ctx, node, params, untrustedNumOrchs, susUntrusted, common.ScoreEqualTo(common.Score_Untrusted)) } + deleteSessions := func(sessionID string) { + node.Sender.StopSession(sessionID) + } var stakeRdr stakeReader if node.Eth != nil { stakeRdr = &storeStakeReader{store: node.Database} @@ -476,8 +483,8 @@ func NewSessionManager(ctx context.Context, node *core.LivepeerNode, params *cor bsm := &BroadcastSessionsManager{ mid: params.ManifestID, VerificationFreq: params.VerificationFreq, - trustedPool: NewSessionPool(params.ManifestID, int(trustedPoolSize), trustedNumOrchs, susTrusted, createSessionsTrusted, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)), - untrustedPool: NewSessionPool(params.ManifestID, int(untrustedPoolSize), untrustedNumOrchs, susUntrusted, createSessionsUntrusted, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)), + trustedPool: NewSessionPool(params.ManifestID, int(trustedPoolSize), trustedNumOrchs, susTrusted, createSessionsTrusted, deleteSessions, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)), + untrustedPool: NewSessionPool(params.ManifestID, int(untrustedPoolSize), untrustedNumOrchs, susUntrusted, createSessionsUntrusted, deleteSessions, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)), } bsm.trustedPool.refreshSessions(ctx) bsm.untrustedPool.refreshSessions(ctx) diff --git a/server/broadcast_test.go b/server/broadcast_test.go index 669ff06313..f742bc8511 100644 --- a/server/broadcast_test.go +++ b/server/broadcast_test.go @@ -64,13 +64,6 @@ func StubBroadcastSession(transcoder string) *BroadcastSession { } } -func StubBroadcastSessionsManager() *BroadcastSessionsManager { - sess1 := StubBroadcastSession("transcoder1") - sess2 := StubBroadcastSession("transcoder2") - - return bsmWithSessList([]*BroadcastSession{sess1, sess2}) -} - func selFactoryEmpty() BroadcastSessionsSelector { return &LIFOSelector{} } @@ -101,6 +94,8 @@ func bsmWithSessListExt(sessList, untrustedSessList []*BroadcastSession, noRefre return cloneSessions(sessList), nil } + var deleteSessions = func(sessionID string) {} + untrustedSessMap := make(map[string]*BroadcastSession) for _, sess := range untrustedSessList { untrustedSessMap[sess.OrchestratorInfo.Transcoder] = sess @@ -118,11 +113,10 @@ func bsmWithSessListExt(sessList, untrustedSessList []*BroadcastSession, noRefre if noRefresh { createSessions = createSessionsEmpty createSessionsUntrusted = createSessionsEmpty - } - trustedPool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, sel) + trustedPool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, deleteSessions, sel) trustedPool.sessMap = sessMap - untrustedPool := NewSessionPool("test", len(untrustedSessList), 1, newSuspender(), createSessionsUntrusted, unsel) + untrustedPool := NewSessionPool("test", len(untrustedSessList), 1, newSuspender(), createSessionsUntrusted, deleteSessions, unsel) untrustedPool.sessMap = untrustedSessMap return &BroadcastSessionsManager{ @@ -393,6 +387,7 @@ func TestSelectSession_MultipleInFlight2(t *testing.T) { sess := StubBroadcastSession(ts.URL) sender := &pm.MockSender{} sender.On("StartSession", mock.Anything).Return("foo").Times(3) + sender.On("StopSession", mock.Anything).Times(3) sender.On("EV", mock.Anything).Return(big.NewRat(1000000, 1), nil) sender.On("CreateTicketBatch", mock.Anything, mock.Anything).Return(defaultTicketBatch(), nil) sender.On("ValidateTicketParams", mock.Anything).Return(nil) diff --git a/server/sessionpool_test.go b/server/sessionpool_test.go index 49a9f4f44b..832f7016d3 100644 --- a/server/sessionpool_test.go +++ b/server/sessionpool_test.go @@ -51,7 +51,8 @@ func poolWithSessList(sessList []*BroadcastSession) *sessionPoolLIFO { // return sessList, nil return nil, nil } - pool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, sel) + var deleteSessions = func(sessionID string) {} + pool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, deleteSessions, sel) pool.sessMap = sessMap return newSessionPoolLIFO(pool) }