diff --git a/mcu_janus_test.go b/mcu_janus_test.go index 318e2eb2..95cfb0cd 100644 --- a/mcu_janus_test.go +++ b/mcu_janus_test.go @@ -27,6 +27,7 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/dlintw/goconf" "github.com/notedit/janus-go" @@ -34,34 +35,54 @@ import ( "github.com/stretchr/testify/require" ) +type TestJanusHandle struct { + id uint64 +} + type TestJanusRoom struct { + id uint64 } -type TestJanusHandler func(body map[string]interface{}) (interface{}, *janus.ErrorMsg) +type TestJanusHandler func(room *TestJanusRoom, body map[string]interface{}) (interface{}, *janus.ErrorMsg) type TestJanusGateway struct { t *testing.T sid atomic.Uint64 tid atomic.Uint64 + hid atomic.Uint64 rid atomic.Uint64 mu sync.Mutex sessions map[uint64]*JanusSession transactions map[uint64]*transaction + handles map[uint64]*TestJanusHandle rooms map[uint64]*TestJanusRoom handlers map[string]TestJanusHandler } func NewTestJanusGateway(t *testing.T) *TestJanusGateway { - return &TestJanusGateway{ + gateway := &TestJanusGateway{ t: t, sessions: make(map[uint64]*JanusSession), transactions: make(map[uint64]*transaction), + handles: make(map[uint64]*TestJanusHandle), rooms: make(map[uint64]*TestJanusRoom), handlers: make(map[string]TestJanusHandler), } + + t.Cleanup(func() { + assert := assert.New(t) + gateway.mu.Lock() + defer gateway.mu.Unlock() + assert.Len(gateway.sessions, 0) + assert.Len(gateway.transactions, 0) + assert.Len(gateway.handles, 0) + assert.Len(gateway.rooms, 0) + }) + + return gateway } func (g *TestJanusGateway) registerHandlers(handlers map[string]TestJanusHandler) { @@ -72,6 +93,12 @@ func (g *TestJanusGateway) registerHandlers(handlers map[string]TestJanusHandler } } +func (g *TestJanusGateway) getRoomById(id uint64) *TestJanusRoom { + g.mu.Lock() + defer g.mu.Unlock() + return g.rooms[id] +} + func (g *TestJanusGateway) Info(ctx context.Context) (*InfoMsg, error) { return &InfoMsg{ Name: "TestJanus", @@ -107,6 +134,162 @@ func (g *TestJanusGateway) Close() error { return nil } +func (g *TestJanusGateway) processMessage(session *JanusSession, handle *TestJanusHandle, body map[string]interface{}) interface{} { + request := body["request"].(string) + switch request { + case "create": + room := &TestJanusRoom{ + id: g.rid.Add(1), + } + g.rooms[room.id] = room + + return &janus.SuccessMsg{ + PluginData: janus.PluginData{ + Plugin: pluginVideoRoom, + Data: map[string]interface{}{ + "room": room.id, + }, + }, + } + case "join": + rid := body["room"].(float64) + room := g.rooms[uint64(rid)] + if room == nil { + return &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM, + Reason: "Room not found", + }, + } + } + + assert.Equal(g.t, "publisher", body["ptype"]) + return &janus.EventMsg{ + Session: session.Id, + Handle: handle.id, + Plugindata: janus.PluginData{ + Plugin: pluginVideoRoom, + Data: map[string]interface{}{ + "room": room.id, + }, + }, + } + case "destroy": + rid := body["room"].(float64) + room := g.rooms[uint64(rid)] + if room == nil { + return &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM, + Reason: "Room not found", + }, + } + } + + delete(g.rooms, uint64(rid)) + + return &janus.SuccessMsg{ + PluginData: janus.PluginData{ + Plugin: pluginVideoRoom, + Data: map[string]interface{}{}, + }, + } + default: + rid := body["room"].(float64) + room := g.rooms[uint64(rid)] + if room == nil { + return &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM, + Reason: "Room not found", + }, + } + } + + handler, found := g.handlers[request] + if found { + var err *janus.ErrorMsg + result, err := handler(room, body) + if err != nil { + result = err + } + return result + } + } + + return nil +} + +func (g *TestJanusGateway) processRequest(msg map[string]interface{}) interface{} { + method, found := msg["janus"] + if !found { + return nil + } + + sid := msg["session_id"].(float64) + g.mu.Lock() + defer g.mu.Unlock() + session := g.sessions[uint64(sid)] + if session == nil { + return &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: JANUS_ERROR_SESSION_NOT_FOUND, + Reason: "Session not found", + }, + } + } + + switch method { + case "attach": + handle := &TestJanusHandle{ + id: g.hid.Add(1), + } + + g.handles[handle.id] = handle + + return &janus.SuccessMsg{ + Data: janus.SuccessData{ + ID: handle.id, + }, + } + case "detach": + hid := msg["handle_id"].(float64) + handle, found := g.handles[uint64(hid)] + if found { + delete(g.handles, handle.id) + } + if handle == nil { + return &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: JANUS_ERROR_HANDLE_NOT_FOUND, + Reason: "Handle not found", + }, + } + } + + return &janus.AckMsg{} + case "destroy": + delete(g.sessions, session.Id) + return &janus.AckMsg{} + case "message": + hid := msg["handle_id"].(float64) + handle, found := g.handles[uint64(hid)] + if !found { + return &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: JANUS_ERROR_HANDLE_NOT_FOUND, + Reason: "Handle not found", + }, + } + } + + body := msg["body"].(map[string]interface{}) + return g.processMessage(session, handle, body) + } + + return nil +} + func (g *TestJanusGateway) send(msg map[string]interface{}, t *transaction) (uint64, error) { tid := g.tid.Add(1) @@ -118,82 +301,20 @@ func (g *TestJanusGateway) send(msg map[string]interface{}, t *transaction) (uin go t.run() g.mu.Lock() + defer g.mu.Unlock() g.transactions[tid] = t - g.mu.Unlock() - - var handleId uint64 = 1234 - var result any - if method, found := msg["janus"]; found { - switch method { - case "attach": - result = &janus.SuccessMsg{ - Data: janus.SuccessData{ - ID: handleId, + go func() { + result := g.processRequest(msg) + if !assert.NotNil(g.t, result, "Unsupported request %+v", msg) { + result = &janus.ErrorMsg{ + Err: janus.ErrorData{ + Code: JANUS_ERROR_UNKNOWN, + Reason: "Not implemented", }, } - case "detach": - result = &janus.AckMsg{} - case "destroy": - result = &janus.AckMsg{} - case "message": - if assert.EqualValues(g.t, handleId, msg["handle_id"]) { - body := msg["body"].(map[string]interface{}) - request := body["request"].(string) - switch request { - case "create": - rid := g.rid.Add(1) - room := &TestJanusRoom{} - g.mu.Lock() - g.rooms[rid] = room - g.mu.Unlock() - - result = &janus.SuccessMsg{ - PluginData: janus.PluginData{ - Plugin: pluginVideoRoom, - Data: map[string]interface{}{ - "room": rid, - }, - }, - } - case "destroy": - rid := body["room"].(float64) - g.mu.Lock() - delete(g.rooms, uint64(rid)) - g.mu.Unlock() - - result = &janus.SuccessMsg{ - PluginData: janus.PluginData{ - Plugin: pluginVideoRoom, - Data: map[string]interface{}{}, - }, - } - default: - g.mu.Lock() - handler, found := g.handlers[request] - g.mu.Unlock() - if found { - var err *janus.ErrorMsg - result, err = handler(body) - if err != nil { - result = err - } - } - } - } } - } - if !assert.NotNil(g.t, result, "Unsupported request %+v", msg) { - result = &janus.ErrorMsg{ - Err: janus.ErrorData{ - Code: JANUS_ERROR_UNKNOWN, - Reason: "Not implemented", - }, - } - } - - go func() { t.add(result) }() @@ -213,6 +334,8 @@ func (g *TestJanusGateway) removeSession(session *JanusSession) { } func newMcuJanusForTesting(t *testing.T) (*mcuJanus, *TestJanusGateway) { + gateway := NewTestJanusGateway(t) + config := goconf.NewConfigFile() mcu, err := NewMcuJanus(context.Background(), "", config) require.NoError(t, err) @@ -220,8 +343,6 @@ func newMcuJanusForTesting(t *testing.T) (*mcuJanus, *TestJanusGateway) { mcu.Stop() }) - gateway := NewTestJanusGateway(t) - mcuJanus := mcu.(*mcuJanus) mcuJanus.createJanusGateway = func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error) { return gateway, nil @@ -292,16 +413,113 @@ func (c *TestMcuController) GetStreams(ctx context.Context) ([]PublisherStream, return streams, nil } +type TestMcuInitiator struct { + country string +} + +func (i *TestMcuInitiator) Country() string { + return i.country +} + +func Test_JanusPublisherSubscriber(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + require := require.New(t) + + mcu, gateway := newMcuJanusForTesting(t) + gateway.registerHandlers(map[string]TestJanusHandler{}) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pubId := "publisher-id" + listener1 := &TestMcuListener{ + id: pubId, + } + + settings1 := NewPublisherSettings{} + initiator1 := &TestMcuInitiator{ + country: "DE", + } + + pub, err := mcu.NewPublisher(ctx, listener1, pubId, "sid", StreamTypeVideo, settings1, initiator1) + require.NoError(err) + defer pub.Close(context.Background()) + + listener2 := &TestMcuListener{ + id: pubId, + } + + initiator2 := &TestMcuInitiator{ + country: "DE", + } + sub, err := mcu.NewSubscriber(ctx, listener2, pubId, StreamTypeVideo, initiator2) + require.NoError(err) + defer sub.Close(context.Background()) +} + +func Test_JanusSubscriberPublisher(t *testing.T) { + CatchLogForTest(t) + t.Parallel() + require := require.New(t) + + mcu, gateway := newMcuJanusForTesting(t) + gateway.registerHandlers(map[string]TestJanusHandler{}) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + pubId := "publisher-id" + listener1 := &TestMcuListener{ + id: pubId, + } + + settings1 := NewPublisherSettings{} + initiator1 := &TestMcuInitiator{ + country: "DE", + } + + ready := make(chan struct{}) + done := make(chan struct{}) + + go func() { + defer close(done) + time.Sleep(100 * time.Millisecond) + pub, err := mcu.NewPublisher(ctx, listener1, pubId, "sid", StreamTypeVideo, settings1, initiator1) + require.NoError(err) + defer func() { + <-ready + pub.Close(context.Background()) + }() + }() + + listener2 := &TestMcuListener{ + id: pubId, + } + + initiator2 := &TestMcuInitiator{ + country: "DE", + } + sub, err := mcu.NewSubscriber(ctx, listener2, pubId, StreamTypeVideo, initiator2) + require.NoError(err) + defer sub.Close(context.Background()) + close(ready) + <-done +} + func Test_JanusRemotePublisher(t *testing.T) { CatchLogForTest(t) t.Parallel() assert := assert.New(t) require := require.New(t) + var added atomic.Int32 + var removed atomic.Int32 + mcu, gateway := newMcuJanusForTesting(t) gateway.registerHandlers(map[string]TestJanusHandler{ - "add_remote_publisher": func(body map[string]interface{}) (interface{}, *janus.ErrorMsg) { - assert.EqualValues(1, body["room"]) + "add_remote_publisher": func(room *TestJanusRoom, body map[string]interface{}) (interface{}, *janus.ErrorMsg) { + assert.EqualValues(1, room.id) if streams := body["streams"].([]interface{}); assert.Len(streams, 1) { stream := streams[0].(map[string]interface{}) assert.Equal("0", stream["mid"]) @@ -309,6 +527,7 @@ func Test_JanusRemotePublisher(t *testing.T) { assert.Equal("audio", stream["type"]) assert.Equal("opus", stream["codec"]) } + added.Add(1) return &janus.SuccessMsg{ PluginData: janus.PluginData{ Plugin: pluginVideoRoom, @@ -320,8 +539,9 @@ func Test_JanusRemotePublisher(t *testing.T) { }, }, nil }, - "remove_remote_publisher": func(body map[string]interface{}) (interface{}, *janus.ErrorMsg) { - assert.EqualValues(1, body["room"]) + "remove_remote_publisher": func(room *TestJanusRoom, body map[string]interface{}) (interface{}, *janus.ErrorMsg) { + assert.EqualValues(1, room.id) + removed.Add(1) return &janus.SuccessMsg{ PluginData: janus.PluginData{ Plugin: pluginVideoRoom, @@ -346,5 +566,25 @@ func Test_JanusRemotePublisher(t *testing.T) { require.NoError(err) defer pub.Close(context.Background()) - assert.NotNil(pub) + assert.EqualValues(1, added.Load()) + assert.EqualValues(0, removed.Load()) + + listener2 := &TestMcuListener{ + id: "subscriber-id", + } + + sub, err := mcu.NewRemoteSubscriber(ctx, listener2, pub) + require.NoError(err) + defer sub.Close(context.Background()) + + pub.Close(context.Background()) + + assert.EqualValues(1, added.Load()) + // The publisher is ref-counted, and still referenced by the subscriber. + assert.EqualValues(0, removed.Load()) + + sub.Close(context.Background()) + + assert.EqualValues(1, added.Load()) + assert.EqualValues(1, removed.Load()) }