Skip to content

Commit

Permalink
Merge pull request #479 from mynaparrot/websocket_queue
Browse files Browse the repository at this point in the history
Websocket queue
  • Loading branch information
jibon57 authored May 3, 2024
2 parents 0395c2b + 3ce39c6 commit 8c13e13
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
MaxDurationWaitBeforeCleanRoomWebhook = 1 * time.Minute
WaitDurationIfRoomInProgress = 300 * time.Millisecond

DefaultWebsocketQueueSize = 200
DefaultWebsocketQueueSize = 1000
DefaultWebhookQueueSize = 200
UserWebsocketChannel = "plug-n-meet-user-websocket"
WhiteboardWebsocketChannel = "plug-n-meet-whiteboard-websocket"
Expand Down
16 changes: 10 additions & 6 deletions pkg/models/websocket_channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ func SubscribeToUserWebsocketChannel() {
if err != nil {
log.Errorln(err)
}
if res.Type == "sendMsg" {
m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin)
} else if res.Type == "deleteRoom" {
config.AppCnf.DeleteChatRoom(res.RoomId)
if res != nil {
if res.Type == "sendMsg" {
m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin)
} else if res.Type == "deleteRoom" {
config.AppCnf.DeleteChatRoom(res.RoomId)
}
}
})
}
Expand Down Expand Up @@ -103,8 +105,9 @@ func SubscribeToWhiteboardWebsocketChannel() {
err = json.Unmarshal([]byte(msg.Payload), res)
if err != nil {
log.Errorln(err)
} else {
m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin)
}
m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin)
})
}
}
Expand Down Expand Up @@ -138,8 +141,9 @@ func SubscribeToSystemWebsocketChannel() {
err = json.Unmarshal([]byte(msg.Payload), res)
if err != nil {
log.Errorln(err)
} else {
m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin)
}
m.HandleDataMessages(res.DataMsg, res.RoomId, res.IsAdmin)
})
}
}
44 changes: 34 additions & 10 deletions pkg/models/websocket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,24 @@ func (w *WebsocketServiceModel) handleChat() {
}
config.AppCnf.RUnlock()

if len(to) > 0 {
socketio.EmitToList(to, jm, socketio.BinaryMessage)
l := len(to)
if l > 0 {
var wg sync.WaitGroup
// for network related issue delivery can be delay
// if this continues then messages will be overflow & drop
// using concurrent will give better result
// if one user have bad connection then waiting only for him
wg.Add(l)
for _, t := range to {
go func(u string) {
defer wg.Done()
err := socketio.EmitTo(u, jm, socketio.BinaryMessage)
if err != nil {
log.Errorln(err)
}
}(t)
}
wg.Wait()
}
}

Expand Down Expand Up @@ -253,16 +269,11 @@ func (w *WebsocketServiceModel) handleWhiteboard() {
// if this continues then messages will be overflow & drop
// using concurrent will give better result
// if one user have bad connection then waiting only for him
// as whiteboard transmit a lot of data very frequently
// at present we'll implement it here only
wg.Add(l)
for _, t := range to {
go func(u string) {
defer wg.Done()
err := socketio.EmitTo(u, jm, socketio.BinaryMessage)
if err != nil {
log.Errorln(err)
}
_ = socketio.EmitTo(u, jm, socketio.BinaryMessage)
}(t)
}
wg.Wait()
Expand Down Expand Up @@ -391,7 +402,20 @@ func (w *WebsocketServiceModel) handleSpeechSubtitleText() {
}
config.AppCnf.RUnlock()

if len(to) > 0 {
socketio.EmitToList(to, jm, socketio.BinaryMessage)
l := len(to)
if l > 0 {
var wg sync.WaitGroup
// for network related issue delivery can be delay
// if this continues then messages will be overflow & drop
// using concurrent will give better result
// if one user have bad connection then waiting only for him
wg.Add(l)
for _, t := range to {
go func(u string) {
defer wg.Done()
_ = socketio.EmitTo(u, jm, socketio.BinaryMessage)
}(t)
}
wg.Wait()
}
}

0 comments on commit 8c13e13

Please sign in to comment.