Skip to content

Commit

Permalink
core/services/streams: create new Vars to avoid race (#15997)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Jan 20, 2025
1 parent f098810 commit 8bbb77f
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions core/services/streams/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type multiStreamPipeline struct {
runner Runner
rrs RunResultSaver
streamIDs []StreamID
vars pipeline.Vars
newVars func() pipeline.Vars
}

func NewMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs RunResultSaver) (Pipeline, error) {
Expand Down Expand Up @@ -71,16 +71,18 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R
if err := validateStreamIDs(streamIDs); err != nil {
return nil, fmt.Errorf("invalid stream IDs: %w", err)
}
vars := pipeline.NewVarsFrom(map[string]interface{}{
"pipelineSpec": map[string]interface{}{
"id": jb.PipelineSpecID,
},
"jb": map[string]interface{}{
"databaseID": jb.ID,
"externalJobID": jb.ExternalJobID,
"name": jb.Name.ValueOrZero(),
},
})
vars := func() pipeline.Vars {
return pipeline.NewVarsFrom(map[string]interface{}{
"pipelineSpec": map[string]interface{}{
"id": jb.PipelineSpecID,
},
"jb": map[string]interface{}{
"databaseID": jb.ID,
"externalJobID": jb.ExternalJobID,
"name": jb.Name.ValueOrZero(),
},
})
}

return &multiStreamPipeline{
lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType),
Expand Down Expand Up @@ -122,7 +124,7 @@ func (s *multiStreamPipeline) StreamIDs() []StreamID {
// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod).
// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod.
func (s *multiStreamPipeline) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
run, trrs, err := s.runner.ExecuteRun(ctx, s.spec, s.vars)
run, trrs, err := s.runner.ExecuteRun(ctx, s.spec, s.newVars())
if err != nil {
return nil, nil, fmt.Errorf("error executing run for spec ID %v: %w", s.spec.ID, err)
}
Expand Down

0 comments on commit 8bbb77f

Please sign in to comment.