Skip to content

Commit

Permalink
fix: faster processing of timeout queue items (#924)
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 authored Oct 1, 2024
1 parent 8d49a24 commit c299843
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export function DataTable<TData extends IDGetter, TValue>({
cell: () => <Skeleton className="h-4 w-[100px]" />,
}))
: columns,
[isLoading, columns],
[loadingNoData, columns],
);

const table = useReactTable({
Expand Down
99 changes: 0 additions & 99 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type JobsControllerImpl struct {
celParser *cel.CELParser

reassignMutexes sync.Map
timeoutMutexes sync.Map
}

type JobsControllerOpt func(*JobsControllerOpts)
Expand Down Expand Up @@ -204,18 +203,6 @@ func (jc *JobsControllerImpl) Start() (func() error, error) {
return nil, fmt.Errorf("could not schedule step run reassign: %w", err)
}

_, err = jc.s.NewJob(
gocron.DurationJob(time.Second*1),
gocron.NewTask(
jc.runStepRunTimeout(ctx),
),
)

if err != nil {
cancel()
return nil, fmt.Errorf("could not schedule step run timeout: %w", err)
}

jc.s.Start()

f := func(task *msgqueue.Message) error {
Expand Down Expand Up @@ -705,92 +692,6 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena
return nil
}

func (jc *JobsControllerImpl) runStepRunTimeout(ctx context.Context) func() {
return func() {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

jc.l.Debug().Msgf("jobs controller: running step run timeout")

// list all tenants
tenants, err := jc.repo.Tenant().ListTenantsByControllerPartition(ctx, jc.p.GetControllerPartitionId())

if err != nil {
jc.l.Err(err).Msg("could not list tenants")
return
}

g := new(errgroup.Group)

for i := range tenants {
tenantId := sqlchelpers.UUIDToStr(tenants[i].ID)

g.Go(func() error {
return jc.runStepRunTimeoutTenant(ctx, tenantId)
})
}

err = g.Wait()

if err != nil {
jc.l.Err(err).Msg("could not run step run timeout")
}
}
}

// runStepRunTimeoutTenant looks for step runs that are timed out in the tenant.
func (ec *JobsControllerImpl) runStepRunTimeoutTenant(ctx context.Context, tenantId string) error {
// we want only one requeue running at a time for a tenant
if _, ok := ec.timeoutMutexes.Load(tenantId); !ok {
ec.timeoutMutexes.Store(tenantId, &sync.Mutex{})
}

muInt, _ := ec.timeoutMutexes.Load(tenantId)
mu := muInt.(*sync.Mutex)

if !mu.TryLock() {
return nil
}

defer mu.Unlock()

ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout")
defer span.End()

stepRuns, err := ec.repo.StepRun().ListStepRunsToTimeout(ctx, tenantId)

if err != nil {
return fmt.Errorf("could not list step runs to timeout for tenant %s: %w", tenantId, err)
}

if num := len(stepRuns); num > 0 {
ec.l.Info().Msgf("timing out %d step runs", num)
}

return MakeBatched(10, stepRuns, func(group []*dbsqlc.GetStepRunForEngineRow) error {
scheduleCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

scheduleCtx, span := telemetry.NewSpan(scheduleCtx, "handle-step-run-timeout-step-run")
defer span.End()

for i := range group {
stepRunCp := group[i]

defer span.End()

stepRunId := sqlchelpers.UUIDToStr(stepRunCp.SRID)

err = ec.failStepRun(scheduleCtx, tenantId, stepRunId, "TIMED_OUT", time.Now().UTC())
if err != nil {
return err
}
}

return nil
})
}

func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId, stepRunId string, isRetry bool) error {
ctx, span := telemetry.NewSpan(ctx, "queue-step-run")
defer span.End()
Expand Down
87 changes: 85 additions & 2 deletions internal/services/controllers/jobs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type queue struct {
// a custom queue logger
ql *zerolog.Logger

tenantQueueOperations *queueutils.OperationPool
updateStepRunOperations *queueutils.OperationPool
tenantQueueOperations *queueutils.OperationPool
updateStepRunOperations *queueutils.OperationPool
timeoutStepRunOperations *queueutils.OperationPool
}

func newQueue(
Expand Down Expand Up @@ -67,6 +68,7 @@ func newQueue(

q.tenantQueueOperations = queueutils.NewOperationPool(ql, time.Second*5, "check tenant queue", q.scheduleStepRuns)
q.updateStepRunOperations = queueutils.NewOperationPool(ql, time.Second*30, "update step runs", q.processStepRunUpdates)
q.timeoutStepRunOperations = queueutils.NewOperationPool(ql, time.Second*30, "timeout step runs", q.processStepRunTimeouts)

return q, nil
}
Expand Down Expand Up @@ -100,6 +102,18 @@ func (q *queue) Start() (func() error, error) {
return nil, fmt.Errorf("could not schedule step run update: %w", err)
}

_, err = q.s.NewJob(
gocron.DurationJob(time.Second*1),
gocron.NewTask(
q.runTenantTimeoutStepRuns(ctx),
),
)

if err != nil {
cancel()
return nil, fmt.Errorf("could not schedule step run timeout: %w", err)
}

q.s.Start()

f := func(task *msgqueue.Message) error {
Expand Down Expand Up @@ -338,6 +352,75 @@ func (q *queue) processStepRunUpdates(ctx context.Context, tenantId string) (boo
return res.Continue, nil
}

func (q *queue) runTenantTimeoutStepRuns(ctx context.Context) func() {
return func() {
q.l.Debug().Msgf("partition: running timeout for step runs")

// list all tenants
tenants, err := q.repo.Tenant().ListTenantsByControllerPartition(ctx, q.p.GetControllerPartitionId())

if err != nil {
q.l.Err(err).Msg("could not list tenants")
return
}

for i := range tenants {
tenantId := sqlchelpers.UUIDToStr(tenants[i].ID)

q.timeoutStepRunOperations.RunOrContinue(tenantId)
}
}
}

func (q *queue) processStepRunTimeouts(ctx context.Context, tenantId string) (bool, error) {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout")
defer span.End()

shouldContinue, stepRuns, err := q.repo.StepRun().ListStepRunsToTimeout(ctx, tenantId)

if err != nil {
return false, fmt.Errorf("could not list step runs to timeout for tenant %s: %w", tenantId, err)
}

if num := len(stepRuns); num > 0 {
q.l.Info().Msgf("timing out %d step runs", num)
}

failedAt := time.Now().UTC()

err = MakeBatched(10, stepRuns, func(group []*dbsqlc.GetStepRunForEngineRow) error {
scheduleCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

scheduleCtx, span := telemetry.NewSpan(scheduleCtx, "handle-step-run-timeout-step-run")
defer span.End()

for i := range group {
stepRunCp := group[i]

if err := q.mq.AddMessage(
scheduleCtx,
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.StepRunFailedToTask(
stepRunCp,
"TIMED_OUT",
&failedAt,
),
); err != nil {
q.l.Error().Err(err).Msg("could not add step run failed task to task queue")
}
}

return nil
})

if err != nil {
return false, fmt.Errorf("could not process step run timeouts: %w", err)
}

return shouldContinue, nil
}

func getStepRunCancelTask(tenantId, stepRunId, reason string) *msgqueue.Message {
payload, _ := datautils.ToJSONMap(tasktypes.StepRunCancelTaskPayload{
StepRunId: stepRunId,
Expand Down
28 changes: 28 additions & 0 deletions internal/services/shared/tasktypes/step.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package tasktypes

import (
"time"

"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
Expand Down Expand Up @@ -218,6 +220,32 @@ func StepRunReplayToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byt
}
}

func StepRunFailedToTask(stepRun *dbsqlc.GetStepRunForEngineRow, errorReason string, failedAt *time.Time) *msgqueue.Message {
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)
workflowRunId := sqlchelpers.UUIDToStr(stepRun.WorkflowRunId)
tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId)

payload, _ := datautils.ToJSONMap(StepRunFailedTaskPayload{
WorkflowRunId: workflowRunId,
StepRunId: stepRunId,
FailedAt: failedAt.Format(time.RFC3339),
Error: errorReason,
StepRetries: &stepRun.StepRetries,
RetryCount: &stepRun.SRRetryCount,
})

metadata, _ := datautils.ToJSONMap(StepRunFailedTaskMetadata{
TenantId: tenantId,
})

return &msgqueue.Message{
ID: "step-run-failed",
Payload: payload,
Metadata: metadata,
Retries: 3,
}
}

func StepRunCancelToTask(stepRun *dbsqlc.GetStepRunForEngineRow, reason string) *msgqueue.Message {
stepRunId := sqlchelpers.UUIDToStr(stepRun.SRID)
tenantId := sqlchelpers.UUIDToStr(stepRun.SRTenantId)
Expand Down
6 changes: 5 additions & 1 deletion pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,11 @@ WHERE
UPDATE
"StepRun" sr
SET
"status" = 'CANCELLING',
"status" = CASE
-- Final states are final, we cannot go from a final state to cancelling
WHEN "status" IN ('SUCCEEDED', 'FAILED', 'CANCELLED') THEN "status"
ELSE 'CANCELLING'
END,
"updatedAt" = CURRENT_TIMESTAMP
FROM (
SELECT
Expand Down
6 changes: 5 additions & 1 deletion pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,19 +455,23 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te
return stepRunIdsStr, nil
}

func (s *stepRunEngineRepository) ListStepRunsToTimeout(ctx context.Context, tenantId string) ([]*dbsqlc.GetStepRunForEngineRow, error) {
func (s *stepRunEngineRepository) ListStepRunsToTimeout(ctx context.Context, tenantId string) (bool, []*dbsqlc.GetStepRunForEngineRow, error) {
pgTenantId := sqlchelpers.UUIDFromStr(tenantId)

tx, err := s.pool.Begin(ctx)

if err != nil {
return nil, err
return false, nil, err
}

defer deferRollback(ctx, s.l, tx.Rollback)

limit := 100

if s.cf.SingleQueueLimit != 0 {
limit = s.cf.SingleQueueLimit
}

// get the step run and make sure it's still in pending
stepRunIds, err := s.queries.PopTimeoutQueueItems(ctx, tx, dbsqlc.PopTimeoutQueueItemsParams{
Tenantid: pgTenantId,
Expand All @@ -478,32 +482,34 @@ func (s *stepRunEngineRepository) ListStepRunsToTimeout(ctx context.Context, ten
})

if err != nil {
return nil, err
return false, nil, err
}

// mark the step runs as cancelling
_, err = s.queries.BulkMarkStepRunsAsCancelling(ctx, tx, stepRunIds)
defer func() {
_, err = s.queries.BulkMarkStepRunsAsCancelling(ctx, s.pool, stepRunIds)

if err != nil {
return nil, err
}
if err != nil {
s.l.Err(err).Msg("could not bulk mark step runs as cancelling")
}
}()

stepRuns, err := s.queries.GetStepRunForEngine(ctx, tx, dbsqlc.GetStepRunForEngineParams{
Ids: stepRunIds,
TenantId: pgTenantId,
})

if err != nil {
return nil, err
return false, nil, err
}

err = tx.Commit(ctx)

if err != nil {
return nil, err
return false, nil, err
}

return stepRuns, nil
return len(stepRunIds) == limit, stepRuns, nil
}

var deadlockRetry = func(l *zerolog.Logger, f func() error) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type StepRunEngineRepository interface {
// ListStepRunsToReassign returns a list of step runs which are in a reassignable state.
ListStepRunsToReassign(ctx context.Context, tenantId string) ([]string, error)

ListStepRunsToTimeout(ctx context.Context, tenantId string) ([]*dbsqlc.GetStepRunForEngineRow, error)
ListStepRunsToTimeout(ctx context.Context, tenantId string) (bool, []*dbsqlc.GetStepRunForEngineRow, error)

StepRunStarted(ctx context.Context, tenantId, stepRunId string, startedAt time.Time) error

Expand Down

0 comments on commit c299843

Please sign in to comment.