Skip to content

Commit

Permalink
network: integrate state sync module with blockfetcher
Browse files Browse the repository at this point in the history
Close #3574

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 21, 2025
1 parent 4af6927 commit f4d006f
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 36 deletions.
5 changes: 5 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,8 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
panic("TODO")
}

// GetConfig implements the StateSync interface.
func (s *FakeStateSync) GetConfig() config.Blockchain {
panic("TODO")
}
1 change: 1 addition & 0 deletions pkg/config/blockfetcher_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type NeoFSBlockFetcher struct {
BQueueSize int `yaml:"BQueueSize"`
SkipIndexFilesSearch bool `yaml:"SkipIndexFilesSearch"`
IndexFileSize uint32 `yaml:"IndexFileSize"`
BlocksOnly bool `yaml:"BlocksOnly"`
}

// Validate checks NeoFSBlockFetcher for internal consistency and ensures
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
return Blockchain{
ProtocolConfiguration: c.ProtocolConfiguration,
Ledger: c.ApplicationConfiguration.Ledger,
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type Ledger struct {
type Blockchain struct {
ProtocolConfiguration
Ledger
NeoFSBlockFetcher
}
2 changes: 1 addition & 1 deletion pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
if !cfg.StateRootInHeader && !cfg.NeoFSBlockFetcher.Enabled {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
Expand Down
5 changes: 5 additions & 0 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {

return s.mptpool.GetBatch(limit)
}

// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()

Check warning on line 522 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L521-L522

Added lines #L521 - L522 were not covered by tests
}
2 changes: 2 additions & 0 deletions pkg/network/bqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync/atomic"
"time"

"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"go.uber.org/zap"
)
Expand All @@ -15,6 +16,7 @@ type Blockqueuer interface {
AddHeaders(...*block.Header) error
BlockHeight() uint32
HeaderHeight() uint32
GetConfig() config.Blockchain
}

// OperationMode is the mode of operation for the block queue.
Expand Down
73 changes: 52 additions & 21 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type (
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
notaryFeer NotaryFeer
headerFetcher *blockfetcher.Service
blockFetcher *blockfetcher.Service

serviceLock sync.RWMutex
Expand Down Expand Up @@ -227,10 +228,23 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize
}
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
sync.OnceFunc(func() { close(s.blockFetcherFin) }))
var (
err error
bq bqueue.Blockqueuer = s.chain
)
if s.config.P2PStateExchangeExtensions {
bq = s.stateSync
}
s.bFetcherQueue = bqueue.New(bq, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
s.headerFetcher, err = blockfetcher.New(bq, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader,
func() {
s.log.Info("NeoFS BlockFetcher finished headers downloading")
})

Check warning on line 242 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L241-L242

Added lines #L241 - L242 were not covered by tests
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}

Check warning on line 245 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L244-L245

Added lines #L244 - L245 were not covered by tests
s.NeoFSBlockFetcherCfg.BlocksOnly = true
s.blockFetcher, err = blockfetcher.New(bq, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, s.bFetcherQueue.PutHeader, sync.OnceFunc(func() { close(s.blockFetcherFin) }))
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
Expand Down Expand Up @@ -311,12 +325,6 @@ func (s *Server) Start() {
go s.bQueue.Run()
go s.bSyncQueue.Run()
go s.bFetcherQueue.Run()
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
err := s.blockFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}
}
for _, tr := range s.transports {
go tr.Accept()
}
Expand All @@ -333,7 +341,12 @@ func (s *Server) Shutdown() {
s.log.Info("shutting down server", zap.Int("peers", s.PeerCount()))
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
s.bFetcherQueue.Discard()
s.blockFetcher.Shutdown()
if s.headerFetcher.IsActive() {
s.headerFetcher.Shutdown()
}
if s.blockFetcher.IsActive() {
s.blockFetcher.Shutdown()
}

Check warning on line 349 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L344-L349

Added lines #L344 - L349 were not covered by tests
}
for _, tr := range s.transports {
tr.Close()
Expand Down Expand Up @@ -732,7 +745,7 @@ func (s *Server) IsInSync() bool {
var peersNumber int
var notHigher int

if s.stateSync.IsActive() || s.blockFetcher.IsActive() {
if s.stateSync.IsActive() || s.headerFetcher.IsActive() || s.blockFetcher.IsActive() {
return false
}

Expand Down Expand Up @@ -792,7 +805,7 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {

// handleBlockCmd processes the block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
if s.blockFetcher.IsActive() {
if s.headerFetcher.IsActive() || s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.IsActive() {
Expand All @@ -815,15 +828,23 @@ func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
}

func (s *Server) requestBlocksOrHeaders(p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}
if s.stateSync.NeedHeaders() {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
if s.chain.HeaderHeight() < p.LastBlockIndex() || s.NeoFSBlockFetcherCfg.Enabled {
return s.requestHeaders(p)
}
return nil
}
if s.headerFetcher.IsActive() {
s.headerFetcher.Shutdown()
}

Check warning on line 839 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L838-L839

Added lines #L838 - L839 were not covered by tests
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
if ((s.stateSync.IsActive() && s.headerFetcher.IsShutdown()) || !s.stateSync.IsActive()) && !s.blockFetcher.IsShutdown() {
if err := s.blockFetcher.Start(); err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
}

Check warning on line 844 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L841-L844

Added lines #L841 - L844 were not covered by tests
}
}

var (
bq bqueue.Blockqueuer = s.chain
requestMPTNodes bool
Expand All @@ -847,6 +868,16 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {

// requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers.
func (s *Server) requestHeaders(p Peer) error {
if s.ServerConfig.NeoFSBlockFetcherCfg.Enabled {
if !s.headerFetcher.IsShutdown() {
err := s.headerFetcher.Start()
if err != nil {
s.log.Error("skipping NeoFS BlockFetcher", zap.Error(err))
} else {
return err
}

Check warning on line 878 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L872-L878

Added lines #L872 - L878 were not covered by tests
}
}
pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader)
return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl))
}
Expand Down Expand Up @@ -1136,7 +1167,7 @@ func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error

// handleHeadersCmd processes headers payload.
func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
if s.blockFetcher.IsActive() {
if s.headerFetcher.IsActive() {

Check warning on line 1170 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L1170

Added line #L1170 was not covered by tests
return nil
}
return s.stateSync.AddHeaders(h.Hdrs...)
Expand Down Expand Up @@ -1335,6 +1366,9 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 2. Send requests for chunk in increasing order.
// 3. After all requests have been sent, request random height.
func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error {
if s.blockFetcher.IsActive() {
return nil
}

Check warning on line 1371 in pkg/network/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/server.go#L1370-L1371

Added lines #L1370 - L1371 were not covered by tests
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
lq, capLeft := s.bQueue.LastQueued()
if capLeft == 0 {
Expand Down Expand Up @@ -1468,9 +1502,6 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
}

func (s *Server) tryInitStateSync() {
if s.blockFetcher.IsActive() {
return
}
if !s.stateSync.IsActive() {
s.bSyncQueue.Discard()
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/network/state_sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network

import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/util"
Expand All @@ -17,4 +18,5 @@ type StateSync interface {
NeedHeaders() bool
NeedMPTNodes() bool
Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error
GetConfig() config.Blockchain
}
42 changes: 33 additions & 9 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
type Ledger interface {
GetConfig() config.Blockchain
BlockHeight() uint32
HeaderHeight() uint32
}

// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
Expand All @@ -52,15 +53,18 @@ func (p poolWrapper) Close() error {
// Service is a service that fetches blocks from NeoFS.
type Service struct {
// isActive denotes whether the service is working or in the process of shutdown.
isActive atomic.Bool
isActive atomic.Bool
// isShutdown denotes whether the service is completely shutdown after running.
isShutdown atomic.Bool
log *zap.Logger
cfg config.NeoFSBlockFetcher
stateRootInHeader bool

chain Ledger
pool poolWrapper
enqueueBlock func(*block.Block) error
account *wallet.Account
chain Ledger
pool poolWrapper
enqueueBlock func(*block.Block) error
enqueueHeader func(*block.Header) error
account *wallet.Account

oidsCh chan oid.ID
// wg is a wait group for block downloaders.
Expand All @@ -81,7 +85,7 @@ type Service struct {
}

// New creates a new BlockFetcher Service.
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) {
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, putHeader func(*block.Header) error, shutdownCallback func()) (*Service, error) {
var (
account *wallet.Account
err error
Expand Down Expand Up @@ -143,6 +147,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc
cfg: cfg,

enqueueBlock: putBlock,
enqueueHeader: putHeader,
account: account,
stateRootInHeader: chain.GetConfig().StateRootInHeader,
shutdownCallback: shutdownCallback,
Expand All @@ -162,7 +167,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc

// Start runs the NeoFS BlockFetcher service.
func (bfs *Service) Start() error {
if !bfs.isActive.CompareAndSwap(false, true) {
if !bfs.isActive.CompareAndSwap(false, true) || bfs.IsShutdown() {
return nil
}
bfs.log.Info("starting NeoFS BlockFetcher service")
Expand Down Expand Up @@ -262,7 +267,11 @@ func (bfs *Service) blockDownloader() {
case <-bfs.ctx.Done():
return
default:
err = bfs.enqueueBlock(b)
if !bfs.cfg.BlocksOnly {
err = bfs.enqueueHeader(&b.Header)
} else {
err = bfs.enqueueBlock(b)
}

Check warning on line 274 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L270-L274

Added lines #L270 - L274 were not covered by tests
if err != nil {
bfs.log.Error("failed to enqueue block", zap.Uint32("index", b.Index), zap.Error(err))
bfs.stopService(true)
Expand All @@ -275,6 +284,11 @@ func (bfs *Service) blockDownloader() {
// fetchOIDsFromIndexFiles fetches block OIDs from NeoFS by searching index files first.
func (bfs *Service) fetchOIDsFromIndexFiles() error {
h := bfs.chain.BlockHeight()
endIndex := uint32(0)
if !bfs.cfg.BlocksOnly {
h = bfs.chain.HeaderHeight()
endIndex = bfs.chain.BlockHeight() / bfs.cfg.IndexFileSize
}

Check warning on line 291 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L287-L291

Added lines #L287 - L291 were not covered by tests
startIndex := h / bfs.cfg.IndexFileSize
skip := h % bfs.cfg.IndexFileSize

Expand All @@ -283,6 +297,9 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
case <-bfs.exiterToOIDDownloader:
return nil
default:
if !bfs.cfg.BlocksOnly && startIndex > endIndex {
return nil
}

Check warning on line 302 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L300-L302

Added lines #L300 - L302 were not covered by tests
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
Expand Down Expand Up @@ -425,7 +442,7 @@ func (bfs *Service) readBlock(rc io.ReadCloser) (*block.Block, error) {
// block OIDs search, cancels all in-progress downloading operations and waits
// until all service routines finish their work.
func (bfs *Service) Shutdown() {
if !bfs.IsActive() {
if !bfs.IsActive() || bfs.IsShutdown() {

Check warning on line 445 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L445

Added line #L445 was not covered by tests
return
}
bfs.stopService(true)
Expand Down Expand Up @@ -475,13 +492,20 @@ func (bfs *Service) exiter() {

// Notify Shutdown routine in case if it's user-triggered shutdown.
close(bfs.exiterToShutdown)
bfs.isShutdown.CompareAndSwap(false, true)

Check warning on line 495 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L495

Added line #L495 was not covered by tests
}

// IsActive returns true if the NeoFS BlockFetcher service is running.
func (bfs *Service) IsActive() bool {
return bfs.isActive.Load()
}

// IsShutdown returns true if the NeoFS BlockFetcher service is completely shutdown.
// The service can not be started again.
func (bfs *Service) IsShutdown() bool {
return bfs.isShutdown.Load()
}

// retry function with exponential backoff.
func (bfs *Service) retry(action func() error) error {
var (
Expand Down
Loading

0 comments on commit f4d006f

Please sign in to comment.