Skip to content

Commit

Permalink
bqueue: add headers to the queue
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Dec 27, 2024
1 parent 64c4de4 commit 4af6927
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 17 deletions.
5 changes: 5 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ func (s *FakeStateSync) BlockHeight() uint32 {
return 0
}

// HeaderHeight implements the StateSync interface.
func (s *FakeStateSync) HeaderHeight() uint32 {
return 0
}

// IsActive implements the StateSync interface.
func (s *FakeStateSync) IsActive() bool { return s.IsActiveFlag.Load() }

Expand Down
8 changes: 8 additions & 0 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ func (s *Module) BlockHeight() uint32 {
return s.blockHeight
}

// HeaderHeight returns index of the last stored header.
func (s *Module) HeaderHeight() uint32 {
s.lock.RLock()
defer s.lock.RUnlock()

return s.bc.HeaderHeight()
}

// IsActive tells whether state sync module is on and still gathering state
// synchronisation data (headers, blocks or MPT nodes).
func (s *Module) IsActive() bool {
Expand Down
170 changes: 153 additions & 17 deletions pkg/network/bqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ type Blockqueuer interface {
AddBlock(block *block.Block) error
AddHeaders(...*block.Header) error
BlockHeight() uint32
HeaderHeight() uint32
}

// OperationMode is the mode of operation for the block queue.
// Could be either Blocking or NonBlocking.
type OperationMode byte

const (
// NonBlocking means that PutBlock will return immediately if the queue is full.
// NonBlocking means that PutBlock (or PutHeader) will return immediately if the queue is full.
NonBlocking OperationMode = 0
// Blocking means that PutBlock will wait until there is enough space in the queue.
// Blocking means that PutBlock (or PutHeader) will wait until there is enough space in the queue.
Blocking OperationMode = 1
)

Expand All @@ -41,17 +42,23 @@ type Queue struct {
lenUpdateF func(int)
cacheSize int
mode OperationMode

headersLock sync.RWMutex
headersQueue []*block.Header
lastHeaderQ uint32
checkHeaders chan struct{}
headersLen int
}

// DefaultCacheSize is the default amount of blocks above the current height
// DefaultCacheSize is the default amount of blocks (or headers) above the current height
// which are stored in the queue.
const DefaultCacheSize = 2000

func (bq *Queue) indexToPosition(i uint32) int {
return int(i) % bq.cacheSize
}

// New creates an instance of BlockQueue.
// New creates an instance of Queue that handles both blocks and headers.
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue {
if log == nil {
return nil
Expand All @@ -61,24 +68,31 @@ func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize
}

return &Queue{
log: log,
queue: make([]*block.Block, cacheSize),
checkBlocks: make(chan struct{}, 1),
chain: bc,
relayF: relayer,
lenUpdateF: lenMetricsUpdater,
cacheSize: cacheSize,
mode: mode,
log: log,
chain: bc,
relayF: relayer,
lenUpdateF: lenMetricsUpdater,
cacheSize: cacheSize,
mode: mode,
queue: make([]*block.Block, cacheSize),
checkBlocks: make(chan struct{}, 1),
headersQueue: make([]*block.Header, cacheSize),
checkHeaders: make(chan struct{}, 1),
}
}

// Run runs the BlockQueue queueing loop. It must be called in a separate routine.
// Run must be called in a separate goroutine; it processes both blocks and headers.
func (bq *Queue) Run() {
go bq.runBlocks()
go bq.runHeaders()
}

func (bq *Queue) runBlocks() {
var lastHeight = bq.chain.BlockHeight()
for {
_, ok := <-bq.checkBlocks
if !ok {
break
return
}
for {
h := bq.chain.BlockHeight()
Expand Down Expand Up @@ -125,14 +139,14 @@ func (bq *Queue) Run() {
}
}

// PutBlock enqueues block to be added to the chain.
// PutBlock enqueues a block to be added to the chain.
func (bq *Queue) PutBlock(block *block.Block) error {
h := bq.chain.BlockHeight()
bq.queueLock.Lock()
defer bq.queueLock.Unlock()
if bq.discarded.Load() {
return nil
}
h := bq.chain.BlockHeight()
// Can easily happen when fetching the same blocks from
// different peers, thus not considered as error.
if block.Index <= h {
Expand Down Expand Up @@ -182,6 +196,121 @@ func (bq *Queue) PutBlock(block *block.Block) error {
return nil
}

func (bq *Queue) runHeaders() {
var lastHHeight = bq.chain.HeaderHeight()

for {
_, ok := <-bq.checkHeaders
if !ok {
return
}
for {
hH := bq.chain.HeaderHeight()

bq.headersLock.Lock()
pos := bq.indexToPosition(hH + 1)
hdr := bq.headersQueue[pos]

for i := lastHHeight; i < hH; i++ {
old := bq.indexToPosition(i + 1)
if bq.headersQueue[old] != nil && bq.headersQueue[old].Index == i {
bq.headersLen--
bq.headersQueue[old] = nil
}
}
bq.headersLock.Unlock()
lastHHeight = hH

if hdr == nil {
break
}
if hdr.Index != (hH + 1) {
break
}

err := bq.chain.AddHeaders(hdr)
if err != nil {
if bq.chain.HeaderHeight() < hdr.Index {
bq.log.Warn("headerQueue: failed adding header",
zap.String("error", err.Error()),
zap.Uint32("headerHeight", bq.chain.HeaderHeight()),
zap.Uint32("nextIndex", hdr.Index))
}
}

bq.headersLock.Lock()
bq.headersLen--
if bq.headersQueue[pos] == hdr {
bq.headersQueue[pos] = nil
}
bq.headersLock.Unlock()
}
}
}

// PutHeader enqueues a single header.
func (bq *Queue) PutHeader(hdr *block.Header) error {
if bq.discarded.Load() {
return nil
}

chainH := bq.chain.HeaderHeight()

bq.headersLock.Lock()
defer bq.headersLock.Unlock()

if hdr.Index <= chainH {
return nil
}
if chainH+uint32(bq.cacheSize) < hdr.Index {
switch bq.mode {
case NonBlocking:
return nil
case Blocking:
bq.headersLock.Unlock()
t := time.NewTicker(time.Second)
defer t.Stop()

for range t.C {
if bq.discarded.Load() {
bq.headersLock.Lock()
return nil
}
chainH = bq.chain.HeaderHeight()
if chainH+uint32(bq.cacheSize) >= hdr.Index {
bq.headersLock.Lock()
break
}
}
}
}

pos := bq.indexToPosition(hdr.Index)
if bq.headersQueue[pos] == nil || bq.headersQueue[pos].Index < hdr.Index {
bq.headersQueue[pos] = hdr
bq.headersLen++
}

for bq.lastHeaderQ+1 == hdr.Index {
bq.lastHeaderQ++
}

select {
case bq.checkHeaders <- struct{}{}:
default:
}
return nil
}

func (bq *Queue) AddHeaders(hdrs ...*block.Header) error {
for _, hdr := range hdrs {
if err := bq.PutHeader(hdr); err != nil {
return err
}
}
return nil
}

// LastQueued returns the index of the last queued block and the queue's capacity
// left.
func (bq *Queue) LastQueued() (uint32, int) {
Expand All @@ -190,7 +319,7 @@ func (bq *Queue) LastQueued() (uint32, int) {
return bq.lastQ, bq.cacheSize - bq.len
}

// Discard stops the queue and prevents it from accepting more blocks to enqueue.
// Discard stops the queue from accepting more blocks/headers and cleans up.
func (bq *Queue) Discard() {
if bq.discarded.CompareAndSwap(false, true) {
bq.queueLock.Lock()
Expand All @@ -200,5 +329,12 @@ func (bq *Queue) Discard() {
clear(bq.queue)
bq.len = 0
bq.queueLock.Unlock()

// Stop header queue
bq.headersLock.Lock()
close(bq.checkHeaders)
clear(bq.headersQueue)
bq.headersLen = 0
bq.headersLock.Unlock()
}
}

0 comments on commit 4af6927

Please sign in to comment.