From 4af69273ebc09a8125a5b81303c1bc37046c30ac Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Fri, 27 Dec 2024 14:31:02 +0100 Subject: [PATCH] bqueue: add headers to the queue Signed-off-by: Ekaterina Pavlova --- internal/fakechain/fakechain.go | 5 + pkg/core/statesync/module.go | 8 ++ pkg/network/bqueue/queue.go | 170 ++++++++++++++++++++++++++++---- 3 files changed, 166 insertions(+), 17 deletions(-) diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index a2553b61a1..aa75a2a07c 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -428,6 +428,11 @@ func (s *FakeStateSync) BlockHeight() uint32 { return 0 } +// HeaderHeight implements the StateSync interface. +func (s *FakeStateSync) HeaderHeight() uint32 { + return 0 +} + // IsActive implements the StateSync interface. func (s *FakeStateSync) IsActive() bool { return s.IsActiveFlag.Load() } diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 4915a279fc..ef6e61806c 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -450,6 +450,14 @@ func (s *Module) BlockHeight() uint32 { return s.blockHeight } +// HeaderHeight returns index of the last stored header. +func (s *Module) HeaderHeight() uint32 { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.bc.HeaderHeight() +} + // IsActive tells whether state sync module is on and still gathering state // synchronisation data (headers, blocks or MPT nodes). func (s *Module) IsActive() bool { diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 2899b1ac51..762d58c35b 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -14,6 +14,7 @@ type Blockqueuer interface { AddBlock(block *block.Block) error AddHeaders(...*block.Header) error BlockHeight() uint32 + HeaderHeight() uint32 } // OperationMode is the mode of operation for the block queue. @@ -21,9 +22,9 @@ type Blockqueuer interface { type OperationMode byte const ( - // NonBlocking means that PutBlock will return immediately if the queue is full. + // NonBlocking means that PutBlock (or PutHeader) will return immediately if the queue is full. NonBlocking OperationMode = 0 - // Blocking means that PutBlock will wait until there is enough space in the queue. + // Blocking means that PutBlock (or PutHeader) will wait until there is enough space in the queue. Blocking OperationMode = 1 ) @@ -41,9 +42,15 @@ type Queue struct { lenUpdateF func(int) cacheSize int mode OperationMode + + headersLock sync.RWMutex + headersQueue []*block.Header + lastHeaderQ uint32 + checkHeaders chan struct{} + headersLen int } -// DefaultCacheSize is the default amount of blocks above the current height +// DefaultCacheSize is the default amount of blocks (or headers) above the current height // which are stored in the queue. const DefaultCacheSize = 2000 @@ -51,7 +58,7 @@ func (bq *Queue) indexToPosition(i uint32) int { return int(i) % bq.cacheSize } -// New creates an instance of BlockQueue. +// New creates an instance of Queue that handles both blocks and headers. func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue { if log == nil { return nil @@ -61,24 +68,31 @@ func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize } return &Queue{ - log: log, - queue: make([]*block.Block, cacheSize), - checkBlocks: make(chan struct{}, 1), - chain: bc, - relayF: relayer, - lenUpdateF: lenMetricsUpdater, - cacheSize: cacheSize, - mode: mode, + log: log, + chain: bc, + relayF: relayer, + lenUpdateF: lenMetricsUpdater, + cacheSize: cacheSize, + mode: mode, + queue: make([]*block.Block, cacheSize), + checkBlocks: make(chan struct{}, 1), + headersQueue: make([]*block.Header, cacheSize), + checkHeaders: make(chan struct{}, 1), } } -// Run runs the BlockQueue queueing loop. It must be called in a separate routine. +// Run must be called in a separate goroutine; it processes both blocks and headers. func (bq *Queue) Run() { + go bq.runBlocks() + go bq.runHeaders() +} + +func (bq *Queue) runBlocks() { var lastHeight = bq.chain.BlockHeight() for { _, ok := <-bq.checkBlocks if !ok { - break + return } for { h := bq.chain.BlockHeight() @@ -125,14 +139,14 @@ func (bq *Queue) Run() { } } -// PutBlock enqueues block to be added to the chain. +// PutBlock enqueues a block to be added to the chain. func (bq *Queue) PutBlock(block *block.Block) error { - h := bq.chain.BlockHeight() bq.queueLock.Lock() defer bq.queueLock.Unlock() if bq.discarded.Load() { return nil } + h := bq.chain.BlockHeight() // Can easily happen when fetching the same blocks from // different peers, thus not considered as error. if block.Index <= h { @@ -182,6 +196,121 @@ func (bq *Queue) PutBlock(block *block.Block) error { return nil } +func (bq *Queue) runHeaders() { + var lastHHeight = bq.chain.HeaderHeight() + + for { + _, ok := <-bq.checkHeaders + if !ok { + return + } + for { + hH := bq.chain.HeaderHeight() + + bq.headersLock.Lock() + pos := bq.indexToPosition(hH + 1) + hdr := bq.headersQueue[pos] + + for i := lastHHeight; i < hH; i++ { + old := bq.indexToPosition(i + 1) + if bq.headersQueue[old] != nil && bq.headersQueue[old].Index == i { + bq.headersLen-- + bq.headersQueue[old] = nil + } + } + bq.headersLock.Unlock() + lastHHeight = hH + + if hdr == nil { + break + } + if hdr.Index != (hH + 1) { + break + } + + err := bq.chain.AddHeaders(hdr) + if err != nil { + if bq.chain.HeaderHeight() < hdr.Index { + bq.log.Warn("headerQueue: failed adding header", + zap.String("error", err.Error()), + zap.Uint32("headerHeight", bq.chain.HeaderHeight()), + zap.Uint32("nextIndex", hdr.Index)) + } + } + + bq.headersLock.Lock() + bq.headersLen-- + if bq.headersQueue[pos] == hdr { + bq.headersQueue[pos] = nil + } + bq.headersLock.Unlock() + } + } +} + +// PutHeader enqueues a single header. +func (bq *Queue) PutHeader(hdr *block.Header) error { + if bq.discarded.Load() { + return nil + } + + chainH := bq.chain.HeaderHeight() + + bq.headersLock.Lock() + defer bq.headersLock.Unlock() + + if hdr.Index <= chainH { + return nil + } + if chainH+uint32(bq.cacheSize) < hdr.Index { + switch bq.mode { + case NonBlocking: + return nil + case Blocking: + bq.headersLock.Unlock() + t := time.NewTicker(time.Second) + defer t.Stop() + + for range t.C { + if bq.discarded.Load() { + bq.headersLock.Lock() + return nil + } + chainH = bq.chain.HeaderHeight() + if chainH+uint32(bq.cacheSize) >= hdr.Index { + bq.headersLock.Lock() + break + } + } + } + } + + pos := bq.indexToPosition(hdr.Index) + if bq.headersQueue[pos] == nil || bq.headersQueue[pos].Index < hdr.Index { + bq.headersQueue[pos] = hdr + bq.headersLen++ + } + + for bq.lastHeaderQ+1 == hdr.Index { + bq.lastHeaderQ++ + } + + select { + case bq.checkHeaders <- struct{}{}: + default: + } + return nil +} + +func (bq *Queue) AddHeaders(hdrs ...*block.Header) error { + for _, hdr := range hdrs { + if err := bq.PutHeader(hdr); err != nil { + return err + } + } + return nil +} + // LastQueued returns the index of the last queued block and the queue's capacity // left. func (bq *Queue) LastQueued() (uint32, int) { @@ -190,7 +319,7 @@ func (bq *Queue) LastQueued() (uint32, int) { return bq.lastQ, bq.cacheSize - bq.len } -// Discard stops the queue and prevents it from accepting more blocks to enqueue. +// Discard stops the queue from accepting more blocks/headers and cleans up. func (bq *Queue) Discard() { if bq.discarded.CompareAndSwap(false, true) { bq.queueLock.Lock() @@ -200,5 +329,12 @@ func (bq *Queue) Discard() { clear(bq.queue) bq.len = 0 bq.queueLock.Unlock() + + // Stop header queue + bq.headersLock.Lock() + close(bq.checkHeaders) + clear(bq.headersQueue) + bq.headersLen = 0 + bq.headersLock.Unlock() } }