From 5de62cc6909de563f4f06318964fad680b2e9c86 Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Mon, 23 Sep 2024 08:45:20 -0700 Subject: [PATCH 1/8] feat(da): add metrics --- README.md | 2 + da/da.go | 3 ++ go.mod | 4 +- main.go | 21 +++++++- sequencing/metrics.go | 95 ++++++++++++++++++++++++++++++++++++ sequencing/sequencer.go | 37 ++++++++++++-- sequencing/sequencer_test.go | 4 +- 7 files changed, 157 insertions(+), 9 deletions(-) create mode 100644 sequencing/metrics.go diff --git a/README.md b/README.md index 2c10ade..44a5ff3 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,8 @@ Run centralized-sequencer by specifying DA network details: | `host` | centralized sequencer host | localhost | | `port` | centralized sequencer port | 50051 | | `listen-all` |listen on all network interfaces (0.0.0.0) instead of just localhost|disabled| +| `metrics` |enable prometheus metrics|disabled| +| `metrics-address` |address to expose prometheus metrics|`":8080"`| See `./build/centralized-sequencer --help` for details. diff --git a/da/da.go b/da/da.go index abf5ed1..10f2a42 100644 --- a/da/da.go +++ b/da/da.go @@ -80,6 +80,8 @@ type BaseResult struct { Message string // DAHeight informs about a height on Data Availability Layer for given result. DAHeight uint64 + // BlobSize is the size of the blob submitted. + BlobSize uint64 // SubmittedCount is the number of successfully submitted blocks. SubmittedCount uint64 } @@ -192,6 +194,7 @@ func (dac *DAClient) SubmitBatch(ctx context.Context, data []*sequencing.Batch, BaseResult: BaseResult{ Code: StatusSuccess, DAHeight: binary.LittleEndian.Uint64(ids[0]), + BlobSize: blobSize, SubmittedCount: uint64(len(ids)), }, } diff --git a/go.mod b/go.mod index 3e483aa..d4842d8 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,10 @@ toolchain go1.22.3 require ( github.com/dgraph-io/badger/v3 v3.2103.5 + github.com/go-kit/kit v0.13.0 github.com/gogo/protobuf v1.3.2 github.com/ipfs/go-log/v2 v2.5.1 + github.com/prometheus/client_golang v1.19.1 github.com/rollkit/go-da v0.8.0 github.com/rollkit/go-sequencing v0.3.0 github.com/rollkit/rollkit v0.13.7 @@ -30,7 +32,6 @@ require ( github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/filecoin-project/go-jsonrpc v0.6.0 // indirect - github.com/go-kit/kit v0.13.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/golang/glog v1.2.2 // indirect @@ -68,7 +69,6 @@ require ( github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect diff --git a/main.go b/main.go index 852c730..3ecb0d7 100644 --- a/main.go +++ b/main.go @@ -6,11 +6,14 @@ import ( "fmt" "log" "net" + "net/http" "os" "os/signal" "syscall" "time" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rollkit/centralized-sequencer/sequencing" sequencingGRPC "github.com/rollkit/go-sequencing/proxy/grpc" ) @@ -32,6 +35,8 @@ func main() { da_namespace string da_auth_token string db_path string + metricsEnabled bool + metricsAddress string ) flag.StringVar(&host, "host", defaultHost, "centralized sequencer host") flag.StringVar(&port, "port", defaultPort, "centralized sequencer port") @@ -41,6 +46,8 @@ func main() { flag.StringVar(&da_namespace, "da_namespace", "", "DA namespace where the sequencer submits transactions") flag.StringVar(&da_auth_token, "da_auth_token", "", "auth token for the DA") flag.StringVar(&db_path, "db_path", "", "path to the database") + flag.BoolVar(&metricsEnabled, "metrics", false, "Enable Prometheus metrics") + flag.StringVar(&metricsAddress, "metrics-address", ":8080", "Address to expose Prometheus metrics") flag.Parse() @@ -60,7 +67,19 @@ func main() { log.Fatalf("Error decoding namespace: %v", err) } - centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, db_path) + if metricsEnabled { + go func() { + log.Printf("Starting metrics server on %v...\n", metricsAddress) + http.Handle("/metrics", promhttp.Handler()) + err := http.ListenAndServe(metricsAddress, nil) // #nosec G114 + if err != nil { + log.Fatalf("Failed to serve metrics: %v", err) + } + }() + } + + metrics := sequencing.DefaultMetricsProvider(metricsEnabled) + centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics(""), db_path) if err != nil { log.Fatalf("Failed to create centralized sequencer: %v", err) } diff --git a/sequencing/metrics.go b/sequencing/metrics.go new file mode 100644 index 0000000..7bdcc28 --- /dev/null +++ b/sequencing/metrics.go @@ -0,0 +1,95 @@ +package sequencing + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "sequencer" +) + +// MetricsProvider returns sequencing Metrics. +type MetricsProvider func(chainID string) *Metrics + +// DefaultMetricsProvider returns Metrics build using Prometheus client library +// if Prometheus is enabled. Otherwise, it returns no-op Metrics. +func DefaultMetricsProvider(enabled bool) MetricsProvider { + return func(chainID string) *Metrics { + if enabled { + return PrometheusMetrics("chain_id", chainID) + } + return NopMetrics() + } +} + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // GasPrice + GasPrice metrics.Gauge + // Last submitted blob size + LastBlobSize metrics.Gauge + // TODO(tuxcanfly): needs gas used, wallet balance from go-da + // cost / byte + // CostPerByte metrics.Gauge + // Wallet Balance + // WalletBalance metrics.Gauge + // Transaction Status + TransactionStatus metrics.Gauge + // Number of pending blocks. + NumPendingBlocks metrics.Gauge + // Last included block height + IncludedBlockHeight metrics.Gauge +} + +// PrometheusMetrics returns Metrics build using Prometheus client library. +// Optionally, labels can be provided along with their values ("foo", +// "fooValue"). +func PrometheusMetrics(labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "gas_price", + Help: "The gas price of DA.", + }, labels).With(labelsAndValues...), + LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "last_blob_size", + Help: "The size in bytes of the last DA blob.", + }, labels).With(labelsAndValues...), + TransactionStatus: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "transaction_status", + Help: "The transaction status of the last DA submission.", + }, labels).With(labelsAndValues...), + NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "num_pending_blocks", + Help: "The number of pending blocks for DA submission.", + }, labels).With(labelsAndValues...), + IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "included_block_height", + Help: "The last DA included block height.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + GasPrice: discard.NewGauge(), + LastBlobSize: discard.NewGauge(), + TransactionStatus: discard.NewGauge(), + NumPendingBlocks: discard.NewGauge(), + IncludedBlockHeight: discard.NewGauge(), + } +} diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index 8325a0d..b905711 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -281,10 +281,13 @@ type Sequencer struct { db *badger.DB // BadgerDB instance for persistence dbMux sync.Mutex // Mutex for safe concurrent DB access + + metrics *Metrics + metricsProvider MetricsProvider } // NewSequencer ... -func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) { +func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, metrics *Metrics, dbPath string) (*Sequencer, error) { ctx := context.Background() dac, err := proxyda.NewClient(daAddress, daAuthToken) if err != nil { @@ -303,12 +306,22 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t } else { opts = badger.DefaultOptions(dbPath) } + s := &Sequencer{ + dalc: dalc, + batchTime: batchTime, + ctx: ctx, + maxSize: maxBlobSize, + tq: NewTransactionQueue(), + bq: NewBatchQueue(), + seenBatches: make(map[string]struct{}), + metrics: metrics, + } opts = opts.WithLogger(nil) db, err := badger.Open(opts) if err != nil { return nil, fmt.Errorf("failed to open BadgerDB: %w", err) } - s := &Sequencer{ + s = &Sequencer{ dalc: dalc, batchTime: batchTime, ctx: ctx, @@ -481,6 +494,16 @@ func (c *Sequencer) publishBatch() error { return nil } +func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, status da.StatusCode, numPendingBlocks int, includedBlockHeight uint64) { + if c.metrics != nil { + c.metrics.GasPrice.Set(float64(gasPrice)) + c.metrics.LastBlobSize.Set(float64(blobSize)) + c.metrics.TransactionStatus.Set(float64(status)) + c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks)) + c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight)) + } +} + func (c *Sequencer) submitBatchToDA(batch sequencing.Batch) error { batchesToSubmit := []*sequencing.Batch{&batch} submittedAllBlocks := false @@ -542,6 +565,7 @@ daSubmitRetryLoop: backoff = c.exponentialBackoff(backoff) } + c.recordMetrics(gasPrice, res.BlobSize, res.Code, len(batchesToSubmit), res.DAHeight) attempt += 1 } @@ -578,8 +602,13 @@ func getRemainingSleep(start time.Time, blockTime time.Duration, sleep time.Dura // SubmitRollupTransaction implements sequencing.Sequencer. func (c *Sequencer) SubmitRollupTransaction(ctx context.Context, req sequencing.SubmitRollupTransactionRequest) (*sequencing.SubmitRollupTransactionResponse, error) { - if !c.isValid(req.RollupId) { - return nil, ErrInvalidRollupId + if c.rollupId == nil { + c.rollupId = req.RollupId + c.metrics = c.metricsProvider(hex.EncodeToString(req.RollupId)) + } else { + if !bytes.Equal(c.rollupId, req.RollupId) { + return nil, ErrInvalidRollupId + } } err := c.tq.AddTransaction(req.Tx, c.db) if err != nil { diff --git a/sequencing/sequencer_test.go b/sequencing/sequencer_test.go index 4581c58..dafdbc4 100644 --- a/sequencing/sequencer_test.go +++ b/sequencing/sequencer_test.go @@ -51,7 +51,7 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv func TestNewSequencer(t *testing.T) { // Create a new sequencer with mock DA client - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 1*time.Second, "") + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second, NopMetrics(), "") require.NoError(t, err) defer func() { err := seq.Close() @@ -67,7 +67,7 @@ func TestNewSequencer(t *testing.T) { func TestSequencer_SubmitRollupTransaction(t *testing.T) { // Initialize a new sequencer - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 1*time.Second, "") + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 10*time.Second, NopMetrics(), "") require.NoError(t, err) defer func() { err := seq.Close() From 5941721d49eaecff96e6d9fd97f879e1f6783160 Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Fri, 25 Oct 2024 07:55:12 -0700 Subject: [PATCH 2/8] feat(da): review --- main.go | 2 +- sequencing/metrics.go | 12 ++++++++---- sequencing/sequencer.go | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 3ecb0d7..1064981 100644 --- a/main.go +++ b/main.go @@ -70,11 +70,11 @@ func main() { if metricsEnabled { go func() { log.Printf("Starting metrics server on %v...\n", metricsAddress) - http.Handle("/metrics", promhttp.Handler()) err := http.ListenAndServe(metricsAddress, nil) // #nosec G114 if err != nil { log.Fatalf("Failed to serve metrics: %v", err) } + http.Handle("/metrics", promhttp.Handler()) }() } diff --git a/sequencing/metrics.go b/sequencing/metrics.go index 7bdcc28..3402056 100644 --- a/sequencing/metrics.go +++ b/sequencing/metrics.go @@ -39,7 +39,7 @@ type Metrics struct { // Wallet Balance // WalletBalance metrics.Gauge // Transaction Status - TransactionStatus metrics.Gauge + TransactionStatus metrics.Histogram // Number of pending blocks. NumPendingBlocks metrics.Gauge // Last included block height @@ -50,6 +50,9 @@ type Metrics struct { // Optionally, labels can be provided along with their values ("foo", // "fooValue"). func PrometheusMetrics(labelsAndValues ...string) *Metrics { + if len(labelsAndValues)%2 != 0 { + panic("uneven number of labels and values; labels and values should be provided in pairs") + } labels := []string{} for i := 0; i < len(labelsAndValues); i += 2 { labels = append(labels, labelsAndValues[i]) @@ -65,10 +68,11 @@ func PrometheusMetrics(labelsAndValues ...string) *Metrics { Name: "last_blob_size", Help: "The size in bytes of the last DA blob.", }, labels).With(labelsAndValues...), - TransactionStatus: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + TransactionStatus: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Subsystem: MetricsSubsystem, Name: "transaction_status", - Help: "The transaction status of the last DA submission.", + Help: "Distribution of transaction statuses for DA submissions.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7}, }, labels).With(labelsAndValues...), NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Subsystem: MetricsSubsystem, @@ -88,7 +92,7 @@ func NopMetrics() *Metrics { return &Metrics{ GasPrice: discard.NewGauge(), LastBlobSize: discard.NewGauge(), - TransactionStatus: discard.NewGauge(), + TransactionStatus: discard.NewHistogram(), NumPendingBlocks: discard.NewGauge(), IncludedBlockHeight: discard.NewGauge(), } diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index b905711..2c14062 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -498,7 +498,7 @@ func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, status da.S if c.metrics != nil { c.metrics.GasPrice.Set(float64(gasPrice)) c.metrics.LastBlobSize.Set(float64(blobSize)) - c.metrics.TransactionStatus.Set(float64(status)) + c.metrics.TransactionStatus.Observe(float64(status)) c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks)) c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight)) } From 7c0a87bf811992f43363157fbe4085475cad8a40 Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Fri, 25 Oct 2024 08:00:29 -0700 Subject: [PATCH 3/8] feat(da): lint --- sequencing/sequencer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index 2c14062..91adcd8 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -282,7 +282,7 @@ type Sequencer struct { db *badger.DB // BadgerDB instance for persistence dbMux sync.Mutex // Mutex for safe concurrent DB access - metrics *Metrics + metrics *Metrics metricsProvider MetricsProvider } @@ -496,11 +496,11 @@ func (c *Sequencer) publishBatch() error { func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, status da.StatusCode, numPendingBlocks int, includedBlockHeight uint64) { if c.metrics != nil { - c.metrics.GasPrice.Set(float64(gasPrice)) - c.metrics.LastBlobSize.Set(float64(blobSize)) + c.metrics.GasPrice.Set(float64(gasPrice)) + c.metrics.LastBlobSize.Set(float64(blobSize)) c.metrics.TransactionStatus.Observe(float64(status)) - c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks)) - c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight)) + c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks)) + c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight)) } } From 9619f8880428fc69fd4d014b1a8d391f52d48b37 Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Fri, 25 Oct 2024 08:05:12 -0700 Subject: [PATCH 4/8] feat(da): fix init --- main.go | 18 +++++++++--------- sequencing/metrics.go | 1 - sequencing/sequencer.go | 25 +++++-------------------- 3 files changed, 14 insertions(+), 30 deletions(-) diff --git a/main.go b/main.go index 1064981..8424367 100644 --- a/main.go +++ b/main.go @@ -27,14 +27,14 @@ const ( func main() { var ( - host string - port string - listenAll bool - batchTime time.Duration - da_address string - da_namespace string - da_auth_token string - db_path string + host string + port string + listenAll bool + batchTime time.Duration + da_address string + da_namespace string + da_auth_token string + db_path string metricsEnabled bool metricsAddress string ) @@ -79,7 +79,7 @@ func main() { } metrics := sequencing.DefaultMetricsProvider(metricsEnabled) - centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics(""), db_path) + centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics(da_namespace), db_path) if err != nil { log.Fatalf("Failed to create centralized sequencer: %v", err) } diff --git a/sequencing/metrics.go b/sequencing/metrics.go index 3402056..f51a04b 100644 --- a/sequencing/metrics.go +++ b/sequencing/metrics.go @@ -33,7 +33,6 @@ type Metrics struct { GasPrice metrics.Gauge // Last submitted blob size LastBlobSize metrics.Gauge - // TODO(tuxcanfly): needs gas used, wallet balance from go-da // cost / byte // CostPerByte metrics.Gauge // Wallet Balance diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index 91adcd8..f91e146 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -282,8 +282,7 @@ type Sequencer struct { db *badger.DB // BadgerDB instance for persistence dbMux sync.Mutex // Mutex for safe concurrent DB access - metrics *Metrics - metricsProvider MetricsProvider + metrics *Metrics } // NewSequencer ... @@ -306,22 +305,12 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t } else { opts = badger.DefaultOptions(dbPath) } - s := &Sequencer{ - dalc: dalc, - batchTime: batchTime, - ctx: ctx, - maxSize: maxBlobSize, - tq: NewTransactionQueue(), - bq: NewBatchQueue(), - seenBatches: make(map[string]struct{}), - metrics: metrics, - } opts = opts.WithLogger(nil) db, err := badger.Open(opts) if err != nil { return nil, fmt.Errorf("failed to open BadgerDB: %w", err) } - s = &Sequencer{ + s := &Sequencer{ dalc: dalc, batchTime: batchTime, ctx: ctx, @@ -331,6 +320,7 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), db: db, + metrics: metrics, } // Load last batch hash from DB to recover from crash @@ -602,13 +592,8 @@ func getRemainingSleep(start time.Time, blockTime time.Duration, sleep time.Dura // SubmitRollupTransaction implements sequencing.Sequencer. func (c *Sequencer) SubmitRollupTransaction(ctx context.Context, req sequencing.SubmitRollupTransactionRequest) (*sequencing.SubmitRollupTransactionResponse, error) { - if c.rollupId == nil { - c.rollupId = req.RollupId - c.metrics = c.metricsProvider(hex.EncodeToString(req.RollupId)) - } else { - if !bytes.Equal(c.rollupId, req.RollupId) { - return nil, ErrInvalidRollupId - } + if !c.isValid(req.RollupId) { + return nil, ErrInvalidRollupId } err := c.tq.AddTransaction(req.Tx, c.db) if err != nil { From 5b4db360b7b8ed627df959274e2cefcf6ee60fb9 Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Fri, 25 Oct 2024 08:13:49 -0700 Subject: [PATCH 5/8] feat(da): status gauge --- sequencing/metrics.go | 9 ++++----- sequencing/sequencer.go | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sequencing/metrics.go b/sequencing/metrics.go index f51a04b..1fa2800 100644 --- a/sequencing/metrics.go +++ b/sequencing/metrics.go @@ -38,7 +38,7 @@ type Metrics struct { // Wallet Balance // WalletBalance metrics.Gauge // Transaction Status - TransactionStatus metrics.Histogram + TransactionStatus metrics.Gauge // Number of pending blocks. NumPendingBlocks metrics.Gauge // Last included block height @@ -67,11 +67,10 @@ func PrometheusMetrics(labelsAndValues ...string) *Metrics { Name: "last_blob_size", Help: "The size in bytes of the last DA blob.", }, labels).With(labelsAndValues...), - TransactionStatus: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + TransactionStatus: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Subsystem: MetricsSubsystem, Name: "transaction_status", - Help: "Distribution of transaction statuses for DA submissions.", - Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7}, + Help: "Count of transaction statuses for DA submissions", }, labels).With(labelsAndValues...), NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Subsystem: MetricsSubsystem, @@ -91,7 +90,7 @@ func NopMetrics() *Metrics { return &Metrics{ GasPrice: discard.NewGauge(), LastBlobSize: discard.NewGauge(), - TransactionStatus: discard.NewHistogram(), + TransactionStatus: discard.NewGauge(), NumPendingBlocks: discard.NewGauge(), IncludedBlockHeight: discard.NewGauge(), } diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index f91e146..5796e6b 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -484,11 +484,11 @@ func (c *Sequencer) publishBatch() error { return nil } -func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, status da.StatusCode, numPendingBlocks int, includedBlockHeight uint64) { +func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, statusCode da.StatusCode, numPendingBlocks int, includedBlockHeight uint64) { if c.metrics != nil { c.metrics.GasPrice.Set(float64(gasPrice)) c.metrics.LastBlobSize.Set(float64(blobSize)) - c.metrics.TransactionStatus.Observe(float64(status)) + c.metrics.TransactionStatus.With("status", fmt.Sprintf("%d", statusCode)).Add(1) c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks)) c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight)) } From da8dae0c8335e31eaf562e1fe32f8757bebe6278 Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Fri, 25 Oct 2024 08:16:23 -0700 Subject: [PATCH 6/8] feat(da): error --- main.go | 7 ++-- sequencing/metrics.go | 66 +++++++++++++++++++----------------- sequencing/sequencer_test.go | 6 ++-- 3 files changed, 43 insertions(+), 36 deletions(-) diff --git a/main.go b/main.go index 8424367..01078cf 100644 --- a/main.go +++ b/main.go @@ -78,8 +78,11 @@ func main() { }() } - metrics := sequencing.DefaultMetricsProvider(metricsEnabled) - centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics(da_namespace), db_path) + metrics, err := sequencing.DefaultMetricsProvider(metricsEnabled)(da_namespace) + if err != nil { + log.Fatalf("Failed to create metrics: %v", err) + } + centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics, db_path) if err != nil { log.Fatalf("Failed to create centralized sequencer: %v", err) } diff --git a/sequencing/metrics.go b/sequencing/metrics.go index 1fa2800..3707705 100644 --- a/sequencing/metrics.go +++ b/sequencing/metrics.go @@ -1,6 +1,8 @@ package sequencing import ( + "errors" + "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" "github.com/go-kit/kit/metrics/prometheus" @@ -14,12 +16,12 @@ const ( ) // MetricsProvider returns sequencing Metrics. -type MetricsProvider func(chainID string) *Metrics +type MetricsProvider func(chainID string) (*Metrics, error) // DefaultMetricsProvider returns Metrics build using Prometheus client library // if Prometheus is enabled. Otherwise, it returns no-op Metrics. func DefaultMetricsProvider(enabled bool) MetricsProvider { - return func(chainID string) *Metrics { + return func(chainID string) (*Metrics, error) { if enabled { return PrometheusMetrics("chain_id", chainID) } @@ -48,50 +50,50 @@ type Metrics struct { // PrometheusMetrics returns Metrics build using Prometheus client library. // Optionally, labels can be provided along with their values ("foo", // "fooValue"). -func PrometheusMetrics(labelsAndValues ...string) *Metrics { +func PrometheusMetrics(labelsAndValues ...string) (*Metrics, error) { if len(labelsAndValues)%2 != 0 { - panic("uneven number of labels and values; labels and values should be provided in pairs") + return nil, errors.New("uneven number of labels and values; labels and values should be provided in pairs") } labels := []string{} for i := 0; i < len(labelsAndValues); i += 2 { labels = append(labels, labelsAndValues[i]) } return &Metrics{ - GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "gas_price", - Help: "The gas price of DA.", - }, labels).With(labelsAndValues...), - LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "last_blob_size", - Help: "The size in bytes of the last DA blob.", - }, labels).With(labelsAndValues...), - TransactionStatus: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "transaction_status", - Help: "Count of transaction statuses for DA submissions", - }, labels).With(labelsAndValues...), - NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "num_pending_blocks", - Help: "The number of pending blocks for DA submission.", - }, labels).With(labelsAndValues...), - IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "included_block_height", - Help: "The last DA included block height.", - }, labels).With(labelsAndValues...), - } + GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "gas_price", + Help: "The gas price of DA.", + }, labels).With(labelsAndValues...), + LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "last_blob_size", + Help: "The size in bytes of the last DA blob.", + }, labels).With(labelsAndValues...), + TransactionStatus: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "transaction_status", + Help: "Count of transaction statuses for DA submissions", + }, labels).With(labelsAndValues...), + NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "num_pending_blocks", + Help: "The number of pending blocks for DA submission.", + }, labels).With(labelsAndValues...), + IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "included_block_height", + Help: "The last DA included block height.", + }, labels).With(labelsAndValues...), +}, nil } // NopMetrics returns no-op Metrics. -func NopMetrics() *Metrics { +func NopMetrics() (*Metrics, error) { return &Metrics{ GasPrice: discard.NewGauge(), LastBlobSize: discard.NewGauge(), TransactionStatus: discard.NewGauge(), NumPendingBlocks: discard.NewGauge(), IncludedBlockHeight: discard.NewGauge(), - } + }, nil } diff --git a/sequencing/sequencer_test.go b/sequencing/sequencer_test.go index dafdbc4..ee42c57 100644 --- a/sequencing/sequencer_test.go +++ b/sequencing/sequencer_test.go @@ -51,7 +51,8 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv func TestNewSequencer(t *testing.T) { // Create a new sequencer with mock DA client - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second, NopMetrics(), "") + metrics, _ := NopMetrics() + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second, metrics, "") require.NoError(t, err) defer func() { err := seq.Close() @@ -67,7 +68,8 @@ func TestNewSequencer(t *testing.T) { func TestSequencer_SubmitRollupTransaction(t *testing.T) { // Initialize a new sequencer - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 10*time.Second, NopMetrics(), "") + metrics, _ := NopMetrics() + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 10*time.Second, metrics, "") require.NoError(t, err) defer func() { err := seq.Close() From 734ccf93b875cc376fb396e0c3082f5a8e20634e Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Mon, 28 Oct 2024 12:59:29 -0700 Subject: [PATCH 7/8] feat(da): lint --- sequencing/metrics.go | 52 +++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/sequencing/metrics.go b/sequencing/metrics.go index 3707705..27212f5 100644 --- a/sequencing/metrics.go +++ b/sequencing/metrics.go @@ -59,32 +59,32 @@ func PrometheusMetrics(labelsAndValues ...string) (*Metrics, error) { labels = append(labels, labelsAndValues[i]) } return &Metrics{ - GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "gas_price", - Help: "The gas price of DA.", - }, labels).With(labelsAndValues...), - LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "last_blob_size", - Help: "The size in bytes of the last DA blob.", - }, labels).With(labelsAndValues...), - TransactionStatus: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "transaction_status", - Help: "Count of transaction statuses for DA submissions", - }, labels).With(labelsAndValues...), - NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "num_pending_blocks", - Help: "The number of pending blocks for DA submission.", - }, labels).With(labelsAndValues...), - IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: MetricsSubsystem, - Name: "included_block_height", - Help: "The last DA included block height.", - }, labels).With(labelsAndValues...), -}, nil + GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "gas_price", + Help: "The gas price of DA.", + }, labels).With(labelsAndValues...), + LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "last_blob_size", + Help: "The size in bytes of the last DA blob.", + }, labels).With(labelsAndValues...), + TransactionStatus: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "transaction_status", + Help: "Count of transaction statuses for DA submissions", + }, labels).With(labelsAndValues...), + NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "num_pending_blocks", + Help: "The number of pending blocks for DA submission.", + }, labels).With(labelsAndValues...), + IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: MetricsSubsystem, + Name: "included_block_height", + Help: "The last DA included block height.", + }, labels).With(labelsAndValues...), + }, nil } // NopMetrics returns no-op Metrics. From 78f2fe25a327c874039767282d3123cfd843c446 Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Tue, 29 Oct 2024 07:52:47 -0700 Subject: [PATCH 8/8] feat(da): graceful shutdown --- main.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 01078cf..7245713 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/hex" "flag" "fmt" @@ -67,11 +68,17 @@ func main() { log.Fatalf("Error decoding namespace: %v", err) } + var metricsServer *http.Server if metricsEnabled { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + metricsServer = &http.Server{ + Addr: metricsAddress, + Handler: mux, + } go func() { log.Printf("Starting metrics server on %v...\n", metricsAddress) - err := http.ListenAndServe(metricsAddress, nil) // #nosec G114 - if err != nil { + if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed { log.Fatalf("Failed to serve metrics: %v", err) } http.Handle("/metrics", promhttp.Handler()) @@ -96,6 +103,11 @@ func main() { interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGINT) <-interrupt + if metricsServer != nil { + if err := metricsServer.Shutdown(context.Background()); err != nil { + log.Printf("Error shutting down metrics server: %v", err) + } + } fmt.Println("\nCtrl+C pressed. Exiting...") os.Exit(0) }