From af125d8b89f03cd6aeef6b27916dead3340d5ac4 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Sat, 27 Apr 2024 04:35:10 +0300 Subject: [PATCH] fix(otelbench): wait for receiver readiness --- cmd/otelbench/otel_log_bench.go | 42 +++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/cmd/otelbench/otel_log_bench.go b/cmd/otelbench/otel_log_bench.go index c1f5295d..206c26e3 100644 --- a/cmd/otelbench/otel_log_bench.go +++ b/cmd/otelbench/otel_log_bench.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff/v4" "github.com/go-faster/errors" "github.com/go-faster/sdk/zctx" "github.com/spf13/cobra" @@ -173,9 +174,9 @@ func (b *LogsBench) generateBatch(r *rand.Rand, now time.Time) (logs plog.Logs, return logs, lines, bytes } -func (b *LogsBench) prepareTargets(args []string) error { +func (b *LogsBench) prepareTargets(ctx context.Context, args []string) error { for _, arg := range args { - client, err := b.prepareTarget(arg) + client, err := b.prepareTarget(ctx, arg) if err != nil { return errors.Wrapf(err, "prepare %q", arg) } @@ -190,14 +191,45 @@ func (b *LogsBench) prepareTargets(args []string) error { return nil } -func (b *LogsBench) prepareTarget(target string) (plogotlp.GRPCClient, error) { +func (b *LogsBench) prepareTarget(ctx context.Context, target string) (plogotlp.GRPCClient, error) { conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { - return nil, errors.Wrap(err, "dial otlp") + return nil, errors.Wrap(err, "new client") } client := plogotlp.NewGRPCClient(conn) + + var ( + log = zctx.From(ctx).With(zap.String("target", target)) + eb = backoff.NewExponentialBackOff( + backoff.WithInitialInterval(5*time.Second), + backoff.WithMaxElapsedTime(time.Minute), + ) + ) + log.Info("Waiting for receiver") + if err := backoff.RetryNotify( + func() error { + _, err := client.Export(ctx, plogotlp.NewExportRequest()) + if err != nil { + if cerr := ctx.Err(); cerr != nil { + return backoff.Permanent(cerr) + } + return err + } + return nil + }, + eb, + func(err error, d time.Duration) { + log.Debug("Retry ping request", + zap.Error(err), + ) + }, + ); err != nil { + return nil, err + } + log.Info("Receiver is ready") + return client, nil } @@ -219,7 +251,7 @@ func newOtelLogsBenchCommand() *cobra.Command { }() ctx = zctx.Base(ctx, log) - if err := b.prepareTargets(args); err != nil { + if err := b.prepareTargets(ctx, args); err != nil { return err } return b.Run(ctx)