From 3b2b9a632c4afee69043387af1c4154abf4b157c Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Tue, 21 Jan 2025 10:50:41 +0300 Subject: [PATCH] network: integrate state sync module with blockfetcher Close #3574 Signed-off-by: Ekaterina Pavlova --- internal/fakechain/fakechain.go | 5 ++ pkg/config/blockfetcher_config.go | 1 + pkg/core/blockchain.go | 3 -- pkg/core/statesync/module.go | 5 ++ pkg/network/server.go | 53 ++++++++++++------- pkg/network/state_sync.go | 2 + pkg/services/blockfetcher/blockfetcher.go | 22 +++++--- .../blockfetcher/blockfetcher_test.go | 19 +++++-- 8 files changed, 78 insertions(+), 32 deletions(-) diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index aa75a2a07c..8bff9fa2b0 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -469,3 +469,8 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node, func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { panic("TODO") } + +// GetConfig implements the StateSync interface. +func (s *FakeStateSync) GetConfig() config.Blockchain { + panic("TODO") +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go index 65abc62953..c32fe666be 100644 --- a/pkg/config/blockfetcher_config.go +++ b/pkg/config/blockfetcher_config.go @@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct { BQueueSize int `yaml:"BQueueSize"` SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"` IndexFileSize uint32 `yaml:"IndexFileSize"` + BlocksOnly bool `yaml:"BlocksOnly"` } // Validate checks NeoFSBlockFetcher for internal consistency and ensures diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index a223f01bd6..cfbc29cb9e 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -285,9 +285,6 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement)) } if cfg.P2PStateExchangeExtensions { - if !cfg.StateRootInHeader { - return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") - } if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks { return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)") } diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index ef6e61806c..ee650eae0d 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -516,3 +516,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { return s.mptpool.GetBatch(limit) } + +// GetConfig returns current blockchain configuration. +func (s *Module) GetConfig() config.Blockchain { + return s.bc.GetConfig() +} diff --git a/pkg/network/server.go b/pkg/network/server.go index b8998700c0..83cba8dc9c 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -109,6 +109,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + headerFetcher *blockfetcher.Service blockFetcher *blockfetcher.Service serviceLock sync.RWMutex @@ -227,10 +228,17 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 { s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize } - s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + s.bFetcherQueue = bqueue.New(s.stateSync, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) var err error - s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, - sync.OnceFunc(func() { close(s.blockFetcherFin) })) + s.headerFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, + func() { + s.log.Info("NeoFS BlockFetcher finished headers") + }) + if err != nil { + return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) + } + s.NeoFSBlockFetcherCfg.BlocksOnly = true + s.blockFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, sync.OnceFunc(func() { close(s.blockFetcherFin) })) if err != nil { return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) } @@ -311,12 +319,6 @@ func (s *Server) Start() { go s.bQueue.Run() go s.bSyncQueue.Run() go s.bFetcherQueue.Run() - if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { - err := s.blockFetcher.Start() - if err != nil { - s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) - } - } for _, tr := range s.transports { go tr.Accept() } @@ -333,6 +335,7 @@ func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { s.bFetcherQueue.Discard() + s.headerFetcher.Shutdown() s.blockFetcher.Shutdown() } for _, tr := range s.transports { @@ -732,7 +735,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() || s.blockFetcher.IsActive() { + if s.stateSync.IsActive() || s.headerFetcher.IsActive() || s.blockFetcher.IsActive() { return false } @@ -792,7 +795,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { - if s.blockFetcher.IsActive() { + if s.headerFetcher.IsActive() || s.blockFetcher.IsActive() { return nil } if s.stateSync.IsActive() { @@ -815,15 +818,20 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { - if s.blockFetcher.IsActive() { - return nil - } if s.stateSync.NeedHeaders() { if s.chain.HeaderHeight() < p.LastBlockIndex() { return s.requestHeaders(p) } return nil } + s.headerFetcher.Shutdown() + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + err := s.blockFetcher.Start() + if err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } + } + var ( bq bqueue.Blockqueuer = s.chain requestMPTNodes bool @@ -847,6 +855,15 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error { // requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers. func (s *Server) requestHeaders(p Peer) error { + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + err := s.headerFetcher.Start() + if err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } + } + if s.headerFetcher.IsActive() { + return nil + } pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader) return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl)) } @@ -1136,7 +1153,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error // handleHeadersCmd processes headers payload. func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error { - if s.blockFetcher.IsActive() { + if s.headerFetcher.IsActive() { return nil } return s.stateSync.AddHeaders(h.Hdrs...) @@ -1335,6 +1352,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 2. Send requests for chunk in increasing order. // 3. After all requests have been sent, request random height. func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error { + if s.blockFetcher.IsActive() { + return nil + } pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) lq, capLeft := s.bQueue.LastQueued() if capLeft == 0 { @@ -1468,9 +1488,6 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { - if s.blockFetcher.IsActive() { - return - } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index b7e5a3d3be..a3beee81b7 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -1,6 +1,7 @@ package network import ( + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/util" @@ -17,4 +18,5 @@ type StateSync interface { NeedHeaders() bool NeedMPTNodes() bool Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error + GetConfig() config.Blockchain } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 854827c6eb..497699141d 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -36,6 +36,7 @@ const ( type Ledger interface { GetConfig() config.Blockchain BlockHeight() uint32 + HeaderHeight() uint32 } // poolWrapper wraps a NeoFS pool to adapt its Close method to return an error. @@ -57,10 +58,11 @@ type Service struct { cfg config.NeoFSBlockFetcher stateRootInHeader bool - chain Ledger - pool poolWrapper - enqueueBlock func(*block.Block) error - account *wallet.Account + chain Ledger + pool poolWrapper + enqueueBlock func(*block.Block) error + enqueueHeader func(*block.Header) error + account *wallet.Account oidsCh chan oid.ID // wg is a wait group for block downloaders. @@ -81,7 +83,7 @@ type Service struct { } // New creates a new BlockFetcher Service. -func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { +func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, putHeader func(*block.Header) error, shutdownCallback func()) (*Service, error) { var ( account *wallet.Account err error @@ -143,6 +145,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc cfg: cfg, enqueueBlock: putBlock, + enqueueHeader: putHeader, account: account, stateRootInHeader: chain.GetConfig().StateRootInHeader, shutdownCallback: shutdownCallback, @@ -262,7 +265,11 @@ func (bfs *Service) blockDownloader() { case <-bfs.ctx.Done(): return default: - err = bfs.enqueueBlock(b) + if !bfs.cfg.BlocksOnly { + err = bfs.enqueueHeader(&b.Header) + } else { + err = bfs.enqueueBlock(b) + } if err != nil { bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err)) bfs.stopService(true) @@ -275,6 +282,9 @@ func (bfs *Service) blockDownloader() { // fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first. func (bfs *Service) fetchOIDsFromIndexFiles() error { h := bfs.chain.BlockHeight() + if !bfs.cfg.BlocksOnly { + h = bfs.chain.HeaderHeight() + } startIndex := h / bfs.cfg.IndexFileSize skip := h % bfs.cfg.IndexFileSize diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index 970a5d456e..fc5c506dc6 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -14,6 +14,10 @@ type mockLedger struct { height uint32 } +func (m *mockLedger) HeaderHeight() uint32 { + return m.height +} + func (m *mockLedger) GetConfig() config.Blockchain { return config.Blockchain{} } @@ -31,6 +35,11 @@ func (m *mockPutBlockFunc) putBlock(b *block.Block) error { return nil } +func (m *mockPutBlockFunc) putHead(h *block.Header) error { + m.putCalled = true + return nil +} + func TestServiceConstructor(t *testing.T) { logger := zap.NewNop() ledger := &mockLedger{height: 10} @@ -46,7 +55,7 @@ func TestServiceConstructor(t *testing.T) { OIDBatchSize: 0, DownloaderWorkersCount: 0, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.Error(t, err) }) @@ -57,7 +66,7 @@ func TestServiceConstructor(t *testing.T) { }, Addresses: []string{}, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.Error(t, err) }) @@ -69,7 +78,7 @@ func TestServiceConstructor(t *testing.T) { Addresses: []string{"localhost:8080"}, BQueueSize: DefaultQueueCacheSize, } - service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + service, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.NoError(t, err) require.NotNil(t, service) @@ -87,7 +96,7 @@ func TestServiceConstructor(t *testing.T) { }, Addresses: []string{"localhost:1"}, } - service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + service, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.NoError(t, err) err = service.Start() require.Error(t, err) @@ -106,7 +115,7 @@ func TestServiceConstructor(t *testing.T) { }, }, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.Error(t, err) require.Contains(t, err.Error(), "open wallet: open invalid/path/to/wallet.json:") })