diff --git a/pkg/config/constants.go b/pkg/config/constants.go index 071215c6..4f142edf 100644 --- a/pkg/config/constants.go +++ b/pkg/config/constants.go @@ -15,7 +15,7 @@ const ( MaxDurationWaitBeforeCleanRoomWebhook = 1 * time.Minute WaitDurationIfRoomInProgress = 300 * time.Millisecond - DefaultWebsocketQueueSize = 200 + DefaultWebsocketQueueSize = 1000 DefaultWebhookQueueSize = 200 UserWebsocketChannel = "plug-n-meet-user-websocket" WhiteboardWebsocketChannel = "plug-n-meet-whiteboard-websocket" diff --git a/pkg/models/websocket_channels.go b/pkg/models/websocket_channels.go index efa5513f..d2062f39 100644 --- a/pkg/models/websocket_channels.go +++ b/pkg/models/websocket_channels.go @@ -66,10 +66,12 @@ func SubscribeToUserWebsocketChannel() { if err != nil { log.Errorln(err) } - if res.Type == "sendMsg" { - m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin) - } else if res.Type == "deleteRoom" { - config.AppCnf.DeleteChatRoom(res.RoomId) + if res != nil { + if res.Type == "sendMsg" { + m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin) + } else if res.Type == "deleteRoom" { + config.AppCnf.DeleteChatRoom(res.RoomId) + } } }) } @@ -103,8 +105,9 @@ func SubscribeToWhiteboardWebsocketChannel() { err = json.Unmarshal([]byte(msg.Payload), res) if err != nil { log.Errorln(err) + } else { + m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin) } - m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin) }) } } @@ -138,8 +141,9 @@ func SubscribeToSystemWebsocketChannel() { err = json.Unmarshal([]byte(msg.Payload), res) if err != nil { log.Errorln(err) + } else { + m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin) } - m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin) }) } } diff --git a/pkg/models/websocket_service.go b/pkg/models/websocket_service.go index fc77c065..5b900774 100644 --- a/pkg/models/websocket_service.go +++ b/pkg/models/websocket_service.go @@ -128,8 +128,24 @@ func (w *WebsocketServiceModel) handleChat() { } config.AppCnf.RUnlock() - if len(to) > 0 { - socketio.EmitToList(to, jm, socketio.BinaryMessage) + l := len(to) + if l > 0 { + var wg sync.WaitGroup + // for network related issue delivery can be delay + // if this continues then messages will be overflow & drop + // using concurrent will give better result + // if one user have bad connection then waiting only for him + wg.Add(l) + for _, t := range to { + go func(u string) { + defer wg.Done() + err := socketio.EmitTo(u, jm, socketio.BinaryMessage) + if err != nil { + log.Errorln(err) + } + }(t) + } + wg.Wait() } } @@ -253,16 +269,11 @@ func (w *WebsocketServiceModel) handleWhiteboard() { // if this continues then messages will be overflow & drop // using concurrent will give better result // if one user have bad connection then waiting only for him - // as whiteboard transmit a lot of data very frequently - // at present we'll implement it here only wg.Add(l) for _, t := range to { go func(u string) { defer wg.Done() - err := socketio.EmitTo(u, jm, socketio.BinaryMessage) - if err != nil { - log.Errorln(err) - } + _ = socketio.EmitTo(u, jm, socketio.BinaryMessage) }(t) } wg.Wait() @@ -391,7 +402,20 @@ func (w *WebsocketServiceModel) handleSpeechSubtitleText() { } config.AppCnf.RUnlock() - if len(to) > 0 { - socketio.EmitToList(to, jm, socketio.BinaryMessage) + l := len(to) + if l > 0 { + var wg sync.WaitGroup + // for network related issue delivery can be delay + // if this continues then messages will be overflow & drop + // using concurrent will give better result + // if one user have bad connection then waiting only for him + wg.Add(l) + for _, t := range to { + go func(u string) { + defer wg.Done() + _ = socketio.EmitTo(u, jm, socketio.BinaryMessage) + }(t) + } + wg.Wait() } }