From 926229049c3c5020fe9628b881a441aa7dcd97d4 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Mon, 20 Jan 2025 11:44:31 +0300 Subject: [PATCH] network: integrate state sync module with blockfetcher New peer connected but protocol not started, so state sync is not init. Signed-off-by: Ekaterina Pavlova --- config/protocol.testnet.yml | 20 +++--- internal/fakechain/fakechain.go | 4 ++ pkg/config/blockfetcher_config.go | 1 + pkg/core/blockchain.go | 5 +- pkg/core/statesync/module.go | 31 ++++----- pkg/network/server.go | 76 +++++++++++++++-------- pkg/network/state_sync.go | 2 + pkg/services/blockfetcher/blockfetcher.go | 27 +++++++- 8 files changed, 111 insertions(+), 55 deletions(-) diff --git a/config/protocol.testnet.yml b/config/protocol.testnet.yml index 4c3c54a1f3..53be733bb8 100644 --- a/config/protocol.testnet.yml +++ b/config/protocol.testnet.yml @@ -31,13 +31,12 @@ ProtocolConfiguration: - 03184b018d6b2bc093e535519732b3fd3f7551c8cffaf4621dd5a0b89482ca66c9 ValidatorsCount: 7 SeedList: - - seed1t5.neo.org:20333 - - seed2t5.neo.org:20333 - - seed3t5.neo.org:20333 - - seed4t5.neo.org:20333 - - seed5t5.neo.org:20333 + - localhost:20332 VerifyTransactions: false P2PSigExtensions: false + StateRootInHeader: false + P2PStateExchangeExtensions: true + StateSyncInterval: 10 Hardforks: Aspidochelone: 210000 Basilisk: 2680000 @@ -45,6 +44,7 @@ ProtocolConfiguration: Domovoi: 4144000 ApplicationConfiguration: + RemoveUntraceableBlocks: true SkipBlockVerification: false # LogPath could be set up in case you need stdout logs to some proper file. # LogPath: "./log/neogo.log" @@ -57,14 +57,14 @@ ApplicationConfiguration: # FilePath: "./chains/testnet.bolt" P2P: Addresses: - - ":20333" # in form of "[host]:[port][:announcedPort]" + - ":20335" # in form of "[host]:[port][:announcedPort]" DialTimeout: 3s ProtoTickInterval: 2s PingInterval: 30s PingTimeout: 90s - MaxPeers: 100 + MaxPeers: 1 AttemptConnPeers: 20 - MinPeers: 10 + MinPeers: 1 Relay: true Consensus: Enabled: false @@ -83,7 +83,7 @@ ApplicationConfiguration: RPC: Enabled: true Addresses: - - ":20332" + - ":20338" MaxGasInvoke: 15 EnableCORSWorkaround: false TLSConfig: @@ -93,7 +93,7 @@ ApplicationConfiguration: CertFile: serv.crt KeyFile: serv.key Prometheus: - Enabled: true + Enabled: false Addresses: - ":2112" Pprof: diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index aa75a2a07c..fa06a14fad 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -469,3 +469,7 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node, func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { panic("TODO") } + +func (s *FakeStateSync) GetConfig() config.Blockchain { + panic("TODO") +} diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go index 65abc62953..c32fe666be 100644 --- a/pkg/config/blockfetcher_config.go +++ b/pkg/config/blockfetcher_config.go @@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct { BQueueSize int `yaml:"BQueueSize"` SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"` IndexFileSize uint32 `yaml:"IndexFileSize"` + BlocksOnly bool `yaml:"BlocksOnly"` } // Validate checks NeoFSBlockFetcher for internal consistency and ensures diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index a223f01bd6..393f4158a5 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -285,9 +285,6 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement)) } if cfg.P2PStateExchangeExtensions { - if !cfg.StateRootInHeader { - return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") - } if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks { return nil, errors.New("P2PStateExchangeExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)") } @@ -1605,7 +1602,7 @@ func (bc *Blockchain) GetStateModule() StateRoot { // GetStateSyncModule returns new state sync service instance. func (bc *Blockchain) GetStateSyncModule() *statesync.Module { - return statesync.NewModule(bc, bc.stateRoot, bc.log, bc.dao, bc.jumpToState) + return statesync.NewModule(bc, bc.log, bc.dao, bc.jumpToState) } // storeBlock performs chain update using the block given, it executes all diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index ef6e61806c..e3764a28ed 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -27,7 +27,6 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/core/mpt" - "github.com/nspcc-dev/neo-go/pkg/core/stateroot" "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" @@ -85,10 +84,9 @@ type Module struct { // blockHeight is the index of the latest stored block. blockHeight uint32 - dao *dao.Simple - bc Ledger - stateMod *stateroot.Module - mptpool *Pool + dao *dao.Simple + bc Ledger + mptpool *Pool billet *mpt.Billet @@ -96,19 +94,18 @@ type Module struct { } // NewModule returns new instance of statesync module. -func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module { +func NewModule(bc Ledger, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module { if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) { return &Module{ dao: s, bc: bc, - stateMod: stateMod, syncStage: inactive, } } + fmt.Println("statesync.NewModule") return &Module{ dao: s, bc: bc, - stateMod: stateMod, log: log, syncInterval: uint32(bc.GetConfig().StateSyncInterval), mptpool: NewPool(), @@ -120,6 +117,7 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si // Init initializes state sync module for the current chain's height with given // callback for MPT nodes requests. func (s *Module) Init(currChainHeight uint32) error { + fmt.Println("statesync.Module.Init") s.lock.Lock() defer s.lock.Unlock() @@ -160,10 +158,10 @@ func (s *Module) Init(currChainHeight uint32) error { // current chain's state until new state is completely fetched, outdated state-related data // will be removed from storage during (*Blockchain).jumpToState(...) execution. // All we need to do right now is to remove genesis-related MPT nodes. - err = s.stateMod.CleanStorage() - if err != nil { - return fmt.Errorf("failed to remove outdated MPT data from storage: %w", err) - } + //err = s.stateMod.CleanStorage() + //if err != nil { + // return fmt.Errorf("failed to remove outdated MPT data from storage: %w", err) + //} } s.syncPoint = p @@ -212,7 +210,7 @@ func (s *Module) defineSyncStage() error { if s.blockHeight > s.syncPoint { s.syncStage |= mptSynced s.log.Info("MPT is in sync", - zap.Uint32("stateroot height", s.stateMod.CurrentLocalHeight())) + zap.Uint32("stateroot height", s.syncPoint)) } else if s.syncStage&headersSynced != 0 { header, err := s.bc.GetHeader(s.bc.GetHeaderHash(s.syncPoint + 1)) if err != nil { @@ -289,7 +287,7 @@ func (s *Module) getLatestSavedBlock(p uint32) uint32 { func (s *Module) AddHeaders(hdrs ...*block.Header) error { s.lock.Lock() defer s.lock.Unlock() - + fmt.Println("s.syncStage", s.syncStage) if s.syncStage != initialized { return errors.New("headers were not requested") } @@ -516,3 +514,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { return s.mptpool.GetBatch(limit) } + +// GetConfig returns current blockchain configuration. +func (s *Module) GetConfig() config.Blockchain { + return s.bc.GetConfig() +} diff --git a/pkg/network/server.go b/pkg/network/server.go index b8998700c0..d4156d8314 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -109,6 +109,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + headerFetcher *blockfetcher.Service blockFetcher *blockfetcher.Service serviceLock sync.RWMutex @@ -227,13 +228,18 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 { s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize } - s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + s.bFetcherQueue = bqueue.New(s.stateSync, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) var err error - s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, + s.headerFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, sync.OnceFunc(func() { close(s.blockFetcherFin) })) if err != nil { return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) } + s.NeoFSBlockFetcherCfg.BlocksOnly = true + s.blockFetcher, err = blockfetcher.New(s.stateSync, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, nil) + if err != nil { + return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) + } if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", @@ -311,12 +317,6 @@ func (s *Server) Start() { go s.bQueue.Run() go s.bSyncQueue.Run() go s.bFetcherQueue.Run() - if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { - err := s.blockFetcher.Start() - if err != nil { - s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) - } - } for _, tr := range s.transports { go tr.Accept() } @@ -333,7 +333,7 @@ func (s *Server) Shutdown() { s.log.Info("shutting down server", zap.Int("peers", s.PeerCount())) if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { s.bFetcherQueue.Discard() - s.blockFetcher.Shutdown() + s.headerFetcher.Shutdown() } for _, tr := range s.transports { tr.Close() @@ -571,13 +571,19 @@ func (s *Server) run() { s.discovery.RegisterGood(p) s.tryInitStateSync() - s.tryStartServices() - case <-s.blockFetcherFin: - if s.started.Load() { - s.tryInitStateSync() - s.tryStartServices() + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + err := s.headerFetcher.Start() + if err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } } - s.blockFetcherFin = nil + s.tryStartServices() + //case <-s.blockFetcherFin: + // if s.started.Load() { + // s.tryInitStateSync() + // s.tryStartServices() + // } + // s.blockFetcherFin = nil } } } @@ -732,7 +738,7 @@ func (s *Server) IsInSync() bool { var peersNumber int var notHigher int - if s.stateSync.IsActive() || s.blockFetcher.IsActive() { + if s.stateSync.IsActive() || s.headerFetcher.IsActive() { return false } @@ -792,7 +798,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error { // handleBlockCmd processes the block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { - if s.blockFetcher.IsActive() { + if s.headerFetcher.IsActive() { return nil } if s.stateSync.IsActive() { @@ -815,15 +821,25 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error { } func (s *Server) requestBlocksOrHeaders(p Peer) error { - if s.blockFetcher.IsActive() { - return nil - } + fmt.Println("requestBlocksOrHeaders") if s.stateSync.NeedHeaders() { + fmt.Println(".stateSync.NeedHeaders()") if s.chain.HeaderHeight() < p.LastBlockIndex() { + fmt.Println("s.chain.HeaderHeight() < p.LastBlockIndex()") return s.requestHeaders(p) } return nil } + if s.headerFetcher.IsActive() { + s.headerFetcher.Shutdown() + } + //stop bf and start new+ mpt + if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled { + err := s.blockFetcher.Start() + if err != nil { + s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err)) + } + } var ( bq bqueue.Blockqueuer = s.chain requestMPTNodes bool @@ -847,6 +863,10 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error { // requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers. func (s *Server) requestHeaders(p Peer) error { + if s.headerFetcher.IsActive() { + //add by blockfetcher + return nil + } pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader) return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl)) } @@ -1136,7 +1156,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error // handleHeadersCmd processes headers payload. func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error { - if s.blockFetcher.IsActive() { + if s.headerFetcher.IsActive() { return nil } return s.stateSync.AddHeaders(h.Hdrs...) @@ -1335,6 +1355,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 2. Send requests for chunk in increasing order. // 3. After all requests have been sent, request random height. func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error { + if s.blockFetcher.IsActive() { + return nil + } pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) lq, capLeft := s.bQueue.LastQueued() if capLeft == 0 { @@ -1468,18 +1491,16 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } func (s *Server) tryInitStateSync() { - if s.blockFetcher.IsActive() { - return - } if !s.stateSync.IsActive() { s.bSyncQueue.Discard() return } + fmt.Println("tryInitStateSync 1") if s.stateSync.IsInitialized() { return } - + fmt.Println("tryInitStateSync 2") s.lock.RLock() heights := make([]uint32, 0, len(s.peers)) for p := range s.peers { @@ -1489,7 +1510,11 @@ func (s *Server) tryInitStateSync() { } s.lock.RUnlock() slices.Sort(heights) + fmt.Println("tryInitStateSync 3") + fmt.Println("heights", heights) + fmt.Println("MinPeers", s.MinPeers) if len(heights) >= s.MinPeers && len(heights) > 0 { + fmt.Println("tryInitStateSync 4") // choose the height of the median peer as the current chain's height h := heights[len(heights)/2] err := s.stateSync.Init(h) @@ -1506,6 +1531,7 @@ func (s *Server) tryInitStateSync() { s.bSyncQueue.Discard() } } + fmt.Println("tryInitStateSync end") } // BroadcastExtensible add a locally-generated Extensible payload to the pool diff --git a/pkg/network/state_sync.go b/pkg/network/state_sync.go index b7e5a3d3be..a3beee81b7 100644 --- a/pkg/network/state_sync.go +++ b/pkg/network/state_sync.go @@ -1,6 +1,7 @@ package network import ( + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/network/bqueue" "github.com/nspcc-dev/neo-go/pkg/util" @@ -17,4 +18,5 @@ type StateSync interface { NeedHeaders() bool NeedMPTNodes() bool Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error + GetConfig() config.Blockchain } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 854827c6eb..eac9078888 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -60,6 +60,7 @@ type Service struct { chain Ledger pool poolWrapper enqueueBlock func(*block.Block) error + addHeader func(*block.Header) error account *wallet.Account oidsCh chan oid.ID @@ -81,7 +82,7 @@ type Service struct { } // New creates a new BlockFetcher Service. -func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { +func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, putHeader func(*block.Header) error, shutdownCallback func()) (*Service, error) { var ( account *wallet.Account err error @@ -143,6 +144,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc cfg: cfg, enqueueBlock: putBlock, + addHeader: putHeader, account: account, stateRootInHeader: chain.GetConfig().StateRootInHeader, shutdownCallback: shutdownCallback, @@ -262,7 +264,12 @@ func (bfs *Service) blockDownloader() { case <-bfs.ctx.Done(): return default: - err = bfs.enqueueBlock(b) + if !bfs.cfg.BlocksOnly { + err = bfs.addHeader(&b.Header) + //bfs.log.Info("head added to the chain", zap.Uint32("index", b.Index)) + } else { + err = bfs.enqueueBlock(b) + } if err != nil { bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err)) bfs.stopService(true) @@ -533,6 +540,22 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e return rc, err } +func (bfs *Service) objectGetRange(ctx context.Context, oid string, offset, length uint64) (io.ReadCloser, error) { + rangeParam := fmt.Sprintf("%d|%d", 0, length) + + u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", rangeParam)) + if err != nil { + return nil, err + } + + var rc io.ReadCloser + err = bfs.retry(func() error { + rc, err = neofs.GetWithClient(ctx, bfs.pool, bfs.account.PrivateKey(), u, false) + return err + }) + return rc, err +} + func (bfs *Service) objectSearch(ctx context.Context, prm client.PrmObjectSearch) ([]oid.ID, error) { var ( oids []oid.ID