diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go index de52fed26e5..8a232029a82 100644 --- a/core/services/streams/pipeline.go +++ b/core/services/streams/pipeline.go @@ -31,7 +31,7 @@ type multiStreamPipeline struct { runner Runner rrs RunResultSaver streamIDs []StreamID - vars pipeline.Vars + newVars func() pipeline.Vars } func NewMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs RunResultSaver) (Pipeline, error) { @@ -71,16 +71,18 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R if err := validateStreamIDs(streamIDs); err != nil { return nil, fmt.Errorf("invalid stream IDs: %w", err) } - vars := pipeline.NewVarsFrom(map[string]interface{}{ - "pipelineSpec": map[string]interface{}{ - "id": jb.PipelineSpecID, - }, - "jb": map[string]interface{}{ - "databaseID": jb.ID, - "externalJobID": jb.ExternalJobID, - "name": jb.Name.ValueOrZero(), - }, - }) + vars := func() pipeline.Vars { + return pipeline.NewVarsFrom(map[string]interface{}{ + "pipelineSpec": map[string]interface{}{ + "id": jb.PipelineSpecID, + }, + "jb": map[string]interface{}{ + "databaseID": jb.ID, + "externalJobID": jb.ExternalJobID, + "name": jb.Name.ValueOrZero(), + }, + }) + } return &multiStreamPipeline{ lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType), @@ -122,7 +124,7 @@ func (s *multiStreamPipeline) StreamIDs() []StreamID { // The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). // Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. func (s *multiStreamPipeline) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { - run, trrs, err := s.runner.ExecuteRun(ctx, s.spec, s.vars) + run, trrs, err := s.runner.ExecuteRun(ctx, s.spec, s.newVars()) if err != nil { return nil, nil, fmt.Errorf("error executing run for spec ID %v: %w", s.spec.ID, err) }