diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index aa75a2a07c..8bff9fa2b0 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -469,3 +469,8 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node, func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { panic("TODO") } + +// GetConfig implements the StateSync interface. +func (s *FakeStateSync) GetConfig() config.Blockchain { + panic("TODO") +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go index 65abc62953..c32fe666be 100644 --- a/pkg/config/blockfetcher_config.go +++ b/pkg/config/blockfetcher_config.go @@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct { BQueueSize int `yaml:"BQueueSize"` SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"` IndexFileSize uint32 `yaml:"IndexFileSize"` + BlocksOnly bool `yaml:"BlocksOnly"` } // Validate checks NeoFSBlockFetcher for internal consistency and ensures diff --git a/pkg/config/config.go b/pkg/config/config.go index ac8bc5d7be..302316e2ae 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain { return Blockchain{ ProtocolConfiguration: c.ProtocolConfiguration, Ledger: c.ApplicationConfiguration.Ledger, + NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher, } } diff --git a/pkg/config/ledger_config.go b/pkg/config/ledger_config.go index ff6d59948a..98e6d2f92c 100644 --- a/pkg/config/ledger_config.go +++ b/pkg/config/ledger_config.go @@ -29,4 +29,5 @@ type Ledger struct { type Blockchain struct { ProtocolConfiguration Ledger + NeoFSBlockFetcher } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index a223f01bd6..67eb106b05 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -285,7 +285,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement)) } if cfg.P2PStateExchangeExtensions { - if !cfg.StateRootInHeader { + if !cfg.StateRootInHeader && !cfg.NeoFSBlockFetcher.Enabled { return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") } if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks { diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index ef6e61806c..ee650eae0d 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -516,3 +516,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { return s.mptpool.GetBatch(limit) } + +// GetConfig returns current blockchain configuration. +func (s *Module) GetConfig() config.Blockchain { + return s.bc.GetConfig() +} diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 762d58c35b..24a55ce147 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" "go.uber.org/zap" ) @@ -15,6 +16,7 @@ type Blockqueuer interface { AddHeaders(...*block.Header) error BlockHeight() uint32 HeaderHeight() uint32 + GetConfig() config.Blockchain } // OperationMode is the mode of operation for the block queue. diff --git a/pkg/network/server.go b/pkg/network/server.go index b8998700c0..b96b1e6b30 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -109,6 +109,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + headerFetcher *blockfetcher.Service blockFetcher *blockfetcher.Service serviceLock sync.RWMutex @@ -227,10 +228,23 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 { s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize } - s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) - var err error - s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, - sync.OnceFunc(func() { close(s.blockFetcherFin) })) + var ( + err error + bq bqueue.Blockqueuer = s.chain + ) + if s.config.P2PStateExchangeExtensions { + bq = s.stateSync + } + s.bFetcherQueue = bqueue.New(bq, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + s.headerFetcher, err = blockfetcher.New(bq, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, + func() { + s.log.Info("NeoFS BlockFetcher finished headers downloading") + }) + if err != nil { + return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) + } + s.NeoFSBlockFetcherCfg.BlocksOnly = true + s.blockFetcher, err = blockfetcher.New(bq, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, sync.OnceFunc(func() { close(s.blockFetcherFin) })) if err != nil { return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) } @@ -311,12 +325,6 @@ func (s *Server) Start() { go s.bQueue.Run() go s.bSyncQueue.Run() go s.bFetcherQueue.Run() - if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { - err := s.blockFetcher.Start() - if err != nil { - s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) - } - } for _, tr := range s.transports { go tr.Accept() } @@ -333,7 +341,12 @@ func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { s.bFetcherQueue.Discard() - s.blockFetcher.Shutdown() + if s.headerFetcher.IsActive() { + s.headerFetcher.Shutdown() + } + if s.blockFetcher.IsActive() { + s.blockFetcher.Shutdown() + } } for _, tr := range s.transports { tr.Close() @@ -732,7 +745,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() || s.blockFetcher.IsActive() { + if s.stateSync.IsActive() || s.headerFetcher.IsActive() || s.blockFetcher.IsActive() { return false } @@ -792,7 +805,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { - if s.blockFetcher.IsActive() { + if s.headerFetcher.IsActive() || s.blockFetcher.IsActive() { return nil } if s.stateSync.IsActive() { @@ -815,15 +828,23 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { - if s.blockFetcher.IsActive() { - return nil - } if s.stateSync.NeedHeaders() { - if s.chain.HeaderHeight() < p.LastBlockIndex() { + if s.chain.HeaderHeight() < p.LastBlockIndex() || s.NeoFSBlockFetcherCfg.Enabled { return s.requestHeaders(p) } return nil } + if s.headerFetcher.IsActive() { + s.headerFetcher.Shutdown() + } + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + if ((s.stateSync.IsActive() && s.headerFetcher.IsShutdown()) || !s.stateSync.IsActive()) && !s.blockFetcher.IsShutdown() { + if err := s.blockFetcher.Start(); err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } + } + } + var ( bq bqueue.Blockqueuer = s.chain requestMPTNodes bool @@ -847,6 +868,16 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error { // requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers. func (s *Server) requestHeaders(p Peer) error { + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + if !s.headerFetcher.IsShutdown() { + err := s.headerFetcher.Start() + if err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } else { + return err + } + } + } pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader) return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl)) } @@ -1136,7 +1167,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error // handleHeadersCmd processes headers payload. func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error { - if s.blockFetcher.IsActive() { + if s.headerFetcher.IsActive() { return nil } return s.stateSync.AddHeaders(h.Hdrs...) @@ -1335,6 +1366,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 2. Send requests for chunk in increasing order. // 3. After all requests have been sent, request random height. func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error { + if s.blockFetcher.IsActive() { + return nil + } pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) lq, capLeft := s.bQueue.LastQueued() if capLeft == 0 { @@ -1468,9 +1502,6 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { - if s.blockFetcher.IsActive() { - return - } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index b7e5a3d3be..a3beee81b7 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -1,6 +1,7 @@ package network import ( + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/util" @@ -17,4 +18,5 @@ type StateSync interface { NeedHeaders() bool NeedMPTNodes() bool Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error + GetConfig() config.Blockchain } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 854827c6eb..a2f6edf970 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -36,6 +36,7 @@ const ( type Ledger interface { GetConfig() config.Blockchain BlockHeight() uint32 + HeaderHeight() uint32 } // poolWrapper wraps a NeoFS pool to adapt its Close method to return an error. @@ -52,15 +53,18 @@ func (p poolWrapper) Close() error { // Service is a service that fetches blocks from NeoFS. type Service struct { // isActive denotes whether the service is working or in the process of shutdown. - isActive atomic.Bool + isActive atomic.Bool + // isShutdown denotes whether the service is shutdown after running. + isShutdown atomic.Bool log *zap.Logger cfg config.NeoFSBlockFetcher stateRootInHeader bool - chain Ledger - pool poolWrapper - enqueueBlock func(*block.Block) error - account *wallet.Account + chain Ledger + pool poolWrapper + enqueueBlock func(*block.Block) error + enqueueHeader func(*block.Header) error + account *wallet.Account oidsCh chan oid.ID // wg is a wait group for block downloaders. @@ -81,7 +85,7 @@ type Service struct { } // New creates a new BlockFetcher Service. -func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { +func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, putHeader func(*block.Header) error, shutdownCallback func()) (*Service, error) { var ( account *wallet.Account err error @@ -143,6 +147,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc cfg: cfg, enqueueBlock: putBlock, + enqueueHeader: putHeader, account: account, stateRootInHeader: chain.GetConfig().StateRootInHeader, shutdownCallback: shutdownCallback, @@ -162,7 +167,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc // Start runs the NeoFS BlockFetcher service. func (bfs *Service) Start() error { - if !bfs.isActive.CompareAndSwap(false, true) { + if !bfs.isActive.CompareAndSwap(false, true) || bfs.IsShutdown() { return nil } bfs.log.Info("starting NeoFS BlockFetcher service") @@ -262,7 +267,11 @@ func (bfs *Service) blockDownloader() { case <-bfs.ctx.Done(): return default: - err = bfs.enqueueBlock(b) + if !bfs.cfg.BlocksOnly { + err = bfs.enqueueHeader(&b.Header) + } else { + err = bfs.enqueueBlock(b) + } if err != nil { bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err)) bfs.stopService(true) @@ -275,6 +284,9 @@ func (bfs *Service) blockDownloader() { // fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first. func (bfs *Service) fetchOIDsFromIndexFiles() error { h := bfs.chain.BlockHeight() + if !bfs.cfg.BlocksOnly { + h = bfs.chain.HeaderHeight() + } startIndex := h / bfs.cfg.IndexFileSize skip := h % bfs.cfg.IndexFileSize @@ -425,7 +437,7 @@ func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) { // block OIDs search, cancels all in-progress downloading operations and waits // until all service routines finish their work. func (bfs *Service) Shutdown() { - if !bfs.IsActive() { + if !bfs.IsActive() || bfs.IsShutdown() { return } bfs.stopService(true) @@ -444,6 +456,9 @@ func (bfs *Service) stopService(force bool) { // exiter is a routine that is listening to a quitting signal and manages graceful // Service shutdown process. func (bfs *Service) exiter() { + if !bfs.isActive.Load() { + return + } // Closing signal may come from anyone, but only once. force := <-bfs.quit bfs.log.Info("shutting down NeoFS BlockFetcher service", @@ -451,6 +466,7 @@ func (bfs *Service) exiter() { ) bfs.isActive.CompareAndSwap(true, false) + bfs.isShutdown.CompareAndSwap(false, true) // Cansel all pending OIDs/blocks downloads in case if shutdown requested by user // or caused by downloading error. if force { @@ -482,6 +498,12 @@ func (bfs *Service) IsActive() bool { return bfs.isActive.Load() } +// IsShutdown returns true if the NeoFS BlockFetcher service is completely shutdown. +// The service can not be started again. +func (bfs *Service) IsShutdown() bool { + return bfs.isShutdown.Load() +} + // retry function with exponential backoff. func (bfs *Service) retry(action func() error) error { var ( diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index 970a5d456e..fc5c506dc6 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -14,6 +14,10 @@ type mockLedger struct { height uint32 } +func (m *mockLedger) HeaderHeight() uint32 { + return m.height +} + func (m *mockLedger) GetConfig() config.Blockchain { return config.Blockchain{} } @@ -31,6 +35,11 @@ func (m *mockPutBlockFunc) putBlock(b *block.Block) error { return nil } +func (m *mockPutBlockFunc) putHead(h *block.Header) error { + m.putCalled = true + return nil +} + func TestServiceConstructor(t *testing.T) { logger := zap.NewNop() ledger := &mockLedger{height: 10} @@ -46,7 +55,7 @@ func TestServiceConstructor(t *testing.T) { OIDBatchSize: 0, DownloaderWorkersCount: 0, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.Error(t, err) }) @@ -57,7 +66,7 @@ func TestServiceConstructor(t *testing.T) { }, Addresses: []string{}, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.Error(t, err) }) @@ -69,7 +78,7 @@ func TestServiceConstructor(t *testing.T) { Addresses: []string{"localhost:8080"}, BQueueSize: DefaultQueueCacheSize, } - service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + service, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.NoError(t, err) require.NotNil(t, service) @@ -87,7 +96,7 @@ func TestServiceConstructor(t *testing.T) { }, Addresses: []string{"localhost:1"}, } - service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + service, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.NoError(t, err) err = service.Start() require.Error(t, err) @@ -106,7 +115,7 @@ func TestServiceConstructor(t *testing.T) { }, }, } - _, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) + _, err := New(ledger, cfg, logger, mockPut.putBlock, mockPut.putHead, shutdownCallback) require.Error(t, err) require.Contains(t, err.Error(), "open wallet: open invalid/path/to/wallet.json:") })