From eb5c7d08fb7dd9968e81eaeafcddbb0b712a3340 Mon Sep 17 00:00:00 2001 From: Vyom Yadav Date: Sat, 4 Jan 2025 22:51:25 +0530 Subject: [PATCH] Record Metrics for Reminder Signed-off-by: Vyom Yadav --- internal/reminder/metrics/metrics.go | 79 +++++++++++++++++++++++++ internal/reminder/metrics_server.go | 87 ++++++++++++++++++++++++++++ internal/reminder/reminder.go | 80 +++++++++++++++++++------ internal/reminder/reminder_test.go | 2 +- pkg/config/reminder/config.go | 10 ++-- 5 files changed, 235 insertions(+), 23 deletions(-) create mode 100644 internal/reminder/metrics/metrics.go create mode 100644 internal/reminder/metrics_server.go diff --git a/internal/reminder/metrics/metrics.go b/internal/reminder/metrics/metrics.go new file mode 100644 index 0000000000..e937667f13 --- /dev/null +++ b/internal/reminder/metrics/metrics.go @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package metrics provides metrics for the reminder service +package metrics + +import ( + "context" + + "go.opentelemetry.io/otel/metric" +) + +// Default bucket boundaries in seconds for the delay histograms +var delayBuckets = []float64{ + 60, // 1 minute + 300, // 5 minutes + 600, // 10 minutes + 1800, // 30 minutes + 3600, // 1 hour + 7200, // 2 hours + 10800, // 3 hours + 18000, // 5 hours + 25200, // 7 hours + 36000, // 10 hours +} + +// Metrics contains all the metrics for the reminder service +type Metrics struct { + // Time between when a reminder became eligible and when it was sent + SendDelay metric.Float64Histogram + + // Time between when a reminder became eligible and when it was sent for the first time + NewSendDelay metric.Float64Histogram + + // Current number of reminders in the batch + BatchSize metric.Int64Histogram +} + +// NewMetrics creates a new metrics instance +func NewMetrics(meter metric.Meter) (*Metrics, error) { + sendDelay, err := meter.Float64Histogram( + "send_delay", + metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(delayBuckets...), + ) + if err != nil { + return nil, err + } + + newSendDelay, err := meter.Float64Histogram( + "new_send_delay", + metric.WithDescription("Time between reminder becoming eligible and actual send (seconds) for first time reminders"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(delayBuckets...), + ) + if err != nil { + return nil, err + } + + batchSize, err := meter.Int64Histogram( + "batch_size", + metric.WithDescription("Current number of reminders in the batch"), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + SendDelay: sendDelay, + NewSendDelay: newSendDelay, + BatchSize: batchSize, + }, nil +} + +// RecordBatch records the metrics for a batch of reminders +func (m *Metrics) RecordBatch(ctx context.Context, size int64) { + m.BatchSize.Record(ctx, size) +} diff --git a/internal/reminder/metrics_server.go b/internal/reminder/metrics_server.go new file mode 100644 index 0000000000..54559e3f56 --- /dev/null +++ b/internal/reminder/metrics_server.go @@ -0,0 +1,87 @@ +// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors +// SPDX-License-Identifier: Apache-2.0 + +package reminder + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +const ( + metricsPath = "/metrics" + readHeaderTimeout = 2 * time.Second +) + +func (r *reminder) startMetricServer(ctx context.Context, mpReady chan<- struct{}) error { + logger := zerolog.Ctx(ctx) + + prometheusExporter, err := prometheus.New( + prometheus.WithNamespace("reminder"), + ) + if err != nil { + return fmt.Errorf("failed to create Prometheus exporter: %w", err) + } + + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("reminder"), + // TODO: Make this auto-generated + semconv.ServiceVersion("v0.1.0"), + ) + + mp := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(prometheusExporter), + sdkmetric.WithResource(res), + ) + + otel.SetMeterProvider(mp) + + // Indicates that a global MeterProvider is available + close(mpReady) + + mux := http.NewServeMux() + mux.Handle(metricsPath, promhttp.Handler()) + + server := &http.Server{ + Addr: r.cfg.MetricServer.GetAddress(), + Handler: mux, + ReadHeaderTimeout: readHeaderTimeout, + } + + logger.Info().Msgf("starting metrics server on %s", server.Addr) + + errCh := make(chan error) + go func() { + errCh <- server.ListenAndServe() + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + case <-r.stop: + } + + // shutdown the metrics server when either the context is done or when reminder is stopped + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownRelease() + + logger.Info().Msg("shutting down metrics server") + + if err := mp.Shutdown(shutdownCtx); err != nil { + logger.Err(err).Msg("error shutting down metrics provider") + } + + return server.Shutdown(shutdownCtx) +} diff --git a/internal/reminder/reminder.go b/internal/reminder/reminder.go index 22cb041e83..71675a30f3 100644 --- a/internal/reminder/reminder.go +++ b/internal/reminder/reminder.go @@ -14,9 +14,11 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/google/uuid" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" "github.com/mindersec/minder/internal/db" remindermessages "github.com/mindersec/minder/internal/reminder/messages" + "github.com/mindersec/minder/internal/reminder/metrics" reminderconfig "github.com/mindersec/minder/pkg/config/reminder" "github.com/mindersec/minder/pkg/eventer/constants" ) @@ -42,6 +44,8 @@ type reminder struct { ticker *time.Ticker eventPublisher message.Publisher + + metrics *metrics.Metrics } // NewReminder creates a new reminder instance @@ -74,21 +78,52 @@ func (r *reminder) Start(ctx context.Context) error { return errors.New("reminder stopped, cannot start again") default: } + defer r.Stop() interval := r.cfg.RecurrenceConfig.Interval if interval <= 0 { return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval) } + metricsServerDone := make(chan struct{}) + + if r.cfg.MetricsConfig.Enabled { + metricsProviderReady := make(chan struct{}) + + go func() { + if err := r.startMetricServer(ctx, metricsProviderReady); err != nil { + logger.Fatal().Err(err).Msg("failed to start metrics server") + } + close(metricsServerDone) + }() + + select { + case <-metricsProviderReady: + var err error + r.metrics, err = metrics.NewMetrics(otel.Meter("reminder")) + if err != nil { + return err + } + case <-ctx.Done(): + logger.Info().Msg("reminder stopped") + return nil + } + } + r.ticker = time.NewTicker(interval) - defer r.Stop() for { select { case <-ctx.Done(): + if r.cfg.MetricsConfig.Enabled { + <-metricsServerDone + } logger.Info().Msg("reminder stopped") return nil case <-r.stop: + if r.cfg.MetricsConfig.Enabled { + <-metricsServerDone + } logger.Info().Msg("reminder stopped") return nil case <-r.ticker.C: @@ -126,7 +161,7 @@ func (r *reminder) sendReminders(ctx context.Context) error { logger := zerolog.Ctx(ctx) // Fetch a batch of repositories - repos, err := r.getRepositoryBatch(ctx) + repos, repoToLastUpdated, err := r.getRepositoryBatch(ctx) if err != nil { return fmt.Errorf("error fetching repository batch: %w", err) } @@ -143,6 +178,10 @@ func (r *reminder) sendReminders(ctx context.Context) error { return fmt.Errorf("error creating reminder messages: %w", err) } + if r.metrics != nil { + r.metrics.RecordBatch(ctx, int64(len(repos))) + } + err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...) if err != nil { return fmt.Errorf("error publishing messages: %w", err) @@ -151,13 +190,16 @@ func (r *reminder) sendReminders(ctx context.Context) error { repoIds := make([]uuid.UUID, len(repos)) for _, repo := range repos { repoIds = append(repoIds, repo.ID) - } + if r.metrics != nil { + sendDelay := time.Since(repoToLastUpdated[repo.ID]) - r.cfg.RecurrenceConfig.MinElapsed - // TODO: Collect Metrics - // Potential metrics: - // - Gauge: Number of reminders in the current batch - // - UpDownCounter: Average reminders sent per batch - // - Histogram: reminder_last_sent time distribution + recorder := r.metrics.SendDelay + if !repo.ReminderLastSent.Valid { + recorder = r.metrics.NewSendDelay + } + recorder.Record(ctx, sendDelay.Seconds()) + } + } err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds) if err != nil { @@ -167,7 +209,7 @@ func (r *reminder) sendReminders(ctx context.Context) error { return nil } -func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) { +func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, map[uuid.UUID]time.Time, error) { logger := zerolog.Ctx(ctx) logger.Debug().Msgf("fetching repositories after cursor: %s", r.repositoryCursor) @@ -176,21 +218,23 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err Limit: int64(r.cfg.RecurrenceConfig.BatchSize), }) if err != nil { - return nil, err + return nil, nil, err } - eligibleRepos, err := r.getEligibleRepositories(ctx, repos) + eligibleRepos, eligibleReposLastUpdated, err := r.getEligibleRepositories(ctx, repos) if err != nil { - return nil, err + return nil, nil, err } logger.Debug().Msgf("%d/%d repositories are eligible for reminders", len(eligibleRepos), len(repos)) r.updateRepositoryCursor(ctx, repos) - return eligibleRepos, nil + return eligibleRepos, eligibleReposLastUpdated, nil } -func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ([]db.Repository, error) { +func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ( + []db.Repository, map[uuid.UUID]time.Time, error, +) { eligibleRepos := make([]db.Repository, 0, len(repos)) // We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs, @@ -202,11 +246,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos } oldestRuleEvals, err := r.store.ListOldestRuleEvaluationsByRepositoryId(ctx, repoIds) if err != nil { - return nil, err + return nil, nil, err } idToLastUpdate := make(map[uuid.UUID]time.Time, len(oldestRuleEvals)) - for _, times := range oldestRuleEvals { - idToLastUpdate[times.RepositoryID] = times.OldestLastUpdated + for _, ruleEval := range oldestRuleEvals { + idToLastUpdate[ruleEval.RepositoryID] = ruleEval.OldestLastUpdated } cutoff := time.Now().Add(-1 * r.cfg.RecurrenceConfig.MinElapsed) @@ -216,7 +260,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos } } - return eligibleRepos, nil + return eligibleRepos, idToLastUpdate, nil } func (r *reminder) updateRepositoryCursor(ctx context.Context, repos []db.Repository) { diff --git a/internal/reminder/reminder_test.go b/internal/reminder/reminder_test.go index c47e025d21..aea5d32f80 100644 --- a/internal/reminder/reminder_test.go +++ b/internal/reminder/reminder_test.go @@ -159,7 +159,7 @@ func Test_getRepositoryBatch(t *testing.T) { r := createTestReminder(t, store, cfg) - got, err := r.getRepositoryBatch(context.Background()) + got, _, err := r.getRepositoryBatch(context.Background()) if test.err != "" { require.ErrorContains(t, err, test.err) return diff --git a/pkg/config/reminder/config.go b/pkg/config/reminder/config.go index da16082e2a..a55a218f35 100644 --- a/pkg/config/reminder/config.go +++ b/pkg/config/reminder/config.go @@ -18,10 +18,12 @@ import ( // Config contains the configuration for the reminder service type Config struct { - Database config.DatabaseConfig `mapstructure:"database"` - RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"` - EventConfig serverconfig.EventConfig `mapstructure:"events"` - LoggingConfig LoggingConfig `mapstructure:"logging"` + Database config.DatabaseConfig `mapstructure:"database"` + RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"` + EventConfig serverconfig.EventConfig `mapstructure:"events"` + LoggingConfig LoggingConfig `mapstructure:"logging"` + MetricsConfig serverconfig.MetricsConfig `mapstructure:"metrics"` + MetricServer serverconfig.MetricServerConfig `mapstructure:"metric_server" default:"{\"port\":\"9091\"}"` } // Validate validates the configuration