Skip to content

Commit

Permalink
Add first Janus tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Nov 7, 2024
1 parent da1bb09 commit 3f4e92a
Showing 1 changed file with 350 additions and 0 deletions.
350 changes: 350 additions & 0 deletions mcu_janus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2024 struktur AG
*
* @author Joachim Bauch <[email protected]>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling

import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"testing"

"github.com/dlintw/goconf"
"github.com/notedit/janus-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type TestJanusRoom struct {
}

type TestJanusHandler func(body map[string]interface{}) (interface{}, *janus.ErrorMsg)

type TestJanusGateway struct {
t *testing.T

sid atomic.Uint64
tid atomic.Uint64
rid atomic.Uint64
mu sync.Mutex

sessions map[uint64]*JanusSession
transactions map[uint64]*transaction
rooms map[uint64]*TestJanusRoom
handlers map[string]TestJanusHandler
}

func NewTestJanusGateway(t *testing.T) *TestJanusGateway {
return &TestJanusGateway{
t: t,

sessions: make(map[uint64]*JanusSession),
transactions: make(map[uint64]*transaction),
rooms: make(map[uint64]*TestJanusRoom),
handlers: make(map[string]TestJanusHandler),
}
}

func (g *TestJanusGateway) registerHandlers(handlers map[string]TestJanusHandler) {
g.mu.Lock()
defer g.mu.Unlock()
for name, handler := range handlers {
g.handlers[name] = handler
}
}

func (g *TestJanusGateway) Info(ctx context.Context) (*InfoMsg, error) {
return &InfoMsg{
Name: "TestJanus",
Version: 1400,
VersionString: "1.4.0",
Author: "struktur AG",
DataChannels: true,
FullTrickle: true,
Plugins: map[string]janus.PluginInfo{
pluginVideoRoom: {
Name: "Test VideoRoom plugin",
VersionString: "0.0.0",
Author: "struktur AG",
},
},
}, nil
}

func (g *TestJanusGateway) Create(ctx context.Context) (*JanusSession, error) {
sid := g.sid.Add(1)
session := &JanusSession{
Id: sid,
Handles: make(map[uint64]*JanusHandle),
gateway: g,
}
g.mu.Lock()
defer g.mu.Unlock()
g.sessions[sid] = session
return session, nil
}

func (g *TestJanusGateway) Close() error {
return nil
}

func (g *TestJanusGateway) send(msg map[string]interface{}, t *transaction) (uint64, error) {
tid := g.tid.Add(1)

data, err := json.Marshal(msg)
require.NoError(g.t, err)
err = json.Unmarshal(data, &msg)
require.NoError(g.t, err)

go t.run()

g.mu.Lock()
g.transactions[tid] = t
g.mu.Unlock()

var handleId uint64 = 1234

var result any
if method, found := msg["janus"]; found {
switch method {
case "attach":
result = &janus.SuccessMsg{
Data: janus.SuccessData{
ID: handleId,
},
}
case "detach":
result = &janus.AckMsg{}
case "destroy":
result = &janus.AckMsg{}
case "message":
if assert.EqualValues(g.t, handleId, msg["handle_id"]) {
body := msg["body"].(map[string]interface{})
request := body["request"].(string)
switch request {
case "create":
rid := g.rid.Add(1)
room := &TestJanusRoom{}
g.mu.Lock()
g.rooms[rid] = room
g.mu.Unlock()

result = &janus.SuccessMsg{
PluginData: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{
"room": rid,
},
},
}
case "destroy":
rid := body["room"].(float64)
g.mu.Lock()
delete(g.rooms, uint64(rid))
g.mu.Unlock()

result = &janus.SuccessMsg{
PluginData: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{},
},
}
default:
g.mu.Lock()
handler, found := g.handlers[request]
g.mu.Unlock()
if found {
var err *janus.ErrorMsg
result, err = handler(body)
if err != nil {
result = err
}
}
}
}
}
}

if !assert.NotNil(g.t, result, "Unsupported request %+v", msg) {
result = &janus.ErrorMsg{
Err: janus.ErrorData{
Code: JANUS_ERROR_UNKNOWN,
Reason: "Not implemented",
},
}
}

go func() {
t.add(result)
}()

return tid, nil
}

func (g *TestJanusGateway) removeTransaction(id uint64) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.transactions, id)
}

func (g *TestJanusGateway) removeSession(session *JanusSession) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.sessions, session.Id)
}

func newMcuJanusForTesting(t *testing.T) (*mcuJanus, *TestJanusGateway) {
config := goconf.NewConfigFile()
mcu, err := NewMcuJanus(context.Background(), "", config)
require.NoError(t, err)
t.Cleanup(func() {
mcu.Stop()
})

gateway := NewTestJanusGateway(t)

mcuJanus := mcu.(*mcuJanus)
mcuJanus.createJanusGateway = func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error) {
return gateway, nil
}
require.NoError(t, mcu.Start(context.Background()))
return mcuJanus, gateway
}

type TestMcuListener struct {
id string
}

func (t *TestMcuListener) PublicId() string {
return t.id
}

func (t *TestMcuListener) OnUpdateOffer(client McuClient, offer map[string]interface{}) {

}

func (t *TestMcuListener) OnIceCandidate(client McuClient, candidate interface{}) {

}

func (t *TestMcuListener) OnIceCompleted(client McuClient) {

}

func (t *TestMcuListener) SubscriberSidUpdated(subscriber McuSubscriber) {

}

func (t *TestMcuListener) PublisherClosed(publisher McuPublisher) {

}

func (t *TestMcuListener) SubscriberClosed(subscriber McuSubscriber) {

}

type TestMcuController struct {
id string
}

func (c *TestMcuController) PublisherId() string {
return c.id
}

func (c *TestMcuController) StartPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error {
// TODO: Check parameters?
return nil
}

func (c *TestMcuController) StopPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error {
// TODO: Check parameters?
return nil
}

func (c *TestMcuController) GetStreams(ctx context.Context) ([]PublisherStream, error) {
streams := []PublisherStream{
{
Mid: "0",
Mindex: 0,
Type: "audio",
Codec: "opus",
},
}
return streams, nil
}

func Test_JanusRemotePublisher(t *testing.T) {
CatchLogForTest(t)
t.Parallel()
assert := assert.New(t)
require := require.New(t)

mcu, gateway := newMcuJanusForTesting(t)
gateway.registerHandlers(map[string]TestJanusHandler{
"add_remote_publisher": func(body map[string]interface{}) (interface{}, *janus.ErrorMsg) {
assert.EqualValues(1, body["room"])
if streams := body["streams"].([]interface{}); assert.Len(streams, 1) {
stream := streams[0].(map[string]interface{})
assert.Equal("0", stream["mid"])
assert.EqualValues(0, stream["mindex"])
assert.Equal("audio", stream["type"])
assert.Equal("opus", stream["codec"])
}
return &janus.SuccessMsg{
PluginData: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{
"id": 12345,
"port": 10000,
"rtcp_port": 10001,
},
},
}, nil
},
"remove_remote_publisher": func(body map[string]interface{}) (interface{}, *janus.ErrorMsg) {
assert.EqualValues(1, body["room"])
return &janus.SuccessMsg{
PluginData: janus.PluginData{
Plugin: pluginVideoRoom,
Data: map[string]interface{}{},
},
}, nil
},
})

ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

listener1 := &TestMcuListener{
id: "publisher-id",
}

controller := &TestMcuController{
id: listener1.id,
}

pub, err := mcu.NewRemotePublisher(ctx, listener1, controller, StreamTypeVideo)
require.NoError(err)
defer pub.Close(context.Background())

assert.NotNil(pub)
}

0 comments on commit 3f4e92a

Please sign in to comment.