Skip to content

Commit

Permalink
Add wait to ensure all entity events are executed (#2274)
Browse files Browse the repository at this point in the history
* The wait group counter was being incremented and decremented, but the server wasn't waiting for all entity event executions to finish. This change ensures that the server waits for all entity events to be executed before exiting

Signed-off-by: Vyom-Yadav <[email protected]>
  • Loading branch information
Vyom-Yadav authored Feb 5, 2024
1 parent b6b4581 commit b7fb73a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
3 changes: 3 additions & 0 deletions cmd/server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ var serveCmd = &cobra.Command{
return fmt.Errorf("error flushing cache: %w", err)
}

// Wait for all entity events to be executed
exec.Wait()

return errg.Wait()
},
}
Expand Down
34 changes: 17 additions & 17 deletions internal/engine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ const (

// Executor is the engine that executes the rules for a given event
type Executor struct {
querier db.Store
evt *events.Eventer
crypteng *crypto.Engine
provMt providertelemetry.ProviderMetrics
mdws []message.HandlerMiddleware
executions *sync.WaitGroup
querier db.Store
evt *events.Eventer
crypteng *crypto.Engine
provMt providertelemetry.ProviderMetrics
mdws []message.HandlerMiddleware
wgEntityEventExecution *sync.WaitGroup
// terminationcontext is used to terminate the executor
// when the server is shutting down.
terminationcontext context.Context
Expand Down Expand Up @@ -88,13 +88,13 @@ func NewExecutor(
}

e := &Executor{
querier: querier,
crypteng: crypteng,
provMt: providertelemetry.NewNoopMetrics(),
evt: evt,
executions: &sync.WaitGroup{},
terminationcontext: ctx,
mdws: []message.HandlerMiddleware{},
querier: querier,
crypteng: crypteng,
provMt: providertelemetry.NewNoopMetrics(),
evt: evt,
wgEntityEventExecution: &sync.WaitGroup{},
terminationcontext: ctx,
mdws: []message.HandlerMiddleware{},
}

for _, opt := range opts {
Expand All @@ -109,9 +109,9 @@ func (e *Executor) Register(r events.Registrar) {
r.Register(events.ExecuteEntityEventTopic, e.HandleEntityEvent, e.mdws...)
}

// Wait waits for all the executions to finish.
// Wait waits for all the entity executions to finish.
func (e *Executor) Wait() {
e.executions.Wait()
e.wgEntityEventExecution.Wait()
}

// HandleEntityEvent handles events coming from webhooks/signals
Expand All @@ -127,9 +127,9 @@ func (e *Executor) HandleEntityEvent(msg *message.Message) error {
return fmt.Errorf("error unmarshalling payload: %w", err)
}

e.executions.Add(1)
e.wgEntityEventExecution.Add(1)
go func() {
defer e.executions.Done()
defer e.wgEntityEventExecution.Done()
// TODO: Make this timeout configurable
ctx, cancel := context.WithTimeout(e.terminationcontext, DefaultExecutionTimeout)
defer cancel()
Expand Down

0 comments on commit b7fb73a

Please sign in to comment.