Skip to content

Commit

Permalink
Merge pull request #16 from kubescape/pool
Browse files Browse the repository at this point in the history
process incoming messages asynchronously with a pool
  • Loading branch information
matthyx authored Nov 29, 2023
2 parents 342e586 + e641f2a commit 603c165
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions core/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,25 +130,22 @@ func (s *Synchronizer) Start(ctx context.Context) error {
}

func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
// process incoming messages
for {
data, err := s.readDataFunc(s.conn)
if err != nil {
return fmt.Errorf("cannot read server data: %w", err)
}
// incoming message pool
inPool, err := ants.NewPoolWithFunc(10, func(i interface{}) {
data := i.([]byte)
// unmarshal message
var generic domain.Generic
err = json.Unmarshal(data, &generic)
err := json.Unmarshal(data, &generic)
if err != nil {
logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err))
continue
return
}
logger.L().Debug("received message", helpers.Interface("event", generic.Event.Value()),
helpers.String("msgid", generic.MsgId), helpers.Int("depth", generic.Depth))
// check message depth and ID
if generic.Depth > maxMessageDepth {
logger.L().Ctx(ctx).Error("message depth too high", helpers.Int("depth", generic.Depth))
continue
return
}
// store in context
ctx := utils.ContextFromGeneric(ctx, generic)
Expand All @@ -160,7 +157,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
if err != nil {
logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
continue
return
}
id := domain.KindName{
Kind: msg.Kind,
Expand All @@ -172,15 +169,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err),
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
return
}
case domain.EventNewChecksum:
var msg domain.NewChecksum
err = json.Unmarshal(data, &msg)
if err != nil {
logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
continue
return
}
id := domain.KindName{
Kind: msg.Kind,
Expand All @@ -192,15 +189,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err),
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
return
}
case domain.EventObjectDeleted:
var msg domain.ObjectDeleted
err = json.Unmarshal(data, &msg)
if err != nil {
logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
continue
return
}
id := domain.KindName{
Kind: msg.Kind,
Expand All @@ -212,15 +209,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err),
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
return
}
case domain.EventPatchObject:
var msg domain.PatchObject
err = json.Unmarshal(data, &msg)
if err != nil {
logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
continue
return
}
id := domain.KindName{
Kind: msg.Kind,
Expand All @@ -232,15 +229,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err),
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
return
}
case domain.EventPutObject:
var msg domain.PutObject
err = json.Unmarshal(data, &msg)
if err != nil {
logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err),
helpers.Interface("event", generic.Event.Value()))
continue
return
}
id := domain.KindName{
Kind: msg.Kind,
Expand All @@ -252,9 +249,23 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err),
helpers.Interface("event", msg.Event.Value()),
helpers.String("id", id.String()))
continue
return
}
}
})
if err != nil {
logger.L().Ctx(ctx).Fatal("unable to create incoming message pool", helpers.Error(err))
}
// process incoming messages
for {
data, err := s.readDataFunc(s.conn)
if err != nil {
return fmt.Errorf("cannot read server data: %w", err)
}
err = inPool.Invoke(data)
if err != nil {
return fmt.Errorf("invoke inPool: %w", err)
}
}
}

Expand Down

0 comments on commit 603c165

Please sign in to comment.