Skip to content

Commit

Permalink
network: integrate state sync module with blockfetcher
Browse files Browse the repository at this point in the history
New peer connected but protocol not started, so state sync is not init.

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 20, 2025
1 parent 4af6927 commit 9262290
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 55 deletions.
20 changes: 10 additions & 10 deletions config/protocol.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ ProtocolConfiguration:
- 03184b018d6b2bc093e535519732b3fd3f7551c8cffaf4621dd5a0b89482ca66c9
ValidatorsCount: 7
SeedList:
- seed1t5.neo.org:20333
- seed2t5.neo.org:20333
- seed3t5.neo.org:20333
- seed4t5.neo.org:20333
- seed5t5.neo.org:20333
- localhost:20332
VerifyTransactions: false
P2PSigExtensions: false
StateRootInHeader: false
P2PStateExchangeExtensions: true
StateSyncInterval: 10
Hardforks:
Aspidochelone: 210000
Basilisk: 2680000
Cockatrice: 3967000
Domovoi: 4144000

ApplicationConfiguration:
RemoveUntraceableBlocks: true
SkipBlockVerification: false
# LogPath could be set up in case you need stdout logs to some proper file.
# LogPath: "./log/neogo.log"
Expand All @@ -57,14 +57,14 @@ ApplicationConfiguration:
# FilePath: "./chains/testnet.bolt"
P2P:
Addresses:
- ":20333" # in form of "[host]:[port][:announcedPort]"
- ":20335" # in form of "[host]:[port][:announcedPort]"
DialTimeout: 3s
ProtoTickInterval: 2s
PingInterval: 30s
PingTimeout: 90s
MaxPeers: 100
MaxPeers: 1
AttemptConnPeers: 20
MinPeers: 10
MinPeers: 1
Relay: true
Consensus:
Enabled: false
Expand All @@ -83,7 +83,7 @@ ApplicationConfiguration:
RPC:
Enabled: true
Addresses:
- ":20332"
- ":20338"
MaxGasInvoke: 15
EnableCORSWorkaround: false
TLSConfig:
Expand All @@ -93,7 +93,7 @@ ApplicationConfiguration:
CertFile: serv.crt
KeyFile: serv.key
Prometheus:
Enabled: true
Enabled: false
Addresses:
- ":2112"
Pprof:
Expand Down
4 changes: 4 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,7 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
panic("TODO")
}

func (s *FakeStateSync) GetConfig() config.Blockchain {
panic("TODO")
}
1 change: 1 addition & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct {
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
IndexFileSize uint32 `yaml:"IndexFileSize"`
BlocksOnly bool `yaml:"BlocksOnly"`
}

// Validate checks NeoFSBlockFetcher for internal consistency and ensures
Expand Down
5 changes: 1 addition & 4 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
}
Expand Down Expand Up @@ -1605,7 +1602,7 @@ func (bc *Blockchain) GetStateModule() StateRoot {

// GetStateSyncModule returns new state sync service instance.
func (bc *Blockchain) GetStateSyncModule() *statesync.Module {
return statesync.NewModule(bc, bc.stateRoot, bc.log, bc.dao, bc.jumpToState)
return statesync.NewModule(bc, bc.log, bc.dao, bc.jumpToState)
}

// storeBlock performs chain update using the block given, it executes all
Expand Down
31 changes: 17 additions & 14 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/dao"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/stateroot"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/util"
Expand Down Expand Up @@ -85,30 +84,28 @@ type Module struct {
// blockHeight is the index of the latest stored block.
blockHeight uint32

dao *dao.Simple
bc Ledger
stateMod *stateroot.Module
mptpool *Pool
dao *dao.Simple
bc Ledger
mptpool *Pool

billet *mpt.Billet

jumpCallback func(p uint32) error
}

// NewModule returns new instance of statesync module.
func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
func NewModule(bc Ledger, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) {
return &Module{
dao: s,
bc: bc,
stateMod: stateMod,
syncStage: inactive,
}
}
fmt.Println("statesync.NewModule")
return &Module{
dao: s,
bc: bc,
stateMod: stateMod,
log: log,
syncInterval: uint32(bc.GetConfig().StateSyncInterval),
mptpool: NewPool(),
Expand All @@ -120,6 +117,7 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si
// Init initializes state sync module for the current chain's height with given
// callback for MPT nodes requests.
func (s *Module) Init(currChainHeight uint32) error {
fmt.Println("statesync.Module.Init")
s.lock.Lock()
defer s.lock.Unlock()

Expand Down Expand Up @@ -160,10 +158,10 @@ func (s *Module) Init(currChainHeight uint32) error {
// current chain's state until new state is completely fetched, outdated state-related data
// will be removed from storage during (*Blockchain).jumpToState(...) execution.
// All we need to do right now is to remove genesis-related MPT nodes.
err = s.stateMod.CleanStorage()
if err != nil {
return fmt.Errorf("failed to remove outdated MPT data from storage: %w", err)
}
//err = s.stateMod.CleanStorage()
//if err != nil {
// return fmt.Errorf("failed to remove outdated MPT data from storage: %w", err)
//}
}

s.syncPoint = p
Expand Down Expand Up @@ -212,7 +210,7 @@ func (s *Module) defineSyncStage() error {
if s.blockHeight > s.syncPoint {
s.syncStage |= mptSynced
s.log.Info("MPT is in sync",
zap.Uint32("stateroot height", s.stateMod.CurrentLocalHeight()))
zap.Uint32("stateroot height", s.syncPoint))
} else if s.syncStage&headersSynced != 0 {
header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint + 1))
if err != nil {
Expand Down Expand Up @@ -289,7 +287,7 @@ func (s *Module) getLatestSavedBlock(p uint32) uint32 {
func (s *Module) AddHeaders(hdrs ...*block.Header) error {
s.lock.Lock()
defer s.lock.Unlock()

fmt.Println("s.syncStage", s.syncStage)
if s.syncStage != initialized {
return errors.New("headers were not requested")
}
Expand Down Expand Up @@ -516,3 +514,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {

return s.mptpool.GetBatch(limit)
}

// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()
}
76 changes: 51 additions & 25 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type (
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
headerFetcher *blockfetcher.Service
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
Expand Down Expand Up @@ -227,13 +228,18 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize
}
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
s.bFetcherQueue = bqueue.New(s.stateSync, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
s.headerFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader,
sync.OnceFunc(func() { close(s.blockFetcherFin) }))
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
s.NeoFSBlockFetcherCfg.BlocksOnly = true
s.blockFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, nil)
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}

if s.MinPeers < 0 {
s.log.Info("bad MinPeers configured, using the default value",
Expand Down Expand Up @@ -311,12 +317,6 @@ func (s *Server) Start() {
go s.bQueue.Run()
go s.bSyncQueue.Run()
go s.bFetcherQueue.Run()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand All @@ -333,7 +333,7 @@ func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
s.bFetcherQueue.Discard()
s.blockFetcher.Shutdown()
s.headerFetcher.Shutdown()
}
for _, tr := range s.transports {
tr.Close()
Expand Down Expand Up @@ -571,13 +571,19 @@ func (s *Server) run() {
s.discovery.RegisterGood(p)

s.tryInitStateSync()
s.tryStartServices()
case <-s.blockFetcherFin:
if s.started.Load() {
s.tryInitStateSync()
s.tryStartServices()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.headerFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
s.blockFetcherFin = nil
s.tryStartServices()
//case <-s.blockFetcherFin:
// if s.started.Load() {
// s.tryInitStateSync()
// s.tryStartServices()
// }
// s.blockFetcherFin = nil
}
}
}
Expand Down Expand Up @@ -732,7 +738,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int

if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
if s.stateSync.IsActive() || s.headerFetcher.IsActive() {
return false
}

Expand Down Expand Up @@ -792,7 +798,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {

// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.blockFetcher.IsActive() {
if s.headerFetcher.IsActive() {
return nil
}
if s.stateSync.IsActive() {
Expand All @@ -815,15 +821,25 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
}

func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
fmt.Println("requestBlocksOrHeaders")
if s.stateSync.NeedHeaders() {
fmt.Println(".stateSync.NeedHeaders()")
if s.chain.HeaderHeight() < p.LastBlockIndex() {
fmt.Println("s.chain.HeaderHeight() < p.LastBlockIndex()")
return s.requestHeaders(p)
}
return nil
}
if s.headerFetcher.IsActive() {
s.headerFetcher.Shutdown()
}
//stop bf and start new+ mpt
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
var (
bq bqueue.Blockqueuer = s.chain
requestMPTNodes bool
Expand All @@ -847,6 +863,10 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {

// requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers.
func (s *Server) requestHeaders(p Peer) error {
if s.headerFetcher.IsActive() {
//add by blockfetcher
return nil
}
pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader)
return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl))
}
Expand Down Expand Up @@ -1136,7 +1156,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error

// handleHeadersCmd processes headers payload.
func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
if s.blockFetcher.IsActive() {
if s.headerFetcher.IsActive() {
return nil
}
return s.stateSync.AddHeaders(h.Hdrs...)
Expand Down Expand Up @@ -1335,6 +1355,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 2. Send requests for chunk in increasing order.
// 3. After all requests have been sent, request random height.
func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
lq, capLeft := s.bQueue.LastQueued()
if capLeft == 0 {
Expand Down Expand Up @@ -1468,18 +1491,16 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}

func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
return
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return
}

fmt.Println("tryInitStateSync 1")
if s.stateSync.IsInitialized() {
return
}

fmt.Println("tryInitStateSync 2")
s.lock.RLock()
heights := make([]uint32, 0, len(s.peers))
for p := range s.peers {
Expand All @@ -1489,7 +1510,11 @@ func (s *Server) tryInitStateSync() {
}
s.lock.RUnlock()
slices.Sort(heights)
fmt.Println("tryInitStateSync 3")
fmt.Println("heights", heights)
fmt.Println("MinPeers", s.MinPeers)
if len(heights) >= s.MinPeers && len(heights) > 0 {
fmt.Println("tryInitStateSync 4")
// choose the height of the median peer as the current chain's height
h := heights[len(heights)/2]
err := s.stateSync.Init(h)
Expand All @@ -1506,6 +1531,7 @@ func (s *Server) tryInitStateSync() {
s.bSyncQueue.Discard()
}
}
fmt.Println("tryInitStateSync end")
}

// BroadcastExtensible add a locally-generated Extensible payload to the pool
Expand Down
2 changes: 2 additions & 0 deletions pkg/network/state_sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network

import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/util"
Expand All @@ -17,4 +18,5 @@ type StateSync interface {
NeedHeaders() bool
NeedMPTNodes() bool
Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error
GetConfig() config.Blockchain
}
Loading

0 comments on commit 9262290

Please sign in to comment.