From b7fb73a53159a74dd6307a4dd7de4c614ec8aef1 Mon Sep 17 00:00:00 2001 From: Vyom Yadav <73882557+Vyom-Yadav@users.noreply.github.com> Date: Mon, 5 Feb 2024 18:32:37 +0530 Subject: [PATCH] Add wait to ensure all entity events are executed (#2274) * The wait group counter was being incremented and decremented, but the server wasn't waiting for all entity event executions to finish. This change ensures that the server waits for all entity events to be executed before exiting Signed-off-by: Vyom-Yadav --- cmd/server/app/serve.go | 3 +++ internal/engine/executor.go | 34 +++++++++++++++++----------------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/cmd/server/app/serve.go b/cmd/server/app/serve.go index 0b8d55e9a8..6e26fbf6d4 100644 --- a/cmd/server/app/serve.go +++ b/cmd/server/app/serve.go @@ -173,6 +173,9 @@ var serveCmd = &cobra.Command{ return fmt.Errorf("error flushing cache: %w", err) } + // Wait for all entity events to be executed + exec.Wait() + return errg.Wait() }, } diff --git a/internal/engine/executor.go b/internal/engine/executor.go index 7fcb6b9459..0a54fdba1b 100644 --- a/internal/engine/executor.go +++ b/internal/engine/executor.go @@ -46,12 +46,12 @@ const ( // Executor is the engine that executes the rules for a given event type Executor struct { - querier db.Store - evt *events.Eventer - crypteng *crypto.Engine - provMt providertelemetry.ProviderMetrics - mdws []message.HandlerMiddleware - executions *sync.WaitGroup + querier db.Store + evt *events.Eventer + crypteng *crypto.Engine + provMt providertelemetry.ProviderMetrics + mdws []message.HandlerMiddleware + wgEntityEventExecution *sync.WaitGroup // terminationcontext is used to terminate the executor // when the server is shutting down. terminationcontext context.Context @@ -88,13 +88,13 @@ func NewExecutor( } e := &Executor{ - querier: querier, - crypteng: crypteng, - provMt: providertelemetry.NewNoopMetrics(), - evt: evt, - executions: &sync.WaitGroup{}, - terminationcontext: ctx, - mdws: []message.HandlerMiddleware{}, + querier: querier, + crypteng: crypteng, + provMt: providertelemetry.NewNoopMetrics(), + evt: evt, + wgEntityEventExecution: &sync.WaitGroup{}, + terminationcontext: ctx, + mdws: []message.HandlerMiddleware{}, } for _, opt := range opts { @@ -109,9 +109,9 @@ func (e *Executor) Register(r events.Registrar) { r.Register(events.ExecuteEntityEventTopic, e.HandleEntityEvent, e.mdws...) } -// Wait waits for all the executions to finish. +// Wait waits for all the entity executions to finish. func (e *Executor) Wait() { - e.executions.Wait() + e.wgEntityEventExecution.Wait() } // HandleEntityEvent handles events coming from webhooks/signals @@ -127,9 +127,9 @@ func (e *Executor) HandleEntityEvent(msg *message.Message) error { return fmt.Errorf("error unmarshalling payload: %w", err) } - e.executions.Add(1) + e.wgEntityEventExecution.Add(1) go func() { - defer e.executions.Done() + defer e.wgEntityEventExecution.Done() // TODO: Make this timeout configurable ctx, cancel := context.WithTimeout(e.terminationcontext, DefaultExecutionTimeout) defer cancel()