Skip to content

Commit

Permalink
Use channel instead of sync cond for RetrieveLoop and blockStoreRetri… (
Browse files Browse the repository at this point in the history
rollkit#1152)

<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview
Resolves: rollkit#1132, resolves: rollkit#1153
<!-- 
Please provide an explanation of the PR, including the appropriate
context,
background, goal, and rationale. If there is an issue with this
information,
please provide a tl;dr and link the issue. 
-->

## Checklist

<!-- 
Please complete the checklist to ensure that the PR is ready to be
reviewed.

IMPORTANT:
PRs should be left in Draft until the below checklist is completed.
-->

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords
  • Loading branch information
Manav-Aggarwal authored Aug 31, 2023
1 parent 344d382 commit 63486ad
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 68 deletions.
100 changes: 33 additions & 67 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,11 @@ type Manager struct {

blockCache *BlockCache

// blockStoreMtx is used by blockStoreCond
blockStoreMtx *sync.Mutex
// blockStoreCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve blocks from blockStore
blockStoreCond *sync.Cond
// blockStoreCh is used to notify sync goroutine (SyncLoop) that it needs to retrieve blocks from blockStore
blockStoreCh chan struct{}

// retrieveMtx is used by retrieveCond
retrieveMtx *sync.Mutex
// retrieveCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve data
retrieveCond *sync.Cond
retrieveCh chan struct{}

logger log.Logger

Expand All @@ -96,9 +92,7 @@ type Manager struct {
txsAvailable <-chan struct{}
doneBuildingBlock chan struct{}

// Maintains blocks that need to be published to DA layer
pendingBlocks []*types.Block
pendingBlocksMtx *sync.RWMutex
pendingBlocks *PendingBlocks
}

// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
Expand Down Expand Up @@ -181,21 +175,17 @@ func NewManager(
HeaderCh: make(chan *types.SignedHeader, channelLength),
BlockCh: make(chan *types.Block, channelLength),
blockInCh: make(chan newBlockEvent, blockInChLength),
blockStoreMtx: new(sync.Mutex),
blockStoreCh: make(chan struct{}, 1),
blockStore: blockStore,
retrieveMtx: new(sync.Mutex),
lastStateMtx: new(sync.RWMutex),
blockCache: NewBlockCache(),
retrieveCh: make(chan struct{}, 1),
logger: logger,
txsAvailable: txsAvailableCh,
doneBuildingBlock: doneBuildingCh,
buildingBlock: false,
pendingBlocks: make([]*types.Block, 0),
pendingBlocksMtx: new(sync.RWMutex),
pendingBlocks: NewPendingBlocks(),
}
agg.retrieveCond = sync.NewCond(agg.retrieveMtx)
agg.blockStoreCond = sync.NewCond(agg.blockStoreMtx)

return agg, nil
}

Expand Down Expand Up @@ -291,6 +281,9 @@ func (m *Manager) BlockSubmissionLoop(ctx context.Context) {
return
case <-timer.C:
}
if m.pendingBlocks.isEmpty() {
continue
}
err := m.submitBlocksToDA(ctx)
if err != nil {
m.logger.Error("error while submitting block to DA", "error", err)
Expand All @@ -308,9 +301,9 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
for {
select {
case <-daTicker.C:
m.retrieveCond.Signal()
m.sendNonBlockingSignalToRetrieveCh()
case <-blockTicker.C:
m.blockStoreCond.Signal()
m.sendNonBlockingSignalToBlockStoreCh()
case blockEvent := <-m.blockInCh:
block := blockEvent.block
daHeight := blockEvent.daHeight
Expand All @@ -327,8 +320,8 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
}
m.blockCache.setBlock(blockHeight, block)

m.blockStoreCond.Signal()
m.retrieveCond.Signal()
m.sendNonBlockingSignalToBlockStoreCh()
m.sendNonBlockingSignalToRetrieveCh()

err := m.trySyncNextBlock(ctx, daHeight)
if err != nil {
Expand All @@ -342,6 +335,20 @@ func (m *Manager) SyncLoop(ctx context.Context, cancel context.CancelFunc) {
}
}

func (m *Manager) sendNonBlockingSignalToBlockStoreCh() {
select {
case m.blockStoreCh <- struct{}{}:
default:
}
}

func (m *Manager) sendNonBlockingSignalToRetrieveCh() {
select {
case m.retrieveCh <- struct{}{}:
default:
}
}

// trySyncNextBlock tries to progress one step (one block) in sync process.
//
// To be able to apply block and height h, we need to have its Commit. It is contained in block at height h+1.
Expand Down Expand Up @@ -405,30 +412,12 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {

// BlockStoreRetrieveLoop is responsible for retrieving blocks from the Block Store.
func (m *Manager) BlockStoreRetrieveLoop(ctx context.Context) {
// waitCh is used to signal the block store retrieve loop, that it should check block store for new blocks
// blockStoreCond can be signalled in completely async manner, and goroutine below
// works as some kind of "buffer" for those signals
waitCh := make(chan interface{})
lastBlockStoreHeight := uint64(0)
go func() {
for {
// This infinite loop is expected to be stopped once the context is
// cancelled or throws an error and cleaned up by the GC. This is OK
// because it waits using a conditional which is only signaled periodically.
m.blockStoreMtx.Lock()
m.blockStoreCond.Wait()
waitCh <- nil
m.blockStoreMtx.Unlock()
if ctx.Err() != nil {
return
}
}
}()
for {
select {
case <-ctx.Done():
return
case <-waitCh:
case <-m.blockStoreCh:
}
blockStoreHeight := m.blockStore.Height()
if blockStoreHeight > lastBlockStoreHeight {
Expand Down Expand Up @@ -467,30 +456,11 @@ func (m *Manager) getBlocksFromBlockStore(ctx context.Context, startHeight, endH

// RetrieveLoop is responsible for interacting with DA layer.
func (m *Manager) RetrieveLoop(ctx context.Context) {
// waitCh is used to signal the retrieve loop, that it should process next blocks
// retrieveCond can be signalled in completely async manner, and goroutine below
// works as some kind of "buffer" for those signals
waitCh := make(chan interface{})
go func() {
for {
// This infinite loop is expected to be stopped once the context is
// cancelled or throws an error and cleaned up by the GC. This is OK
// because it waits using a conditional which is only signaled periodically.
m.retrieveMtx.Lock()
m.retrieveCond.Wait()
waitCh <- nil
m.retrieveMtx.Unlock()
if ctx.Err() != nil {
return
}
}
}()

for {
select {
case <-ctx.Done():
return
case <-waitCh:
case <-m.retrieveCh:
}
daHeight := atomic.LoadUint64(&m.daHeight)
err := m.processNextDABlock(ctx)
Expand Down Expand Up @@ -677,9 +647,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

// Submit block to be published to the DA layer
m.pendingBlocksMtx.Lock()
m.pendingBlocks = append(m.pendingBlocks, block)
m.pendingBlocksMtx.Unlock()
m.pendingBlocks.addPendingBlock(block)

// Commit the new state and block which writes to disk on the proxy app
_, _, err = m.executor.Commit(ctx, newState, block, responses)
Expand Down Expand Up @@ -719,12 +687,10 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

func (m *Manager) submitBlocksToDA(ctx context.Context) error {
m.pendingBlocksMtx.Lock()
defer m.pendingBlocksMtx.Unlock()
submitted := false
backoff := initialBackoff
for attempt := 1; ctx.Err() == nil && !submitted && attempt <= maxSubmitAttempts; attempt++ {
res := m.dalc.SubmitBlocks(ctx, m.pendingBlocks)
res := m.dalc.SubmitBlocks(ctx, m.pendingBlocks.getPendingBlocks())
if res.Code == da.StatusSuccess {
m.logger.Info("successfully submitted Rollkit block to DA layer", "daHeight", res.DAHeight)
submitted = true
Expand All @@ -738,7 +704,7 @@ func (m *Manager) submitBlocksToDA(ctx context.Context) error {
if !submitted {
return fmt.Errorf("failed to submit block to DA layer after %d attempts", maxSubmitAttempts)
}
m.pendingBlocks = make([]*types.Block, 0)
m.pendingBlocks.resetPendingBlocks()
return nil
}

Expand Down
43 changes: 43 additions & 0 deletions block/pending_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package block

import (
"sync"

"github.com/rollkit/rollkit/types"
)

// Maintains blocks that need to be published to DA layer
type PendingBlocks struct {
pendingBlocks []*types.Block
mtx *sync.RWMutex
}

func NewPendingBlocks() *PendingBlocks {
return &PendingBlocks{
pendingBlocks: make([]*types.Block, 0),
mtx: new(sync.RWMutex),
}
}

func (pb *PendingBlocks) getPendingBlocks() []*types.Block {
pb.mtx.RLock()
defer pb.mtx.RUnlock()
return pb.pendingBlocks
}

func (pb *PendingBlocks) isEmpty() bool {
pendingBlocks := pb.getPendingBlocks()
return len(pendingBlocks) == 0
}

func (pb *PendingBlocks) addPendingBlock(block *types.Block) {
pb.mtx.Lock()
defer pb.mtx.Unlock()
pb.pendingBlocks = append(pb.pendingBlocks, block)
}

func (pb *PendingBlocks) resetPendingBlocks() {
pb.mtx.Lock()
defer pb.mtx.Unlock()
pb.pendingBlocks = make([]*types.Block, 0)
}
2 changes: 1 addition & 1 deletion node/full_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ func TestMempool2Nodes(t *testing.T) {
require.NoError(node2.Stop())
}()

time.Sleep(3 * time.Second)
time.Sleep(4 * time.Second)
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 3*time.Second)
defer timeoutCancel()

Expand Down

0 comments on commit 63486ad

Please sign in to comment.