From 72e38b7814348c0c717963d76a97543169471e6f Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 30 Oct 2024 11:59:28 +0100 Subject: [PATCH] Prevent duplicate virtual sessions in participant update events. This can happen if these sessions are returned by Nextcloud and then the local session is added. Also fixes events in clustered scenarios where virtual sessions are available on on server but the update events are generated on a different one. --- grpc_client.go | 30 +++ grpc_server.go | 47 +++++ grpc_sessions.pb.go | 386 ++++++++++++++++++++++++++++++++------- grpc_sessions.proto | 22 +++ grpc_sessions_grpc.pb.go | 46 ++++- hub.go | 13 +- hub_test.go | 285 +++++++++++++++++++++++++++++ mcu_proxy_test.go | 4 + room.go | 150 +++++++++++++-- 9 files changed, 897 insertions(+), 86 deletions(-) diff --git a/grpc_client.go b/grpc_client.go index 0774ed70..24d2b0cd 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -249,6 +249,36 @@ func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room return response.GetInCall(), nil } +func (c *GrpcClient) GetInternalSessions(ctx context.Context, roomId string, backend *Backend) (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData, err error) { + statsGrpcClientCalls.WithLabelValues("GetInternalSessions").Inc() + // TODO: Remove debug logging + log.Printf("Get internal sessions for %s@%s on %s", roomId, backend.Id(), c.Target()) + response, err := c.impl.GetInternalSessions(ctx, &GetInternalSessionsRequest{ + RoomId: roomId, + BackendUrl: backend.Url(), + }, grpc.WaitForReady(true)) + if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { + return nil, nil, nil + } else if err != nil { + return nil, nil, err + } + + if len(response.InternalSessions) > 0 { + internal = make(map[string]*InternalSessionData, len(response.InternalSessions)) + for _, s := range response.InternalSessions { + internal[s.SessionId] = s + } + } + if len(response.VirtualSessions) > 0 { + virtual = make(map[string]*VirtualSessionData, len(response.VirtualSessions)) + for _, s := range response.VirtualSessions { + virtual[s.SessionId] = s + } + } + + return +} + func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, streamType StreamType) (string, string, net.IP, error) { statsGrpcClientCalls.WithLabelValues("GetPublisherId").Inc() // TODO: Remove debug logging diff --git a/grpc_server.go b/grpc_server.go index 0ccb1020..44674954 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -59,6 +59,7 @@ type GrpcServerHub interface { GetSessionByResumeId(resumeId string) Session GetSessionByPublicId(sessionId string) Session GetSessionIdByRoomSessionId(roomSessionId string) (string, error) + GetRoomForBackend(roomId string, backend *Backend) *Room GetBackend(u *url.URL) *Backend } @@ -185,6 +186,52 @@ func (s *GrpcServer) IsSessionInCall(ctx context.Context, request *IsSessionInCa return result, nil } +func (s *GrpcServer) GetInternalSessions(ctx context.Context, request *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) { + statsGrpcServerCalls.WithLabelValues("GetInternalSessions").Inc() + // TODO: Remove debug logging + log.Printf("Get internal sessions from %s on %s", request.RoomId, request.BackendUrl) + + var u *url.URL + if request.BackendUrl != "" { + var err error + u, err = url.Parse(request.BackendUrl) + if err != nil { + return nil, status.Error(codes.InvalidArgument, "invalid url") + } + } + + backend := s.hub.GetBackend(u) + if backend == nil { + return nil, status.Error(codes.NotFound, "no such backend") + } + + room := s.hub.GetRoomForBackend(request.RoomId, backend) + if room == nil { + return nil, status.Error(codes.NotFound, "no such room") + } + + result := &GetInternalSessionsReply{} + room.mu.RLock() + defer room.mu.RUnlock() + + for session := range room.internalSessions { + result.InternalSessions = append(result.InternalSessions, &InternalSessionData{ + SessionId: session.PublicId(), + InCall: uint32(session.GetInCall()), + Features: session.GetFeatures(), + }) + } + + for session := range room.virtualSessions { + result.VirtualSessions = append(result.VirtualSessions, &VirtualSessionData{ + SessionId: session.PublicId(), + InCall: uint32(session.GetInCall()), + }) + } + + return result, nil +} + func (s *GrpcServer) GetPublisherId(ctx context.Context, request *GetPublisherIdRequest) (*GetPublisherIdReply, error) { statsGrpcServerCalls.WithLabelValues("GetPublisherId").Inc() // TODO: Remove debug logging diff --git a/grpc_sessions.pb.go b/grpc_sessions.pb.go index 22baa11a..e9fc9b82 100644 --- a/grpc_sessions.pb.go +++ b/grpc_sessions.pb.go @@ -333,6 +333,226 @@ func (x *IsSessionInCallReply) GetInCall() bool { return false } +type GetInternalSessionsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RoomId string `protobuf:"bytes,1,opt,name=roomId,proto3" json:"roomId,omitempty"` + BackendUrl string `protobuf:"bytes,2,opt,name=backendUrl,proto3" json:"backendUrl,omitempty"` +} + +func (x *GetInternalSessionsRequest) Reset() { + *x = GetInternalSessionsRequest{} + mi := &file_grpc_sessions_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetInternalSessionsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetInternalSessionsRequest) ProtoMessage() {} + +func (x *GetInternalSessionsRequest) ProtoReflect() protoreflect.Message { + mi := &file_grpc_sessions_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetInternalSessionsRequest.ProtoReflect.Descriptor instead. +func (*GetInternalSessionsRequest) Descriptor() ([]byte, []int) { + return file_grpc_sessions_proto_rawDescGZIP(), []int{6} +} + +func (x *GetInternalSessionsRequest) GetRoomId() string { + if x != nil { + return x.RoomId + } + return "" +} + +func (x *GetInternalSessionsRequest) GetBackendUrl() string { + if x != nil { + return x.BackendUrl + } + return "" +} + +type InternalSessionData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SessionId string `protobuf:"bytes,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + InCall uint32 `protobuf:"varint,2,opt,name=inCall,proto3" json:"inCall,omitempty"` + Features []string `protobuf:"bytes,3,rep,name=features,proto3" json:"features,omitempty"` +} + +func (x *InternalSessionData) Reset() { + *x = InternalSessionData{} + mi := &file_grpc_sessions_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *InternalSessionData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InternalSessionData) ProtoMessage() {} + +func (x *InternalSessionData) ProtoReflect() protoreflect.Message { + mi := &file_grpc_sessions_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InternalSessionData.ProtoReflect.Descriptor instead. +func (*InternalSessionData) Descriptor() ([]byte, []int) { + return file_grpc_sessions_proto_rawDescGZIP(), []int{7} +} + +func (x *InternalSessionData) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + +func (x *InternalSessionData) GetInCall() uint32 { + if x != nil { + return x.InCall + } + return 0 +} + +func (x *InternalSessionData) GetFeatures() []string { + if x != nil { + return x.Features + } + return nil +} + +type VirtualSessionData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SessionId string `protobuf:"bytes,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + InCall uint32 `protobuf:"varint,2,opt,name=inCall,proto3" json:"inCall,omitempty"` +} + +func (x *VirtualSessionData) Reset() { + *x = VirtualSessionData{} + mi := &file_grpc_sessions_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *VirtualSessionData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VirtualSessionData) ProtoMessage() {} + +func (x *VirtualSessionData) ProtoReflect() protoreflect.Message { + mi := &file_grpc_sessions_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VirtualSessionData.ProtoReflect.Descriptor instead. +func (*VirtualSessionData) Descriptor() ([]byte, []int) { + return file_grpc_sessions_proto_rawDescGZIP(), []int{8} +} + +func (x *VirtualSessionData) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + +func (x *VirtualSessionData) GetInCall() uint32 { + if x != nil { + return x.InCall + } + return 0 +} + +type GetInternalSessionsReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InternalSessions []*InternalSessionData `protobuf:"bytes,1,rep,name=internalSessions,proto3" json:"internalSessions,omitempty"` + VirtualSessions []*VirtualSessionData `protobuf:"bytes,2,rep,name=virtualSessions,proto3" json:"virtualSessions,omitempty"` +} + +func (x *GetInternalSessionsReply) Reset() { + *x = GetInternalSessionsReply{} + mi := &file_grpc_sessions_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetInternalSessionsReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetInternalSessionsReply) ProtoMessage() {} + +func (x *GetInternalSessionsReply) ProtoReflect() protoreflect.Message { + mi := &file_grpc_sessions_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetInternalSessionsReply.ProtoReflect.Descriptor instead. +func (*GetInternalSessionsReply) Descriptor() ([]byte, []int) { + return file_grpc_sessions_proto_rawDescGZIP(), []int{9} +} + +func (x *GetInternalSessionsReply) GetInternalSessions() []*InternalSessionData { + if x != nil { + return x.InternalSessions + } + return nil +} + +func (x *GetInternalSessionsReply) GetVirtualSessions() []*VirtualSessionData { + if x != nil { + return x.VirtualSessions + } + return nil +} + type ClientSessionMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -343,7 +563,7 @@ type ClientSessionMessage struct { func (x *ClientSessionMessage) Reset() { *x = ClientSessionMessage{} - mi := &file_grpc_sessions_proto_msgTypes[6] + mi := &file_grpc_sessions_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -355,7 +575,7 @@ func (x *ClientSessionMessage) String() string { func (*ClientSessionMessage) ProtoMessage() {} func (x *ClientSessionMessage) ProtoReflect() protoreflect.Message { - mi := &file_grpc_sessions_proto_msgTypes[6] + mi := &file_grpc_sessions_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -368,7 +588,7 @@ func (x *ClientSessionMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ClientSessionMessage.ProtoReflect.Descriptor instead. func (*ClientSessionMessage) Descriptor() ([]byte, []int) { - return file_grpc_sessions_proto_rawDescGZIP(), []int{6} + return file_grpc_sessions_proto_rawDescGZIP(), []int{10} } func (x *ClientSessionMessage) GetMessage() []byte { @@ -388,7 +608,7 @@ type ServerSessionMessage struct { func (x *ServerSessionMessage) Reset() { *x = ServerSessionMessage{} - mi := &file_grpc_sessions_proto_msgTypes[7] + mi := &file_grpc_sessions_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -400,7 +620,7 @@ func (x *ServerSessionMessage) String() string { func (*ServerSessionMessage) ProtoMessage() {} func (x *ServerSessionMessage) ProtoReflect() protoreflect.Message { - mi := &file_grpc_sessions_proto_msgTypes[7] + mi := &file_grpc_sessions_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -413,7 +633,7 @@ func (x *ServerSessionMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ServerSessionMessage.ProtoReflect.Descriptor instead. func (*ServerSessionMessage) Descriptor() ([]byte, []int) { - return file_grpc_sessions_proto_rawDescGZIP(), []int{7} + return file_grpc_sessions_proto_rawDescGZIP(), []int{11} } func (x *ServerSessionMessage) GetMessage() []byte { @@ -454,41 +674,75 @@ var file_grpc_sessions_proto_rawDesc = []byte{ 0x52, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55, 0x72, 0x6c, 0x22, 0x2e, 0x0a, 0x14, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0x30, 0x0a, 0x14, - 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x30, - 0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x32, 0xed, 0x02, 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x54, 0x0a, 0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, - 0x49, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, - 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, - 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, - 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0x54, 0x0a, 0x1a, + 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x6f, + 0x6f, 0x6d, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x6f, 0x6f, 0x6d, + 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55, 0x72, 0x6c, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55, + 0x72, 0x6c, 0x22, 0x67, 0x0a, 0x13, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, + 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x12, + 0x1a, 0x0a, 0x08, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x08, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0x4a, 0x0a, 0x12, 0x56, + 0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, + 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, + 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0xaf, 0x01, 0x0a, 0x18, 0x47, 0x65, 0x74, 0x49, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x12, 0x4a, 0x0a, 0x10, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, + 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x52, 0x10, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, + 0x12, 0x47, 0x0a, 0x0f, 0x76, 0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x69, 0x67, 0x6e, + 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x56, 0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x52, 0x0f, 0x76, 0x69, 0x72, 0x74, 0x75, 0x61, + 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x30, 0x0a, 0x14, 0x43, 0x6c, 0x69, + 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x30, 0x0a, 0x14, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xd2, 0x03, + 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x54, 0x0a, + 0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x12, + 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, + 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1e, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, + 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, + 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, - 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, - 0x57, 0x0a, 0x0f, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, - 0x6c, 0x6c, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, + 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f, + 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x12, + 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x73, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, - 0x67, 0x2e, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, - 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x78, - 0x79, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, - 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, - 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, - 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, - 0x74, 0x72, 0x75, 0x6b, 0x74, 0x75, 0x72, 0x61, 0x67, 0x2f, 0x6e, 0x65, 0x78, 0x74, 0x63, 0x6c, - 0x6f, 0x75, 0x64, 0x2d, 0x73, 0x70, 0x72, 0x65, 0x65, 0x64, 0x2d, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x6c, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x25, 0x2e, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, + 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x0c, 0x50, 0x72, + 0x6f, 0x78, 0x79, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x73, 0x69, 0x67, + 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x73, 0x69, + 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, + 0x30, 0x01, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x73, 0x74, 0x72, 0x75, 0x6b, 0x74, 0x75, 0x72, 0x61, 0x67, 0x2f, 0x6e, 0x65, 0x78, 0x74, + 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2d, 0x73, 0x70, 0x72, 0x65, 0x65, 0x64, 0x2d, 0x73, 0x69, 0x67, + 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -503,31 +757,39 @@ func file_grpc_sessions_proto_rawDescGZIP() []byte { return file_grpc_sessions_proto_rawDescData } -var file_grpc_sessions_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_grpc_sessions_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_grpc_sessions_proto_goTypes = []any{ - (*LookupResumeIdRequest)(nil), // 0: signaling.LookupResumeIdRequest - (*LookupResumeIdReply)(nil), // 1: signaling.LookupResumeIdReply - (*LookupSessionIdRequest)(nil), // 2: signaling.LookupSessionIdRequest - (*LookupSessionIdReply)(nil), // 3: signaling.LookupSessionIdReply - (*IsSessionInCallRequest)(nil), // 4: signaling.IsSessionInCallRequest - (*IsSessionInCallReply)(nil), // 5: signaling.IsSessionInCallReply - (*ClientSessionMessage)(nil), // 6: signaling.ClientSessionMessage - (*ServerSessionMessage)(nil), // 7: signaling.ServerSessionMessage + (*LookupResumeIdRequest)(nil), // 0: signaling.LookupResumeIdRequest + (*LookupResumeIdReply)(nil), // 1: signaling.LookupResumeIdReply + (*LookupSessionIdRequest)(nil), // 2: signaling.LookupSessionIdRequest + (*LookupSessionIdReply)(nil), // 3: signaling.LookupSessionIdReply + (*IsSessionInCallRequest)(nil), // 4: signaling.IsSessionInCallRequest + (*IsSessionInCallReply)(nil), // 5: signaling.IsSessionInCallReply + (*GetInternalSessionsRequest)(nil), // 6: signaling.GetInternalSessionsRequest + (*InternalSessionData)(nil), // 7: signaling.InternalSessionData + (*VirtualSessionData)(nil), // 8: signaling.VirtualSessionData + (*GetInternalSessionsReply)(nil), // 9: signaling.GetInternalSessionsReply + (*ClientSessionMessage)(nil), // 10: signaling.ClientSessionMessage + (*ServerSessionMessage)(nil), // 11: signaling.ServerSessionMessage } var file_grpc_sessions_proto_depIdxs = []int32{ - 0, // 0: signaling.RpcSessions.LookupResumeId:input_type -> signaling.LookupResumeIdRequest - 2, // 1: signaling.RpcSessions.LookupSessionId:input_type -> signaling.LookupSessionIdRequest - 4, // 2: signaling.RpcSessions.IsSessionInCall:input_type -> signaling.IsSessionInCallRequest - 6, // 3: signaling.RpcSessions.ProxySession:input_type -> signaling.ClientSessionMessage - 1, // 4: signaling.RpcSessions.LookupResumeId:output_type -> signaling.LookupResumeIdReply - 3, // 5: signaling.RpcSessions.LookupSessionId:output_type -> signaling.LookupSessionIdReply - 5, // 6: signaling.RpcSessions.IsSessionInCall:output_type -> signaling.IsSessionInCallReply - 7, // 7: signaling.RpcSessions.ProxySession:output_type -> signaling.ServerSessionMessage - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 7, // 0: signaling.GetInternalSessionsReply.internalSessions:type_name -> signaling.InternalSessionData + 8, // 1: signaling.GetInternalSessionsReply.virtualSessions:type_name -> signaling.VirtualSessionData + 0, // 2: signaling.RpcSessions.LookupResumeId:input_type -> signaling.LookupResumeIdRequest + 2, // 3: signaling.RpcSessions.LookupSessionId:input_type -> signaling.LookupSessionIdRequest + 4, // 4: signaling.RpcSessions.IsSessionInCall:input_type -> signaling.IsSessionInCallRequest + 6, // 5: signaling.RpcSessions.GetInternalSessions:input_type -> signaling.GetInternalSessionsRequest + 10, // 6: signaling.RpcSessions.ProxySession:input_type -> signaling.ClientSessionMessage + 1, // 7: signaling.RpcSessions.LookupResumeId:output_type -> signaling.LookupResumeIdReply + 3, // 8: signaling.RpcSessions.LookupSessionId:output_type -> signaling.LookupSessionIdReply + 5, // 9: signaling.RpcSessions.IsSessionInCall:output_type -> signaling.IsSessionInCallReply + 9, // 10: signaling.RpcSessions.GetInternalSessions:output_type -> signaling.GetInternalSessionsReply + 11, // 11: signaling.RpcSessions.ProxySession:output_type -> signaling.ServerSessionMessage + 7, // [7:12] is the sub-list for method output_type + 2, // [2:7] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_grpc_sessions_proto_init() } @@ -541,7 +803,7 @@ func file_grpc_sessions_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_grpc_sessions_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, diff --git a/grpc_sessions.proto b/grpc_sessions.proto index 4dbfab42..3d29dfef 100644 --- a/grpc_sessions.proto +++ b/grpc_sessions.proto @@ -29,6 +29,7 @@ service RpcSessions { rpc LookupResumeId(LookupResumeIdRequest) returns (LookupResumeIdReply) {} rpc LookupSessionId(LookupSessionIdRequest) returns (LookupSessionIdReply) {} rpc IsSessionInCall(IsSessionInCallRequest) returns (IsSessionInCallReply) {} + rpc GetInternalSessions(GetInternalSessionsRequest) returns (GetInternalSessionsReply) {} rpc ProxySession(stream ClientSessionMessage) returns (stream ServerSessionMessage) {} } @@ -60,6 +61,27 @@ message IsSessionInCallReply { bool inCall = 1; } +message GetInternalSessionsRequest { + string roomId = 1; + string backendUrl = 2; +} + +message InternalSessionData { + string sessionId = 1; + uint32 inCall = 2; + repeated string features = 3; +} + +message VirtualSessionData { + string sessionId = 1; + uint32 inCall = 2; +} + +message GetInternalSessionsReply { + repeated InternalSessionData internalSessions = 1; + repeated VirtualSessionData virtualSessions = 2; +} + message ClientSessionMessage { bytes message = 1; } diff --git a/grpc_sessions_grpc.pb.go b/grpc_sessions_grpc.pb.go index 7898269a..8b9c6a1c 100644 --- a/grpc_sessions_grpc.pb.go +++ b/grpc_sessions_grpc.pb.go @@ -37,10 +37,11 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - RpcSessions_LookupResumeId_FullMethodName = "/signaling.RpcSessions/LookupResumeId" - RpcSessions_LookupSessionId_FullMethodName = "/signaling.RpcSessions/LookupSessionId" - RpcSessions_IsSessionInCall_FullMethodName = "/signaling.RpcSessions/IsSessionInCall" - RpcSessions_ProxySession_FullMethodName = "/signaling.RpcSessions/ProxySession" + RpcSessions_LookupResumeId_FullMethodName = "/signaling.RpcSessions/LookupResumeId" + RpcSessions_LookupSessionId_FullMethodName = "/signaling.RpcSessions/LookupSessionId" + RpcSessions_IsSessionInCall_FullMethodName = "/signaling.RpcSessions/IsSessionInCall" + RpcSessions_GetInternalSessions_FullMethodName = "/signaling.RpcSessions/GetInternalSessions" + RpcSessions_ProxySession_FullMethodName = "/signaling.RpcSessions/ProxySession" ) // RpcSessionsClient is the client API for RpcSessions service. @@ -50,6 +51,7 @@ type RpcSessionsClient interface { LookupResumeId(ctx context.Context, in *LookupResumeIdRequest, opts ...grpc.CallOption) (*LookupResumeIdReply, error) LookupSessionId(ctx context.Context, in *LookupSessionIdRequest, opts ...grpc.CallOption) (*LookupSessionIdReply, error) IsSessionInCall(ctx context.Context, in *IsSessionInCallRequest, opts ...grpc.CallOption) (*IsSessionInCallReply, error) + GetInternalSessions(ctx context.Context, in *GetInternalSessionsRequest, opts ...grpc.CallOption) (*GetInternalSessionsReply, error) ProxySession(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ClientSessionMessage, ServerSessionMessage], error) } @@ -91,6 +93,16 @@ func (c *rpcSessionsClient) IsSessionInCall(ctx context.Context, in *IsSessionIn return out, nil } +func (c *rpcSessionsClient) GetInternalSessions(ctx context.Context, in *GetInternalSessionsRequest, opts ...grpc.CallOption) (*GetInternalSessionsReply, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetInternalSessionsReply) + err := c.cc.Invoke(ctx, RpcSessions_GetInternalSessions_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *rpcSessionsClient) ProxySession(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ClientSessionMessage, ServerSessionMessage], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &RpcSessions_ServiceDesc.Streams[0], RpcSessions_ProxySession_FullMethodName, cOpts...) @@ -111,6 +123,7 @@ type RpcSessionsServer interface { LookupResumeId(context.Context, *LookupResumeIdRequest) (*LookupResumeIdReply, error) LookupSessionId(context.Context, *LookupSessionIdRequest) (*LookupSessionIdReply, error) IsSessionInCall(context.Context, *IsSessionInCallRequest) (*IsSessionInCallReply, error) + GetInternalSessions(context.Context, *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) ProxySession(grpc.BidiStreamingServer[ClientSessionMessage, ServerSessionMessage]) error mustEmbedUnimplementedRpcSessionsServer() } @@ -131,6 +144,9 @@ func (UnimplementedRpcSessionsServer) LookupSessionId(context.Context, *LookupSe func (UnimplementedRpcSessionsServer) IsSessionInCall(context.Context, *IsSessionInCallRequest) (*IsSessionInCallReply, error) { return nil, status.Errorf(codes.Unimplemented, "method IsSessionInCall not implemented") } +func (UnimplementedRpcSessionsServer) GetInternalSessions(context.Context, *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetInternalSessions not implemented") +} func (UnimplementedRpcSessionsServer) ProxySession(grpc.BidiStreamingServer[ClientSessionMessage, ServerSessionMessage]) error { return status.Errorf(codes.Unimplemented, "method ProxySession not implemented") } @@ -209,6 +225,24 @@ func _RpcSessions_IsSessionInCall_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _RpcSessions_GetInternalSessions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetInternalSessionsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RpcSessionsServer).GetInternalSessions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: RpcSessions_GetInternalSessions_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RpcSessionsServer).GetInternalSessions(ctx, req.(*GetInternalSessionsRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _RpcSessions_ProxySession_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(RpcSessionsServer).ProxySession(&grpc.GenericServerStream[ClientSessionMessage, ServerSessionMessage]{ServerStream: stream}) } @@ -235,6 +269,10 @@ var RpcSessions_ServiceDesc = grpc.ServiceDesc{ MethodName: "IsSessionInCall", Handler: _RpcSessions_IsSessionInCall_Handler, }, + { + MethodName: "GetInternalSessions", + Handler: _RpcSessions_GetInternalSessions_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/hub.go b/hub.go index 4ebc4164..1720fe61 100644 --- a/hub.go +++ b/hub.go @@ -648,6 +648,9 @@ func (h *Hub) GetDialoutSession(roomId string, backend *Backend) *ClientSession } func (h *Hub) GetBackend(u *url.URL) *Backend { + if u == nil { + return h.backend.GetCompatBackend() + } return h.backend.GetBackend(u) } @@ -1629,7 +1632,7 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) { return } - if room := h.getRoomForBackend(roomId, session.Backend()); room != nil && room.HasSession(session) { + if room := h.GetRoomForBackend(roomId, session.Backend()); room != nil && room.HasSession(session) { // Session already is in that room, no action needed. roomSessionId := message.Room.SessionId if roomSessionId == "" { @@ -1770,7 +1773,7 @@ func (h *Hub) publishFederatedSessions() (int, *sync.WaitGroup) { return count, &wg } -func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room { +func (h *Hub) GetRoomForBackend(id string, backend *Backend) *Room { internalRoomId := getRoomIdForBackend(id, backend) h.ru.RLock() @@ -2256,7 +2259,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) { switch msg.Type { case "addsession": msg := msg.AddSession - room := h.getRoomForBackend(msg.RoomId, session.Backend()) + room := h.GetRoomForBackend(msg.RoomId, session.Backend()) if room == nil { log.Printf("Ignore add session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) return @@ -2333,7 +2336,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) { room.AddSession(sess, nil) case "updatesession": msg := msg.UpdateSession - room := h.getRoomForBackend(msg.RoomId, session.Backend()) + room := h.GetRoomForBackend(msg.RoomId, session.Backend()) if room == nil { log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) return @@ -2371,7 +2374,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) { } case "removesession": msg := msg.RemoveSession - room := h.getRoomForBackend(msg.RoomId, session.Backend()) + room := h.GetRoomForBackend(msg.RoomId, session.Backend()) if room == nil { log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) return diff --git a/hub_test.go b/hub_test.go index 69454e5b..6f835cf2 100644 --- a/hub_test.go +++ b/hub_test.go @@ -4488,6 +4488,7 @@ func TestVirtualClientSessions(t *testing.T) { } virtualSession := virtualSessions[0] + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { assert.NoError(client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId)) } @@ -4625,6 +4626,290 @@ func TestVirtualClientSessions(t *testing.T) { } } +func TestDuplicateVirtualSessions(t *testing.T) { + CatchLogForTest(t) + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + t.Parallel() + require := require.New(t) + assert := assert.New(t) + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) + + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + + require.NoError(client1.SendHello(testDefaultUserId)) + + hello1, err := client1.RunUntilHello(ctx) + require.NoError(err) + + roomId := "test-room" + _, err = client1.JoinRoom(ctx, roomId) + require.NoError(err) + + assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello)) + + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + + require.NoError(client2.SendHelloInternal()) + + hello2, err := client2.RunUntilHello(ctx) + require.NoError(err) + session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession) + require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId) + + _, err = client2.JoinRoom(ctx, roomId) + require.NoError(err) + + assert.NoError(client1.RunUntilJoined(ctx, hello2.Hello)) + + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 1) { + assert.Equal(true, msg.Users[0]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(3, msg.Users[0]["inCall"], "%+v", msg) + } + } + } + + _, unexpected, err := client2.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello) + assert.NoError(err) + + if len(unexpected) == 0 { + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + unexpected = append(unexpected, msg) + } + } + + require.Len(unexpected, 1) + if msg, err := checkMessageParticipantsInCall(unexpected[0]); assert.NoError(err) { + if assert.Len(msg.Users, 1) { + assert.Equal(true, msg.Users[0]["internal"]) + assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"]) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"]) + } + } + + calledCtx, calledCancel := context.WithTimeout(ctx, time.Second) + + virtualSessionId := "virtual-session-id" + virtualUserId := "virtual-user-id" + generatedSessionId := GetVirtualSessionId(session2, virtualSessionId) + + setSessionRequestHandler(t, func(request *BackendClientSessionRequest) { + defer calledCancel() + assert.Equal("add", request.Action, "%+v", request) + assert.Equal(roomId, request.RoomId, "%+v", request) + assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request) + assert.Equal(virtualUserId, request.UserId, "%+v", request) + }) + + require.NoError(client2.SendInternalAddSession(&AddSessionInternalClientMessage{ + CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{ + SessionId: virtualSessionId, + RoomId: roomId, + }, + UserId: virtualUserId, + Flags: FLAG_MUTED_SPEAKING, + })) + <-calledCtx.Done() + if err := calledCtx.Err(); err != nil { + require.ErrorIs(err, context.Canceled) + } + + virtualSessions := session2.GetVirtualSessions() + for len(virtualSessions) == 0 { + time.Sleep(time.Millisecond) + virtualSessions = session2.GetVirtualSessions() + } + + virtualSession := virtualSessions[0] + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + assert.NoError(client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId)) + } + + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 2) { + assert.Equal(true, msg.Users[0]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg) + + assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg) + } + } + } + + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) { + assert.Equal(roomId, flags.RoomId) + assert.Equal(virtualSession.PublicId(), flags.SessionId) + assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags) + } + } + + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.NoError(client2.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId)) + } + + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 2) { + assert.Equal(true, msg.Users[0]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg) + + assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg) + } + } + } + + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) { + assert.Equal(roomId, flags.RoomId) + assert.Equal(virtualSession.PublicId(), flags.SessionId) + assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags) + } + } + + msg := &BackendServerRoomRequest{ + Type: "incall", + InCall: &BackendRoomInCallRequest{ + InCall: []byte("0"), + Users: []map[string]interface{}{ + { + "sessionId": virtualSession.PublicId(), + "participantPermissions": 246, + "participantType": 4, + "lastPing": 123456789, + }, + { + // Request is coming from Nextcloud, so use its session id (which is our "room session id"). + "sessionId": roomId + "-" + hello1.Hello.SessionId, + "participantPermissions": 254, + "participantType": 1, + "lastPing": 234567890, + }, + }, + }, + } + + data, err := json.Marshal(msg) + require.NoError(err) + res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId, data) + require.NoError(err) + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + assert.NoError(err) + assert.Equal(http.StatusOK, res.StatusCode, "Expected successful request, got %s", string(body)) + + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 3) { + assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg) + assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg) + assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg) + + assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg) + assert.Nil(msg.Users[1]["inCall"], "%+v", msg) + assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg) + assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg) + + assert.Equal(true, msg.Users[2]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg) + } + } + } + + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 3) { + assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg) + assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg) + assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg) + + assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg) + assert.Nil(msg.Users[1]["inCall"], "%+v", msg) + assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg) + assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg) + + assert.Equal(true, msg.Users[2]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg) + } + } + } + + client1.Close() + assert.NoError(client1.WaitForClientRemoved(ctx)) + + client3 := NewTestClient(t, server1, hub1) + defer client3.CloseWithBye() + + require.NoError(client3.SendHelloResume(hello1.Hello.ResumeId)) + if hello3, err := client3.RunUntilHello(ctx); assert.NoError(err) { + assert.Equal(testDefaultUserId, hello3.Hello.UserId, "%+v", hello3.Hello) + assert.Equal(hello1.Hello.SessionId, hello3.Hello.SessionId, "%+v", hello3.Hello) + assert.Equal(hello1.Hello.ResumeId, hello3.Hello.ResumeId, "%+v", hello3.Hello) + } + + if msg, err := client3.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 3) { + assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg) + assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg) + assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg) + + assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg) + assert.Nil(msg.Users[1]["inCall"], "%+v", msg) + assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg) + assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg) + + assert.Equal(true, msg.Users[2]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg) + } + } + } + + setSessionRequestHandler(t, func(request *BackendClientSessionRequest) { + defer calledCancel() + assert.Equal("remove", request.Action, "%+v", request) + assert.Equal(roomId, request.RoomId, "%+v", request) + assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request) + assert.Equal(virtualUserId, request.UserId, "%+v", request) + }) + }) + } +} + func DoTestSwitchToOne(t *testing.T, details map[string]interface{}) { CatchLogForTest(t) for _, subtest := range clusteredTests { diff --git a/mcu_proxy_test.go b/mcu_proxy_test.go index cfdb865f..f16ccfb3 100644 --- a/mcu_proxy_test.go +++ b/mcu_proxy_test.go @@ -1346,6 +1346,10 @@ func (h *mockGrpcServerHub) GetBackend(u *url.URL) *Backend { return nil } +func (h *mockGrpcServerHub) GetRoomForBackend(roomId string, backend *Backend) *Room { + return nil +} + func Test_ProxyRemotePublisher(t *testing.T) { CatchLogForTest(t) t.Parallel() diff --git a/room.go b/room.go index 19a9ff50..82ea8a0a 100644 --- a/room.go +++ b/room.go @@ -71,7 +71,7 @@ type Room struct { mu *sync.RWMutex sessions map[string]Session - internalSessions map[Session]bool + internalSessions map[*ClientSession]bool virtualSessions map[*VirtualSession]bool inCallSessions map[Session]bool roomSessionData map[string]*RoomSessionData @@ -108,7 +108,7 @@ func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEv mu: &sync.RWMutex{}, sessions: make(map[string]Session), - internalSessions: make(map[Session]bool), + internalSessions: make(map[*ClientSession]bool), virtualSessions: make(map[*VirtualSession]bool), inCallSessions: make(map[Session]bool), roomSessionData: make(map[string]*RoomSessionData), @@ -290,13 +290,19 @@ func (r *Room) AddSession(session Session, sessionData json.RawMessage) { var publishUsersChanged bool switch session.ClientType() { case HelloClientTypeInternal: - r.internalSessions[session] = true + clientSession, ok := session.(*ClientSession) + if !ok { + delete(r.sessions, sid) + r.mu.Unlock() + panic(fmt.Sprintf("Expected a client session, got %v (%T)", session, session)) + } + r.internalSessions[clientSession] = true case HelloClientTypeVirtual: virtualSession, ok := session.(*VirtualSession) if !ok { delete(r.sessions, sid) r.mu.Unlock() - panic(fmt.Sprintf("Expected a virtual session, got %v", session)) + panic(fmt.Sprintf("Expected a virtual session, got %v (%T)", session, session)) } r.virtualSessions[virtualSession] = true publishUsersChanged = true @@ -447,11 +453,21 @@ func (r *Room) RemoveSession(session Session) bool { sid := session.PublicId() r.statsRoomSessionsCurrent.With(prometheus.Labels{"clienttype": session.ClientType()}).Dec() delete(r.sessions, sid) - delete(r.internalSessions, session) if virtualSession, ok := session.(*VirtualSession); ok { delete(r.virtualSessions, virtualSession) + // Handle case where virtual session was also sent by Nextcloud. + users := make([]map[string]interface{}, 0, len(r.users)) + for _, u := range r.users { + if u["sessionId"] != sid { + users = append(users, u) + } + } + if len(users) != len(r.users) { + r.users = users + } } if clientSession, ok := session.(*ClientSession); ok { + delete(r.internalSessions, clientSession) r.transientData.RemoveListener(clientSession) } delete(r.inCallSessions, session) @@ -568,10 +584,66 @@ func (r *Room) PublishSessionLeft(session Session) { } } +func (r *Room) getClusteredInternalSessionsRLocked() (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData) { + if r.hub.rpcClients == nil { + return nil, nil + } + + r.mu.RUnlock() + defer r.mu.RLock() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var mu sync.Mutex + var wg sync.WaitGroup + for _, client := range r.hub.rpcClients.GetClients() { + wg.Add(1) + go func(c *GrpcClient) { + defer wg.Done() + + clientInternal, clientVirtual, err := c.GetInternalSessions(ctx, r.Id(), r.Backend()) + if err != nil { + log.Printf("Received error while getting internal sessions for %s@%s from %s: %s", r.Id(), r.Backend().Id(), c.Target(), err) + return + } + + mu.Lock() + defer mu.Unlock() + if internal == nil { + internal = make(map[string]*InternalSessionData, len(clientInternal)) + } + for sid, s := range clientInternal { + internal[sid] = s + } + if virtual == nil { + virtual = make(map[string]*VirtualSessionData, len(clientVirtual)) + } + for sid, s := range clientVirtual { + virtual[sid] = s + } + }(client) + } + wg.Wait() + + return +} + func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string]interface{} { now := time.Now().Unix() - r.mu.Lock() - defer r.mu.Unlock() + r.mu.RLock() + defer r.mu.RUnlock() + if len(users) == 0 && len(r.internalSessions) == 0 && len(r.virtualSessions) == 0 { + return users + } + + clusteredInternalSessions, clusteredVirtualSessions := r.getClusteredInternalSessionsRLocked() + + // Local sessions might have changed while waiting for clustered information. + if len(users) == 0 && len(r.internalSessions) == 0 && len(r.virtualSessions) == 0 { + return users + } + + skipSession := make(map[string]bool) for _, user := range users { sessionid, found := user["sessionId"] if !found || sessionid == "" { @@ -581,21 +653,69 @@ func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string] if userid, found := user["userId"]; !found || userid == "" { if roomSessionData, found := r.roomSessionData[sessionid.(string)]; found { user["userId"] = roomSessionData.UserId + } else if sid, ok := sessionid.(string); ok { + if entry, found := clusteredVirtualSessions[sid]; found { + user["virtual"] = true + user["inCall"] = entry.GetInCall() + skipSession[sid] = true + } else { + for session := range r.virtualSessions { + if session.PublicId() == sid { + user["virtual"] = true + user["inCall"] = session.GetInCall() + skipSession[sid] = true + break + } + } + } } } } for session := range r.internalSessions { - users = append(users, map[string]interface{}{ - "inCall": session.(*ClientSession).GetInCall(), + u := map[string]interface{}{ + "inCall": session.GetInCall(), "sessionId": session.PublicId(), "lastPing": now, "internal": true, - }) + } + if f := session.GetFeatures(); len(f) > 0 { + u["features"] = f + } + users = append(users, u) + } + for _, session := range clusteredInternalSessions { + u := map[string]interface{}{ + "inCall": session.GetInCall(), + "sessionId": session.GetSessionId(), + "lastPing": now, + "internal": true, + } + if f := session.GetFeatures(); len(f) > 0 { + u["features"] = f + } + users = append(users, u) } for session := range r.virtualSessions { + sid := session.PublicId() + if skipSession[sid] { + continue + } + skipSession[sid] = true users = append(users, map[string]interface{}{ "inCall": session.GetInCall(), - "sessionId": session.PublicId(), + "sessionId": sid, + "lastPing": now, + "virtual": true, + }) + } + for sid, session := range clusteredVirtualSessions { + if skipSession[sid] { + continue + } + + users = append(users, map[string]interface{}{ + "inCall": session.GetInCall(), + "sessionId": sid, "lastPing": now, "virtual": true, }) @@ -974,7 +1094,7 @@ func (r *Room) publishActiveSessions() (int, *sync.WaitGroup) { } func (r *Room) publishRoomMessage(message *BackendRoomMessageRequest) { - if message == nil || message.Data == nil { + if message == nil || len(message.Data) == 0 { return } @@ -1062,10 +1182,10 @@ func (r *Room) notifyInternalRoomDeleted() { }, } - r.mu.Lock() - defer r.mu.Unlock() + r.mu.RLock() + defer r.mu.RUnlock() for s := range r.internalSessions { - s.(*ClientSession).SendMessage(msg) + s.SendMessage(msg) } }