Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

celestia: add node rpc client otel metrics #61

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion celestia/celestia.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package celestia

Check failure on line 1 in celestia/celestia.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

: # github.com/rollkit/celestia-da/celestia [github.com/rollkit/celestia-da/celestia.test]

Check failure on line 1 in celestia/celestia.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

: # github.com/rollkit/celestia-da/celestia [github.com/rollkit/celestia-da/celestia.test]

import (
"context"
Expand All @@ -17,6 +17,7 @@
auth "github.com/cosmos/cosmos-sdk/x/auth/types"

"github.com/rollkit/go-da"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
)

// CelestiaDA implements the celestia backend for the DA interface
Expand All @@ -25,15 +26,17 @@
namespace share.Namespace
gasPrice float64
ctx context.Context
metrics *sdkmetric.MeterProvider
}

// NewCelestiaDA returns an instance of CelestiaDA
func NewCelestiaDA(client *rpc.Client, namespace share.Namespace, gasPrice float64, ctx context.Context) *CelestiaDA {
func NewCelestiaDA(client *rpc.Client, namespace share.Namespace, gasPrice float64, ctx context.Context, m *sdkmetric.MeterProvider) *CelestiaDA {
return &CelestiaDA{
client: client,
namespace: namespace,
gasPrice: gasPrice,
ctx: ctx,
metrics: m,
}
}

Expand Down
67 changes: 67 additions & 0 deletions celestia/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package celestia

import (
"runtime/debug"
"strings"
)

// BuildInfo populated in ldflags by Makefile
type BuildInfo struct {
BuildTime string
LastCommit string
SemanticVersion string
NodeVersion string
GoVersion string
}

// ReadBuildInfo *BuildInfo containing build information
func ReadBuildInfo() *BuildInfo {
debugInfo, _ := debug.ReadBuildInfo()
var ldflags string
for _, kv := range debugInfo.Settings {
switch kv.Key {
case "-ldflags":
ldflags = kv.Value
}
}
var buildInfo BuildInfo

// Split ldflags into individual key-value pairs
keyValuePairs := strings.Split(ldflags, "-X ")

// Iterate over key-value pairs
for _, pair := range keyValuePairs {
// Skip empty pairs
if pair == "" {
continue
}

// Remove quotes
pair = strings.Trim(strings.TrimSpace(pair), "'")

// Split pair into key and value
parts := strings.Split(pair, "=")
if len(parts) != 2 {
// Invalid pair, skip
continue
}

// Trim leading and trailing spaces from key and value
key := parts[0]
value := strings.TrimSpace(parts[1])

// Assign value to corresponding field in BuildInfo
switch key {
case ".buildTime":
buildInfo.BuildTime = value
case ".lastCommit":
buildInfo.LastCommit = value
case ".semanticVersion":
buildInfo.SemanticVersion = value
case ".nodeVersion":
buildInfo.NodeVersion = value
}
}
buildInfo.GoVersion = debugInfo.GoVersion
return &buildInfo
}
5 changes: 4 additions & 1 deletion cmd/celestia-da/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
grpcListenFlag = "da.grpc.listen"
grpcNetworkFlag = "da.grpc.network"
grpcGasPriceFlag = "da.grpc.gasprice"
grpcMetricsFlag = "da.grpc.metrics"
)

// WithDataAvailabilityService patches the start command to also run the gRPC Data Availability service
Expand All @@ -25,6 +26,7 @@ func WithDataAvailabilityService(flags []*pflag.FlagSet) func(*cobra.Command) {
grpcFlags.String(grpcListenFlag, "127.0.0.1:0", "gRPC service listen address")
grpcFlags.String(grpcNetworkFlag, "tcp", "gRPC service listen network type must be \"tcp\", \"tcp4\", \"tcp6\", \"unix\" or \"unixpacket\"")
grpcFlags.Float64(grpcGasPriceFlag, -1, "gas price for estimating fee (utia/gas) default: -1 for default fees")
grpcFlags.Bool(grpcMetricsFlag, false, "enables OTLP metrics with HTTP exporter")

fset := append(flags, grpcFlags)

Expand All @@ -44,6 +46,7 @@ func WithDataAvailabilityService(flags []*pflag.FlagSet) func(*cobra.Command) {
listenAddress, _ := cmd.Flags().GetString(grpcListenFlag)
listenNetwork, _ := cmd.Flags().GetString(grpcNetworkFlag)
gasPrice, _ := cmd.Flags().GetFloat64(grpcGasPriceFlag)
metrics, _ := cmd.Flags().GetBool(grpcMetricsFlag)

if rpcToken == "" {
token, err := authToken(cmdnode.StorePath(c.Context()))
Expand All @@ -54,7 +57,7 @@ func WithDataAvailabilityService(flags []*pflag.FlagSet) func(*cobra.Command) {
}

// serve the gRPC service in a goroutine
go serve(cmd.Context(), rpcAddress, rpcToken, listenAddress, listenNetwork, nsString, gasPrice)
go serve(cmd.Context(), rpcAddress, rpcToken, listenAddress, listenNetwork, nsString, gasPrice, metrics)
}

c.PreRun = preRun
Expand Down
175 changes: 175 additions & 0 deletions cmd/celestia-da/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package main

import (
"context"
"fmt"
"net/http"
"sync"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
)

// InstrumentReg stores the already registered instruments
//
//nolint:structcheck // generics
type InstrumentReg[T any, O any] struct {
instruments map[string]T
mu sync.Mutex
newInstrument func(name string, options ...O) (T, error)
}

// GetInstrument registers a new instrument, otherwise returns the already created.
func (r *InstrumentReg[T, O]) GetInstrument(name string, options ...O) (T, error) {
var err error
r.mu.Lock()
defer r.mu.Unlock()
instrument, has := r.instruments[name]
if !has {
instrument, err = r.newInstrument(name, options...)
if err != nil {
return instrument, fmt.Errorf("unable to register metric %T %s: %w", r, name, err)
}
r.instruments[name] = instrument
}

return instrument, nil
}

var (
// meter is the default meter
meter metric.Meter //nolint:gochecknoglobals // private
// meterOnce is used to init meter
meterOnce sync.Once //nolint:gochecknoglobals // private
// regInt64Counter stores Int64Counters
regInt64Counter *InstrumentReg[metric.Int64Counter, metric.Int64CounterOption] //nolint:gochecknoglobals // private
// regFloat64Counter stores Float64Counters
regFloat64Counter *InstrumentReg[metric.Float64Counter, metric.Float64CounterOption] //nolint:gochecknoglobals // private
)

// GetMeter returns the default meter.
// Inits meter and InstrumentRegs (if needed)
func GetMeter(m metric.MeterProvider) metric.Meter {
meterOnce.Do(func() {
meter = m.Meter("github.com/pgillich/opentracing-example/internal/middleware", metric.WithInstrumentationVersion("0.1"))

regInt64Counter = &InstrumentReg[metric.Int64Counter, metric.Int64CounterOption]{
instruments: map[string]metric.Int64Counter{},
newInstrument: meter.Int64Counter,
}
regFloat64Counter = &InstrumentReg[metric.Float64Counter, metric.Float64CounterOption]{
instruments: map[string]metric.Float64Counter{},
newInstrument: meter.Float64Counter,
}
})

return meter
}

func Int64CounterGetInstrument(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {

Check warning on line 75 in cmd/celestia-da/metrics.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported function Int64CounterGetInstrument should have comment or be unexported (revive)

Check warning on line 75 in cmd/celestia-da/metrics.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported function Int64CounterGetInstrument should have comment or be unexported (revive)
return regInt64Counter.GetInstrument(name, options...)
}

func Float64CounterGetInstrument(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {

Check warning on line 79 in cmd/celestia-da/metrics.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported function Float64CounterGetInstrument should have comment or be unexported (revive)

Check warning on line 79 in cmd/celestia-da/metrics.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: exported function Float64CounterGetInstrument should have comment or be unexported (revive)
return regFloat64Counter.GetInstrument(name, options...)
}

// MetricTransport implements the http.RoundTripper interface and wraps
// outbound HTTP(S) requests with metrics.
type MetricTransport struct {
rt http.RoundTripper

meter metric.Meter
name string
description string
baseAttrs []attribute.KeyValue
}

// NewMetricTransport wraps the provided http.RoundTripper with one that
// meters metrics.
//
// If the provided http.RoundTripper is nil, http.DefaultTransport will be used
// as the base http.RoundTripper.
func NewMetricTransport(base http.RoundTripper, meter metric.Meter, name string,
description string, attributes map[string]string) *MetricTransport {
if base == nil {
base = http.DefaultTransport
}
baseAttrs := make([]attribute.KeyValue, 0, len(attributes))
for aKey, aVal := range attributes {
baseAttrs = append(baseAttrs, attribute.Key(aKey).String(aVal))
}

return &MetricTransport{
rt: base,
meter: meter,
name: name,
description: description,
baseAttrs: baseAttrs,
}
}

// RoundTrip meters outgoing request-response pair.
func (t *MetricTransport) RoundTrip(r *http.Request) (*http.Response, error) {
ctx := r.Context()

attempted, err := Int64CounterGetInstrument(t.name, metric.WithDescription(t.description))
if err != nil {
return nil, err
}
durationSum, err := Float64CounterGetInstrument(t.name+"_duration", metric.WithDescription(t.description+", duration sum"), metric.WithUnit("s"))
if err != nil {
return nil, err
}
beginTS := time.Now()
var res *http.Response

r = r.WithContext(ctx)
res, err = t.rt.RoundTrip(r)

elapsedSec := time.Since(beginTS).Seconds()
attrs := make([]attribute.KeyValue, len(t.baseAttrs), len(t.baseAttrs)+6)
copy(attrs, t.baseAttrs)
opt := metric.WithAttributes(attrs...)
attempted.Add(ctx, 1, opt)
durationSum.Add(ctx, elapsedSec, opt)

return res, err //nolint:wrapcheck // should not be changed
}

func setupMetrics(ctx context.Context, serviceName string) (*sdkmetric.MeterProvider, error) {
exporter, err := otlpmetrichttp.New(
ctx,
otlpmetrichttp.WithEndpoint("localhost:4318"),
otlpmetrichttp.WithInsecure(),
)
if err != nil {
return nil, err
}

// labels/tags/resources that are common to all metrics.
resource := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
attribute.String("some-attribute", "some-value"),
)

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(resource),
sdkmetric.WithReader(
// collects and exports metric data every 30 seconds.
sdkmetric.NewPeriodicReader(exporter, sdkmetric.WithInterval(30*time.Second)),
),
)
GetMeter(mp)

otel.SetMeterProvider(mp)

return mp, nil
}
33 changes: 28 additions & 5 deletions cmd/celestia-da/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"encoding/hex"
"errors"
"net"
"net/http"

rpc "github.com/celestiaorg/celestia-node/api/rpc/client"
"github.com/celestiaorg/celestia-node/share"
"github.com/filecoin-project/go-jsonrpc"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"

"github.com/rollkit/celestia-da/celestia"

Expand All @@ -17,10 +20,30 @@ import (
"github.com/rollkit/go-da/proxy"
)

func serve(ctx context.Context, rpcAddress, rpcToken, listenAddress, listenNetwork, nsString string, gasPrice float64) {
client, err := rpc.NewClient(ctx, rpcAddress, rpcToken)
if err != nil {
log.Fatalln("failed to create celestia-node RPC client:", err)
var serviceName = "celestia-da"

func serve(ctx context.Context, rpcAddress, rpcToken, listenAddress, listenNetwork, nsString string, gasPrice float64, metrics bool) {
var client *rpc.Client
var err error
var m *sdkmetric.MeterProvider
if metrics {
m, err = setupMetrics(ctx, serviceName)
if err != nil {
log.Fatalln("failed to setup metrics:", err)
}
httpTransport := NewMetricTransport(nil, m.Meter("rpc"), "rpc", "celestia-node json-rpc client", nil)
httpClient := &http.Client{
Transport: httpTransport,
}
client, err = rpc.NewClient(ctx, rpcAddress, rpcToken, jsonrpc.WithHTTPClient(httpClient))
if err != nil {
log.Fatalln("failed to create celestia-node RPC client:", err)
}
} else {
client, err = rpc.NewClient(ctx, rpcAddress, rpcToken)
if err != nil {
log.Fatalln("failed to create celestia-node RPC client:", err)
}
}
nsBytes := make([]byte, len(nsString)/2)
_, err = hex.Decode(nsBytes, []byte(nsString))
Expand All @@ -32,7 +55,7 @@ func serve(ctx context.Context, rpcAddress, rpcToken, listenAddress, listenNetwo
log.Fatalln("invalid namespace:", err)
}

da := celestia.NewCelestiaDA(client, namespace, gasPrice, ctx)
da := celestia.NewCelestiaDA(client, namespace, gasPrice, ctx, m)
// TODO(tzdybal): add configuration options for encryption
srv := proxy.NewServer(da, grpc.Creds(insecure.NewCredentials()))

Expand Down
Loading
Loading