diff --git a/grpc_client.go b/grpc_client.go index 0774ed70..24d2b0cd 100644 --- a/grpc_client.go +++ b/grpc_client.go @@ -249,6 +249,36 @@ func (c *GrpcClient) IsSessionInCall(ctx context.Context, sessionId string, room return response.GetInCall(), nil } +func (c *GrpcClient) GetInternalSessions(ctx context.Context, roomId string, backend *Backend) (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData, err error) { + statsGrpcClientCalls.WithLabelValues("GetInternalSessions").Inc() + // TODO: Remove debug logging + log.Printf("Get internal sessions for %s@%s on %s", roomId, backend.Id(), c.Target()) + response, err := c.impl.GetInternalSessions(ctx, &GetInternalSessionsRequest{ + RoomId: roomId, + BackendUrl: backend.Url(), + }, grpc.WaitForReady(true)) + if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { + return nil, nil, nil + } else if err != nil { + return nil, nil, err + } + + if len(response.InternalSessions) > 0 { + internal = make(map[string]*InternalSessionData, len(response.InternalSessions)) + for _, s := range response.InternalSessions { + internal[s.SessionId] = s + } + } + if len(response.VirtualSessions) > 0 { + virtual = make(map[string]*VirtualSessionData, len(response.VirtualSessions)) + for _, s := range response.VirtualSessions { + virtual[s.SessionId] = s + } + } + + return +} + func (c *GrpcClient) GetPublisherId(ctx context.Context, sessionId string, streamType StreamType) (string, string, net.IP, error) { statsGrpcClientCalls.WithLabelValues("GetPublisherId").Inc() // TODO: Remove debug logging diff --git a/grpc_server.go b/grpc_server.go index 0ccb1020..44674954 100644 --- a/grpc_server.go +++ b/grpc_server.go @@ -59,6 +59,7 @@ type GrpcServerHub interface { GetSessionByResumeId(resumeId string) Session GetSessionByPublicId(sessionId string) Session GetSessionIdByRoomSessionId(roomSessionId string) (string, error) + GetRoomForBackend(roomId string, backend *Backend) *Room GetBackend(u *url.URL) *Backend } @@ -185,6 +186,52 @@ func (s *GrpcServer) IsSessionInCall(ctx context.Context, request *IsSessionInCa return result, nil } +func (s *GrpcServer) GetInternalSessions(ctx context.Context, request *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) { + statsGrpcServerCalls.WithLabelValues("GetInternalSessions").Inc() + // TODO: Remove debug logging + log.Printf("Get internal sessions from %s on %s", request.RoomId, request.BackendUrl) + + var u *url.URL + if request.BackendUrl != "" { + var err error + u, err = url.Parse(request.BackendUrl) + if err != nil { + return nil, status.Error(codes.InvalidArgument, "invalid url") + } + } + + backend := s.hub.GetBackend(u) + if backend == nil { + return nil, status.Error(codes.NotFound, "no such backend") + } + + room := s.hub.GetRoomForBackend(request.RoomId, backend) + if room == nil { + return nil, status.Error(codes.NotFound, "no such room") + } + + result := &GetInternalSessionsReply{} + room.mu.RLock() + defer room.mu.RUnlock() + + for session := range room.internalSessions { + result.InternalSessions = append(result.InternalSessions, &InternalSessionData{ + SessionId: session.PublicId(), + InCall: uint32(session.GetInCall()), + Features: session.GetFeatures(), + }) + } + + for session := range room.virtualSessions { + result.VirtualSessions = append(result.VirtualSessions, &VirtualSessionData{ + SessionId: session.PublicId(), + InCall: uint32(session.GetInCall()), + }) + } + + return result, nil +} + func (s *GrpcServer) GetPublisherId(ctx context.Context, request *GetPublisherIdRequest) (*GetPublisherIdReply, error) { statsGrpcServerCalls.WithLabelValues("GetPublisherId").Inc() // TODO: Remove debug logging diff --git a/grpc_sessions.pb.go b/grpc_sessions.pb.go index 22baa11a..e9fc9b82 100644 --- a/grpc_sessions.pb.go +++ b/grpc_sessions.pb.go @@ -333,6 +333,226 @@ func (x *IsSessionInCallReply) GetInCall() bool { return false } +type GetInternalSessionsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RoomId string `protobuf:"bytes,1,opt,name=roomId,proto3" json:"roomId,omitempty"` + BackendUrl string `protobuf:"bytes,2,opt,name=backendUrl,proto3" json:"backendUrl,omitempty"` +} + +func (x *GetInternalSessionsRequest) Reset() { + *x = GetInternalSessionsRequest{} + mi := &file_grpc_sessions_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetInternalSessionsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetInternalSessionsRequest) ProtoMessage() {} + +func (x *GetInternalSessionsRequest) ProtoReflect() protoreflect.Message { + mi := &file_grpc_sessions_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetInternalSessionsRequest.ProtoReflect.Descriptor instead. +func (*GetInternalSessionsRequest) Descriptor() ([]byte, []int) { + return file_grpc_sessions_proto_rawDescGZIP(), []int{6} +} + +func (x *GetInternalSessionsRequest) GetRoomId() string { + if x != nil { + return x.RoomId + } + return "" +} + +func (x *GetInternalSessionsRequest) GetBackendUrl() string { + if x != nil { + return x.BackendUrl + } + return "" +} + +type InternalSessionData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SessionId string `protobuf:"bytes,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + InCall uint32 `protobuf:"varint,2,opt,name=inCall,proto3" json:"inCall,omitempty"` + Features []string `protobuf:"bytes,3,rep,name=features,proto3" json:"features,omitempty"` +} + +func (x *InternalSessionData) Reset() { + *x = InternalSessionData{} + mi := &file_grpc_sessions_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *InternalSessionData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InternalSessionData) ProtoMessage() {} + +func (x *InternalSessionData) ProtoReflect() protoreflect.Message { + mi := &file_grpc_sessions_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InternalSessionData.ProtoReflect.Descriptor instead. +func (*InternalSessionData) Descriptor() ([]byte, []int) { + return file_grpc_sessions_proto_rawDescGZIP(), []int{7} +} + +func (x *InternalSessionData) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + +func (x *InternalSessionData) GetInCall() uint32 { + if x != nil { + return x.InCall + } + return 0 +} + +func (x *InternalSessionData) GetFeatures() []string { + if x != nil { + return x.Features + } + return nil +} + +type VirtualSessionData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SessionId string `protobuf:"bytes,1,opt,name=sessionId,proto3" json:"sessionId,omitempty"` + InCall uint32 `protobuf:"varint,2,opt,name=inCall,proto3" json:"inCall,omitempty"` +} + +func (x *VirtualSessionData) Reset() { + *x = VirtualSessionData{} + mi := &file_grpc_sessions_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *VirtualSessionData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VirtualSessionData) ProtoMessage() {} + +func (x *VirtualSessionData) ProtoReflect() protoreflect.Message { + mi := &file_grpc_sessions_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VirtualSessionData.ProtoReflect.Descriptor instead. +func (*VirtualSessionData) Descriptor() ([]byte, []int) { + return file_grpc_sessions_proto_rawDescGZIP(), []int{8} +} + +func (x *VirtualSessionData) GetSessionId() string { + if x != nil { + return x.SessionId + } + return "" +} + +func (x *VirtualSessionData) GetInCall() uint32 { + if x != nil { + return x.InCall + } + return 0 +} + +type GetInternalSessionsReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InternalSessions []*InternalSessionData `protobuf:"bytes,1,rep,name=internalSessions,proto3" json:"internalSessions,omitempty"` + VirtualSessions []*VirtualSessionData `protobuf:"bytes,2,rep,name=virtualSessions,proto3" json:"virtualSessions,omitempty"` +} + +func (x *GetInternalSessionsReply) Reset() { + *x = GetInternalSessionsReply{} + mi := &file_grpc_sessions_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetInternalSessionsReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetInternalSessionsReply) ProtoMessage() {} + +func (x *GetInternalSessionsReply) ProtoReflect() protoreflect.Message { + mi := &file_grpc_sessions_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetInternalSessionsReply.ProtoReflect.Descriptor instead. +func (*GetInternalSessionsReply) Descriptor() ([]byte, []int) { + return file_grpc_sessions_proto_rawDescGZIP(), []int{9} +} + +func (x *GetInternalSessionsReply) GetInternalSessions() []*InternalSessionData { + if x != nil { + return x.InternalSessions + } + return nil +} + +func (x *GetInternalSessionsReply) GetVirtualSessions() []*VirtualSessionData { + if x != nil { + return x.VirtualSessions + } + return nil +} + type ClientSessionMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -343,7 +563,7 @@ type ClientSessionMessage struct { func (x *ClientSessionMessage) Reset() { *x = ClientSessionMessage{} - mi := &file_grpc_sessions_proto_msgTypes[6] + mi := &file_grpc_sessions_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -355,7 +575,7 @@ func (x *ClientSessionMessage) String() string { func (*ClientSessionMessage) ProtoMessage() {} func (x *ClientSessionMessage) ProtoReflect() protoreflect.Message { - mi := &file_grpc_sessions_proto_msgTypes[6] + mi := &file_grpc_sessions_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -368,7 +588,7 @@ func (x *ClientSessionMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ClientSessionMessage.ProtoReflect.Descriptor instead. func (*ClientSessionMessage) Descriptor() ([]byte, []int) { - return file_grpc_sessions_proto_rawDescGZIP(), []int{6} + return file_grpc_sessions_proto_rawDescGZIP(), []int{10} } func (x *ClientSessionMessage) GetMessage() []byte { @@ -388,7 +608,7 @@ type ServerSessionMessage struct { func (x *ServerSessionMessage) Reset() { *x = ServerSessionMessage{} - mi := &file_grpc_sessions_proto_msgTypes[7] + mi := &file_grpc_sessions_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -400,7 +620,7 @@ func (x *ServerSessionMessage) String() string { func (*ServerSessionMessage) ProtoMessage() {} func (x *ServerSessionMessage) ProtoReflect() protoreflect.Message { - mi := &file_grpc_sessions_proto_msgTypes[7] + mi := &file_grpc_sessions_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -413,7 +633,7 @@ func (x *ServerSessionMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ServerSessionMessage.ProtoReflect.Descriptor instead. func (*ServerSessionMessage) Descriptor() ([]byte, []int) { - return file_grpc_sessions_proto_rawDescGZIP(), []int{7} + return file_grpc_sessions_proto_rawDescGZIP(), []int{11} } func (x *ServerSessionMessage) GetMessage() []byte { @@ -454,41 +674,75 @@ var file_grpc_sessions_proto_rawDesc = []byte{ 0x52, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55, 0x72, 0x6c, 0x22, 0x2e, 0x0a, 0x14, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0x30, 0x0a, 0x14, - 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x30, - 0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x32, 0xed, 0x02, 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x54, 0x0a, 0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, - 0x49, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, - 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, - 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, - 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0x54, 0x0a, 0x1a, + 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x6f, + 0x6f, 0x6d, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x6f, 0x6f, 0x6d, + 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55, 0x72, 0x6c, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x55, + 0x72, 0x6c, 0x22, 0x67, 0x0a, 0x13, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, + 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x12, + 0x1a, 0x0a, 0x08, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x08, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0x4a, 0x0a, 0x12, 0x56, + 0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, + 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, + 0x16, 0x0a, 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x06, 0x69, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x22, 0xaf, 0x01, 0x0a, 0x18, 0x47, 0x65, 0x74, 0x49, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x12, 0x4a, 0x0a, 0x10, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, + 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x52, 0x10, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, + 0x12, 0x47, 0x0a, 0x0f, 0x76, 0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x69, 0x67, 0x6e, + 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x56, 0x69, 0x72, 0x74, 0x75, 0x61, 0x6c, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x61, 0x74, 0x61, 0x52, 0x0f, 0x76, 0x69, 0x72, 0x74, 0x75, 0x61, + 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x30, 0x0a, 0x14, 0x43, 0x6c, 0x69, + 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x30, 0x0a, 0x14, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xd2, 0x03, + 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x54, 0x0a, + 0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x12, + 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, + 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1e, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, + 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, + 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, - 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x53, - 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, - 0x57, 0x0a, 0x0f, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, - 0x6c, 0x6c, 0x12, 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, + 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0f, + 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x12, + 0x21, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x73, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, - 0x67, 0x2e, 0x49, 0x73, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x43, 0x61, 0x6c, - 0x6c, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x78, - 0x79, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, 0x73, 0x69, - 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, - 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, 0x73, 0x73, - 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, - 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, - 0x74, 0x72, 0x75, 0x6b, 0x74, 0x75, 0x72, 0x61, 0x67, 0x2f, 0x6e, 0x65, 0x78, 0x74, 0x63, 0x6c, - 0x6f, 0x75, 0x64, 0x2d, 0x73, 0x70, 0x72, 0x65, 0x65, 0x64, 0x2d, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x6c, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x25, 0x2e, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, + 0x47, 0x65, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x0c, 0x50, 0x72, + 0x6f, 0x78, 0x79, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x73, 0x69, 0x67, + 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x73, 0x69, + 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, + 0x30, 0x01, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x73, 0x74, 0x72, 0x75, 0x6b, 0x74, 0x75, 0x72, 0x61, 0x67, 0x2f, 0x6e, 0x65, 0x78, 0x74, + 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2d, 0x73, 0x70, 0x72, 0x65, 0x65, 0x64, 0x2d, 0x73, 0x69, 0x67, + 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -503,31 +757,39 @@ func file_grpc_sessions_proto_rawDescGZIP() []byte { return file_grpc_sessions_proto_rawDescData } -var file_grpc_sessions_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_grpc_sessions_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_grpc_sessions_proto_goTypes = []any{ - (*LookupResumeIdRequest)(nil), // 0: signaling.LookupResumeIdRequest - (*LookupResumeIdReply)(nil), // 1: signaling.LookupResumeIdReply - (*LookupSessionIdRequest)(nil), // 2: signaling.LookupSessionIdRequest - (*LookupSessionIdReply)(nil), // 3: signaling.LookupSessionIdReply - (*IsSessionInCallRequest)(nil), // 4: signaling.IsSessionInCallRequest - (*IsSessionInCallReply)(nil), // 5: signaling.IsSessionInCallReply - (*ClientSessionMessage)(nil), // 6: signaling.ClientSessionMessage - (*ServerSessionMessage)(nil), // 7: signaling.ServerSessionMessage + (*LookupResumeIdRequest)(nil), // 0: signaling.LookupResumeIdRequest + (*LookupResumeIdReply)(nil), // 1: signaling.LookupResumeIdReply + (*LookupSessionIdRequest)(nil), // 2: signaling.LookupSessionIdRequest + (*LookupSessionIdReply)(nil), // 3: signaling.LookupSessionIdReply + (*IsSessionInCallRequest)(nil), // 4: signaling.IsSessionInCallRequest + (*IsSessionInCallReply)(nil), // 5: signaling.IsSessionInCallReply + (*GetInternalSessionsRequest)(nil), // 6: signaling.GetInternalSessionsRequest + (*InternalSessionData)(nil), // 7: signaling.InternalSessionData + (*VirtualSessionData)(nil), // 8: signaling.VirtualSessionData + (*GetInternalSessionsReply)(nil), // 9: signaling.GetInternalSessionsReply + (*ClientSessionMessage)(nil), // 10: signaling.ClientSessionMessage + (*ServerSessionMessage)(nil), // 11: signaling.ServerSessionMessage } var file_grpc_sessions_proto_depIdxs = []int32{ - 0, // 0: signaling.RpcSessions.LookupResumeId:input_type -> signaling.LookupResumeIdRequest - 2, // 1: signaling.RpcSessions.LookupSessionId:input_type -> signaling.LookupSessionIdRequest - 4, // 2: signaling.RpcSessions.IsSessionInCall:input_type -> signaling.IsSessionInCallRequest - 6, // 3: signaling.RpcSessions.ProxySession:input_type -> signaling.ClientSessionMessage - 1, // 4: signaling.RpcSessions.LookupResumeId:output_type -> signaling.LookupResumeIdReply - 3, // 5: signaling.RpcSessions.LookupSessionId:output_type -> signaling.LookupSessionIdReply - 5, // 6: signaling.RpcSessions.IsSessionInCall:output_type -> signaling.IsSessionInCallReply - 7, // 7: signaling.RpcSessions.ProxySession:output_type -> signaling.ServerSessionMessage - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 7, // 0: signaling.GetInternalSessionsReply.internalSessions:type_name -> signaling.InternalSessionData + 8, // 1: signaling.GetInternalSessionsReply.virtualSessions:type_name -> signaling.VirtualSessionData + 0, // 2: signaling.RpcSessions.LookupResumeId:input_type -> signaling.LookupResumeIdRequest + 2, // 3: signaling.RpcSessions.LookupSessionId:input_type -> signaling.LookupSessionIdRequest + 4, // 4: signaling.RpcSessions.IsSessionInCall:input_type -> signaling.IsSessionInCallRequest + 6, // 5: signaling.RpcSessions.GetInternalSessions:input_type -> signaling.GetInternalSessionsRequest + 10, // 6: signaling.RpcSessions.ProxySession:input_type -> signaling.ClientSessionMessage + 1, // 7: signaling.RpcSessions.LookupResumeId:output_type -> signaling.LookupResumeIdReply + 3, // 8: signaling.RpcSessions.LookupSessionId:output_type -> signaling.LookupSessionIdReply + 5, // 9: signaling.RpcSessions.IsSessionInCall:output_type -> signaling.IsSessionInCallReply + 9, // 10: signaling.RpcSessions.GetInternalSessions:output_type -> signaling.GetInternalSessionsReply + 11, // 11: signaling.RpcSessions.ProxySession:output_type -> signaling.ServerSessionMessage + 7, // [7:12] is the sub-list for method output_type + 2, // [2:7] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_grpc_sessions_proto_init() } @@ -541,7 +803,7 @@ func file_grpc_sessions_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_grpc_sessions_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, diff --git a/grpc_sessions.proto b/grpc_sessions.proto index 4dbfab42..3d29dfef 100644 --- a/grpc_sessions.proto +++ b/grpc_sessions.proto @@ -29,6 +29,7 @@ service RpcSessions { rpc LookupResumeId(LookupResumeIdRequest) returns (LookupResumeIdReply) {} rpc LookupSessionId(LookupSessionIdRequest) returns (LookupSessionIdReply) {} rpc IsSessionInCall(IsSessionInCallRequest) returns (IsSessionInCallReply) {} + rpc GetInternalSessions(GetInternalSessionsRequest) returns (GetInternalSessionsReply) {} rpc ProxySession(stream ClientSessionMessage) returns (stream ServerSessionMessage) {} } @@ -60,6 +61,27 @@ message IsSessionInCallReply { bool inCall = 1; } +message GetInternalSessionsRequest { + string roomId = 1; + string backendUrl = 2; +} + +message InternalSessionData { + string sessionId = 1; + uint32 inCall = 2; + repeated string features = 3; +} + +message VirtualSessionData { + string sessionId = 1; + uint32 inCall = 2; +} + +message GetInternalSessionsReply { + repeated InternalSessionData internalSessions = 1; + repeated VirtualSessionData virtualSessions = 2; +} + message ClientSessionMessage { bytes message = 1; } diff --git a/grpc_sessions_grpc.pb.go b/grpc_sessions_grpc.pb.go index 7898269a..8b9c6a1c 100644 --- a/grpc_sessions_grpc.pb.go +++ b/grpc_sessions_grpc.pb.go @@ -37,10 +37,11 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - RpcSessions_LookupResumeId_FullMethodName = "/signaling.RpcSessions/LookupResumeId" - RpcSessions_LookupSessionId_FullMethodName = "/signaling.RpcSessions/LookupSessionId" - RpcSessions_IsSessionInCall_FullMethodName = "/signaling.RpcSessions/IsSessionInCall" - RpcSessions_ProxySession_FullMethodName = "/signaling.RpcSessions/ProxySession" + RpcSessions_LookupResumeId_FullMethodName = "/signaling.RpcSessions/LookupResumeId" + RpcSessions_LookupSessionId_FullMethodName = "/signaling.RpcSessions/LookupSessionId" + RpcSessions_IsSessionInCall_FullMethodName = "/signaling.RpcSessions/IsSessionInCall" + RpcSessions_GetInternalSessions_FullMethodName = "/signaling.RpcSessions/GetInternalSessions" + RpcSessions_ProxySession_FullMethodName = "/signaling.RpcSessions/ProxySession" ) // RpcSessionsClient is the client API for RpcSessions service. @@ -50,6 +51,7 @@ type RpcSessionsClient interface { LookupResumeId(ctx context.Context, in *LookupResumeIdRequest, opts ...grpc.CallOption) (*LookupResumeIdReply, error) LookupSessionId(ctx context.Context, in *LookupSessionIdRequest, opts ...grpc.CallOption) (*LookupSessionIdReply, error) IsSessionInCall(ctx context.Context, in *IsSessionInCallRequest, opts ...grpc.CallOption) (*IsSessionInCallReply, error) + GetInternalSessions(ctx context.Context, in *GetInternalSessionsRequest, opts ...grpc.CallOption) (*GetInternalSessionsReply, error) ProxySession(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ClientSessionMessage, ServerSessionMessage], error) } @@ -91,6 +93,16 @@ func (c *rpcSessionsClient) IsSessionInCall(ctx context.Context, in *IsSessionIn return out, nil } +func (c *rpcSessionsClient) GetInternalSessions(ctx context.Context, in *GetInternalSessionsRequest, opts ...grpc.CallOption) (*GetInternalSessionsReply, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetInternalSessionsReply) + err := c.cc.Invoke(ctx, RpcSessions_GetInternalSessions_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *rpcSessionsClient) ProxySession(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ClientSessionMessage, ServerSessionMessage], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &RpcSessions_ServiceDesc.Streams[0], RpcSessions_ProxySession_FullMethodName, cOpts...) @@ -111,6 +123,7 @@ type RpcSessionsServer interface { LookupResumeId(context.Context, *LookupResumeIdRequest) (*LookupResumeIdReply, error) LookupSessionId(context.Context, *LookupSessionIdRequest) (*LookupSessionIdReply, error) IsSessionInCall(context.Context, *IsSessionInCallRequest) (*IsSessionInCallReply, error) + GetInternalSessions(context.Context, *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) ProxySession(grpc.BidiStreamingServer[ClientSessionMessage, ServerSessionMessage]) error mustEmbedUnimplementedRpcSessionsServer() } @@ -131,6 +144,9 @@ func (UnimplementedRpcSessionsServer) LookupSessionId(context.Context, *LookupSe func (UnimplementedRpcSessionsServer) IsSessionInCall(context.Context, *IsSessionInCallRequest) (*IsSessionInCallReply, error) { return nil, status.Errorf(codes.Unimplemented, "method IsSessionInCall not implemented") } +func (UnimplementedRpcSessionsServer) GetInternalSessions(context.Context, *GetInternalSessionsRequest) (*GetInternalSessionsReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetInternalSessions not implemented") +} func (UnimplementedRpcSessionsServer) ProxySession(grpc.BidiStreamingServer[ClientSessionMessage, ServerSessionMessage]) error { return status.Errorf(codes.Unimplemented, "method ProxySession not implemented") } @@ -209,6 +225,24 @@ func _RpcSessions_IsSessionInCall_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _RpcSessions_GetInternalSessions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetInternalSessionsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RpcSessionsServer).GetInternalSessions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: RpcSessions_GetInternalSessions_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RpcSessionsServer).GetInternalSessions(ctx, req.(*GetInternalSessionsRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _RpcSessions_ProxySession_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(RpcSessionsServer).ProxySession(&grpc.GenericServerStream[ClientSessionMessage, ServerSessionMessage]{ServerStream: stream}) } @@ -235,6 +269,10 @@ var RpcSessions_ServiceDesc = grpc.ServiceDesc{ MethodName: "IsSessionInCall", Handler: _RpcSessions_IsSessionInCall_Handler, }, + { + MethodName: "GetInternalSessions", + Handler: _RpcSessions_GetInternalSessions_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/hub.go b/hub.go index 4ebc4164..1720fe61 100644 --- a/hub.go +++ b/hub.go @@ -648,6 +648,9 @@ func (h *Hub) GetDialoutSession(roomId string, backend *Backend) *ClientSession } func (h *Hub) GetBackend(u *url.URL) *Backend { + if u == nil { + return h.backend.GetCompatBackend() + } return h.backend.GetBackend(u) } @@ -1629,7 +1632,7 @@ func (h *Hub) processRoom(sess Session, message *ClientMessage) { return } - if room := h.getRoomForBackend(roomId, session.Backend()); room != nil && room.HasSession(session) { + if room := h.GetRoomForBackend(roomId, session.Backend()); room != nil && room.HasSession(session) { // Session already is in that room, no action needed. roomSessionId := message.Room.SessionId if roomSessionId == "" { @@ -1770,7 +1773,7 @@ func (h *Hub) publishFederatedSessions() (int, *sync.WaitGroup) { return count, &wg } -func (h *Hub) getRoomForBackend(id string, backend *Backend) *Room { +func (h *Hub) GetRoomForBackend(id string, backend *Backend) *Room { internalRoomId := getRoomIdForBackend(id, backend) h.ru.RLock() @@ -2256,7 +2259,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) { switch msg.Type { case "addsession": msg := msg.AddSession - room := h.getRoomForBackend(msg.RoomId, session.Backend()) + room := h.GetRoomForBackend(msg.RoomId, session.Backend()) if room == nil { log.Printf("Ignore add session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) return @@ -2333,7 +2336,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) { room.AddSession(sess, nil) case "updatesession": msg := msg.UpdateSession - room := h.getRoomForBackend(msg.RoomId, session.Backend()) + room := h.GetRoomForBackend(msg.RoomId, session.Backend()) if room == nil { log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) return @@ -2371,7 +2374,7 @@ func (h *Hub) processInternalMsg(sess Session, message *ClientMessage) { } case "removesession": msg := msg.RemoveSession - room := h.getRoomForBackend(msg.RoomId, session.Backend()) + room := h.GetRoomForBackend(msg.RoomId, session.Backend()) if room == nil { log.Printf("Ignore remove session message %+v for invalid room %s from %s", *msg, msg.RoomId, session.PublicId()) return diff --git a/hub_test.go b/hub_test.go index 69454e5b..6f835cf2 100644 --- a/hub_test.go +++ b/hub_test.go @@ -4488,6 +4488,7 @@ func TestVirtualClientSessions(t *testing.T) { } virtualSession := virtualSessions[0] + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { assert.NoError(client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId)) } @@ -4625,6 +4626,290 @@ func TestVirtualClientSessions(t *testing.T) { } } +func TestDuplicateVirtualSessions(t *testing.T) { + CatchLogForTest(t) + for _, subtest := range clusteredTests { + t.Run(subtest, func(t *testing.T) { + t.Parallel() + require := require.New(t) + assert := assert.New(t) + var hub1 *Hub + var hub2 *Hub + var server1 *httptest.Server + var server2 *httptest.Server + if isLocalTest(t) { + hub1, _, _, server1 = CreateHubForTest(t) + + hub2 = hub1 + server2 = server1 + } else { + hub1, hub2, server1, server2 = CreateClusteredHubsForTest(t) + } + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + client1 := NewTestClient(t, server1, hub1) + defer client1.CloseWithBye() + + require.NoError(client1.SendHello(testDefaultUserId)) + + hello1, err := client1.RunUntilHello(ctx) + require.NoError(err) + + roomId := "test-room" + _, err = client1.JoinRoom(ctx, roomId) + require.NoError(err) + + assert.NoError(client1.RunUntilJoined(ctx, hello1.Hello)) + + client2 := NewTestClient(t, server2, hub2) + defer client2.CloseWithBye() + + require.NoError(client2.SendHelloInternal()) + + hello2, err := client2.RunUntilHello(ctx) + require.NoError(err) + session2 := hub2.GetSessionByPublicId(hello2.Hello.SessionId).(*ClientSession) + require.NotNil(session2, "Session %s does not exist", hello2.Hello.SessionId) + + _, err = client2.JoinRoom(ctx, roomId) + require.NoError(err) + + assert.NoError(client1.RunUntilJoined(ctx, hello2.Hello)) + + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 1) { + assert.Equal(true, msg.Users[0]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(3, msg.Users[0]["inCall"], "%+v", msg) + } + } + } + + _, unexpected, err := client2.RunUntilJoinedAndReturn(ctx, hello1.Hello, hello2.Hello) + assert.NoError(err) + + if len(unexpected) == 0 { + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + unexpected = append(unexpected, msg) + } + } + + require.Len(unexpected, 1) + if msg, err := checkMessageParticipantsInCall(unexpected[0]); assert.NoError(err) { + if assert.Len(msg.Users, 1) { + assert.Equal(true, msg.Users[0]["internal"]) + assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"]) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"]) + } + } + + calledCtx, calledCancel := context.WithTimeout(ctx, time.Second) + + virtualSessionId := "virtual-session-id" + virtualUserId := "virtual-user-id" + generatedSessionId := GetVirtualSessionId(session2, virtualSessionId) + + setSessionRequestHandler(t, func(request *BackendClientSessionRequest) { + defer calledCancel() + assert.Equal("add", request.Action, "%+v", request) + assert.Equal(roomId, request.RoomId, "%+v", request) + assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request) + assert.Equal(virtualUserId, request.UserId, "%+v", request) + }) + + require.NoError(client2.SendInternalAddSession(&AddSessionInternalClientMessage{ + CommonSessionInternalClientMessage: CommonSessionInternalClientMessage{ + SessionId: virtualSessionId, + RoomId: roomId, + }, + UserId: virtualUserId, + Flags: FLAG_MUTED_SPEAKING, + })) + <-calledCtx.Done() + if err := calledCtx.Err(); err != nil { + require.ErrorIs(err, context.Canceled) + } + + virtualSessions := session2.GetVirtualSessions() + for len(virtualSessions) == 0 { + time.Sleep(time.Millisecond) + virtualSessions = session2.GetVirtualSessions() + } + + virtualSession := virtualSessions[0] + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + assert.NoError(client1.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId)) + } + + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 2) { + assert.Equal(true, msg.Users[0]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg) + + assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg) + } + } + } + + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) { + assert.Equal(roomId, flags.RoomId) + assert.Equal(virtualSession.PublicId(), flags.SessionId) + assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags) + } + } + + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + assert.NoError(client2.checkMessageJoinedSession(msg, virtualSession.PublicId(), virtualUserId)) + } + + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 2) { + assert.Equal(true, msg.Users[0]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[0]["inCall"], "%+v", msg) + + assert.Equal(true, msg.Users[1]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[1]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[1]["inCall"], "%+v", msg) + } + } + } + + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + if flags, err := checkMessageParticipantFlags(msg); assert.NoError(err) { + assert.Equal(roomId, flags.RoomId) + assert.Equal(virtualSession.PublicId(), flags.SessionId) + assert.EqualValues(FLAG_MUTED_SPEAKING, flags.Flags) + } + } + + msg := &BackendServerRoomRequest{ + Type: "incall", + InCall: &BackendRoomInCallRequest{ + InCall: []byte("0"), + Users: []map[string]interface{}{ + { + "sessionId": virtualSession.PublicId(), + "participantPermissions": 246, + "participantType": 4, + "lastPing": 123456789, + }, + { + // Request is coming from Nextcloud, so use its session id (which is our "room session id"). + "sessionId": roomId + "-" + hello1.Hello.SessionId, + "participantPermissions": 254, + "participantType": 1, + "lastPing": 234567890, + }, + }, + }, + } + + data, err := json.Marshal(msg) + require.NoError(err) + res, err := performBackendRequest(server2.URL+"/api/v1/room/"+roomId, data) + require.NoError(err) + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + assert.NoError(err) + assert.Equal(http.StatusOK, res.StatusCode, "Expected successful request, got %s", string(body)) + + if msg, err := client1.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 3) { + assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg) + assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg) + assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg) + + assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg) + assert.Nil(msg.Users[1]["inCall"], "%+v", msg) + assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg) + assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg) + + assert.Equal(true, msg.Users[2]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg) + } + } + } + + if msg, err := client2.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 3) { + assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg) + assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg) + assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg) + + assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg) + assert.Nil(msg.Users[1]["inCall"], "%+v", msg) + assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg) + assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg) + + assert.Equal(true, msg.Users[2]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg) + } + } + } + + client1.Close() + assert.NoError(client1.WaitForClientRemoved(ctx)) + + client3 := NewTestClient(t, server1, hub1) + defer client3.CloseWithBye() + + require.NoError(client3.SendHelloResume(hello1.Hello.ResumeId)) + if hello3, err := client3.RunUntilHello(ctx); assert.NoError(err) { + assert.Equal(testDefaultUserId, hello3.Hello.UserId, "%+v", hello3.Hello) + assert.Equal(hello1.Hello.SessionId, hello3.Hello.SessionId, "%+v", hello3.Hello) + assert.Equal(hello1.Hello.ResumeId, hello3.Hello.ResumeId, "%+v", hello3.Hello) + } + + if msg, err := client3.RunUntilMessage(ctx); assert.NoError(err) { + if msg, err := checkMessageParticipantsInCall(msg); assert.NoError(err) { + if assert.Len(msg.Users, 3) { + assert.Equal(true, msg.Users[0]["virtual"], "%+v", msg) + assert.Equal(virtualSession.PublicId(), msg.Users[0]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithPhone, msg.Users[0]["inCall"], "%+v", msg) + assert.EqualValues(246, msg.Users[0]["participantPermissions"], "%+v", msg) + assert.EqualValues(4, msg.Users[0]["participantType"], "%+v", msg) + + assert.Equal(hello1.Hello.SessionId, msg.Users[1]["sessionId"], "%+v", msg) + assert.Nil(msg.Users[1]["inCall"], "%+v", msg) + assert.EqualValues(254, msg.Users[1]["participantPermissions"], "%+v", msg) + assert.EqualValues(1, msg.Users[1]["participantType"], "%+v", msg) + + assert.Equal(true, msg.Users[2]["internal"], "%+v", msg) + assert.Equal(hello2.Hello.SessionId, msg.Users[2]["sessionId"], "%+v", msg) + assert.EqualValues(FlagInCall|FlagWithAudio, msg.Users[2]["inCall"], "%+v", msg) + } + } + } + + setSessionRequestHandler(t, func(request *BackendClientSessionRequest) { + defer calledCancel() + assert.Equal("remove", request.Action, "%+v", request) + assert.Equal(roomId, request.RoomId, "%+v", request) + assert.NotEqual(generatedSessionId, request.SessionId, "%+v", request) + assert.Equal(virtualUserId, request.UserId, "%+v", request) + }) + }) + } +} + func DoTestSwitchToOne(t *testing.T, details map[string]interface{}) { CatchLogForTest(t) for _, subtest := range clusteredTests { diff --git a/mcu_proxy_test.go b/mcu_proxy_test.go index cfdb865f..f16ccfb3 100644 --- a/mcu_proxy_test.go +++ b/mcu_proxy_test.go @@ -1346,6 +1346,10 @@ func (h *mockGrpcServerHub) GetBackend(u *url.URL) *Backend { return nil } +func (h *mockGrpcServerHub) GetRoomForBackend(roomId string, backend *Backend) *Room { + return nil +} + func Test_ProxyRemotePublisher(t *testing.T) { CatchLogForTest(t) t.Parallel() diff --git a/room.go b/room.go index 19a9ff50..82ea8a0a 100644 --- a/room.go +++ b/room.go @@ -71,7 +71,7 @@ type Room struct { mu *sync.RWMutex sessions map[string]Session - internalSessions map[Session]bool + internalSessions map[*ClientSession]bool virtualSessions map[*VirtualSession]bool inCallSessions map[Session]bool roomSessionData map[string]*RoomSessionData @@ -108,7 +108,7 @@ func NewRoom(roomId string, properties json.RawMessage, hub *Hub, events AsyncEv mu: &sync.RWMutex{}, sessions: make(map[string]Session), - internalSessions: make(map[Session]bool), + internalSessions: make(map[*ClientSession]bool), virtualSessions: make(map[*VirtualSession]bool), inCallSessions: make(map[Session]bool), roomSessionData: make(map[string]*RoomSessionData), @@ -290,13 +290,19 @@ func (r *Room) AddSession(session Session, sessionData json.RawMessage) { var publishUsersChanged bool switch session.ClientType() { case HelloClientTypeInternal: - r.internalSessions[session] = true + clientSession, ok := session.(*ClientSession) + if !ok { + delete(r.sessions, sid) + r.mu.Unlock() + panic(fmt.Sprintf("Expected a client session, got %v (%T)", session, session)) + } + r.internalSessions[clientSession] = true case HelloClientTypeVirtual: virtualSession, ok := session.(*VirtualSession) if !ok { delete(r.sessions, sid) r.mu.Unlock() - panic(fmt.Sprintf("Expected a virtual session, got %v", session)) + panic(fmt.Sprintf("Expected a virtual session, got %v (%T)", session, session)) } r.virtualSessions[virtualSession] = true publishUsersChanged = true @@ -447,11 +453,21 @@ func (r *Room) RemoveSession(session Session) bool { sid := session.PublicId() r.statsRoomSessionsCurrent.With(prometheus.Labels{"clienttype": session.ClientType()}).Dec() delete(r.sessions, sid) - delete(r.internalSessions, session) if virtualSession, ok := session.(*VirtualSession); ok { delete(r.virtualSessions, virtualSession) + // Handle case where virtual session was also sent by Nextcloud. + users := make([]map[string]interface{}, 0, len(r.users)) + for _, u := range r.users { + if u["sessionId"] != sid { + users = append(users, u) + } + } + if len(users) != len(r.users) { + r.users = users + } } if clientSession, ok := session.(*ClientSession); ok { + delete(r.internalSessions, clientSession) r.transientData.RemoveListener(clientSession) } delete(r.inCallSessions, session) @@ -568,10 +584,66 @@ func (r *Room) PublishSessionLeft(session Session) { } } +func (r *Room) getClusteredInternalSessionsRLocked() (internal map[string]*InternalSessionData, virtual map[string]*VirtualSessionData) { + if r.hub.rpcClients == nil { + return nil, nil + } + + r.mu.RUnlock() + defer r.mu.RLock() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var mu sync.Mutex + var wg sync.WaitGroup + for _, client := range r.hub.rpcClients.GetClients() { + wg.Add(1) + go func(c *GrpcClient) { + defer wg.Done() + + clientInternal, clientVirtual, err := c.GetInternalSessions(ctx, r.Id(), r.Backend()) + if err != nil { + log.Printf("Received error while getting internal sessions for %s@%s from %s: %s", r.Id(), r.Backend().Id(), c.Target(), err) + return + } + + mu.Lock() + defer mu.Unlock() + if internal == nil { + internal = make(map[string]*InternalSessionData, len(clientInternal)) + } + for sid, s := range clientInternal { + internal[sid] = s + } + if virtual == nil { + virtual = make(map[string]*VirtualSessionData, len(clientVirtual)) + } + for sid, s := range clientVirtual { + virtual[sid] = s + } + }(client) + } + wg.Wait() + + return +} + func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string]interface{} { now := time.Now().Unix() - r.mu.Lock() - defer r.mu.Unlock() + r.mu.RLock() + defer r.mu.RUnlock() + if len(users) == 0 && len(r.internalSessions) == 0 && len(r.virtualSessions) == 0 { + return users + } + + clusteredInternalSessions, clusteredVirtualSessions := r.getClusteredInternalSessionsRLocked() + + // Local sessions might have changed while waiting for clustered information. + if len(users) == 0 && len(r.internalSessions) == 0 && len(r.virtualSessions) == 0 { + return users + } + + skipSession := make(map[string]bool) for _, user := range users { sessionid, found := user["sessionId"] if !found || sessionid == "" { @@ -581,21 +653,69 @@ func (r *Room) addInternalSessions(users []map[string]interface{}) []map[string] if userid, found := user["userId"]; !found || userid == "" { if roomSessionData, found := r.roomSessionData[sessionid.(string)]; found { user["userId"] = roomSessionData.UserId + } else if sid, ok := sessionid.(string); ok { + if entry, found := clusteredVirtualSessions[sid]; found { + user["virtual"] = true + user["inCall"] = entry.GetInCall() + skipSession[sid] = true + } else { + for session := range r.virtualSessions { + if session.PublicId() == sid { + user["virtual"] = true + user["inCall"] = session.GetInCall() + skipSession[sid] = true + break + } + } + } } } } for session := range r.internalSessions { - users = append(users, map[string]interface{}{ - "inCall": session.(*ClientSession).GetInCall(), + u := map[string]interface{}{ + "inCall": session.GetInCall(), "sessionId": session.PublicId(), "lastPing": now, "internal": true, - }) + } + if f := session.GetFeatures(); len(f) > 0 { + u["features"] = f + } + users = append(users, u) + } + for _, session := range clusteredInternalSessions { + u := map[string]interface{}{ + "inCall": session.GetInCall(), + "sessionId": session.GetSessionId(), + "lastPing": now, + "internal": true, + } + if f := session.GetFeatures(); len(f) > 0 { + u["features"] = f + } + users = append(users, u) } for session := range r.virtualSessions { + sid := session.PublicId() + if skipSession[sid] { + continue + } + skipSession[sid] = true users = append(users, map[string]interface{}{ "inCall": session.GetInCall(), - "sessionId": session.PublicId(), + "sessionId": sid, + "lastPing": now, + "virtual": true, + }) + } + for sid, session := range clusteredVirtualSessions { + if skipSession[sid] { + continue + } + + users = append(users, map[string]interface{}{ + "inCall": session.GetInCall(), + "sessionId": sid, "lastPing": now, "virtual": true, }) @@ -974,7 +1094,7 @@ func (r *Room) publishActiveSessions() (int, *sync.WaitGroup) { } func (r *Room) publishRoomMessage(message *BackendRoomMessageRequest) { - if message == nil || message.Data == nil { + if message == nil || len(message.Data) == 0 { return } @@ -1062,10 +1182,10 @@ func (r *Room) notifyInternalRoomDeleted() { }, } - r.mu.Lock() - defer r.mu.Unlock() + r.mu.RLock() + defer r.mu.RUnlock() for s := range r.internalSessions { - s.(*ClientSession).SendMessage(msg) + s.SendMessage(msg) } }