Skip to content

Commit

Permalink
refact: make seperation of websocket channel (#52)
Browse files Browse the repository at this point in the history
seperated websocket channels based on type
  • Loading branch information
jibon57 authored Jul 19, 2022
1 parent 5dd8453 commit 81ad559
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 48 deletions.
38 changes: 5 additions & 33 deletions internal/controllers/websocket_service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controllers

import (
"context"
"github.com/antoniodipinto/ikisocket"
"github.com/goccy/go-json"
"github.com/gofiber/fiber/v2"
Expand Down Expand Up @@ -119,14 +118,7 @@ func SetupSocketListeners() {
msg.IsAdmin = isAdmin.(bool)
}

marshal, err := json.Marshal(msg)
if err != nil {
log.Errorln(err)
return
}

ctx := context.Background()
config.AppCnf.RDS.Publish(ctx, "plug-n-meet-websocket", marshal)
models.DistributeWebsocketMsgToRedisChannel(&msg)
})

// On disconnect event
Expand All @@ -151,29 +143,9 @@ func SetupSocketListeners() {
//})
}

// SubscribeToWebsocketChannel will delivery message to websocket
// SubscribeToWebsocketChannel will subscribe to all websocket channels
func SubscribeToWebsocketChannel() {
ctx := context.Background()
pubsub := config.AppCnf.RDS.Subscribe(ctx, "plug-n-meet-websocket")
defer pubsub.Close()

_, err := pubsub.Receive(ctx)
if err != nil {
log.Fatalln(err)
}

m := models.NewWebsocketService()
ch := pubsub.Channel()
for msg := range ch {
res := new(models.WebsocketRedisMsg)
err = json.Unmarshal([]byte(msg.Payload), res)
if err != nil {
log.Errorln(err)
}
if res.Type == "sendMsg" {
m.HandleDataMessages(res.Payload, res.RoomId, res.IsAdmin)
} else if res.Type == "deleteRoom" {
config.AppCnf.DeleteChatRoom(res.RoomId)
}
}
go models.SubscribeToUserWebsocketChannel()
go models.SubscribeToWhiteboardWebsocketChannel()
go models.SubscribeToSystemWebsocketChannel()
}
9 changes: 2 additions & 7 deletions internal/models/breakout_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,19 +384,14 @@ func (m *breakoutRoom) broadcastNotification(roomId, fromUserId, toUserId, broad
payload.To = toUserId
}

msg := WebsocketRedisMsg{
msg := &WebsocketRedisMsg{
Type: "sendMsg",
Payload: &payload,
RoomId: roomId,
IsAdmin: isAdmin,
}
DistributeWebsocketMsgToRedisChannel(msg)

marshal, err := json.Marshal(msg)
if err != nil {
return err
}

m.rc.Publish(m.ctx, "plug-n-meet-websocket", marshal)
return nil
}

Expand Down
9 changes: 2 additions & 7 deletions internal/models/polls.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,14 @@ func (m *newPollsModel) broadcastNotification(roomId, userId, pollId, mType stri
},
}

msg := WebsocketRedisMsg{
msg := &WebsocketRedisMsg{
Type: "sendMsg",
Payload: &payload,
RoomId: roomId,
IsAdmin: isAdmin,
}
DistributeWebsocketMsgToRedisChannel(msg)

marshal, err := json.Marshal(msg)
if err != nil {
return err
}

m.rc.Publish(m.ctx, "plug-n-meet-websocket", marshal)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/models/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (w *webhookEvent) roomFinished() int64 {
}
marshal, err := json.Marshal(msg)
if err == nil {
config.AppCnf.RDS.Publish(context.Background(), "plug-n-meet-websocket", marshal)
config.AppCnf.RDS.Publish(context.Background(), "plug-n-meet-user-websocket", marshal)
}

// notify to clean room from room duration map
Expand Down
99 changes: 99 additions & 0 deletions internal/models/websocket_channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package models

import (
"context"
"github.com/goccy/go-json"
"github.com/mynaparrot/plugNmeet/internal/config"
log "github.com/sirupsen/logrus"
)

func DistributeWebsocketMsgToRedisChannel(msg *WebsocketRedisMsg) {
ctx := context.Background()
marshal, err := json.Marshal(msg)
if err != nil {
log.Errorln(err)
return
}

switch msg.Payload.Type {
case "USER":
config.AppCnf.RDS.Publish(ctx, "plug-n-meet-user-websocket", marshal)
case "WHITEBOARD":
config.AppCnf.RDS.Publish(ctx, "plug-n-meet-whiteboard-websocket", marshal)
case "SYSTEM":
config.AppCnf.RDS.Publish(ctx, "plug-n-meet-system-websocket", marshal)
}
}

// SubscribeToUserWebsocketChannel will delivery message to user websocket
func SubscribeToUserWebsocketChannel() {
ctx := context.Background()
pubsub := config.AppCnf.RDS.Subscribe(ctx, "plug-n-meet-user-websocket")
defer pubsub.Close()

_, err := pubsub.Receive(ctx)
if err != nil {
log.Fatalln(err)
}

m := NewWebsocketService()
ch := pubsub.Channel()
for msg := range ch {
res := new(WebsocketRedisMsg)
err = json.Unmarshal([]byte(msg.Payload), res)
if err != nil {
log.Errorln(err)
}
if res.Type == "sendMsg" {
m.HandleDataMessages(res.Payload, res.RoomId, res.IsAdmin)
} else if res.Type == "deleteRoom" {
config.AppCnf.DeleteChatRoom(res.RoomId)
}
}
}

// SubscribeToWhiteboardWebsocketChannel will delivery message to whiteboard websocket
func SubscribeToWhiteboardWebsocketChannel() {
ctx := context.Background()
pubsub := config.AppCnf.RDS.Subscribe(ctx, "plug-n-meet-whiteboard-websocket")
defer pubsub.Close()

_, err := pubsub.Receive(ctx)
if err != nil {
log.Fatalln(err)
}

m := NewWebsocketService()
ch := pubsub.Channel()
for msg := range ch {
res := new(WebsocketRedisMsg)
err = json.Unmarshal([]byte(msg.Payload), res)
if err != nil {
log.Errorln(err)
}
m.HandleDataMessages(res.Payload, res.RoomId, res.IsAdmin)
}
}

// SubscribeToSystemWebsocketChannel will delivery message to websocket
func SubscribeToSystemWebsocketChannel() {
ctx := context.Background()
pubsub := config.AppCnf.RDS.Subscribe(ctx, "plug-n-meet-system-websocket")
defer pubsub.Close()

_, err := pubsub.Receive(ctx)
if err != nil {
log.Fatalln(err)
}

m := NewWebsocketService()
ch := pubsub.Channel()
for msg := range ch {
res := new(WebsocketRedisMsg)
err = json.Unmarshal([]byte(msg.Payload), res)
if err != nil {
log.Errorln(err)
}
m.HandleDataMessages(res.Payload, res.RoomId, res.IsAdmin)
}
}

0 comments on commit 81ad559

Please sign in to comment.