Skip to content

Commit

Permalink
p2p: silkworm sentry (erigontech#8527)
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr authored Nov 2, 2023
1 parent 329d18e commit d92898a
Show file tree
Hide file tree
Showing 25 changed files with 623 additions and 252 deletions.
4 changes: 2 additions & 2 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ledgerwatch/erigon/consensus/bor/heimdallgrpc"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/p2p/sentry/sentry_multi_client"
"github.com/ledgerwatch/erigon/turbo/builder"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
"github.com/ledgerwatch/log/v3"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
Expand Down Expand Up @@ -1529,7 +1529,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,

maxBlockBroadcastPeers := func(header *types.Header) uint { return 0 }

sentryControlServer, err := sentry.NewMultiClient(
sentryControlServer, err := sentry_multi_client.NewMultiClient(
db,
"",
chainConfig,
Expand Down
2 changes: 1 addition & 1 deletion cmd/sentry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/spf13/cobra"

"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/common/paths"
"github.com/ledgerwatch/erigon/p2p/sentry"
"github.com/ledgerwatch/erigon/turbo/debug"
"github.com/ledgerwatch/erigon/turbo/logging"
node2 "github.com/ledgerwatch/erigon/turbo/node"
Expand Down
29 changes: 22 additions & 7 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,9 +838,21 @@ var (

SilkwormPathFlag = cli.StringFlag{
Name: "silkworm.path",
Usage: "Path to the silkworm_api library (enables embedded Silkworm execution)",
Usage: "Path to the Silkworm library",
Value: "",
}
SilkwormExecutionFlag = cli.BoolFlag{
Name: "silkworm.exec",
Usage: "Enable Silkworm block execution",
}
SilkwormRpcDaemonFlag = cli.BoolFlag{
Name: "silkworm.rpcd",
Usage: "Enable embedded Silkworm RPC daemon",
}
SilkwormSentryFlag = cli.BoolFlag{
Name: "silkworm.sentry",
Usage: "Enable embedded Silkworm Sentry service",
}
)

var MetricFlags = []cli.Flag{&MetricsEnabledFlag, &MetricsHTTPFlag, &MetricsPortFlag}
Expand Down Expand Up @@ -1031,6 +1043,7 @@ func NewP2PConfig(
return nil, fmt.Errorf("invalid nat option %s: %w", natSetting, err)
}
cfg.NAT = natif
cfg.NATSpec = natSetting
return cfg, nil
}

Expand Down Expand Up @@ -1079,11 +1092,13 @@ func setListenAddress(ctx *cli.Context, cfg *p2p.Config) {
// setNAT creates a port mapper from command line flags.
func setNAT(ctx *cli.Context, cfg *p2p.Config) {
if ctx.IsSet(NATFlag.Name) {
natif, err := nat.Parse(ctx.String(NATFlag.Name))
natSetting := ctx.String(NATFlag.Name)
natif, err := nat.Parse(natSetting)
if err != nil {
Fatalf("Option %s: %v", NATFlag.Name, err)
}
cfg.NAT = natif
cfg.NATSpec = natSetting
}
}

Expand Down Expand Up @@ -1161,7 +1176,6 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config, nodeName, datadir string, l
}

ethPeers := cfg.MaxPeers
cfg.Name = nodeName
logger.Info("Maximum peer count", "ETH", ethPeers, "total", cfg.MaxPeers)

if netrestrict := ctx.String(NetrestrictFlag.Name); netrestrict != "" {
Expand Down Expand Up @@ -1460,10 +1474,12 @@ func setWhitelist(ctx *cli.Context, cfg *ethconfig.Config) {
}

func setSilkworm(ctx *cli.Context, cfg *ethconfig.Config) {
cfg.SilkwormEnabled = ctx.IsSet(SilkwormPathFlag.Name)
if cfg.SilkwormEnabled {
cfg.SilkwormPath = ctx.String(SilkwormPathFlag.Name)
cfg.SilkwormPath = ctx.String(SilkwormPathFlag.Name)
if ctx.IsSet(SilkwormExecutionFlag.Name) {
cfg.SilkwormExecution = ctx.Bool(SilkwormExecutionFlag.Name)
}
cfg.SilkwormRpcDaemon = ctx.Bool(SilkwormRpcDaemonFlag.Name)
cfg.SilkwormSentry = ctx.Bool(SilkwormSentryFlag.Name)
}

// CheckExclusive verifies that only a single instance of the provided flags was
Expand Down Expand Up @@ -1576,7 +1592,6 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C
setSilkworm(ctx, cfg)

cfg.Ethstats = ctx.String(EthStatsURLFlag.Name)
cfg.P2PEnabled = len(nodeConfig.P2P.SentryAddr) == 0
cfg.HistoryV3 = ctx.Bool(HistoryV3Flag.Name)
if ctx.IsSet(NetworkIdFlag.Name) {
cfg.NetworkID = ctx.Uint64(NetworkIdFlag.Name)
Expand Down
112 changes: 82 additions & 30 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import (

"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/p2p/sentry"
"github.com/ledgerwatch/erigon/p2p/sentry/sentry_multi_client"
"github.com/ledgerwatch/erigon/turbo/builder"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_block_downloader"
Expand Down Expand Up @@ -87,7 +89,6 @@ import (

"github.com/ledgerwatch/erigon/cmd/caplin/caplin1"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/common/debug"

rpcsentinel "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
Expand Down Expand Up @@ -165,7 +166,7 @@ type Ethereum struct {
// downloader fields
sentryCtx context.Context
sentryCancel context.CancelFunc
sentriesClient *sentry.MultiClient
sentriesClient *sentry_multi_client.MultiClient
sentryServers []*sentry.GrpcServer

stagedSync *stagedsync.Sync
Expand Down Expand Up @@ -200,7 +201,10 @@ type Ethereum struct {
logger log.Logger

sentinel rpcsentinel.SentinelClient
silkworm *silkworm.Silkworm

silkworm *silkworm.Silkworm
silkwormRPCDaemonService *silkworm.RpcDaemonService
silkwormSentryService *silkworm.SentryService
}

func splitAddrIntoHostAndPort(addr string) (host string, port int, err error) {
Expand Down Expand Up @@ -339,15 +343,56 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

backend.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice)

if config.SilkwormPath != "" {
backend.silkworm, err = silkworm.New(config.SilkwormPath, config.Dirs.DataDir)
if err != nil {
return nil, err
}
}

var sentries []direct.SentryClient
if len(stack.Config().P2P.SentryAddr) > 0 {
for _, addr := range stack.Config().P2P.SentryAddr {
sentryClient, err := sentry.GrpcClient(backend.sentryCtx, addr)
sentryClient, err := sentry_multi_client.GrpcClient(backend.sentryCtx, addr)
if err != nil {
return nil, err
}
sentries = append(sentries, sentryClient)
}
} else if config.SilkwormSentry {
apiPort := 53774
apiAddr := fmt.Sprintf("127.0.0.1:%d", apiPort)
p2pConfig := stack.Config().P2P

collectNodeURLs := func(nodes []*enode.Node) []string {
var urls []string
for _, n := range nodes {
urls = append(urls, n.URLv4())
}
return urls
}

settings := silkworm.SentrySettings{
ClientId: p2pConfig.Name,
ApiPort: apiPort,
Port: p2pConfig.ListenPort(),
Nat: p2pConfig.NATSpec,
NetworkId: config.NetworkID,
NodeKey: crypto.FromECDSA(p2pConfig.PrivateKey),
StaticPeers: collectNodeURLs(p2pConfig.StaticNodes),
Bootnodes: collectNodeURLs(p2pConfig.BootstrapNodes),
NoDiscover: p2pConfig.NoDiscovery,
MaxPeers: p2pConfig.MaxPeers,
}

silkwormSentryService := backend.silkworm.NewSentryService(settings)
backend.silkwormSentryService = &silkwormSentryService

sentryClient, err := sentry_multi_client.GrpcClient(backend.sentryCtx, apiAddr)
if err != nil {
return nil, err
}
sentries = append(sentries, sentryClient)
} else {
var readNodeInfo = func() *eth.NodeInfo {
var res *eth.NodeInfo
Expand Down Expand Up @@ -478,13 +523,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

backend.engine = ethconsensusconfig.CreateConsensusEngine(ctx, stack.Config(), chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify, heimdallClient, config.WithoutHeimdall, blockReader, false /* readonly */, logger)

if config.SilkwormEnabled {
backend.silkworm, err = silkworm.New(config.SilkwormPath)
if err != nil {
return nil, err
}
}

inMemoryExecution := func(batch kv.RwTx, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications) error {
terseLogger := log.New()
Expand Down Expand Up @@ -529,7 +567,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}
}

backend.sentriesClient, err = sentry.NewMultiClient(
backend.sentriesClient, err = sentry_multi_client.NewMultiClient(
chainKv,
stack.Config().NodeName(),
chainConfig,
Expand Down Expand Up @@ -862,24 +900,17 @@ func (s *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error {
}

s.apiList = jsonrpc.APIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, blockReader, s.agg, httpRpcCfg, s.engine, s.logger)
go func() {
if config.SilkwormEnabled && httpRpcCfg.Enabled {
go func() {
<-ctx.Done()
s.silkworm.StopRpcDaemon()
}()
err = s.silkworm.StartRpcDaemon(chainKv)
if err != nil {
s.logger.Error(err.Error())
return
}
} else {

if config.SilkwormRpcDaemon && httpRpcCfg.Enabled {
silkwormRPCDaemonService := s.silkworm.NewRpcDaemonService(chainKv)
s.silkwormRPCDaemonService = &silkwormRPCDaemonService
} else {
go func() {
if err := cli.StartRpcServer(ctx, httpRpcCfg, s.apiList, s.logger); err != nil {
s.logger.Error(err.Error())
return
s.logger.Error("cli.StartRpcServer error", "err", err)
}
}
}()
}()
}

go s.engineBackendRPC.Start(httpRpcCfg, s.chainDB, s.blockReader, ff, stateCache, s.agg, s.engine, ethRpcClient, txPoolRpcClient, miningRpcClient)

Expand Down Expand Up @@ -1265,6 +1296,17 @@ func (s *Ethereum) Start() error {
s.engine.(*bor.Bor).Start(s.chainDB)
}

if s.silkwormRPCDaemonService != nil {
if err := s.silkwormRPCDaemonService.Start(); err != nil {
s.logger.Error("silkworm.StartRpcDaemon error", "err", err)
}
}
if s.silkwormSentryService != nil {
if err := s.silkwormSentryService.Start(); err != nil {
s.logger.Error("silkworm.SentryStart error", "err", err)
}
}

return nil
}

Expand Down Expand Up @@ -1310,7 +1352,17 @@ func (s *Ethereum) Stop() error {
}
s.chainDB.Close()

if s.config.SilkwormEnabled {
if s.silkwormRPCDaemonService != nil {
if err := s.silkwormRPCDaemonService.Stop(); err != nil {
s.logger.Error("silkworm.StopRpcDaemon error", "err", err)
}
}
if s.silkwormSentryService != nil {
if err := s.silkwormSentryService.Stop(); err != nil {
s.logger.Error("silkworm.SentryStop error", "err", err)
}
}
if s.silkworm != nil {
s.silkworm.Close()
}

Expand All @@ -1337,7 +1389,7 @@ func (s *Ethereum) SentryCtx() context.Context {
return s.sentryCtx
}

func (s *Ethereum) SentryControlServer() *sentry.MultiClient {
func (s *Ethereum) SentryControlServer() *sentry_multi_client.MultiClient {
return s.sentriesClient
}
func (s *Ethereum) BlockIO() (services.FullBlockReader, *blockio.BlockWriter) {
Expand Down
11 changes: 6 additions & 5 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ var Defaults = Config{
Produce: true,
},

SilkwormEnabled: false,
// applies if SilkwormPath is set
SilkwormExecution: true,
}

func init() {
Expand Down Expand Up @@ -178,8 +179,6 @@ type Config struct {
// for nodes to connect to.
EthDiscoveryURLs []string

P2PEnabled bool

Prune prune.Mode
BatchSize datasize.ByteSize // Batch size for execution stage

Expand Down Expand Up @@ -253,8 +252,10 @@ type Config struct {
ForcePartialCommit bool

// Embedded Silkworm support
SilkwormEnabled bool
SilkwormPath string
SilkwormPath string
SilkwormExecution bool
SilkwormRpcDaemon bool
SilkwormSentry bool
}

type Sync struct {
Expand Down
6 changes: 0 additions & 6 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ import (
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/node"
"github.com/ledgerwatch/erigon/p2p/sentry"
)

const (
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sentry
package sentry_multi_client

import (
"context"
Expand Down
Loading

0 comments on commit d92898a

Please sign in to comment.