Skip to content

Commit

Permalink
proxy: Close client connection if session is expired / closed.
Browse files Browse the repository at this point in the history
This handles cases where during development the machine is suspended and
on resume, the session is expired but the connection is still open. As
for the expired session the internal context is closed, the session can
no longer be used.
  • Loading branch information
fancycode committed Oct 31, 2024
1 parent c27ad30 commit 9d3c526
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 11 deletions.
2 changes: 2 additions & 0 deletions proxy/proxy_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,8 @@ func (s *ProxyServer) DeleteSession(id uint64) {
func (s *ProxyServer) deleteSessionLocked(id uint64) {
if session, found := s.sessions[id]; found {
delete(s.sessions, id)
s.sessionsLock.Unlock()
defer s.sessionsLock.Lock()
session.Close()
statsSessionsCurrent.Dec()
}
Expand Down
45 changes: 34 additions & 11 deletions proxy/proxy_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"net/http/httptest"
"os"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -353,8 +354,10 @@ func TestProxyCreateSession(t *testing.T) {
}

type HangingTestMCU struct {
t *testing.T
ctx context.Context
t *testing.T
ctx context.Context
creating chan struct{}
cancelled atomic.Bool
}

func NewHangingTestMCU(t *testing.T) *HangingTestMCU {
Expand All @@ -364,8 +367,9 @@ func NewHangingTestMCU(t *testing.T) *HangingTestMCU {
})

return &HangingTestMCU{
t: t,
ctx: ctx,
t: t,
ctx: ctx,
creating: make(chan struct{}),
}
}

Expand Down Expand Up @@ -393,8 +397,11 @@ func (m *HangingTestMCU) NewPublisher(ctx context.Context, listener signaling.Mc
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
defer cancel()

m.creating <- struct{}{}

select {
case <-ctx.Done():
m.cancelled.Store(true)
return nil, ctx.Err()
case <-ctx2.Done():
return nil, errors.New("Should have been cancelled before")
Expand All @@ -405,8 +412,11 @@ func (m *HangingTestMCU) NewSubscriber(ctx context.Context, listener signaling.M
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
defer cancel()

m.creating <- struct{}{}

select {
case <-ctx.Done():
m.cancelled.Store(true)
return nil, ctx.Err()
case <-ctx2.Done():
return nil, errors.New("Should have been cancelled before")
Expand All @@ -419,7 +429,8 @@ func TestProxyCancelOnClose(t *testing.T) {
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)

proxy.mcu = NewHangingTestMCU(t)
mcu := NewHangingTestMCU(t)
proxy.mcu = mcu

ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
Expand All @@ -436,22 +447,34 @@ func TestProxyCancelOnClose(t *testing.T) {
_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)

require.NoError(client.SendCommand(&signaling.CommandProxyClientMessage{
Type: "create-publisher",
StreamType: signaling.StreamTypeVideo,
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
Id: "2345",
Type: "command",
Command: &signaling.CommandProxyClientMessage{
Type: "create-publisher",
StreamType: signaling.StreamTypeVideo,
},
}))

// Simulate expired session while request is still being processed.
go func() {
<-mcu.creating
if session := proxy.GetSession(1); assert.NotNil(session) {
session.Close()
}
}()

if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
if err := checkMessageType(message, "error"); assert.NoError(err) {
assert.Equal("internal_error", message.Error.Code)
assert.Equal(context.Canceled.Error(), message.Error.Message)
if err := checkMessageType(message, "bye"); assert.NoError(err) {
assert.Equal("session_closed", message.Bye.Reason)
}
}

if message, err := client.RunUntilMessage(ctx); assert.Error(err) {
assert.True(websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived), "expected close error, got %+v", err)
} else {
t.Errorf("expected error, got %+v", message)
}

assert.True(mcu.cancelled.Load())
}
15 changes: 15 additions & 0 deletions proxy/proxy_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,24 @@ func (s *ProxySession) MarkUsed() {
}

func (s *ProxySession) Close() {
prev := s.SetClient(nil)
if prev != nil {
reason := "session_closed"
if s.IsExpired() {
reason = "session_expired"
}
prev.SendMessage(&signaling.ProxyServerMessage{
Type: "bye",
Bye: &signaling.ByeProxyServerMessage{
Reason: reason,
},
})
}

s.closeFunc()
s.clearPublishers()
s.clearSubscribers()
s.proxy.DeleteSession(s.Sid())
}

func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {
Expand Down

0 comments on commit 9d3c526

Please sign in to comment.