Skip to content

Commit

Permalink
more hub tests
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jun 17, 2020
1 parent 7582407 commit 6105771
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 1 deletion.
1 change: 0 additions & 1 deletion hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ func (h *Hub) broadcastPublication(channel string, pub *protocol.Publication, ch

var jsonPublicationReply *prepared.Reply
var protobufPublicationReply *prepared.Reply

// Iterate over channel subscribers and send message.
for uid := range channelSubscriptions {
c, ok := h.conns[uid]
Expand Down
202 changes: 202 additions & 0 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package centrifuge
import (
"context"
"io"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/centrifugal/protocol"
"github.com/stretchr/testify/assert"
Expand All @@ -16,13 +20,15 @@ type testTransport struct {
mu sync.Mutex
sink chan []byte
closed bool
closeCh chan struct{}
disconnect *Disconnect
protoType ProtocolType
}

func newTestTransport() *testTransport {
return &testTransport{
protoType: ProtocolTypeJSON,
closeCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -63,8 +69,12 @@ func (t *testTransport) Encoding() EncodingType {
func (t *testTransport) Close(disconnect *Disconnect) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.closed {
return nil
}
t.disconnect = disconnect
t.closed = true
close(t.closeCh)
return nil
}

Expand All @@ -84,6 +94,198 @@ func TestHub(t *testing.T) {
assert.Equal(t, 1, len(conns))
}

func TestHubUnsubscribe(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
transport.sink = make(chan []byte, 100)
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, err := newClient(newCtx, node, transport)
connectClient(t, client)
subscribeClient(t, client, "test")
assert.NoError(t, err)
assert.Equal(t, len(node.hub.users), 1)
// No such user.
err = node.hub.unsubscribe("1", "test")
require.NoError(t, err)
// Subscribed user.
err = node.hub.unsubscribe("42", "test")
require.NoError(t, err)
select {
case data := <-transport.sink:
require.Equal(t, "{\"result\":{\"type\":3,\"channel\":\"test\",\"data\":{}}}\n", string(data))
case <-time.After(2 * time.Second):
t.Fatal("no data in sink")
}
}

func TestHubDisconnect(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
transport.sink = make(chan []byte, 100)
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, err := newClient(newCtx, node, transport)
connectClient(t, client)
subscribeClient(t, client, "test")
assert.NoError(t, err)
assert.Equal(t, len(node.hub.users), 1)
// No such user.
err = node.hub.disconnect("1", false)
require.NoError(t, err)
// Subscribed user.
err = node.hub.disconnect("42", false)
require.NoError(t, err)
select {
case <-transport.closeCh:
case <-time.After(2 * time.Second):
t.Fatal("no data in sink")
}
}

func TestHubBroadcastPublicationJSON(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
transport.sink = make(chan []byte, 100)
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, err := newClient(newCtx, node, transport)
connectClient(t, client)
subscribeClient(t, client, "test")
assert.NoError(t, err)
assert.Equal(t, len(node.hub.users), 1)
err = node.hub.broadcastPublication(
"test", &protocol.Publication{Data: []byte(`{"data": "broadcasted_data"}`)}, &ChannelOptions{})
require.NoError(t, err)
select {
case data := <-transport.sink:
require.True(t, strings.Contains(string(data), "broadcasted_data"))
case <-time.After(2 * time.Second):
t.Fatal("no data in sink")
}
}

func TestHubBroadcastPublicationProtobuf(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
transport.protoType = ProtocolTypeProtobuf
transport.sink = make(chan []byte, 100)
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, err := newClient(newCtx, node, transport)
connectClient(t, client)
subscribeClient(t, client, "test")
assert.NoError(t, err)
assert.Equal(t, len(node.hub.users), 1)
err = node.hub.broadcastPublication(
"test", &protocol.Publication{Data: []byte(`{"data": "broadcasted_data"}`)}, &ChannelOptions{})
require.NoError(t, err)
select {
case data := <-transport.sink:
require.True(t, strings.Contains(string(data), "broadcasted_data"))
case <-time.After(2 * time.Second):
t.Fatal("no data in sink")
}
}

func TestHubBroadcastJoinJSON(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
transport.sink = make(chan []byte, 100)
transport.protoType = ProtocolTypeProtobuf
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, err := newClient(newCtx, node, transport)
connectClient(t, client)
subscribeClient(t, client, "test")
assert.NoError(t, err)
assert.Equal(t, len(node.hub.users), 1)
err = node.hub.broadcastJoin(
"test", &protocol.Join{Info: ClientInfo{Client: "broadcast_client"}})
require.NoError(t, err)
select {
case data := <-transport.sink:
require.True(t, strings.Contains(string(data), "broadcast_client"))
case <-time.After(2 * time.Second):
t.Fatal("no data in sink")
}
}

func TestHubBroadcastJoinProtobuf(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
transport.sink = make(chan []byte, 100)
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, err := newClient(newCtx, node, transport)
connectClient(t, client)
subscribeClient(t, client, "test")
assert.NoError(t, err)
assert.Equal(t, len(node.hub.users), 1)
err = node.hub.broadcastJoin(
"test", &protocol.Join{Info: ClientInfo{Client: "broadcast_client"}})
require.NoError(t, err)
select {
case data := <-transport.sink:
require.True(t, strings.Contains(string(data), "broadcast_client"))
case <-time.After(2 * time.Second):
t.Fatal("no data in sink")
}
}

func TestHubBroadcastLeaveJSON(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
transport.sink = make(chan []byte, 100)
transport.protoType = ProtocolTypeProtobuf
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, err := newClient(newCtx, node, transport)
connectClient(t, client)
subscribeClient(t, client, "test")
assert.NoError(t, err)
assert.Equal(t, len(node.hub.users), 1)
err = node.hub.broadcastLeave(
"test", &protocol.Leave{Info: ClientInfo{Client: "broadcast_client"}})
require.NoError(t, err)
select {
case data := <-transport.sink:
require.True(t, strings.Contains(string(data), "broadcast_client"))
case <-time.After(2 * time.Second):
t.Fatal("no data in sink")
}
}

func TestHubBroadcastLeaveProtobuf(t *testing.T) {
node := nodeWithMemoryEngine()
defer func() { _ = node.Shutdown(context.Background()) }()
transport := newTestTransport()
transport.sink = make(chan []byte, 100)
ctx := context.Background()
newCtx := SetCredentials(ctx, &Credentials{UserID: "42"})
client, err := newClient(newCtx, node, transport)
connectClient(t, client)
subscribeClient(t, client, "test")
assert.NoError(t, err)
assert.Equal(t, len(node.hub.users), 1)
err = node.hub.broadcastLeave(
"test", &protocol.Leave{Info: ClientInfo{Client: "broadcast_client"}})
require.NoError(t, err)
select {
case data := <-transport.sink:
require.True(t, strings.Contains(string(data), "broadcast_client"))
case <-time.After(2 * time.Second):
t.Fatal("no data in sink")
}
}

func TestHubShutdown(t *testing.T) {
h := newHub()
err := h.shutdown(context.Background())
Expand Down

0 comments on commit 6105771

Please sign in to comment.