Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(da): metrics #11

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Run centralized-sequencer by specifying DA network details:
| `host` | centralized sequencer host | localhost |
| `port` | centralized sequencer port | 50051 |
| `listen-all` |listen on all network interfaces (0.0.0.0) instead of just localhost|disabled|
| `metrics` |enable prometheus metrics|disabled|
| `metrics-address` |address to expose prometheus metrics|`":8080"`|
<!-- markdownlint-enable MD013 -->

See `./build/centralized-sequencer --help` for details.
Expand Down
3 changes: 3 additions & 0 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type BaseResult struct {
Message string
// DAHeight informs about a height on Data Availability Layer for given result.
DAHeight uint64
// BlobSize is the size of the blob submitted.
BlobSize uint64
// SubmittedCount is the number of successfully submitted blocks.
SubmittedCount uint64
}
Expand Down Expand Up @@ -192,6 +194,7 @@ func (dac *DAClient) SubmitBatch(ctx context.Context, data []*sequencing.Batch,
BaseResult: BaseResult{
Code: StatusSuccess,
DAHeight: binary.LittleEndian.Uint64(ids[0]),
BlobSize: blobSize,
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
SubmittedCount: uint64(len(ids)),
},
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ toolchain go1.22.3

require (
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/go-kit/kit v0.13.0
github.com/gogo/protobuf v1.3.2
github.com/ipfs/go-log/v2 v2.5.1
github.com/prometheus/client_golang v1.19.1
github.com/rollkit/go-da v0.8.0
github.com/rollkit/go-sequencing v0.3.0
github.com/rollkit/rollkit v0.13.7
Expand All @@ -30,7 +32,6 @@ require (
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/filecoin-project/go-jsonrpc v0.6.0 // indirect
github.com/go-kit/kit v0.13.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/golang/glog v1.2.2 // indirect
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
21 changes: 20 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/rollkit/centralized-sequencer/sequencing"
sequencingGRPC "github.com/rollkit/go-sequencing/proxy/grpc"
)
Expand All @@ -32,6 +35,8 @@ func main() {
da_namespace string
da_auth_token string
db_path string
metricsEnabled bool
metricsAddress string
)
flag.StringVar(&host, "host", defaultHost, "centralized sequencer host")
flag.StringVar(&port, "port", defaultPort, "centralized sequencer port")
Expand All @@ -41,6 +46,8 @@ func main() {
flag.StringVar(&da_namespace, "da_namespace", "", "DA namespace where the sequencer submits transactions")
flag.StringVar(&da_auth_token, "da_auth_token", "", "auth token for the DA")
flag.StringVar(&db_path, "db_path", "", "path to the database")
flag.BoolVar(&metricsEnabled, "metrics", false, "Enable Prometheus metrics")
flag.StringVar(&metricsAddress, "metrics-address", ":8080", "Address to expose Prometheus metrics")

flag.Parse()

Expand All @@ -60,7 +67,19 @@ func main() {
log.Fatalf("Error decoding namespace: %v", err)
}

centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, db_path)
if metricsEnabled {
go func() {
log.Printf("Starting metrics server on %v...\n", metricsAddress)
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(metricsAddress, nil) // #nosec G114
if err != nil {
log.Fatalf("Failed to serve metrics: %v", err)
}
}()
}

metrics := sequencing.DefaultMetricsProvider(metricsEnabled)
centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, metrics(""), db_path)
if err != nil {
log.Fatalf("Failed to create centralized sequencer: %v", err)
}
Expand Down
95 changes: 95 additions & 0 deletions sequencing/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package sequencing

import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "sequencer"
)

// MetricsProvider returns sequencing Metrics.
type MetricsProvider func(chainID string) *Metrics

// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(enabled bool) MetricsProvider {
return func(chainID string) *Metrics {
if enabled {
return PrometheusMetrics("chain_id", chainID)
}
return NopMetrics()
}
}

// Metrics contains metrics exposed by this package.
type Metrics struct {
// GasPrice
GasPrice metrics.Gauge
// Last submitted blob size
LastBlobSize metrics.Gauge
// TODO(tuxcanfly): needs gas used, wallet balance from go-da
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
// cost / byte
// CostPerByte metrics.Gauge
// Wallet Balance
// WalletBalance metrics.Gauge
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
// Transaction Status
TransactionStatus metrics.Gauge
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
// Number of pending blocks.
NumPendingBlocks metrics.Gauge
// Last included block height
IncludedBlockHeight metrics.Gauge
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
return &Metrics{
GasPrice: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "gas_price",
Help: "The gas price of DA.",
}, labels).With(labelsAndValues...),
LastBlobSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "last_blob_size",
Help: "The size in bytes of the last DA blob.",
}, labels).With(labelsAndValues...),
TransactionStatus: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "transaction_status",
Help: "The transaction status of the last DA submission.",
}, labels).With(labelsAndValues...),
NumPendingBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "num_pending_blocks",
Help: "The number of pending blocks for DA submission.",
}, labels).With(labelsAndValues...),
IncludedBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: MetricsSubsystem,
Name: "included_block_height",
Help: "The last DA included block height.",
}, labels).With(labelsAndValues...),
}
}

// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
GasPrice: discard.NewGauge(),
LastBlobSize: discard.NewGauge(),
TransactionStatus: discard.NewGauge(),
NumPendingBlocks: discard.NewGauge(),
IncludedBlockHeight: discard.NewGauge(),
}
}
37 changes: 33 additions & 4 deletions sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,13 @@ type Sequencer struct {

db *badger.DB // BadgerDB instance for persistence
dbMux sync.Mutex // Mutex for safe concurrent DB access

metrics *Metrics
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
metricsProvider MetricsProvider
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
}

// NewSequencer ...
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) {
func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, metrics *Metrics, dbPath string) (*Sequencer, error) {
ctx := context.Background()
dac, err := proxyda.NewClient(daAddress, daAuthToken)
if err != nil {
Expand All @@ -303,12 +306,22 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t
} else {
opts = badger.DefaultOptions(dbPath)
}
s := &Sequencer{
dalc: dalc,
batchTime: batchTime,
ctx: ctx,
maxSize: maxBlobSize,
tq: NewTransactionQueue(),
bq: NewBatchQueue(),
seenBatches: make(map[string]struct{}),
metrics: metrics,
}
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
opts = opts.WithLogger(nil)
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("failed to open BadgerDB: %w", err)
}
s := &Sequencer{
s = &Sequencer{
dalc: dalc,
batchTime: batchTime,
ctx: ctx,
Expand Down Expand Up @@ -481,6 +494,16 @@ func (c *Sequencer) publishBatch() error {
return nil
}

func (c *Sequencer) recordMetrics(gasPrice float64, blobSize uint64, status da.StatusCode, numPendingBlocks int, includedBlockHeight uint64) {
if c.metrics != nil {
c.metrics.GasPrice.Set(float64(gasPrice))
c.metrics.LastBlobSize.Set(float64(blobSize))
c.metrics.TransactionStatus.Set(float64(status))
c.metrics.NumPendingBlocks.Set(float64(numPendingBlocks))
c.metrics.IncludedBlockHeight.Set(float64(includedBlockHeight))
}
}

func (c *Sequencer) submitBatchToDA(batch sequencing.Batch) error {
batchesToSubmit := []*sequencing.Batch{&batch}
submittedAllBlocks := false
Expand Down Expand Up @@ -542,6 +565,7 @@ daSubmitRetryLoop:
backoff = c.exponentialBackoff(backoff)
}

c.recordMetrics(gasPrice, res.BlobSize, res.Code, len(batchesToSubmit), res.DAHeight)
attempt += 1
}

Expand Down Expand Up @@ -578,8 +602,13 @@ func getRemainingSleep(start time.Time, blockTime time.Duration, sleep time.Dura

// SubmitRollupTransaction implements sequencing.Sequencer.
func (c *Sequencer) SubmitRollupTransaction(ctx context.Context, req sequencing.SubmitRollupTransactionRequest) (*sequencing.SubmitRollupTransactionResponse, error) {
if !c.isValid(req.RollupId) {
return nil, ErrInvalidRollupId
if c.rollupId == nil {
c.rollupId = req.RollupId
c.metrics = c.metricsProvider(hex.EncodeToString(req.RollupId))
} else {
if !bytes.Equal(c.rollupId, req.RollupId) {
return nil, ErrInvalidRollupId
}
tuxcanfly marked this conversation as resolved.
Show resolved Hide resolved
}
err := c.tq.AddTransaction(req.Tx, c.db)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions sequencing/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv

func TestNewSequencer(t *testing.T) {
// Create a new sequencer with mock DA client
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 1*time.Second, "")
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second, NopMetrics(), "")
require.NoError(t, err)
defer func() {
err := seq.Close()
Expand All @@ -67,7 +67,7 @@ func TestNewSequencer(t *testing.T) {

func TestSequencer_SubmitRollupTransaction(t *testing.T) {
// Initialize a new sequencer
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 1*time.Second, "")
seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 10*time.Second, NopMetrics(), "")
require.NoError(t, err)
defer func() {
err := seq.Close()
Expand Down
Loading