Skip to content

Commit

Permalink
network: integrate state sync module with blockfetcher
Browse files Browse the repository at this point in the history
Close #3574

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 21, 2025
1 parent 4af6927 commit 3b2b9a6
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 32 deletions.
5 changes: 5 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,8 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
panic("TODO")
}

// GetConfig implements the StateSync interface.
func (s *FakeStateSync) GetConfig() config.Blockchain {
panic("TODO")
}
1 change: 1 addition & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct {
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
IndexFileSize uint32 `yaml:"IndexFileSize"`
BlocksOnly bool `yaml:"BlocksOnly"`
}

// Validate checks NeoFSBlockFetcher for internal consistency and ensures
Expand Down
3 changes: 0 additions & 3 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {

return s.mptpool.GetBatch(limit)
}

// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()
}
53 changes: 35 additions & 18 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type (
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
headerFetcher *blockfetcher.Service
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
Expand Down Expand Up @@ -227,10 +228,17 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize
}
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
s.bFetcherQueue = bqueue.New(s.stateSync, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
sync.OnceFunc(func() { close(s.blockFetcherFin) }))
s.headerFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader,
func() {
s.log.Info("NeoFS BlockFetcher finished headers")
})
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
s.NeoFSBlockFetcherCfg.BlocksOnly = true
s.blockFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, sync.OnceFunc(func() { close(s.blockFetcherFin) }))
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
Expand Down Expand Up @@ -311,12 +319,6 @@ func (s *Server) Start() {
go s.bQueue.Run()
go s.bSyncQueue.Run()
go s.bFetcherQueue.Run()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand All @@ -333,6 +335,7 @@ func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
s.bFetcherQueue.Discard()
s.headerFetcher.Shutdown()
s.blockFetcher.Shutdown()
}
for _, tr := range s.transports {
Expand Down Expand Up @@ -732,7 +735,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int

if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
if s.stateSync.IsActive() || s.headerFetcher.IsActive() || s.blockFetcher.IsActive() {
return false
}

Expand Down Expand Up @@ -792,7 +795,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {

// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.blockFetcher.IsActive() {
if s.headerFetcher.IsActive() || s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.IsActive() {
Expand All @@ -815,15 +818,20 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
}

func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
}
return nil
}
s.headerFetcher.Shutdown()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}

var (
bq bqueue.Blockqueuer = s.chain
requestMPTNodes bool
Expand All @@ -847,6 +855,15 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {

// requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers.
func (s *Server) requestHeaders(p Peer) error {
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.headerFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
if s.headerFetcher.IsActive() {
return nil
}
pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader)
return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl))
}
Expand Down Expand Up @@ -1136,7 +1153,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error

// handleHeadersCmd processes headers payload.
func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
if s.blockFetcher.IsActive() {
if s.headerFetcher.IsActive() {
return nil
}
return s.stateSync.AddHeaders(h.Hdrs...)
Expand Down Expand Up @@ -1335,6 +1352,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 2. Send requests for chunk in increasing order.
// 3. After all requests have been sent, request random height.
func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
lq, capLeft := s.bQueue.LastQueued()
if capLeft == 0 {
Expand Down Expand Up @@ -1468,9 +1488,6 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}

func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
return
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/network/state_sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network

import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/util"
Expand All @@ -17,4 +18,5 @@ type StateSync interface {
NeedHeaders() bool
NeedMPTNodes() bool
Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error
GetConfig() config.Blockchain
}
22 changes: 16 additions & 6 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
type Ledger interface {
GetConfig() config.Blockchain
BlockHeight() uint32
HeaderHeight() uint32
}

// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
Expand All @@ -57,10 +58,11 @@ type Service struct {
cfg config.NeoFSBlockFetcher
stateRootInHeader bool

chain Ledger
pool poolWrapper
enqueueBlock func(*block.Block) error
account *wallet.Account
chain Ledger
pool poolWrapper
enqueueBlock func(*block.Block) error
enqueueHeader func(*block.Header) error
account *wallet.Account

oidsCh chan oid.ID
// wg is a wait group for block downloaders.
Expand All @@ -81,7 +83,7 @@ type Service struct {
}

// New creates a new BlockFetcher Service.
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) {
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, putHeader func(*block.Header) error, shutdownCallback func()) (*Service, error) {
var (
account *wallet.Account
err error
Expand Down Expand Up @@ -143,6 +145,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
cfg: cfg,

enqueueBlock: putBlock,
enqueueHeader: putHeader,
account: account,
stateRootInHeader: chain.GetConfig().StateRootInHeader,
shutdownCallback: shutdownCallback,
Expand Down Expand Up @@ -262,7 +265,11 @@ func (bfs *Service) blockDownloader() {
case <-bfs.ctx.Done():
return
default:
err = bfs.enqueueBlock(b)
if !bfs.cfg.BlocksOnly {
err = bfs.enqueueHeader(&b.Header)
} else {
err = bfs.enqueueBlock(b)
}
if err != nil {
bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err))
bfs.stopService(true)
Expand All @@ -275,6 +282,9 @@ func (bfs *Service) blockDownloader() {
// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first.
func (bfs *Service) fetchOIDsFromIndexFiles() error {
h := bfs.chain.BlockHeight()
if !bfs.cfg.BlocksOnly {
h = bfs.chain.HeaderHeight()
}
startIndex := h / bfs.cfg.IndexFileSize
skip := h % bfs.cfg.IndexFileSize

Expand Down
19 changes: 14 additions & 5 deletions pkg/services/blockfetcher/blockfetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ type mockLedger struct {
height uint32
}

func (m *mockLedger) HeaderHeight() uint32 {
return m.height
}

func (m *mockLedger) GetConfig() config.Blockchain {
return config.Blockchain{}
}
Expand All @@ -31,6 +35,11 @@ func (m *mockPutBlockFunc) putBlock(b *block.Block) error {
return nil
}

func (m *mockPutBlockFunc) putHead(h *block.Header) error {
m.putCalled = true
return nil
}

func TestServiceConstructor(t *testing.T) {
logger := zap.NewNop()
ledger := &mockLedger{height: 10}
Expand All @@ -46,7 +55,7 @@ func TestServiceConstructor(t *testing.T) {
OIDBatchSize: 0,
DownloaderWorkersCount: 0,
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
_, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback)
require.Error(t, err)
})

Expand All @@ -57,7 +66,7 @@ func TestServiceConstructor(t *testing.T) {
},
Addresses: []string{},
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
_, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback)
require.Error(t, err)
})

Expand All @@ -69,7 +78,7 @@ func TestServiceConstructor(t *testing.T) {
Addresses: []string{"localhost:8080"},
BQueueSize: DefaultQueueCacheSize,
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
service, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback)
require.NoError(t, err)
require.NotNil(t, service)

Expand All @@ -87,7 +96,7 @@ func TestServiceConstructor(t *testing.T) {
},
Addresses: []string{"localhost:1"},
}
service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
service, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback)
require.NoError(t, err)
err = service.Start()
require.Error(t, err)
Expand All @@ -106,7 +115,7 @@ func TestServiceConstructor(t *testing.T) {
},
},
}
_, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback)
_, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback)
require.Error(t, err)
require.Contains(t, err.Error(), "open wallet: open invalid/path/to/wallet.json:")
})
Expand Down

0 comments on commit 3b2b9a6

Please sign in to comment.