Skip to content

Commit

Permalink
OpenTelemetry Updates for Worker, Stripe, Backend (#2744)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Sep 25, 2024
1 parent 83a1a99 commit e0562cd
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 30 deletions.
15 changes: 11 additions & 4 deletions backend/internal/cmds/mgmt/run/stripe-usage/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ func run(ctx context.Context) error {

func getOtelConfig(ctx context.Context, otelconfig neosyncotel.OtelEnvConfig, logger *slog.Logger) (interceptors []connect.Interceptor, shutdown func(context.Context) error, err error) {
logger.DebugContext(ctx, "otel is enabled")
otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes()}
tmPropagator := neosyncotel.NewDefaultPropagator()
otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes(), otelconnect.WithPropagator(tmPropagator)}

meterProviders := []neosyncotel.MeterProvider{}
traceProviders := []neosyncotel.TracerProvider{}

meterprovider, err := neosyncotel.NewMeterProvider(ctx, &neosyncotel.MeterProviderConfig{
Exporter: otelconfig.MeterExporter,
Expand All @@ -135,6 +139,7 @@ func getOtelConfig(ctx context.Context, otelconfig neosyncotel.OtelEnvConfig, lo
if meterprovider != nil {
logger.DebugContext(ctx, "otel metering has been configured")
otelconnopts = append(otelconnopts, otelconnect.WithMeterProvider(meterprovider))
meterProviders = append(meterProviders, meterprovider)
} else {
otelconnopts = append(otelconnopts, otelconnect.WithoutMetrics())
}
Expand All @@ -152,6 +157,7 @@ func getOtelConfig(ctx context.Context, otelconfig neosyncotel.OtelEnvConfig, lo
if traceprovider != nil {
logger.DebugContext(ctx, "otel tracing has been configured")
otelconnopts = append(otelconnopts, otelconnect.WithTracerProvider(traceprovider))
traceProviders = append(traceProviders, traceprovider)
} else {
otelconnopts = append(otelconnopts, otelconnect.WithoutTracing(), otelconnect.WithoutTraceEvents())
}
Expand All @@ -162,9 +168,10 @@ func getOtelConfig(ctx context.Context, otelconfig neosyncotel.OtelEnvConfig, lo
}

otelshutdown := neosyncotel.SetupOtelSdk(&neosyncotel.SetupConfig{
TraceProviders: []neosyncotel.TracerProvider{traceprovider},
MeterProviders: []neosyncotel.MeterProvider{meterprovider},
Logger: logr.FromSlogHandler(logger.Handler()),
TraceProviders: traceProviders,
MeterProviders: meterProviders,
Logger: logr.FromSlogHandler(logger.Handler()),
TextMapPropagator: tmPropagator,
})
return []connect.Interceptor{otelInterceptor}, otelshutdown, nil
}
Expand Down
14 changes: 10 additions & 4 deletions backend/internal/cmds/mgmt/serve/connect/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ func serve(ctx context.Context) error {
otelconfig := neosyncotel.GetOtelConfigFromViperEnv()
if otelconfig.IsEnabled {
slogger.Debug("otel is enabled")
otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes()}
tmPropagator := neosyncotel.NewDefaultPropagator()
otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes(), otelconnect.WithPropagator(tmPropagator)}
traceProviders := []neosyncotel.TracerProvider{}
meterProviders := []neosyncotel.MeterProvider{}

meterprovider, err := neosyncotel.NewMeterProvider(ctx, &neosyncotel.MeterProviderConfig{
Exporter: otelconfig.MeterExporter,
Expand All @@ -184,6 +187,7 @@ func serve(ctx context.Context) error {
if meterprovider != nil {
slogger.Debug("otel metering has been configured")
otelconnopts = append(otelconnopts, otelconnect.WithMeterProvider(meterprovider))
meterProviders = append(meterProviders, meterprovider)
} else {
otelconnopts = append(otelconnopts, otelconnect.WithoutMetrics())
}
Expand All @@ -201,6 +205,7 @@ func serve(ctx context.Context) error {
if traceprovider != nil {
slogger.Debug("otel tracing has been configured")
otelconnopts = append(otelconnopts, otelconnect.WithTracerProvider(traceprovider))
traceProviders = append(traceProviders, traceprovider)
} else {
otelconnopts = append(otelconnopts, otelconnect.WithoutTracing(), otelconnect.WithoutTraceEvents())
}
Expand All @@ -212,9 +217,10 @@ func serve(ctx context.Context) error {
stdInterceptors = append(stdInterceptors, otelInterceptor)

otelshutdown := neosyncotel.SetupOtelSdk(&neosyncotel.SetupConfig{
TraceProviders: []neosyncotel.TracerProvider{traceprovider},
MeterProviders: []neosyncotel.MeterProvider{meterprovider},
Logger: logr.FromSlogHandler(slogger.Handler()),
TraceProviders: traceProviders,
MeterProviders: meterProviders,
Logger: logr.FromSlogHandler(slogger.Handler()),
TextMapPropagator: tmPropagator,
})
defer func() {
if err := otelshutdown(context.Background()); err != nil {
Expand Down
37 changes: 28 additions & 9 deletions internal/otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@ type MeterProvider interface {
type SetupConfig struct {
TraceProviders []TracerProvider
MeterProviders []MeterProvider
Logger logr.Logger
// If provided, configures the global text map propagator
TextMapPropagator propagation.TextMapPropagator
// Configures the global otel logger
Logger logr.Logger
}

func SetupOtelSdk(config *SetupConfig) func(context.Context) error {
otel.SetLogger(config.Logger)
// todo: Should probably move this out of here as it registers this globally
otel.SetTextMapPropagator(newPropagator())

if config.TextMapPropagator != nil {
otel.SetTextMapPropagator(config.TextMapPropagator)
}

var shutdownFuncs []func(context.Context) error

Expand Down Expand Up @@ -77,7 +82,7 @@ func SetupOtelSdk(config *SetupConfig) func(context.Context) error {
return shutdown
}

func newPropagator() propagation.TextMapPropagator {
func NewDefaultPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
Expand Down Expand Up @@ -167,12 +172,17 @@ func getMeterExporter(ctx context.Context, exporter string, opts MeterExporterOp
}

func WithDefaultDeltaTemporalitySelector() otlpmetricgrpc.Option {
return otlpmetricgrpc.WithTemporalitySelector(temporalitySelector)
return otlpmetricgrpc.WithTemporalitySelector(func(ik metricsdk.InstrumentKind) metricdata.Temporality {
// Delta Temporality causes metrics to be reset after some time.
// We are using this today for benthos metrics so that they don't persist indefinitely in the time series database
return metricdata.DeltaTemporality
})
}
func temporalitySelector(ik metricsdk.InstrumentKind) metricdata.Temporality {
// Delta Temporality causes metrics to be reset after some time.
// We are using this today for benthos metrics so that they don't persist indefinitely in the time series database
return metricdata.DeltaTemporality

func withCumulativeTemporalitySelector() otlpmetricgrpc.Option {
return otlpmetricgrpc.WithTemporalitySelector(func(ik metricsdk.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
})
}

type OtelEnvConfig struct {
Expand Down Expand Up @@ -219,3 +229,12 @@ func getMetricsExporter() string {
}
return exporter
}

// This will be used to test sending benthos metrics with cumulative temporality instead of delta for better prometheus compatibility
func GetBenthosMetricTemporalityOption() otlpmetricgrpc.Option {
temporality := viper.GetString("BENTHOS_METER_TEMPORALITY")
if temporality == "" || temporality == "delta" {
return WithDefaultDeltaTemporalitySelector()
}
return withCumulativeTemporalitySelector()
}
79 changes: 66 additions & 13 deletions worker/internal/cmds/worker/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"sync"
"time"

"connectrpc.com/connect"
"connectrpc.com/grpchealth"
"connectrpc.com/grpcreflect"
"connectrpc.com/otelconnect"
"github.com/go-logr/logr"
mysql_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/mysql"
pg_queries "github.com/nucleuscloud/neosync/backend/gen/go/db/dbschemas/postgresql"
Expand Down Expand Up @@ -69,17 +71,23 @@ func serve(ctx context.Context) error {
temporalClientInterceptors := []interceptor.ClientInterceptor{}
var temopralMeterHandler client.MetricsHandler

connectInterceptors := []connect.Interceptor{}

otelconfig := neosyncotel.GetOtelConfigFromViperEnv()
if otelconfig.IsEnabled {
logger.Debug("otel is enabled")
tmPropagator := neosyncotel.NewDefaultPropagator()
otelconnopts := []otelconnect.Option{otelconnect.WithoutServerPeerAttributes(), otelconnect.WithPropagator(tmPropagator)}

meterProviders := []neosyncotel.MeterProvider{}
traceProviders := []neosyncotel.TracerProvider{}
// Meter Provider that uses delta temporality for use with Benthos metrics
// This meter provider is setup expire metrics after a specified time period for easy computation
benthosMeterProvider, err := neosyncotel.NewMeterProvider(ctx, &neosyncotel.MeterProviderConfig{
Exporter: otelconfig.MeterExporter,
AppVersion: otelconfig.ServiceVersion,
Opts: neosyncotel.MeterExporterOpts{
Otlp: []otlpmetricgrpc.Option{neosyncotel.WithDefaultDeltaTemporalitySelector()},
Otlp: []otlpmetricgrpc.Option{neosyncotel.GetBenthosMetricTemporalityOption()},
Console: []stdoutmetric.Option{stdoutmetric.WithPrettyPrint()},
},
})
Expand Down Expand Up @@ -114,7 +122,26 @@ func serve(ctx context.Context) error {
})
}

traceprovider, err := neosyncotel.NewTraceProvider(ctx, &neosyncotel.TraceProviderConfig{
neosyncMeterProvider, err := neosyncotel.NewMeterProvider(ctx, &neosyncotel.MeterProviderConfig{
Exporter: otelconfig.MeterExporter,
AppVersion: otelconfig.ServiceVersion,
Opts: neosyncotel.MeterExporterOpts{
Otlp: []otlpmetricgrpc.Option{},
Console: []stdoutmetric.Option{stdoutmetric.WithPrettyPrint()},
},
})
if err != nil {
return err
}
if neosyncMeterProvider != nil {
logger.Debug("otel metering for neosync clients has been configured")
meterProviders = append(meterProviders, neosyncMeterProvider)
otelconnopts = append(otelconnopts, otelconnect.WithMeterProvider(neosyncMeterProvider))
} else {
otelconnopts = append(otelconnopts, otelconnect.WithoutMetrics())
}

temporalTraceProvider, err := neosyncotel.NewTraceProvider(ctx, &neosyncotel.TraceProviderConfig{
Exporter: otelconfig.TraceExporter,
Opts: neosyncotel.TraceExporterOpts{
Otlp: []otlptracegrpc.Option{},
Expand All @@ -124,21 +151,46 @@ func serve(ctx context.Context) error {
if err != nil {
return err
}
if traceprovider != nil {
if temporalTraceProvider != nil {
logger.Debug("otel tracing for temporal has been configured")
traceInterceptor, err := temporalotel.NewTracingInterceptor(temporalotel.TracerOptions{
Tracer: traceprovider.Tracer("neosync-temporal-sdk"),
temporalTraceInterceptor, err := temporalotel.NewTracingInterceptor(temporalotel.TracerOptions{
Tracer: temporalTraceProvider.Tracer("neosync-temporal-sdk"),
})
if err != nil {
return err
}
temporalClientInterceptors = append(temporalClientInterceptors, traceInterceptor)
temporalClientInterceptors = append(temporalClientInterceptors, temporalTraceInterceptor)
traceProviders = append(traceProviders, temporalTraceProvider)
}

neosyncTraceProvider, err := neosyncotel.NewTraceProvider(ctx, &neosyncotel.TraceProviderConfig{
Exporter: otelconfig.TraceExporter,
Opts: neosyncotel.TraceExporterOpts{
Otlp: []otlptracegrpc.Option{},
Console: []stdouttrace.Option{stdouttrace.WithPrettyPrint()},
},
})
if err != nil {
return err
}
if neosyncTraceProvider != nil {
logger.Debug("otel tracing for neosync clients has been configured")
otelconnopts = append(otelconnopts, otelconnect.WithTracerProvider(neosyncTraceProvider))
} else {
otelconnopts = append(otelconnopts, otelconnect.WithoutTracing(), otelconnect.WithoutTraceEvents())
}

otelConnectInterceptor, err := otelconnect.NewInterceptor(otelconnopts...)
if err != nil {
return err
}
connectInterceptors = append(connectInterceptors, otelConnectInterceptor)

otelshutdown := neosyncotel.SetupOtelSdk(&neosyncotel.SetupConfig{
TraceProviders: []neosyncotel.TracerProvider{traceprovider},
MeterProviders: meterProviders,
Logger: logr.FromSlogHandler(logger.Handler()),
TraceProviders: traceProviders,
MeterProviders: meterProviders,
Logger: logr.FromSlogHandler(logger.Handler()),
TextMapPropagator: tmPropagator,
})
defer func() {
if err := otelshutdown(context.Background()); err != nil {
Expand Down Expand Up @@ -203,10 +255,11 @@ func serve(ctx context.Context) error {

neosyncurl := shared.GetNeosyncUrl()
httpclient := shared.GetNeosyncHttpClient()
userclient := mgmtv1alpha1connect.NewUserAccountServiceClient(httpclient, neosyncurl)
connclient := mgmtv1alpha1connect.NewConnectionServiceClient(httpclient, neosyncurl)
jobclient := mgmtv1alpha1connect.NewJobServiceClient(httpclient, neosyncurl)
transformerclient := mgmtv1alpha1connect.NewTransformersServiceClient(httpclient, neosyncurl)
connectInterceptorOption := connect.WithInterceptors(connectInterceptors...)
userclient := mgmtv1alpha1connect.NewUserAccountServiceClient(httpclient, neosyncurl, connectInterceptorOption)
connclient := mgmtv1alpha1connect.NewConnectionServiceClient(httpclient, neosyncurl, connectInterceptorOption)
jobclient := mgmtv1alpha1connect.NewJobServiceClient(httpclient, neosyncurl, connectInterceptorOption)
transformerclient := mgmtv1alpha1connect.NewTransformersServiceClient(httpclient, neosyncurl, connectInterceptorOption)
sqlconnector := &sqlconnect.SqlOpenConnector{}
sqlmanager := sql_manager.NewSqlManager(pgpoolmap, pgquerier, mysqlpoolmap, mysqlquerier, mssqlpoolmap, mssqlquerier, sqlconnector)
redisconfig := shared.GetRedisConfig()
Expand Down

0 comments on commit e0562cd

Please sign in to comment.