Skip to content

Commit

Permalink
Record Metrics for Reminder
Browse files Browse the repository at this point in the history
Signed-off-by: Vyom Yadav <[email protected]>
  • Loading branch information
Vyom-Yadav committed Jan 17, 2025
1 parent 6d4a5df commit eb5c7d0
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 23 deletions.
79 changes: 79 additions & 0 deletions internal/reminder/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

// Package metrics provides metrics for the reminder service
package metrics

import (
"context"

"go.opentelemetry.io/otel/metric"
)

// Default bucket boundaries in seconds for the delay histograms
var delayBuckets = []float64{
60, // 1 minute
300, // 5 minutes
600, // 10 minutes
1800, // 30 minutes
3600, // 1 hour
7200, // 2 hours
10800, // 3 hours
18000, // 5 hours
25200, // 7 hours
36000, // 10 hours
}

// Metrics contains all the metrics for the reminder service
type Metrics struct {
// Time between when a reminder became eligible and when it was sent
SendDelay metric.Float64Histogram

// Time between when a reminder became eligible and when it was sent for the first time
NewSendDelay metric.Float64Histogram

// Current number of reminders in the batch
BatchSize metric.Int64Histogram
}

// NewMetrics creates a new metrics instance
func NewMetrics(meter metric.Meter) (*Metrics, error) {
sendDelay, err := meter.Float64Histogram(
"send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(delayBuckets...),
)
if err != nil {
return nil, err
}

newSendDelay, err := meter.Float64Histogram(
"new_send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds) for first time reminders"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(delayBuckets...),
)
if err != nil {
return nil, err
}

batchSize, err := meter.Int64Histogram(
"batch_size",
metric.WithDescription("Current number of reminders in the batch"),
)
if err != nil {
return nil, err
}

return &Metrics{
SendDelay: sendDelay,
NewSendDelay: newSendDelay,
BatchSize: batchSize,
}, nil
}

// RecordBatch records the metrics for a batch of reminders
func (m *Metrics) RecordBatch(ctx context.Context, size int64) {
m.BatchSize.Record(ctx, size)
}
87 changes: 87 additions & 0 deletions internal/reminder/metrics_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

package reminder

import (
"context"
"fmt"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

const (
metricsPath = "/metrics"
readHeaderTimeout = 2 * time.Second
)

func (r *reminder) startMetricServer(ctx context.Context, mpReady chan<- struct{}) error {
logger := zerolog.Ctx(ctx)

prometheusExporter, err := prometheus.New(
prometheus.WithNamespace("reminder"),
)
if err != nil {
return fmt.Errorf("failed to create Prometheus exporter: %w", err)
}

res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("reminder"),
// TODO: Make this auto-generated
semconv.ServiceVersion("v0.1.0"),
)

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(prometheusExporter),
sdkmetric.WithResource(res),
)

otel.SetMeterProvider(mp)

// Indicates that a global MeterProvider is available
close(mpReady)

mux := http.NewServeMux()
mux.Handle(metricsPath, promhttp.Handler())

server := &http.Server{
Addr: r.cfg.MetricServer.GetAddress(),
Handler: mux,
ReadHeaderTimeout: readHeaderTimeout,
}

logger.Info().Msgf("starting metrics server on %s", server.Addr)

errCh := make(chan error)
go func() {
errCh <- server.ListenAndServe()
}()

select {
case err := <-errCh:
return err
case <-ctx.Done():
case <-r.stop:
}

// shutdown the metrics server when either the context is done or when reminder is stopped
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownRelease()

logger.Info().Msg("shutting down metrics server")

if err := mp.Shutdown(shutdownCtx); err != nil {
logger.Err(err).Msg("error shutting down metrics provider")
}

return server.Shutdown(shutdownCtx)
}
80 changes: 62 additions & 18 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/uuid"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"

"github.com/mindersec/minder/internal/db"
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
"github.com/mindersec/minder/internal/reminder/metrics"
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
"github.com/mindersec/minder/pkg/eventer/constants"
)
Expand All @@ -42,6 +44,8 @@ type reminder struct {
ticker *time.Ticker

eventPublisher message.Publisher

metrics *metrics.Metrics
}

// NewReminder creates a new reminder instance
Expand Down Expand Up @@ -74,21 +78,52 @@ func (r *reminder) Start(ctx context.Context) error {
return errors.New("reminder stopped, cannot start again")
default:
}
defer r.Stop()

interval := r.cfg.RecurrenceConfig.Interval
if interval <= 0 {
return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval)
}

metricsServerDone := make(chan struct{})

if r.cfg.MetricsConfig.Enabled {
metricsProviderReady := make(chan struct{})

go func() {
if err := r.startMetricServer(ctx, metricsProviderReady); err != nil {
logger.Fatal().Err(err).Msg("failed to start metrics server")
}
close(metricsServerDone)
}()

select {
case <-metricsProviderReady:
var err error
r.metrics, err = metrics.NewMetrics(otel.Meter("reminder"))
if err != nil {
return err
}
case <-ctx.Done():
logger.Info().Msg("reminder stopped")
return nil
}
}

r.ticker = time.NewTicker(interval)
defer r.Stop()

for {
select {
case <-ctx.Done():
if r.cfg.MetricsConfig.Enabled {
<-metricsServerDone
}
logger.Info().Msg("reminder stopped")
return nil
case <-r.stop:
if r.cfg.MetricsConfig.Enabled {
<-metricsServerDone
}
logger.Info().Msg("reminder stopped")
return nil
case <-r.ticker.C:
Expand Down Expand Up @@ -126,7 +161,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
logger := zerolog.Ctx(ctx)

// Fetch a batch of repositories
repos, err := r.getRepositoryBatch(ctx)
repos, repoToLastUpdated, err := r.getRepositoryBatch(ctx)
if err != nil {
return fmt.Errorf("error fetching repository batch: %w", err)
}
Expand All @@ -143,6 +178,10 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return fmt.Errorf("error creating reminder messages: %w", err)
}

if r.metrics != nil {
r.metrics.RecordBatch(ctx, int64(len(repos)))
}

err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...)
if err != nil {
return fmt.Errorf("error publishing messages: %w", err)
Expand All @@ -151,13 +190,16 @@ func (r *reminder) sendReminders(ctx context.Context) error {
repoIds := make([]uuid.UUID, len(repos))
for _, repo := range repos {
repoIds = append(repoIds, repo.ID)
}
if r.metrics != nil {
sendDelay := time.Since(repoToLastUpdated[repo.ID]) - r.cfg.RecurrenceConfig.MinElapsed

// TODO: Collect Metrics
// Potential metrics:
// - Gauge: Number of reminders in the current batch
// - UpDownCounter: Average reminders sent per batch
// - Histogram: reminder_last_sent time distribution
recorder := r.metrics.SendDelay
if !repo.ReminderLastSent.Valid {
recorder = r.metrics.NewSendDelay
}
recorder.Record(ctx, sendDelay.Seconds())
}
}

err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds)
if err != nil {
Expand All @@ -167,7 +209,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return nil
}

func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) {
func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, map[uuid.UUID]time.Time, error) {
logger := zerolog.Ctx(ctx)

logger.Debug().Msgf("fetching repositories after cursor: %s", r.repositoryCursor)
Expand All @@ -176,21 +218,23 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err
Limit: int64(r.cfg.RecurrenceConfig.BatchSize),
})
if err != nil {
return nil, err
return nil, nil, err
}

eligibleRepos, err := r.getEligibleRepositories(ctx, repos)
eligibleRepos, eligibleReposLastUpdated, err := r.getEligibleRepositories(ctx, repos)
if err != nil {
return nil, err
return nil, nil, err
}
logger.Debug().Msgf("%d/%d repositories are eligible for reminders", len(eligibleRepos), len(repos))

r.updateRepositoryCursor(ctx, repos)

return eligibleRepos, nil
return eligibleRepos, eligibleReposLastUpdated, nil
}

func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ([]db.Repository, error) {
func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) (
[]db.Repository, map[uuid.UUID]time.Time, error,
) {
eligibleRepos := make([]db.Repository, 0, len(repos))

// We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs,
Expand All @@ -202,11 +246,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
}
oldestRuleEvals, err := r.store.ListOldestRuleEvaluationsByRepositoryId(ctx, repoIds)
if err != nil {
return nil, err
return nil, nil, err
}
idToLastUpdate := make(map[uuid.UUID]time.Time, len(oldestRuleEvals))
for _, times := range oldestRuleEvals {
idToLastUpdate[times.RepositoryID] = times.OldestLastUpdated
for _, ruleEval := range oldestRuleEvals {
idToLastUpdate[ruleEval.RepositoryID] = ruleEval.OldestLastUpdated
}

cutoff := time.Now().Add(-1 * r.cfg.RecurrenceConfig.MinElapsed)
Expand All @@ -216,7 +260,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
}
}

return eligibleRepos, nil
return eligibleRepos, idToLastUpdate, nil
}

func (r *reminder) updateRepositoryCursor(ctx context.Context, repos []db.Repository) {
Expand Down
2 changes: 1 addition & 1 deletion internal/reminder/reminder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func Test_getRepositoryBatch(t *testing.T) {

r := createTestReminder(t, store, cfg)

got, err := r.getRepositoryBatch(context.Background())
got, _, err := r.getRepositoryBatch(context.Background())
if test.err != "" {
require.ErrorContains(t, err, test.err)
return
Expand Down
10 changes: 6 additions & 4 deletions pkg/config/reminder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (

// Config contains the configuration for the reminder service
type Config struct {
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
MetricsConfig serverconfig.MetricsConfig `mapstructure:"metrics"`
MetricServer serverconfig.MetricServerConfig `mapstructure:"metric_server" default:"{\"port\":\"9091\"}"`
}

// Validate validates the configuration
Expand Down

0 comments on commit eb5c7d0

Please sign in to comment.