From e0562cdf376e60eb3b0d62527ec7afd93bcf3eff Mon Sep 17 00:00:00 2001 From: Nick Zelei <2420177+nickzelei@users.noreply.github.com> Date: Wed, 25 Sep 2024 14:12:32 -0700 Subject: [PATCH] OpenTelemetry Updates for Worker, Stripe, Backend (#2744) --- .../cmds/mgmt/run/stripe-usage/cmd.go | 15 +++- .../internal/cmds/mgmt/serve/connect/cmd.go | 14 +++- internal/otel/otel.go | 37 ++++++--- worker/internal/cmds/worker/serve/serve.go | 79 ++++++++++++++++--- 4 files changed, 115 insertions(+), 30 deletions(-) diff --git a/backend/internal/cmds/mgmt/run/stripe-usage/cmd.go b/backend/internal/cmds/mgmt/run/stripe-usage/cmd.go index afd4e0792d..780e2fc5d2 100644 --- a/backend/internal/cmds/mgmt/run/stripe-usage/cmd.go +++ b/backend/internal/cmds/mgmt/run/stripe-usage/cmd.go @@ -119,7 +119,11 @@ func run(ctx context.Context) error { func getOtelConfig(ctx context.Context, otelconfig neosyncotel.OtelEnvConfig, logger *slog.Logger) (interceptors []connect.Interceptor, shutdown func(context.Context) error, err error) { logger.DebugContext(ctx, "otel is enabled") - otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes()} + tmPropagator := neosyncotel.NewDefaultPropagator() + otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes(), otelconnect.WithPropagator(tmPropagator)} + + meterProviders := []neosyncotel.MeterProvider{} + traceProviders := []neosyncotel.TracerProvider{} meterprovider, err := neosyncotel.NewMeterProvider(ctx, &neosyncotel.MeterProviderConfig{ Exporter: otelconfig.MeterExporter, @@ -135,6 +139,7 @@ func getOtelConfig(ctx context.Context, otelconfig neosyncotel.OtelEnvConfig, lo if meterprovider != nil { logger.DebugContext(ctx, "otel metering has been configured") otelconnopts = append(otelconnopts, otelconnect.WithMeterProvider(meterprovider)) + meterProviders = append(meterProviders, meterprovider) } else { otelconnopts = append(otelconnopts, otelconnect.WithoutMetrics()) } @@ -152,6 +157,7 @@ func getOtelConfig(ctx context.Context, otelconfig neosyncotel.OtelEnvConfig, lo if traceprovider != nil { logger.DebugContext(ctx, "otel tracing has been configured") otelconnopts = append(otelconnopts, otelconnect.WithTracerProvider(traceprovider)) + traceProviders = append(traceProviders, traceprovider) } else { otelconnopts = append(otelconnopts, otelconnect.WithoutTracing(), otelconnect.WithoutTraceEvents()) } @@ -162,9 +168,10 @@ func getOtelConfig(ctx context.Context, otelconfig neosyncotel.OtelEnvConfig, lo } otelshutdown := neosyncotel.SetupOtelSdk(&neosyncotel.SetupConfig{ - TraceProviders: []neosyncotel.TracerProvider{traceprovider}, - MeterProviders: []neosyncotel.MeterProvider{meterprovider}, - Logger: logr.FromSlogHandler(logger.Handler()), + TraceProviders: traceProviders, + MeterProviders: meterProviders, + Logger: logr.FromSlogHandler(logger.Handler()), + TextMapPropagator: tmPropagator, }) return []connect.Interceptor{otelInterceptor}, otelshutdown, nil } diff --git a/backend/internal/cmds/mgmt/serve/connect/cmd.go b/backend/internal/cmds/mgmt/serve/connect/cmd.go index e9a0d2e86d..b8ad7f35b7 100644 --- a/backend/internal/cmds/mgmt/serve/connect/cmd.go +++ b/backend/internal/cmds/mgmt/serve/connect/cmd.go @@ -168,7 +168,10 @@ func serve(ctx context.Context) error { otelconfig := neosyncotel.GetOtelConfigFromViperEnv() if otelconfig.IsEnabled { slogger.Debug("otel is enabled") - otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes()} + tmPropagator := neosyncotel.NewDefaultPropagator() + otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes(), otelconnect.WithPropagator(tmPropagator)} + traceProviders := []neosyncotel.TracerProvider{} + meterProviders := []neosyncotel.MeterProvider{} meterprovider, err := neosyncotel.NewMeterProvider(ctx, &neosyncotel.MeterProviderConfig{ Exporter: otelconfig.MeterExporter, @@ -184,6 +187,7 @@ func serve(ctx context.Context) error { if meterprovider != nil { slogger.Debug("otel metering has been configured") otelconnopts = append(otelconnopts, otelconnect.WithMeterProvider(meterprovider)) + meterProviders = append(meterProviders, meterprovider) } else { otelconnopts = append(otelconnopts, otelconnect.WithoutMetrics()) } @@ -201,6 +205,7 @@ func serve(ctx context.Context) error { if traceprovider != nil { slogger.Debug("otel tracing has been configured") otelconnopts = append(otelconnopts, otelconnect.WithTracerProvider(traceprovider)) + traceProviders = append(traceProviders, traceprovider) } else { otelconnopts = append(otelconnopts, otelconnect.WithoutTracing(), otelconnect.WithoutTraceEvents()) } @@ -212,9 +217,10 @@ func serve(ctx context.Context) error { stdInterceptors = append(stdInterceptors, otelInterceptor) otelshutdown := neosyncotel.SetupOtelSdk(&neosyncotel.SetupConfig{ - TraceProviders: []neosyncotel.TracerProvider{traceprovider}, - MeterProviders: []neosyncotel.MeterProvider{meterprovider}, - Logger: logr.FromSlogHandler(slogger.Handler()), + TraceProviders: traceProviders, + MeterProviders: meterProviders, + Logger: logr.FromSlogHandler(slogger.Handler()), + TextMapPropagator: tmPropagator, }) defer func() { if err := otelshutdown(context.Background()); err != nil { diff --git a/internal/otel/otel.go b/internal/otel/otel.go index 498307be78..4d7cbf958d 100644 --- a/internal/otel/otel.go +++ b/internal/otel/otel.go @@ -41,13 +41,18 @@ type MeterProvider interface { type SetupConfig struct { TraceProviders []TracerProvider MeterProviders []MeterProvider - Logger logr.Logger + // If provided, configures the global text map propagator + TextMapPropagator propagation.TextMapPropagator + // Configures the global otel logger + Logger logr.Logger } func SetupOtelSdk(config *SetupConfig) func(context.Context) error { otel.SetLogger(config.Logger) - // todo: Should probably move this out of here as it registers this globally - otel.SetTextMapPropagator(newPropagator()) + + if config.TextMapPropagator != nil { + otel.SetTextMapPropagator(config.TextMapPropagator) + } var shutdownFuncs []func(context.Context) error @@ -77,7 +82,7 @@ func SetupOtelSdk(config *SetupConfig) func(context.Context) error { return shutdown } -func newPropagator() propagation.TextMapPropagator { +func NewDefaultPropagator() propagation.TextMapPropagator { return propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{}, @@ -167,12 +172,17 @@ func getMeterExporter(ctx context.Context, exporter string, opts MeterExporterOp } func WithDefaultDeltaTemporalitySelector() otlpmetricgrpc.Option { - return otlpmetricgrpc.WithTemporalitySelector(temporalitySelector) + return otlpmetricgrpc.WithTemporalitySelector(func(ik metricsdk.InstrumentKind) metricdata.Temporality { + // Delta Temporality causes metrics to be reset after some time. + // We are using this today for benthos metrics so that they don't persist indefinitely in the time series database + return metricdata.DeltaTemporality + }) } -func temporalitySelector(ik metricsdk.InstrumentKind) metricdata.Temporality { - // Delta Temporality causes metrics to be reset after some time. - // We are using this today for benthos metrics so that they don't persist indefinitely in the time series database - return metricdata.DeltaTemporality + +func withCumulativeTemporalitySelector() otlpmetricgrpc.Option { + return otlpmetricgrpc.WithTemporalitySelector(func(ik metricsdk.InstrumentKind) metricdata.Temporality { + return metricdata.CumulativeTemporality + }) } type OtelEnvConfig struct { @@ -219,3 +229,12 @@ func getMetricsExporter() string { } return exporter } + +// This will be used to test sending benthos metrics with cumulative temporality instead of delta for better prometheus compatibility +func GetBenthosMetricTemporalityOption() otlpmetricgrpc.Option { + temporality := viper.GetString("BENTHOS_METER_TEMPORALITY") + if temporality == "" || temporality == "delta" { + return WithDefaultDeltaTemporalitySelector() + } + return withCumulativeTemporalitySelector() +} diff --git a/worker/internal/cmds/worker/serve/serve.go b/worker/internal/cmds/worker/serve/serve.go index 5e7d9fada6..096099ffe6 100644 --- a/worker/internal/cmds/worker/serve/serve.go +++ b/worker/internal/cmds/worker/serve/serve.go @@ -12,8 +12,10 @@ import ( "sync" "time" + "connectrpc.com/connect" "connectrpc.com/grpchealth" "connectrpc.com/grpcreflect" + "connectrpc.com/otelconnect" "github.com/go-logr/logr" mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql" pg_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/postgresql" @@ -69,17 +71,23 @@ func serve(ctx context.Context) error { temporalClientInterceptors := []interceptor.ClientInterceptor{} var temopralMeterHandler client.MetricsHandler + connectInterceptors := []connect.Interceptor{} + otelconfig := neosyncotel.GetOtelConfigFromViperEnv() if otelconfig.IsEnabled { logger.Debug("otel is enabled") + tmPropagator := neosyncotel.NewDefaultPropagator() + otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes(), otelconnect.WithPropagator(tmPropagator)} + meterProviders := []neosyncotel.MeterProvider{} + traceProviders := []neosyncotel.TracerProvider{} // Meter Provider that uses delta temporality for use with Benthos metrics // This meter provider is setup expire metrics after a specified time period for easy computation benthosMeterProvider, err := neosyncotel.NewMeterProvider(ctx, &neosyncotel.MeterProviderConfig{ Exporter: otelconfig.MeterExporter, AppVersion: otelconfig.ServiceVersion, Opts: neosyncotel.MeterExporterOpts{ - Otlp: []otlpmetricgrpc.Option{neosyncotel.WithDefaultDeltaTemporalitySelector()}, + Otlp: []otlpmetricgrpc.Option{neosyncotel.GetBenthosMetricTemporalityOption()}, Console: []stdoutmetric.Option{stdoutmetric.WithPrettyPrint()}, }, }) @@ -114,7 +122,26 @@ func serve(ctx context.Context) error { }) } - traceprovider, err := neosyncotel.NewTraceProvider(ctx, &neosyncotel.TraceProviderConfig{ + neosyncMeterProvider, err := neosyncotel.NewMeterProvider(ctx, &neosyncotel.MeterProviderConfig{ + Exporter: otelconfig.MeterExporter, + AppVersion: otelconfig.ServiceVersion, + Opts: neosyncotel.MeterExporterOpts{ + Otlp: []otlpmetricgrpc.Option{}, + Console: []stdoutmetric.Option{stdoutmetric.WithPrettyPrint()}, + }, + }) + if err != nil { + return err + } + if neosyncMeterProvider != nil { + logger.Debug("otel metering for neosync clients has been configured") + meterProviders = append(meterProviders, neosyncMeterProvider) + otelconnopts = append(otelconnopts, otelconnect.WithMeterProvider(neosyncMeterProvider)) + } else { + otelconnopts = append(otelconnopts, otelconnect.WithoutMetrics()) + } + + temporalTraceProvider, err := neosyncotel.NewTraceProvider(ctx, &neosyncotel.TraceProviderConfig{ Exporter: otelconfig.TraceExporter, Opts: neosyncotel.TraceExporterOpts{ Otlp: []otlptracegrpc.Option{}, @@ -124,21 +151,46 @@ func serve(ctx context.Context) error { if err != nil { return err } - if traceprovider != nil { + if temporalTraceProvider != nil { logger.Debug("otel tracing for temporal has been configured") - traceInterceptor, err := temporalotel.NewTracingInterceptor(temporalotel.TracerOptions{ - Tracer: traceprovider.Tracer("neosync-temporal-sdk"), + temporalTraceInterceptor, err := temporalotel.NewTracingInterceptor(temporalotel.TracerOptions{ + Tracer: temporalTraceProvider.Tracer("neosync-temporal-sdk"), }) if err != nil { return err } - temporalClientInterceptors = append(temporalClientInterceptors, traceInterceptor) + temporalClientInterceptors = append(temporalClientInterceptors, temporalTraceInterceptor) + traceProviders = append(traceProviders, temporalTraceProvider) + } + + neosyncTraceProvider, err := neosyncotel.NewTraceProvider(ctx, &neosyncotel.TraceProviderConfig{ + Exporter: otelconfig.TraceExporter, + Opts: neosyncotel.TraceExporterOpts{ + Otlp: []otlptracegrpc.Option{}, + Console: []stdouttrace.Option{stdouttrace.WithPrettyPrint()}, + }, + }) + if err != nil { + return err + } + if neosyncTraceProvider != nil { + logger.Debug("otel tracing for neosync clients has been configured") + otelconnopts = append(otelconnopts, otelconnect.WithTracerProvider(neosyncTraceProvider)) + } else { + otelconnopts = append(otelconnopts, otelconnect.WithoutTracing(), otelconnect.WithoutTraceEvents()) + } + + otelConnectInterceptor, err := otelconnect.NewInterceptor(otelconnopts...) + if err != nil { + return err } + connectInterceptors = append(connectInterceptors, otelConnectInterceptor) otelshutdown := neosyncotel.SetupOtelSdk(&neosyncotel.SetupConfig{ - TraceProviders: []neosyncotel.TracerProvider{traceprovider}, - MeterProviders: meterProviders, - Logger: logr.FromSlogHandler(logger.Handler()), + TraceProviders: traceProviders, + MeterProviders: meterProviders, + Logger: logr.FromSlogHandler(logger.Handler()), + TextMapPropagator: tmPropagator, }) defer func() { if err := otelshutdown(context.Background()); err != nil { @@ -203,10 +255,11 @@ func serve(ctx context.Context) error { neosyncurl := shared.GetNeosyncUrl() httpclient := shared.GetNeosyncHttpClient() - userclient := mgmtv1alpha1connect.NewUserAccountServiceClient(httpclient, neosyncurl) - connclient := mgmtv1alpha1connect.NewConnectionServiceClient(httpclient, neosyncurl) - jobclient := mgmtv1alpha1connect.NewJobServiceClient(httpclient, neosyncurl) - transformerclient := mgmtv1alpha1connect.NewTransformersServiceClient(httpclient, neosyncurl) + connectInterceptorOption := connect.WithInterceptors(connectInterceptors...) + userclient := mgmtv1alpha1connect.NewUserAccountServiceClient(httpclient, neosyncurl, connectInterceptorOption) + connclient := mgmtv1alpha1connect.NewConnectionServiceClient(httpclient, neosyncurl, connectInterceptorOption) + jobclient := mgmtv1alpha1connect.NewJobServiceClient(httpclient, neosyncurl, connectInterceptorOption) + transformerclient := mgmtv1alpha1connect.NewTransformersServiceClient(httpclient, neosyncurl, connectInterceptorOption) sqlconnector := &sqlconnect.SqlOpenConnector{} sqlmanager := sql_manager.NewSqlManager(pgpoolmap, pgquerier, mysqlpoolmap, mysqlquerier, mssqlpoolmap, mssqlquerier, sqlconnector) redisconfig := shared.GetRedisConfig()