diff --git a/core/synchronizer.go b/core/synchronizer.go index 7a31633..6265bff 100644 --- a/core/synchronizer.go +++ b/core/synchronizer.go @@ -125,25 +125,22 @@ func (s *Synchronizer) Start(ctx context.Context) error { } func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { - // process incoming messages - for { - data, err := s.readDataFunc(s.conn) - if err != nil { - return fmt.Errorf("cannot read server data: %w", err) - } + // incoming message pool + inPool, err := ants.NewPoolWithFunc(10, func(i interface{}) { + data := i.([]byte) // unmarshal message var generic domain.Generic - err = json.Unmarshal(data, &generic) + err := json.Unmarshal(data, &generic) if err != nil { logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err)) - continue + return } logger.L().Debug("received message", helpers.Interface("event", generic.Event.Value()), helpers.String("msgid", generic.MsgId), helpers.Int("depth", generic.Depth)) // check message depth and ID if generic.Depth > maxMessageDepth { logger.L().Ctx(ctx).Error("message depth too high", helpers.Int("depth", generic.Depth)) - continue + return } // store in context ctx := utils.ContextFromGeneric(ctx, generic) @@ -155,7 +152,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { if err != nil { logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err), helpers.Interface("event", generic.Event.Value())) - continue + return } id := domain.KindName{ Kind: msg.Kind, @@ -167,7 +164,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err), helpers.Interface("event", msg.Event.Value()), helpers.String("id", id.String())) - continue + return } case domain.EventNewChecksum: var msg domain.NewChecksum @@ -175,7 +172,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { if err != nil { logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err), helpers.Interface("event", generic.Event.Value())) - continue + return } id := domain.KindName{ Kind: msg.Kind, @@ -187,7 +184,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err), helpers.Interface("event", msg.Event.Value()), helpers.String("id", id.String())) - continue + return } case domain.EventObjectDeleted: var msg domain.ObjectDeleted @@ -195,7 +192,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { if err != nil { logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err), helpers.Interface("event", generic.Event.Value())) - continue + return } id := domain.KindName{ Kind: msg.Kind, @@ -207,7 +204,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err), helpers.Interface("event", msg.Event.Value()), helpers.String("id", id.String())) - continue + return } case domain.EventPatchObject: var msg domain.PatchObject @@ -215,7 +212,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { if err != nil { logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err), helpers.Interface("event", generic.Event.Value())) - continue + return } id := domain.KindName{ Kind: msg.Kind, @@ -227,7 +224,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err), helpers.Interface("event", msg.Event.Value()), helpers.String("id", id.String())) - continue + return } case domain.EventPutObject: var msg domain.PutObject @@ -235,7 +232,7 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { if err != nil { logger.L().Ctx(ctx).Error("cannot unmarshal message", helpers.Error(err), helpers.Interface("event", generic.Event.Value())) - continue + return } id := domain.KindName{ Kind: msg.Kind, @@ -247,9 +244,23 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { logger.L().Ctx(ctx).Error("error handling message", helpers.Error(err), helpers.Interface("event", msg.Event.Value()), helpers.String("id", id.String())) - continue + return } } + }) + if err != nil { + logger.L().Ctx(ctx).Fatal("unable to create incoming message pool", helpers.Error(err)) + } + // process incoming messages + for { + data, err := s.readDataFunc(s.conn) + if err != nil { + return fmt.Errorf("cannot read server data: %w", err) + } + err = inPool.Invoke(data) + if err != nil { + return fmt.Errorf("invoke inPool: %w", err) + } } }