diff --git a/cmd/hatchet-admin/cli/k8s.go b/cmd/hatchet-admin/cli/k8s.go index a37b8bb18..057e64ec2 100644 --- a/cmd/hatchet-admin/cli/k8s.go +++ b/cmd/hatchet-admin/cli/k8s.go @@ -139,24 +139,26 @@ func runK8sQuickstart() error { if k8sConfigResourceType == "secret" { secret, err := clientset.CoreV1().Secrets(namespace).Get(context.Background(), k8sQuickstartConfigName, metav1.GetOptions{}) - if err != nil && !errors.IsNotFound(err) { + switch { + case err != nil && !errors.IsNotFound(err): return fmt.Errorf("error getting secret: %w", err) - } else if err != nil { + case err != nil: exists = false c = newFromSecret(nil, k8sQuickstartConfigName) - } else { + default: exists = secret != nil c = newFromSecret(secret, k8sQuickstartConfigName) } } else { configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.Background(), k8sQuickstartConfigName, metav1.GetOptions{}) - if err != nil && !errors.IsNotFound(err) { + switch { + case err != nil && !errors.IsNotFound(err): return fmt.Errorf("error getting configmap: %w", err) - } else if err != nil { + case err != nil: exists = false c = newFromConfigMap(nil, k8sQuickstartConfigName) - } else { + default: exists = configMap != nil c = newFromConfigMap(configMap, k8sQuickstartConfigName) } @@ -270,24 +272,26 @@ func runCreateWorkerToken() error { if k8sConfigResourceType == "secret" { secret, err := clientset.CoreV1().Secrets(namespace).Get(context.Background(), k8sClientConfigName, metav1.GetOptions{}) - if err != nil && !errors.IsNotFound(err) { - return err - } else if err != nil { + switch { + case err != nil && !errors.IsNotFound(err): + return fmt.Errorf("error getting secret: %w", err) + case err != nil: exists = false c = newFromSecret(nil, k8sClientConfigName) - } else { + default: exists = secret != nil c = newFromSecret(secret, k8sClientConfigName) } } else { configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.Background(), k8sClientConfigName, metav1.GetOptions{}) - if err != nil && !errors.IsNotFound(err) { - return err - } else if err != nil { + switch { + case err != nil && !errors.IsNotFound(err): + return fmt.Errorf("error getting configmap: %w", err) + case err != nil: exists = false c = newFromConfigMap(nil, k8sClientConfigName) - } else { + default: exists = configMap != nil c = newFromConfigMap(configMap, k8sClientConfigName) } diff --git a/internal/msgqueue/msgqueue.go b/internal/msgqueue/msgqueue.go index e2bf90e8e..53b3bc2d0 100644 --- a/internal/msgqueue/msgqueue.go +++ b/internal/msgqueue/msgqueue.go @@ -130,6 +130,9 @@ type Message struct { // RetryDelay is the delay between retries. RetryDelay int `json:"retry_delay"` + + // OtelCarrier is the OpenTelemetry carrier for the task. + OtelCarrier map[string]string `json:"otel_carrier"` } func (t *Message) TenantID() string { diff --git a/internal/msgqueue/rabbitmq/rabbitmq.go b/internal/msgqueue/rabbitmq/rabbitmq.go index 2dad642bf..bafcb0b74 100644 --- a/internal/msgqueue/rabbitmq/rabbitmq.go +++ b/internal/msgqueue/rabbitmq/rabbitmq.go @@ -15,6 +15,7 @@ import ( "github.com/rs/zerolog" "github.com/hatchet-dev/hatchet/internal/msgqueue" + "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/random" ) @@ -185,6 +186,11 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { // AddMessage adds a msg to the queue. func (t *MessageQueueImpl) AddMessage(ctx context.Context, q msgqueue.Queue, msg *msgqueue.Message) error { + // inject otel carrier into the message + if msg.OtelCarrier == nil { + msg.OtelCarrier = telemetry.GetCarrier(ctx) + } + t.msgs <- &msgWithQueue{ Message: msg, q: q, diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index be1931f60..e16276f33 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -285,7 +285,7 @@ func (ec *JobsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Mes } func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-job-run-queued") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-job-run-queued", task.OtelCarrier) defer span.End() payload := tasktypes.JobRunQueuedTaskPayload{} @@ -340,7 +340,7 @@ func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgq } func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-job-run-cancelled") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-job-run-cancelled", task.OtelCarrier) defer span.End() payload := tasktypes.JobRunCancelledTaskPayload{} @@ -397,7 +397,7 @@ func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *m } func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-retry") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-retry", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunRetryTaskPayload{} @@ -469,7 +469,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq // handleStepRunReplay replays a step run from scratch - it resets the workflow run state, job run state, and // all cancelled step runs which are children of the step run being replayed. func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-replay") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-replay", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunReplayTaskPayload{} @@ -593,7 +593,7 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg } func (ec *JobsControllerImpl) handleStepRunQueued(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-queued") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-queued", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunTaskPayload{} @@ -934,7 +934,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId } func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-started") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-started", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunStartedTaskPayload{} @@ -980,7 +980,7 @@ func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *ms } func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-finished") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-finished", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunFinishedTaskPayload{} @@ -1072,7 +1072,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m } func (ec *JobsControllerImpl) handleStepRunFailed(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-failed") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-failed", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunFailedTaskPayload{} @@ -1202,7 +1202,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun } func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timed-out") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-timed-out", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunTimedOutTaskPayload{} @@ -1224,7 +1224,7 @@ func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *m } func (ec *JobsControllerImpl) handleStepRunCancel(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-cancel") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-cancel", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunCancelTaskPayload{} diff --git a/internal/services/controllers/jobs/queue.go b/internal/services/controllers/jobs/queue.go index a80738fd9..eff90d2cc 100644 --- a/internal/services/controllers/jobs/queue.go +++ b/internal/services/controllers/jobs/queue.go @@ -210,7 +210,7 @@ func (q *queue) handleTask(ctx context.Context, task *msgqueue.Message) (err err } func (q *queue) handleCheckQueue(ctx context.Context, task *msgqueue.Message) error { - _, span := telemetry.NewSpan(ctx, "handle-check-queue") + _, span := telemetry.NewSpanWithCarrier(ctx, "handle-check-queue", task.OtelCarrier) defer span.End() metadata := tasktypes.CheckTenantQueueMetadata{} diff --git a/internal/services/dispatcher/dispatcher.go b/internal/services/dispatcher/dispatcher.go index dd66cf599..93226d09b 100644 --- a/internal/services/dispatcher/dispatcher.go +++ b/internal/services/dispatcher/dispatcher.go @@ -360,7 +360,7 @@ func (d *DispatcherImpl) handleTask(ctx context.Context, task *msgqueue.Message) } func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "group-key-action-assigned") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "group-key-action-assigned", task.OtelCarrier) defer span.End() payload := tasktypes.GroupKeyActionAssignedTaskPayload{} @@ -427,7 +427,7 @@ func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, t } func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "step-run-assigned") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunAssignedTaskPayload{} @@ -528,7 +528,7 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms } func (d *DispatcherImpl) handleStepRunCancelled(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "step-run-cancelled") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-cancelled", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunCancelledTaskPayload{} diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 513ff4edd..4e5a418f6 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -511,8 +511,6 @@ func (s *DispatcherImpl) subscribeToWorkflowEventsByAdditionalMeta(key string, v tenant := stream.Context().Value("tenant").(*dbsqlc.Tenant) tenantId := sqlchelpers.UUIDToStr(tenant.ID) - s.l.Error().Msgf("Received subscribe request for additional meta key-value: {%s: %s}", key, value) - q, err := msgqueue.TenantEventConsumerQueue(tenantId) if err != nil { return err diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 793c20256..83ead711a 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" @@ -78,7 +79,11 @@ func InitTracer(opts *TracerOpts) (func(context.Context) error, error) { otel.SetTracerProvider( sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.TraceIDRatioBased(traceIdRatio)), - sdktrace.WithBatcher(exporter), + sdktrace.WithBatcher( + exporter, + sdktrace.WithMaxQueueSize(sdktrace.DefaultMaxQueueSize*10), + sdktrace.WithMaxExportBatchSize(sdktrace.DefaultMaxExportBatchSize*10), + ), sdktrace.WithResource(resources), ), ) @@ -91,6 +96,26 @@ func NewSpan(ctx context.Context, name string) (context.Context, trace.Span) { return ctx, span } +func NewSpanWithCarrier(ctx context.Context, name string, carrier map[string]string) (context.Context, trace.Span) { + propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) + + otelCarrier := propagation.MapCarrier(carrier) + parentCtx := propagator.Extract(ctx, otelCarrier) + + ctx, span := otel.Tracer("").Start(parentCtx, prefixSpanKey(name)) + return ctx, span +} + +func GetCarrier(ctx context.Context) map[string]string { + propgator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) + + // Serialize the context into carrier + carrier := propagation.MapCarrier{} + propgator.Inject(ctx, carrier) + + return carrier +} + type AttributeKey string // AttributeKV is a wrapper for otel attributes KV diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index a446a4034..851fc2a9b 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -799,6 +799,9 @@ func UniqueSet[T any](i []T, keyFunc func(T) string) map[string]struct{} { } func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId string) (repository.QueueStepRunsResult, error) { + ctx, span := telemetry.NewSpan(ctx, "queue-step-runs-database") + defer span.End() + startedAt := time.Now().UTC() emptyRes := repository.QueueStepRunsResult{ @@ -1048,6 +1051,7 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, tenantId st } plan, err := scheduling.GeneratePlan( + ctx, slots, uniqueActionsArr, queueItems, diff --git a/pkg/scheduling/scheduling.go b/pkg/scheduling/scheduling.go index a2928d350..3f8598b04 100644 --- a/pkg/scheduling/scheduling.go +++ b/pkg/scheduling/scheduling.go @@ -1,10 +1,12 @@ package scheduling import ( + "context" "time" "github.com/jackc/pgx/v5/pgtype" + "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers" @@ -17,6 +19,7 @@ type QueueItemWithOrder struct { } func GeneratePlan( + ctx context.Context, slots []*dbsqlc.ListSemaphoreSlotsToAssignRow, uniqueActionsArr []string, queueItems []*QueueItemWithOrder, @@ -25,6 +28,8 @@ func GeneratePlan( workerLabels map[string][]*dbsqlc.GetWorkerLabelsRow, stepDesiredLabels map[string][]*dbsqlc.GetDesiredLabelsRow, ) (SchedulePlan, error) { + _, span := telemetry.NewSpan(ctx, "generate-scheduling-plan") + defer span.End() plan := SchedulePlan{ StepRunIds: make([]pgtype.UUID, 0), diff --git a/pkg/scheduling/scheduling_test.go b/pkg/scheduling/scheduling_test.go index 2a60cf1c0..c024dd97e 100644 --- a/pkg/scheduling/scheduling_test.go +++ b/pkg/scheduling/scheduling_test.go @@ -1,6 +1,7 @@ package scheduling import ( + "context" "encoding/json" "fmt" "os" @@ -277,6 +278,7 @@ func TestGeneratePlan(t *testing.T) { } got, err := GeneratePlan( + context.Background(), fixtureData.Slots, fixtureData.UniqueActionsArr, fixtureData.QueueItems, @@ -303,6 +305,7 @@ func BenchmarkGeneratePlan(b *testing.B) { for i := 0; i < b.N; i++ { _, _ = GeneratePlan( + context.Background(), fixtureData.Slots, fixtureData.UniqueActionsArr, fixtureData.QueueItems,