Skip to content

Commit

Permalink
add planner metrics (#4820)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Added a metrics planner to track and monitor performance of job
scheduling and planning processes
- Introduced detailed telemetry metrics using OpenTelemetry for tracking
plan processing, executions, and events

- **Improvements**
- Enhanced state update process with more granular error tracking and
performance monitoring
- Added flexibility in metric recording with new
`DoneWithoutTotalDuration` method

- **Telemetry**
- Implemented comprehensive metrics tracking for plan processing,
including duration, counts, and event distributions

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
wdbaruni authored Jan 23, 2025
1 parent 2f949eb commit 4818962
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 38 deletions.
3 changes: 3 additions & 0 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func NewRequesterNode(

// logs job completion or failure
planner.NewLoggingPlanner(),

// metrics planner
planner.NewMetricsPlanner(),
)

retryStrategy := cfg.SystemConfig.RetryStrategy
Expand Down
87 changes: 87 additions & 0 deletions pkg/orchestrator/planner/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package planner

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

var (
Meter = otel.GetMeterProvider().Meter("planner")

// Processing metrics
processDuration = telemetry.Must(Meter.Float64Histogram(
"planner.process.duration",
metric.WithDescription("Time taken to process a single plan"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(telemetry.DurationMsBuckets...),
))

processPartDuration = telemetry.Must(Meter.Float64Histogram(
"planner.process.part.duration",
metric.WithDescription("Time taken for sub-operations within a planner operation"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(telemetry.DurationMsBuckets...),
))

processCount = telemetry.Must(Meter.Int64Counter(
"planner.process.count",
metric.WithDescription("Number of plans processed"),
metric.WithUnit("1"),
))

// State update metrics
executionsCreated = telemetry.Must(Meter.Float64Histogram(
"planner.executions.created",
metric.WithDescription("Distribution of executions created per plan"),
metric.WithUnit("1"),
))

executionsUpdated = telemetry.Must(Meter.Float64Histogram(
"planner.executions.updated",
metric.WithDescription("Distribution of executions updated per plan"),
metric.WithUnit("1"),
))

jobsUpdated = telemetry.Must(Meter.Int64Counter(
"planner.jobs.updated",
metric.WithDescription("Number of jobs with state updates"),
metric.WithUnit("1"),
))

evaluationsCreated = telemetry.Must(Meter.Float64Histogram(
"planner.evaluations.created",
metric.WithDescription("Distribution of evaluations created per plan"),
metric.WithUnit("1"),
))

// History event metrics
jobEventsAdded = telemetry.Must(Meter.Float64Histogram(
"planner.events.job",
metric.WithDescription("Distribution of job events added per plan"),
metric.WithUnit("1"),
))

execEventsAdded = telemetry.Must(Meter.Float64Histogram(
"planner.events.execution",
metric.WithDescription("Distribution of execution events added per plan"),
metric.WithUnit("1"),
))
)

// Common attribute keys
const (
AttrPlannerType = "planner_type"

AttrOperationPartBeginTx = "begin_transaction"
AttrOperationPartCreateExec = "create_execution"
AttrOperationPartUpdateExec = "update_execution"
AttrOperationPartUpdateJob = "update_job"
AttrOperationPartCreateEval = "create_evaluation"
AttrOperationPartAddEvents = "add_events"

AttrOutcomeKey = "outcome"
AttrOutcomeSuccess = "success"
AttrOutcomeFailure = "failure"
)
58 changes: 58 additions & 0 deletions pkg/orchestrator/planner/metrics_planner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package planner

import (
"context"

"go.opentelemetry.io/otel/attribute"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

// MetricsPlanner records metrics about plan content as they flow through the planner chain.
// It tracks distributions of executions, evaluations, and events without modifying the plan.
type MetricsPlanner struct{}

// NewMetricsPlanner creates a new instance of MetricsPlanner.
func NewMetricsPlanner() *MetricsPlanner {
return &MetricsPlanner{}
}

// Process records metrics about plan content including executions, jobs, evaluations and events.
func (s *MetricsPlanner) Process(ctx context.Context, plan *models.Plan) error {
metrics := telemetry.NewMetricRecorder(
attribute.String(AttrPlannerType, "metrics_planner"),
)
defer func() {
metrics.Count(ctx, processCount)
metrics.DoneWithoutTotalDuration(ctx)
}()

if len(plan.NewExecutions) > 0 {
metrics.Histogram(ctx, executionsCreated, float64(len(plan.NewExecutions)))
}
if len(plan.UpdatedExecutions) > 0 {
metrics.Histogram(ctx, executionsUpdated, float64(len(plan.UpdatedExecutions)))
}
if !plan.DesiredJobState.IsUndefined() {
metrics.Count(ctx, jobsUpdated)
}
if len(plan.NewEvaluations) > 0 {
metrics.Histogram(ctx, evaluationsCreated, float64(len(plan.NewEvaluations)))
}
if len(plan.JobEvents) > 0 {
metrics.Histogram(ctx, jobEventsAdded, float64(len(plan.JobEvents)))
}
if len(plan.ExecutionEvents) > 0 {
var totalEvents int
for _, events := range plan.ExecutionEvents {
totalEvents += len(events)
}
metrics.Histogram(ctx, execEventsAdded, float64(totalEvents))
}

return nil
}

var _ orchestrator.Planner = (*MetricsPlanner)(nil)
139 changes: 101 additions & 38 deletions pkg/orchestrator/planner/state_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"

"github.com/bacalhau-project/bacalhau/pkg/jobstore"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
)

// StateUpdater is responsible for updating the state of executions and jobs in the plan.
Expand All @@ -19,25 +22,31 @@ type StateUpdater struct {

// NewStateUpdater creates a new instance of StateUpdater with the specified jobstore.Store.
func NewStateUpdater(store jobstore.Store) *StateUpdater {
return &StateUpdater{
store: store,
}
return &StateUpdater{store: store}
}

// Process updates the state of the executions in the plan according to the scheduler's desired state.
//
//nolint:gocyclo
func (s *StateUpdater) Process(ctx context.Context, plan *models.Plan) error {
func (s *StateUpdater) Process(ctx context.Context, plan *models.Plan) (err error) {
metrics := telemetry.NewMetricRecorder(
attribute.String(AttrPlannerType, "state_updater"),
attribute.String(AttrOutcomeKey, AttrOutcomeSuccess),
)
defer func() {
if err != nil {
metrics.Error(err)
metrics.AddAttributes(attribute.String(AttrOutcomeKey, AttrOutcomeFailure))
}
metrics.Done(ctx, processDuration)
}()

// If there are no new or updated executions
// and the job state is not being updated, there is nothing to do.
if len(plan.NewExecutions) == 0 &&
len(plan.UpdatedExecutions) == 0 &&
len(plan.NewEvaluations) == 0 &&
plan.DesiredJobState.IsUndefined() {
if s.isEmpty(plan) {
return nil
}

txContext, err := s.store.BeginTx(ctx)
metrics.Latency(ctx, processPartDuration, AttrOperationPartBeginTx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
Expand All @@ -48,35 +57,73 @@ func (s *StateUpdater) Process(ctx context.Context, plan *models.Plan) error {
}
}()

if err = s.processExecutions(ctx, txContext, plan, metrics); err != nil {
return err
}

if err = s.processJobState(ctx, txContext, plan, metrics); err != nil {
return err
}

if err = s.processEvaluations(ctx, txContext, plan, metrics); err != nil {
return err
}

if err = s.processEvents(ctx, txContext, plan, metrics); err != nil {
return err
}

return txContext.Commit()
}

func (s *StateUpdater) isEmpty(plan *models.Plan) bool {
return len(plan.NewExecutions) == 0 &&
len(plan.UpdatedExecutions) == 0 &&
len(plan.NewEvaluations) == 0 &&
plan.DesiredJobState.IsUndefined()
}

func (s *StateUpdater) processExecutions(
ctx context.Context, txContext jobstore.TxContext, plan *models.Plan, metrics *telemetry.MetricRecorder) error {
// Create new executions
for _, exec := range plan.NewExecutions {
if err = s.store.CreateExecution(txContext, *exec); err != nil {
return err
if len(plan.NewExecutions) > 0 {
for _, exec := range plan.NewExecutions {
if err := s.store.CreateExecution(txContext, *exec); err != nil {
return err
}
}
metrics.Latency(ctx, processPartDuration, AttrOperationPartCreateExec)
}

// Update existing executions
for _, u := range plan.UpdatedExecutions {
if err = s.store.UpdateExecution(txContext, jobstore.UpdateExecutionRequest{
ExecutionID: u.Execution.ID,
NewValues: models.Execution{
DesiredState: models.State[models.ExecutionDesiredStateType]{
StateType: u.DesiredState,
Message: u.Event.Message,
Details: u.Event.Details,
if len(plan.UpdatedExecutions) > 0 {
for _, u := range plan.UpdatedExecutions {
if err := s.store.UpdateExecution(txContext, jobstore.UpdateExecutionRequest{
ExecutionID: u.Execution.ID,
NewValues: models.Execution{
DesiredState: models.State[models.ExecutionDesiredStateType]{
StateType: u.DesiredState,
Message: u.Event.Message,
Details: u.Event.Details,
},
},
},
Condition: jobstore.UpdateExecutionCondition{
ExpectedRevision: u.Execution.Revision,
},
}); err != nil {
return err
Condition: jobstore.UpdateExecutionCondition{
ExpectedRevision: u.Execution.Revision,
},
}); err != nil {
return err
}
}
metrics.Latency(ctx, processPartDuration, AttrOperationPartUpdateExec)
}

// Update job state if necessary
return nil
}

func (s *StateUpdater) processJobState(
ctx context.Context, txContext jobstore.TxContext, plan *models.Plan, metrics *telemetry.MetricRecorder) error {
if !plan.DesiredJobState.IsUndefined() {
if err = s.store.UpdateJobState(txContext, jobstore.UpdateJobStateRequest{
if err := s.store.UpdateJobState(txContext, jobstore.UpdateJobStateRequest{
JobID: plan.Job.ID,
NewState: plan.DesiredJobState,
Message: plan.UpdateMessage,
Expand All @@ -86,30 +133,46 @@ func (s *StateUpdater) Process(ctx context.Context, plan *models.Plan) error {
}); err != nil {
return err
}
metrics.Latency(ctx, processPartDuration, AttrOperationPartUpdateJob)
}
return nil
}

// Create follow-up evaluations, if any
for _, eval := range plan.NewEvaluations {
if err = s.store.CreateEvaluation(txContext, *eval); err != nil {
return err
// Create follow-up evaluations, if any
func (s *StateUpdater) processEvaluations(
ctx context.Context, txContext jobstore.TxContext, plan *models.Plan, metrics *telemetry.MetricRecorder) error {
if len(plan.NewEvaluations) > 0 {
for _, eval := range plan.NewEvaluations {
if err := s.store.CreateEvaluation(txContext, *eval); err != nil {
return err
}
}
metrics.Latency(ctx, processPartDuration, AttrOperationPartCreateEval)
}
return nil
}

func (s *StateUpdater) processEvents(
ctx context.Context, txContext jobstore.TxContext, plan *models.Plan, metrics *telemetry.MetricRecorder) error {
if len(plan.JobEvents) == 0 && len(plan.ExecutionEvents) == 0 {
return nil
}

// Add history events
if len(plan.JobEvents) > 0 {
if err = s.store.AddJobHistory(txContext, plan.Job.ID, plan.JobEvents...); err != nil {
if err := s.store.AddJobHistory(txContext, plan.Job.ID, plan.JobEvents...); err != nil {
return err
}
}

if len(plan.ExecutionEvents) > 0 {
for executionID, events := range plan.ExecutionEvents {
if err = s.store.AddExecutionHistory(txContext, plan.Job.ID, executionID, events...); err != nil {
if err := s.store.AddExecutionHistory(txContext, plan.Job.ID, executionID, events...); err != nil {
return err
}
}
}

return txContext.Commit()
metrics.Latency(ctx, processPartDuration, AttrOperationPartAddEvents)
return nil
}

// compile-time check whether the StateUpdater implements the Planner interface.
Expand Down
9 changes: 9 additions & 0 deletions pkg/telemetry/metric_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ func (r *MetricRecorder) Done(ctx context.Context, h metric.Float64Histogram, at
finalAttrs := append(r.attrs, attrs...)
h.Record(ctx, r.clock.Since(r.start).Seconds(), metric.WithAttributes(finalAttrs...))

r.DoneWithoutTotalDuration(ctx, attrs...)
}

// DoneWithoutTotalDuration publishes all aggregated metrics without recording total duration.
// This should typically be deferred immediately after creating the recorder.
// Additional attributes can be provided and will be merged with base attributes.
func (r *MetricRecorder) DoneWithoutTotalDuration(ctx context.Context, attrs ...attribute.KeyValue) {
finalAttrs := append(r.attrs, attrs...)

// Publish all aggregated latencies to their respective histograms
for key, duration := range r.latencies {
opAttrs := append(finalAttrs, SubOperationKey.String(key.operation))
Expand Down

0 comments on commit 4818962

Please sign in to comment.