Skip to content

Commit

Permalink
add ping/pong to keep alive the ws
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Nov 28, 2023
1 parent 3e89ccd commit 65e8233
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
2 changes: 2 additions & 0 deletions api/asyncapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ components:
- objectModified
- getObject
- patchObject
- ping
- pong
- putObject
kind:
type: object
Expand Down
47 changes: 47 additions & 0 deletions core/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"time"

"github.com/gobwas/ws/wsutil"
"github.com/kubescape/go-logger"
Expand Down Expand Up @@ -111,6 +112,8 @@ func (s *Synchronizer) VerifyObjectCallback(ctx context.Context, id domain.KindN
func (s *Synchronizer) Start(ctx context.Context) error {
identifiers := utils.ClientIdentifierFromContext(ctx)
logger.L().Ctx(ctx).Info("starting sync", helpers.String("account", identifiers.Account), helpers.String("cluster", identifiers.Cluster))
// send ping
go s.sendPing(ctx)
// adapter events
err := s.adapter.Start(ctx)
if err != nil {
Expand Down Expand Up @@ -149,6 +152,16 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
ctx := utils.ContextFromGeneric(ctx, generic)
// handle message
switch *generic.Event {
case domain.EventPing:
// send pong
err := s.sendPong()
if err != nil {
logger.L().Ctx(ctx).Error("error sending pong", helpers.Error(err))
continue
}
case domain.EventPong:
// do nothing
continue
case domain.EventGetObject:
var msg domain.GetObject
err = json.Unmarshal(data, &msg)
Expand Down Expand Up @@ -427,6 +440,40 @@ func (s *Synchronizer) sendPatchObject(ctx context.Context, id domain.KindName,
return nil
}

func (s *Synchronizer) sendPing(ctx context.Context) {
event := domain.EventPing
msg := domain.Generic{
Event: &event,
}
data, err := json.Marshal(msg)
if err != nil {
logger.L().Fatal("marshal ping message", helpers.Error(err))
}
for {
err = s.outPool.Invoke(data)
if err != nil {
logger.L().Ctx(ctx).Error("invoke outPool on ping message", helpers.Error(err))
}
time.Sleep(50 * time.Second)
}
}

func (s *Synchronizer) sendPong() error {
event := domain.EventPong
msg := domain.Generic{
Event: &event,
}
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("marshal pong message: %w", err)
}
err = s.outPool.Invoke(data)
if err != nil {
return fmt.Errorf("invoke outPool on pong message: %w", err)
}
return nil
}

func (s *Synchronizer) sendPutObject(ctx context.Context, id domain.KindName, object []byte) error {
event := domain.EventPutObject
depth := ctx.Value(domain.ContextKeyDepth).(int)
Expand Down
6 changes: 5 additions & 1 deletion domain/Event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (
EventObjectModified
EventGetObject
EventPatchObject
EventPing
EventPong
EventPutObject
)

Expand All @@ -21,13 +23,15 @@ func (op Event) Value() any {
return EventValues[op]
}

var EventValues = []any{"newChecksum", "objectAdded", "objectDeleted", "objectModified", "getObject", "patchObject", "putObject"}
var EventValues = []any{"newChecksum", "objectAdded", "objectDeleted", "objectModified", "getObject", "patchObject", "ping", "pong", "putObject"}
var ValuesToEvent = map[any]Event{
EventValues[EventNewChecksum]: EventNewChecksum,
EventValues[EventObjectAdded]: EventObjectAdded,
EventValues[EventObjectDeleted]: EventObjectDeleted,
EventValues[EventObjectModified]: EventObjectModified,
EventValues[EventGetObject]: EventGetObject,
EventValues[EventPatchObject]: EventPatchObject,
EventValues[EventPing]: EventPing,
EventValues[EventPong]: EventPong,
EventValues[EventPutObject]: EventPutObject,
}

0 comments on commit 65e8233

Please sign in to comment.