Skip to content

Commit

Permalink
Clean up inactive sessions
Browse files Browse the repository at this point in the history
Not cleaning up the sessions causes quite a big memory leak which in effect causes OOMKilled
  • Loading branch information
leszko committed Sep 9, 2024
1 parent 48a92a1 commit 4eba516
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 14 deletions.
7 changes: 7 additions & 0 deletions pm/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Sender interface {
// for creating new tickets
StartSession(ticketParams TicketParams) string

// StopSession deletes session from the internal map
StopSession(sessionID string)

// CreateTicketBatch returns a ticket batch of the specified size
CreateTicketBatch(sessionID string, size int) (*TicketBatch, error)

Expand Down Expand Up @@ -82,6 +85,10 @@ func (s *sender) EV(sessionID string) (*big.Rat, error) {
return ticketEV(session.ticketParams.FaceValue, session.ticketParams.WinProb), nil
}

func (s *sender) StopSession(sessionID string) {
s.sessions.Delete(sessionID)

Check warning on line 89 in pm/sender.go

View check run for this annotation

Codecov / codecov/patch

pm/sender.go#L88-L89

Added lines #L88 - L89 were not covered by tests
}

func (s *sender) validateSender(info *SenderInfo) error {
maxWithdrawRound := new(big.Int).Add(s.timeManager.LastInitializedRound(), big.NewInt(1))
if info.WithdrawRound.Int64() != 0 && info.WithdrawRound.Cmp(maxWithdrawRound) != 1 {
Expand Down
5 changes: 5 additions & 0 deletions pm/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ func (m *MockSender) StartSession(ticketParams TicketParams) string {
return args.String(0)
}

// StopSession deletes session from the internal ma
func (m *MockSender) StopSession(sessionID string) {
m.Called(sessionID)

Check warning on line 516 in pm/stub.go

View check run for this annotation

Codecov / codecov/patch

pm/stub.go#L515-L516

Added lines #L515 - L516 were not covered by tests
}

// EV returns the ticket EV for a session
func (m *MockSender) EV(sessionID string) (*big.Rat, error) {
args := m.Called(sessionID)
Expand Down
13 changes: 10 additions & 3 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (cfg *BroadcastConfig) SetMaxPrice(price *core.AutoConvertedPrice) {
}

type sessionsCreator func() ([]*BroadcastSession, error)
type sessionsDeleter func(sessionId string)
type SessionPool struct {
mid core.ManifestID

Expand All @@ -101,10 +102,11 @@ type SessionPool struct {
finished bool // set at stream end

createSessions sessionsCreator
deleteSessions sessionsDeleter
sus *suspender
}

func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, createSession sessionsCreator,
func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender, createSession sessionsCreator, deleteSession sessionsDeleter,
sel BroadcastSessionsSelector) *SessionPool {

return &SessionPool{
Expand All @@ -114,6 +116,7 @@ func NewSessionPool(mid core.ManifestID, poolSize, numOrchs int, sus *suspender,
sessMap: make(map[string]*BroadcastSession),
sel: sel,
createSessions: createSession,
deleteSessions: deleteSession,
sus: sus,
}
}
Expand Down Expand Up @@ -378,6 +381,7 @@ func (sp *SessionPool) removeSession(session *BroadcastSession) {
sp.lock.Lock()
defer sp.lock.Unlock()

sp.deleteSessions(session.PMSessionID)
delete(sp.sessMap, session.Transcoder())
}

Expand Down Expand Up @@ -469,15 +473,18 @@ func NewSessionManager(ctx context.Context, node *core.LivepeerNode, params *cor
createSessionsUntrusted := func() ([]*BroadcastSession, error) {
return selectOrchestrator(ctx, node, params, untrustedNumOrchs, susUntrusted, common.ScoreEqualTo(common.Score_Untrusted))
}
deleteSessions := func(sessionID string) {
node.Sender.StopSession(sessionID)

Check warning on line 477 in server/broadcast.go

View check run for this annotation

Codecov / codecov/patch

server/broadcast.go#L477

Added line #L477 was not covered by tests
}
var stakeRdr stakeReader
if node.Eth != nil {
stakeRdr = &storeStakeReader{store: node.Database}
}
bsm := &BroadcastSessionsManager{
mid: params.ManifestID,
VerificationFreq: params.VerificationFreq,
trustedPool: NewSessionPool(params.ManifestID, int(trustedPoolSize), trustedNumOrchs, susTrusted, createSessionsTrusted, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)),
untrustedPool: NewSessionPool(params.ManifestID, int(untrustedPoolSize), untrustedNumOrchs, susUntrusted, createSessionsUntrusted, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)),
trustedPool: NewSessionPool(params.ManifestID, int(trustedPoolSize), trustedNumOrchs, susTrusted, createSessionsTrusted, deleteSessions, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)),
untrustedPool: NewSessionPool(params.ManifestID, int(untrustedPoolSize), untrustedNumOrchs, susUntrusted, createSessionsUntrusted, deleteSessions, NewMinLSSelector(stakeRdr, 1.0, node.SelectionAlgorithm, node.OrchPerfScore)),
}
bsm.trustedPool.refreshSessions(ctx)
bsm.untrustedPool.refreshSessions(ctx)
Expand Down
15 changes: 5 additions & 10 deletions server/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ func StubBroadcastSession(transcoder string) *BroadcastSession {
}
}

func StubBroadcastSessionsManager() *BroadcastSessionsManager {
sess1 := StubBroadcastSession("transcoder1")
sess2 := StubBroadcastSession("transcoder2")

return bsmWithSessList([]*BroadcastSession{sess1, sess2})
}

func selFactoryEmpty() BroadcastSessionsSelector {
return &LIFOSelector{}
}
Expand Down Expand Up @@ -101,6 +94,8 @@ func bsmWithSessListExt(sessList, untrustedSessList []*BroadcastSession, noRefre
return cloneSessions(sessList), nil
}

var deleteSessions = func(sessionID string) {}

untrustedSessMap := make(map[string]*BroadcastSession)
for _, sess := range untrustedSessList {
untrustedSessMap[sess.OrchestratorInfo.Transcoder] = sess
Expand All @@ -118,11 +113,10 @@ func bsmWithSessListExt(sessList, untrustedSessList []*BroadcastSession, noRefre
if noRefresh {
createSessions = createSessionsEmpty
createSessionsUntrusted = createSessionsEmpty

}
trustedPool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, sel)
trustedPool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, deleteSessions, sel)
trustedPool.sessMap = sessMap
untrustedPool := NewSessionPool("test", len(untrustedSessList), 1, newSuspender(), createSessionsUntrusted, unsel)
untrustedPool := NewSessionPool("test", len(untrustedSessList), 1, newSuspender(), createSessionsUntrusted, deleteSessions, unsel)
untrustedPool.sessMap = untrustedSessMap

return &BroadcastSessionsManager{
Expand Down Expand Up @@ -393,6 +387,7 @@ func TestSelectSession_MultipleInFlight2(t *testing.T) {
sess := StubBroadcastSession(ts.URL)
sender := &pm.MockSender{}
sender.On("StartSession", mock.Anything).Return("foo").Times(3)
sender.On("StopSession", mock.Anything).Times(3)
sender.On("EV", mock.Anything).Return(big.NewRat(1000000, 1), nil)
sender.On("CreateTicketBatch", mock.Anything, mock.Anything).Return(defaultTicketBatch(), nil)
sender.On("ValidateTicketParams", mock.Anything).Return(nil)
Expand Down
3 changes: 2 additions & 1 deletion server/sessionpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func poolWithSessList(sessList []*BroadcastSession) *sessionPoolLIFO {
// return sessList, nil
return nil, nil
}
pool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, sel)
var deleteSessions = func(sessionID string) {}
pool := NewSessionPool("test", len(sessList), 1, newSuspender(), createSessions, deleteSessions, sel)
pool.sessMap = sessMap
return newSessionPoolLIFO(pool)
}
Expand Down

0 comments on commit 4eba516

Please sign in to comment.