Skip to content

Commit

Permalink
Prevent duplicate virtual sessions in participant update events.
Browse files Browse the repository at this point in the history
This can happen if these sessions are returned by Nextcloud and then the
local session is added.

Also fixes events in clustered scenarios where virtual sessions are available
on on server but the update events are generated on a different one.
  • Loading branch information
fancycode committed Oct 30, 2024
1 parent 0087692 commit 72e38b7
Show file tree
Hide file tree
Showing 9 changed files with 897 additions and 86 deletions.
30 changes: 30 additions & 0 deletions grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,36 @@ func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room
return response.GetInCall(), nil
}

func (c *GrpcClient) GetInternalSessions(ctx context.Context, roomId string, backend *Backend) (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData, err error) {
statsGrpcClientCalls.WithLabelValues("GetInternalSessions").Inc()
// TODO: Remove debug logging
log.Printf("Get internal sessions for %s@%s on %s", roomId, backend.Id(), c.Target())
response, err := c.impl.GetInternalSessions(ctx, &GetInternalSessionsRequest{
RoomId: roomId,
BackendUrl: backend.Url(),
}, grpc.WaitForReady(true))
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
return nil, nil, nil
} else if err != nil {
return nil, nil, err
}

if len(response.InternalSessions) > 0 {
internal = make(map[string]*InternalSessionData, len(response.InternalSessions))
for _, s := range response.InternalSessions {
internal[s.SessionId] = s
}
}
if len(response.VirtualSessions) > 0 {
virtual = make(map[string]*VirtualSessionData, len(response.VirtualSessions))
for _, s := range response.VirtualSessions {
virtual[s.SessionId] = s
}
}

return
}

func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, streamType StreamType) (string, string, net.IP, error) {
statsGrpcClientCalls.WithLabelValues("GetPublisherId").Inc()
// TODO: Remove debug logging
Expand Down
47 changes: 47 additions & 0 deletions grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type GrpcServerHub interface {
GetSessionByResumeId(resumeId string) Session
GetSessionByPublicId(sessionId string) Session
GetSessionIdByRoomSessionId(roomSessionId string) (string, error)
GetRoomForBackend(roomId string, backend *Backend) *Room

GetBackend(u *url.URL) *Backend
}
Expand Down Expand Up @@ -185,6 +186,52 @@ func (s *GrpcServer) IsSessionInCall(ctx context.Context, request *IsSessionInCa
return result, nil
}

func (s *GrpcServer) GetInternalSessions(ctx context.Context, request *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) {
statsGrpcServerCalls.WithLabelValues("GetInternalSessions").Inc()
// TODO: Remove debug logging
log.Printf("Get internal sessions from %s on %s", request.RoomId, request.BackendUrl)

var u *url.URL
if request.BackendUrl != "" {
var err error
u, err = url.Parse(request.BackendUrl)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "invalid url")
}
}

backend := s.hub.GetBackend(u)
if backend == nil {
return nil, status.Error(codes.NotFound, "no such backend")
}

room := s.hub.GetRoomForBackend(request.RoomId, backend)
if room == nil {
return nil, status.Error(codes.NotFound, "no such room")
}

result := &GetInternalSessionsReply{}
room.mu.RLock()
defer room.mu.RUnlock()

for session := range room.internalSessions {
result.InternalSessions = append(result.InternalSessions, &InternalSessionData{
SessionId: session.PublicId(),
InCall: uint32(session.GetInCall()),
Features: session.GetFeatures(),
})
}

for session := range room.virtualSessions {
result.VirtualSessions = append(result.VirtualSessions, &VirtualSessionData{
SessionId: session.PublicId(),
InCall: uint32(session.GetInCall()),
})
}

return result, nil
}

func (s *GrpcServer) GetPublisherId(ctx context.Context, request *GetPublisherIdRequest) (*GetPublisherIdReply, error) {
statsGrpcServerCalls.WithLabelValues("GetPublisherId").Inc()
// TODO: Remove debug logging
Expand Down
Loading

0 comments on commit 72e38b7

Please sign in to comment.