Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send ping requests to local instance for federated sessions. #808

Merged
merged 8 commits into from
Aug 29, 2024
10 changes: 9 additions & 1 deletion clientsession.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ var (
PathToOcsSignalingBackend = "ocs/v2.php/apps/spreed/api/v1/signaling/backend"
)

const (
FederatedRoomSessionIdPrefix = "federated|"
)

// ResponseHandlerFunc will return "true" has been fully processed.
type ResponseHandlerFunc func(message *ClientMessage) bool

Expand Down Expand Up @@ -444,12 +448,16 @@ func (s *ClientSession) UpdateRoomSessionId(roomSessionId string) error {
if roomSessionId != "" {
if room := s.GetRoom(); room != nil {
log.Printf("Session %s updated room session id to %s in room %s", s.PublicId(), roomSessionId, room.Id())
} else if client := s.GetFederationClient(); client != nil {
log.Printf("Session %s updated room session id to %s in federated room %s", s.PublicId(), roomSessionId, client.RemoteRoomId())
} else {
log.Printf("Session %s updated room session id to %s in unknown room", s.PublicId(), roomSessionId)
}
} else {
if room := s.GetRoom(); room != nil {
log.Printf("Session %s cleared room session id in room %s", s.PublicId(), room.Id())
} else if client := s.GetFederationClient(); client != nil {
log.Printf("Session %s cleared room session id in federated room %s", s.PublicId(), client.RemoteRoomId())
} else {
log.Printf("Session %s cleared room session id in unknown room", s.PublicId())
}
Expand Down Expand Up @@ -540,7 +548,7 @@ func (s *ClientSession) doUnsubscribeRoomEvents(notify bool) {

s.roomSessionIdLock.Lock()
defer s.roomSessionIdLock.Unlock()
if notify && room != nil && s.roomSessionId != "" {
if notify && room != nil && s.roomSessionId != "" && !strings.HasPrefix(s.roomSessionId, FederatedRoomSessionIdPrefix) {
// Notify
go func(sid string) {
ctx := context.Background()
Expand Down
12 changes: 10 additions & 2 deletions federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ func (c *FederationClient) URL() string {
return c.federation.Load().parsedSignalingUrl.String()
}

func (c *FederationClient) RoomId() string {
return c.roomId.Load().(string)
}

func (c *FederationClient) RemoteRoomId() string {
return c.remoteRoomId.Load().(string)
}

func (c *FederationClient) CanReuse(federation *RoomFederationMessage) bool {
fed := c.federation.Load()
return fed.NextcloudUrl == federation.NextcloudUrl &&
Expand Down Expand Up @@ -643,8 +651,8 @@ func (c *FederationClient) processMessage(msg *ServerMessage) {
remoteSessionId = hello.SessionId
}

remoteRoomId := c.remoteRoomId.Load().(string)
roomId := c.roomId.Load().(string)
remoteRoomId := c.RemoteRoomId()
roomId := c.RoomId()

var doClose bool
switch msg.Type {
Expand Down
62 changes: 60 additions & 2 deletions federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func Test_Federation(t *testing.T) {

assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello))

room := hub1.getRoom(roomId)
require.NotNil(room)

now := time.Now()
token, err := client1.CreateHelloV2Token(testDefaultUserId+"2", now, now.Add(time.Minute))
require.NoError(err)
Expand Down Expand Up @@ -145,6 +148,58 @@ func Test_Federation(t *testing.T) {
// The client2 will see its own session id, not the one from the remote server.
assert.NoError(client2.RunUntilJoined(ctx, hello1.Hello, hello2.Hello))

tmpRoom1 := hub2.getRoom(roomId)
assert.Nil(tmpRoom1)
tmpRoom2 := hub2.getRoom(federatedRoomId)
assert.Nil(tmpRoom2)

// The host hub has no federated sessions and thus doesn't send pings.
count1, wg1 := hub1.publishFederatedSessions()
wg1.Wait()
assert.Equal(0, count1)

count1, wg1 = room.publishActiveSessions()
wg1.Wait()
assert.Equal(2, count1)

request1 := getPingRequests(t)
clearPingRequests(t)
assert.Len(request1, 1)
if ping := request1[0].Ping; assert.NotNil(ping) {
assert.Equal(roomId, ping.RoomId)
assert.Equal("1.0", ping.Version)
assert.Len(ping.Entries, 2)
// The order of entries is not defined
if ping.Entries[0].SessionId == federatedRoomId+"-"+hello2.Hello.SessionId {
assert.Equal(hello2.Hello.UserId, ping.Entries[0].UserId)

assert.Equal(roomId+"-"+hello1.Hello.SessionId, ping.Entries[1].SessionId)
assert.Equal(hello1.Hello.UserId, ping.Entries[1].UserId)
} else {
assert.Equal(roomId+"-"+hello1.Hello.SessionId, ping.Entries[0].SessionId)
assert.Equal(hello1.Hello.UserId, ping.Entries[0].UserId)

assert.Equal(federatedRoomId+"-"+hello2.Hello.SessionId, ping.Entries[1].SessionId)
assert.Equal(hello2.Hello.UserId, ping.Entries[1].UserId)
}
}

// The federated hub has a federated session for which it sends a ping.
count2, wg2 := hub2.publishFederatedSessions()
wg2.Wait()
assert.Equal(1, count2)

request2 := getPingRequests(t)
clearPingRequests(t)
assert.Len(request2, 1)
if ping := request2[0].Ping; assert.NotNil(ping) {
assert.Equal(federatedRoomId, ping.RoomId)
assert.Equal("1.0", ping.Version)
assert.Len(ping.Entries, 1)
assert.Equal(federatedRoomId+"-"+hello2.Hello.SessionId, ping.Entries[0].SessionId)
assert.Equal(hello2.Hello.UserId, ping.Entries[0].UserId)
}

// Leaving and re-joining a room as "direct" session will trigger correct events.
if room, err := client1.JoinRoom(ctx, ""); assert.NoError(err) {
assert.Equal("", room.Room.RoomId)
Expand Down Expand Up @@ -172,6 +227,11 @@ func Test_Federation(t *testing.T) {
UserId: hello2.Hello.UserId,
}))

// The federated session has left the room, so no more pings.
count3, wg3 := hub2.publishFederatedSessions()
wg3.Wait()
assert.Equal(0, count3)

require.NoError(client2.WriteJSON(msg))
if message, err := client2.RunUntilMessage(ctx); assert.NoError(err) {
assert.Equal(msg.Id, message.Id)
Expand Down Expand Up @@ -296,8 +356,6 @@ func Test_Federation(t *testing.T) {
"actorType": "federated_users",
},
}
room := hub1.getRoom(roomId)
require.NotNil(room)
room.PublishUsersInCallChanged(users, users)
var event *EventServerMessage
// For the local user, it's a federated user on server 2 that joined.
Expand Down
103 changes: 102 additions & 1 deletion hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type Hub struct {
expectHelloClients map[HandlerClient]time.Time
dialoutSessions map[*ClientSession]bool
remoteSessions map[*RemoteSession]bool
federatedSessions map[*ClientSession]bool

backendTimeout time.Duration
backend *BackendClient
Expand Down Expand Up @@ -356,6 +357,7 @@ func NewHub(config *goconf.ConfigFile, events AsyncEvents, rpcServer *GrpcServer
expectHelloClients: make(map[HandlerClient]time.Time),
dialoutSessions: make(map[*ClientSession]bool),
remoteSessions: make(map[*RemoteSession]bool),
federatedSessions: make(map[*ClientSession]bool),

backendTimeout: backendTimeout,
backend: backend,
Expand Down Expand Up @@ -465,6 +467,7 @@ func (h *Hub) Run() {
defer h.backend.Close()

housekeeping := time.NewTicker(housekeepingInterval)
federationPing := time.NewTicker(updateActiveSessionsInterval)
geoipUpdater := time.NewTicker(24 * time.Hour)

loop:
Expand All @@ -484,6 +487,8 @@ loop:
h.performHousekeeping(now)
case <-geoipUpdater.C:
go h.updateGeoDatabase()
case <-federationPing.C:
go h.publishFederatedSessions()
case <-h.closer.C:
break loop
}
Expand Down Expand Up @@ -1626,6 +1631,23 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) {
}

session.SetFederationClient(client)

roomSessionId := message.Room.SessionId
if roomSessionId == "" {
// TODO(jojo): Better make the session id required in the request.
log.Printf("User did not send a room session id, assuming session %s", session.PublicId())
roomSessionId = session.PublicId()
}

// Prefix room session id to allow using the same signaling server for two Nextcloud instances during development.
// Otherwise the same room session id will be detected and the other session will be kicked.
if err := session.UpdateRoomSessionId(FederatedRoomSessionIdPrefix + roomSessionId); err != nil {
log.Printf("Error updating room session id for session %s: %s", session.PublicId(), err)
}

h.mu.Lock()
h.federatedSessions[session] = true
h.mu.Unlock()
return
}

Expand Down Expand Up @@ -1693,6 +1715,82 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) {
h.processJoinRoom(session, message, &room)
}

func (h *Hub) publishFederatedSessions() (int, *sync.WaitGroup) {
h.mu.RLock()
defer h.mu.RUnlock()

var wg sync.WaitGroup
if len(h.federatedSessions) == 0 {
return 0, &wg
}

rooms := make(map[string]map[string][]BackendPingEntry)
urls := make(map[string]*url.URL)
for session := range h.federatedSessions {
u := session.BackendUrl()
if u == "" {
continue
}

federation := session.GetFederationClient()
if federation == nil {
continue
}

var sid string
var uid string
// Use Nextcloud session id and user id
sid = strings.TrimPrefix(session.RoomSessionId(), FederatedRoomSessionIdPrefix)
uid = session.AuthUserId()
if sid == "" {
continue
}

roomId := federation.RoomId()
entries, found := rooms[roomId]
if !found {
entries = make(map[string][]BackendPingEntry)
rooms[roomId] = entries
}

e, found := entries[u]
if !found {
p := session.ParsedBackendUrl()
if p == nil {
// Should not happen, invalid URLs should get rejected earlier.
continue
}
urls[u] = p
}

entries[u] = append(e, BackendPingEntry{
SessionId: sid,
UserId: uid,
})
}

if len(urls) == 0 {
return 0, &wg
}
count := 0
for roomId, entries := range rooms {
for u, e := range entries {
wg.Add(1)
count += len(e)
go func(roomId string, url *url.URL, entries []BackendPingEntry) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), h.backendTimeout)
defer cancel()

if err := h.roomPing.SendPings(ctx, roomId, url, entries); err != nil {
log.Printf("Error pinging room %s for active entries %+v: %s", roomId, entries, err)
}
}(roomId, urls[u], e)
}
}
return count, &wg
}

func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room {
internalRoomId := getRoomIdForBackend(id, backend)

Expand All @@ -1709,7 +1807,7 @@ func (h *Hub) removeRoom(room *Room) {
statsHubRoomsCurrent.WithLabelValues(room.Backend().Id()).Dec()
}
h.ru.Unlock()
h.roomPing.DeleteRoom(room)
h.roomPing.DeleteRoom(room.Id())
}

func (h *Hub) createRoom(id string, properties json.RawMessage, backend *Backend) (*Room, error) {
Expand All @@ -1735,6 +1833,9 @@ func (h *Hub) processJoinRoom(session *ClientSession, message *ClientMessage, ro
}

session.LeaveRoom(true)
h.mu.Lock()
delete(h.federatedSessions, session)
h.mu.Unlock()

roomId := room.Room.RoomId
internalRoomId := getRoomIdForBackend(roomId, session.Backend())
Expand Down
5 changes: 5 additions & 0 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ func CreateClusteredHubsForTestWithConfig(t *testing.T, getConfigFunc func(*http
grpcServer1, addr1 := NewGrpcServerForTest(t)
grpcServer2, addr2 := NewGrpcServerForTest(t)

if strings.Contains(t.Name(), "Federation") {
// Signaling servers should not form a cluster in federation tests.
addr1, addr2 = addr2, addr1
}

events1, err := NewAsyncEvents(nats1)
if err != nil {
t.Fatal(err)
Expand Down
6 changes: 4 additions & 2 deletions room.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,19 +956,21 @@ func (r *Room) publishActiveSessions() (int, *sync.WaitGroup) {
if len(urls) == 0 {
return 0, &wg
}
var count int
for u, e := range entries {
wg.Add(1)
count += len(e)
go func(url *url.URL, entries []BackendPingEntry) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), r.hub.backendTimeout)
defer cancel()

if err := r.hub.roomPing.SendPings(ctx, r, url, entries); err != nil {
if err := r.hub.roomPing.SendPings(ctx, r.id, url, entries); err != nil {
log.Printf("Error pinging room %s for active entries %+v: %s", r.id, entries, err)
}
}(urls[u], e)
}
return len(entries), &wg
return count, &wg
}

func (r *Room) publishRoomMessage(message *BackendRoomMessageRequest) {
Expand Down
Loading
Loading