Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

metrics: initial libp2p metrics #58

Merged
merged 3 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions pkg/p2p/libp2p/conngater.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ type connectionGater struct {
register register.Register
selfType p2p.PeerType
minimumStake *big.Int
metrics *metrics
}

// newConnectionGater creates a new instance of ConnectionGater
func newConnectionGater(register register.Register, selfType p2p.PeerType, minimumStake *big.Int) ConnectionGater {
func newConnectionGater(register register.Register, selfType p2p.PeerType, minimumStake *big.Int, metrics *metrics) ConnectionGater {
return &connectionGater{
register: register,
selfType: selfType,
minimumStake: minimumStake,
metrics: metrics,
}
}

Expand Down Expand Up @@ -163,6 +165,7 @@ func (cg *connectionGater) InterceptAccept(connMultiaddrs network.ConnMultiaddrs
func (cg *connectionGater) InterceptSecured(dir network.Direction, p peer.ID, connMultiaddrs network.ConnMultiaddrs) bool {
allowance := cg.checkAllowedPeer(p)
if allowance.isDeny() {
cg.metrics.RejectedConnectionCount.Inc()
return false
}

Expand All @@ -189,9 +192,11 @@ func (cg *connectionGater) InterceptUpgraded(conn network.Conn) (bool, control.D
// the peer's stake is greater than minimal stake, the connection is allowed
// otherwise, the connection is rejected
func (cg *connectionGater) validateInboundConnection(p peer.ID, connMultiaddrs network.ConnMultiaddrs) bool {
//cg.metrics.IncomingConnectionCount.Inc()

allowance := cg.checkPeerStake(p)
if allowance.isDeny() {
cg.metrics.RejectedConnectionCount.Inc()
}

return !allowance.isDeny()
}

Expand All @@ -200,8 +205,10 @@ func (cg *connectionGater) validateInboundConnection(p peer.ID, connMultiaddrs n
// and the peer's stake is greater than minimal stake, the connection is
// allowed otherwise, the connection is rejected
func (cg *connectionGater) validateOutboundConnection(p peer.ID, connMultiaddrs network.ConnMultiaddrs) bool {
//cg.metrics.OutgoingConnectionCount.Inc()

allowance := cg.checkPeerStake(p)
if allowance.isDeny() {
cg.metrics.RejectedConnectionCount.Inc()
}

return !allowance.isDeny()
}
16 changes: 14 additions & 2 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"github.com/primevprotocol/mev-commit/pkg/util"
"log/slog"
"math/big"
"time"

"github.com/primevprotocol/mev-commit/pkg/util"

"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p"
libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -37,6 +38,7 @@ type Service struct {
logger *slog.Logger
notifier p2p.Notifier
hsSvc *handshake.Service
metrics *metrics
}

type Options struct {
Expand Down Expand Up @@ -72,8 +74,10 @@ func New(opts *Options) (*Service, error) {
return nil, err
}

var metrics = new(metrics)
if opts.MetricsReg != nil {
rcmgr.MustRegisterWith(opts.MetricsReg)
metrics = newMetrics(opts.MetricsReg, "primev")
}

str, err := rcmgr.NewStatsTraceReporter()
Expand All @@ -88,7 +92,12 @@ func New(opts *Options) (*Service, error) {
return nil, err
}

conngtr := newConnectionGater(opts.Register, opts.PeerType, opts.MinimumStake)
conngtr := newConnectionGater(
opts.Register,
opts.PeerType,
opts.MinimumStake,
metrics,
)

host, err := libp2p.New(
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", opts.ListenPort)),
Expand Down Expand Up @@ -140,6 +149,7 @@ func New(opts *Options) (*Service, error) {
peers: newPeerRegistry(),
hsSvc: hsSvc,
logger: opts.Logger,
metrics: metrics,
}
s.peers.setDisconnector(s)

Expand Down Expand Up @@ -170,6 +180,7 @@ func (s *Service) handleConnectReq(streamlibp2p network.Stream) {
if err != nil {
s.logger.Error("error handling handshake", "err", err)
_ = streamlibp2p.Reset()
s.metrics.FailedIncomingHandshakeCount.Inc()
return
}

Expand Down Expand Up @@ -280,6 +291,7 @@ func (s *Service) Connect(ctx context.Context, info []byte) (p2p.Peer, error) {
p, err := s.hsSvc.Handshake(ctx, addrInfo.ID, stream)
if err != nil {
_ = s.host.Network().ClosePeer(addrInfo.ID)
s.metrics.FailedOutgoingHandshakeCount.Inc()
return p2p.Peer{}, err
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/p2p/libp2p/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package libp2p

import "github.com/prometheus/client_golang/prometheus"

type metrics struct {
BlockedPeerCount prometheus.Counter
RejectedConnectionCount prometheus.Counter
FailedIncomingHandshakeCount prometheus.Counter
FailedOutgoingHandshakeCount prometheus.Counter
}

func newMetrics(registry prometheus.Registerer, namespace string) *metrics {
subsystem := "libp2p"

m := &metrics{
BlockedPeerCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "blocked_peer_count",
Help: "Number of blocked peers.",
}),
RejectedConnectionCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rejected_connection_count",
Help: "Number of rejected connection count.",
}),
FailedIncomingHandshakeCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_incoming_handshake_count",
Help: "Number of failed incoming handshake count.",
}),
FailedOutgoingHandshakeCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_outgoing_handshake_count",
Help: "Number of failed outgoing handshake count.",
}),
}

registry.MustRegister(
m.BlockedPeerCount,
m.RejectedConnectionCount,
m.FailedIncomingHandshakeCount,
m.FailedOutgoingHandshakeCount,
)

return m
}
Loading