Skip to content

Commit

Permalink
fix(otelbench): wait for receiver readiness
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Apr 27, 2024
1 parent aa18f7e commit af125d8
Showing 1 changed file with 37 additions and 5 deletions.
42 changes: 37 additions & 5 deletions cmd/otelbench/otel_log_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/zctx"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -173,9 +174,9 @@ func (b *LogsBench) generateBatch(r *rand.Rand, now time.Time) (logs plog.Logs,
return logs, lines, bytes
}

func (b *LogsBench) prepareTargets(args []string) error {
func (b *LogsBench) prepareTargets(ctx context.Context, args []string) error {
for _, arg := range args {
client, err := b.prepareTarget(arg)
client, err := b.prepareTarget(ctx, arg)
if err != nil {
return errors.Wrapf(err, "prepare %q", arg)
}
Expand All @@ -190,14 +191,45 @@ func (b *LogsBench) prepareTargets(args []string) error {
return nil
}

func (b *LogsBench) prepareTarget(target string) (plogotlp.GRPCClient, error) {
func (b *LogsBench) prepareTarget(ctx context.Context, target string) (plogotlp.GRPCClient, error) {
conn, err := grpc.NewClient(target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, errors.Wrap(err, "dial otlp")
return nil, errors.Wrap(err, "new client")
}
client := plogotlp.NewGRPCClient(conn)

var (
log = zctx.From(ctx).With(zap.String("target", target))
eb = backoff.NewExponentialBackOff(
backoff.WithInitialInterval(5*time.Second),
backoff.WithMaxElapsedTime(time.Minute),
)
)
log.Info("Waiting for receiver")
if err := backoff.RetryNotify(
func() error {
_, err := client.Export(ctx, plogotlp.NewExportRequest())
if err != nil {
if cerr := ctx.Err(); cerr != nil {
return backoff.Permanent(cerr)
}
return err
}
return nil
},
eb,
func(err error, d time.Duration) {
log.Debug("Retry ping request",
zap.Error(err),
)
},
); err != nil {
return nil, err
}
log.Info("Receiver is ready")

return client, nil
}

Expand All @@ -219,7 +251,7 @@ func newOtelLogsBenchCommand() *cobra.Command {
}()
ctx = zctx.Base(ctx, log)

if err := b.prepareTargets(args); err != nil {
if err := b.prepareTargets(ctx, args); err != nil {
return err
}
return b.Run(ctx)
Expand Down

0 comments on commit af125d8

Please sign in to comment.