Skip to content

Commit

Permalink
Make Synchronizer fields private (#1240)
Browse files Browse the repository at this point in the history
  • Loading branch information
omerfirmak authored Sep 15, 2023
1 parent b20cea1 commit 935eb23
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 43 deletions.
64 changes: 64 additions & 0 deletions mocks/mock_synchronizer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (

type Handler struct {
bcReader blockchain.Reader
synchronizer *sync.Synchronizer
syncReader sync.Reader
network utils.Network
gatewayClient Gateway
feederClient *feeder.Client
Expand All @@ -69,12 +69,12 @@ type Handler struct {
version string
}

func New(bcReader blockchain.Reader, synchronizer *sync.Synchronizer, n utils.Network,
func New(bcReader blockchain.Reader, syncReader sync.Reader, n utils.Network,
gatewayClient Gateway, feederClient *feeder.Client, virtualMachine vm.VM, version string, logger utils.Logger,
) *Handler {
return &Handler{
bcReader: bcReader,
synchronizer: synchronizer,
syncReader: syncReader,
network: n,
log: logger,
feederClient: feederClient,
Expand Down Expand Up @@ -578,19 +578,19 @@ func (h *Handler) StateUpdate(id BlockID) (*StateUpdate, *jsonrpc.Error) {
func (h *Handler) Syncing() (*Sync, *jsonrpc.Error) {
defaultSyncState := &Sync{Syncing: new(bool)}

startingBlockNumber := h.synchronizer.StartingBlockNumber
if startingBlockNumber == nil {
startingBlockNumber, err := h.syncReader.StartingBlockNumber()
if err != nil {
return defaultSyncState, nil
}
startingBlockHeader, err := h.bcReader.BlockHeaderByNumber(*startingBlockNumber)
startingBlockHeader, err := h.bcReader.BlockHeaderByNumber(startingBlockNumber)
if err != nil {
return defaultSyncState, nil
}
head, err := h.bcReader.HeadsHeader()
if err != nil {
return defaultSyncState, nil
}
highestBlockHeader := h.synchronizer.HighestBlockHeader.Load()
highestBlockHeader := h.syncReader.HighestBlockHeader()
if highestBlockHeader == nil {
return defaultSyncState, nil
}
Expand Down
17 changes: 7 additions & 10 deletions rpc/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"math/rand"
"testing"
"time"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/clients/feeder"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/NethermindEth/juno/mocks"
"github.com/NethermindEth/juno/rpc"
adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder"
"github.com/NethermindEth/juno/sync"
"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -1119,30 +1117,29 @@ func TestSyncing(t *testing.T) {
mockCtrl := gomock.NewController(t)
t.Cleanup(mockCtrl.Finish)

client := feeder.NewTestClient(t, utils.MAINNET)
gw := adaptfeeder.New(client)
log := utils.NewNopZapLogger()
synchronizer := sync.New(nil, gw, log, time.Duration(0))

synchronizer := mocks.NewMockSyncReader(mockCtrl)
mockReader := mocks.NewMockReader(mockCtrl)
handler := rpc.New(mockReader, synchronizer, utils.MAINNET, nil, nil, nil, "", nil)
defaultSyncState := false

startingBlock := uint64(0)
synchronizer.EXPECT().StartingBlockNumber().Return(startingBlock, errors.New("nope"))
t.Run("undefined starting block", func(t *testing.T) {
syncing, err := handler.Syncing()
assert.Nil(t, err)
assert.Equal(t, &rpc.Sync{Syncing: &defaultSyncState}, syncing)
})

startingBlock := uint64(0)
synchronizer.StartingBlockNumber = &startingBlock
synchronizer.EXPECT().StartingBlockNumber().Return(startingBlock, nil).AnyTimes()
t.Run("empty blockchain", func(t *testing.T) {
mockReader.EXPECT().BlockHeaderByNumber(startingBlock).Return(nil, errors.New("empty blockchain"))

syncing, err := handler.Syncing()
assert.Nil(t, err)
assert.Equal(t, &rpc.Sync{Syncing: &defaultSyncState}, syncing)
})

synchronizer.EXPECT().HighestBlockHeader().Return(nil).Times(2)
t.Run("undefined highest block", func(t *testing.T) {
mockReader.EXPECT().BlockHeaderByNumber(startingBlock).Return(&core.Header{}, nil)
mockReader.EXPECT().HeadsHeader().Return(&core.Header{}, nil)
Expand All @@ -1160,7 +1157,7 @@ func TestSyncing(t *testing.T) {
assert.Equal(t, &rpc.Sync{Syncing: &defaultSyncState}, syncing)
})

synchronizer.HighestBlockHeader.Store(&core.Header{Number: 2, Hash: new(felt.Felt).SetUint64(2)})
synchronizer.EXPECT().HighestBlockHeader().Return(&core.Header{Number: 2, Hash: new(felt.Felt).SetUint64(2)}).Times(2)
t.Run("block height is equal to highest block", func(t *testing.T) {
mockReader.EXPECT().BlockHeaderByNumber(startingBlock).Return(&core.Header{}, nil)
mockReader.EXPECT().HeadsHeader().Return(&core.Header{Number: 2}, nil)
Expand Down
72 changes: 46 additions & 26 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,29 @@ import (
"github.com/sourcegraph/conc/stream"
)

var _ service.Service = (*Synchronizer)(nil)
var (
_ service.Service = (*Synchronizer)(nil)
_ Reader = (*Synchronizer)(nil)
)

const (
opVerifyLabel = "verify"
opStoreLabel = "store"
opFetchLabel = "fetch"
)

//go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader
type Reader interface {
StartingBlockNumber() (uint64, error)
HighestBlockHeader() *core.Header
}

// Synchronizer manages a list of StarknetData to fetch the latest blockchain updates
type Synchronizer struct {
Blockchain *blockchain.Blockchain
StarknetData starknetdata.StarknetData
StartingBlockNumber *uint64
HighestBlockHeader atomic.Pointer[core.Header]
blockchain *blockchain.Blockchain
starknetData starknetdata.StarknetData
startingBlockNumber *uint64
highestBlockHeader atomic.Pointer[core.Header]

log utils.SimpleLogger

Expand All @@ -53,8 +62,8 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
log utils.SimpleLogger, pendingPollInterval time.Duration,
) *Synchronizer {
s := &Synchronizer{
Blockchain: bc,
StarknetData: starkNetData,
blockchain: bc,
starknetData: starkNetData,
log: log,
pendingPollInterval: pendingPollInterval,

Expand Down Expand Up @@ -108,7 +117,7 @@ func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers
case <-ctx.Done():
return func() {}
default:
stateUpdate, block, err := s.StarknetData.StateUpdateWithBlock(ctx, height)
stateUpdate, block, err := s.starknetData.StateUpdateWithBlock(ctx, height)
if err != nil {
continue
}
Expand All @@ -128,7 +137,7 @@ func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers
}

func (s *Synchronizer) fetchUnknownClasses(ctx context.Context, stateUpdate *core.StateUpdate) (map[felt.Felt]core.Class, error) {
state, closer, err := s.Blockchain.HeadState()
state, closer, err := s.blockchain.HeadState()
if err != nil {
// if err is db.ErrKeyNotFound we are on an empty DB
if !errors.Is(err, db.ErrKeyNotFound) {
Expand All @@ -151,7 +160,7 @@ func (s *Synchronizer) fetchUnknownClasses(ctx context.Context, stateUpdate *cor
}

if errors.Is(stateErr, db.ErrKeyNotFound) {
class, fetchErr := s.StarknetData.Class(ctx, classHash)
class, fetchErr := s.starknetData.Class(ctx, classHash)
if fetchErr == nil {
newClasses[*classHash] = class
}
Expand Down Expand Up @@ -183,7 +192,7 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
newClasses map[felt.Felt]core.Class, resetStreams context.CancelFunc,
) stream.Callback {
timer := prometheus.NewTimer(s.opTimerHistogram.WithLabelValues(opVerifyLabel))
commitments, err := s.Blockchain.SanityCheckNewHeight(block, stateUpdate, newClasses)
commitments, err := s.blockchain.SanityCheckNewHeight(block, stateUpdate, newClasses)
timer.ObserveDuration()
return func() {
select {
Expand All @@ -196,7 +205,7 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
return
}
timer := prometheus.NewTimer(s.opTimerHistogram.WithLabelValues(opStoreLabel))
err = s.Blockchain.Store(block, commitments, stateUpdate, newClasses)
err = s.blockchain.Store(block, commitments, stateUpdate, newClasses)
timer.ObserveDuration()

if err != nil {
Expand All @@ -212,13 +221,13 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
resetStreams()
return
}
highestBlockHeader := s.HighestBlockHeader.Load()
highestBlockHeader := s.highestBlockHeader.Load()
if highestBlockHeader == nil || highestBlockHeader.Number <= block.Number {
highestBlock, err := s.StarknetData.BlockLatest(ctx)
highestBlock, err := s.starknetData.BlockLatest(ctx)
if err != nil {
s.log.Warnw("Failed fetching latest block", "err", err)
} else {
s.HighestBlockHeader.Store(highestBlock.Header)
s.highestBlockHeader.Store(highestBlock.Header)
isBehind := highestBlock.Number > block.Number+uint64(maxWorkers())
if s.catchUpMode != isBehind {
resetStreams()
Expand All @@ -236,24 +245,24 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat

func (s *Synchronizer) nextHeight() uint64 {
nextHeight := uint64(0)
if h, err := s.Blockchain.Height(); err == nil {
if h, err := s.blockchain.Height(); err == nil {
nextHeight = h + 1
}
return nextHeight
}

func (s *Synchronizer) syncBlocks(syncCtx context.Context) {
defer func() {
s.StartingBlockNumber = nil
s.HighestBlockHeader.Store(nil)
s.startingBlockNumber = nil
s.highestBlockHeader.Store(nil)
}()

fetchers, verifiers := s.setupWorkers()
streamCtx, streamCancel := context.WithCancel(syncCtx)

nextHeight := s.nextHeight()
startingHeight := nextHeight
s.StartingBlockNumber = &startingHeight
s.startingBlockNumber = &startingHeight

pendingSem := make(chan struct{}, 1)
go s.pollPending(syncCtx, pendingSem)
Expand Down Expand Up @@ -306,14 +315,14 @@ func (s *Synchronizer) setupWorkers() (*stream.Stream, *stream.Stream) {

func (s *Synchronizer) revertHead(forkBlock *core.Block) {
var localHead *felt.Felt
head, err := s.Blockchain.HeadsHeader()
head, err := s.blockchain.HeadsHeader()
if err == nil {
localHead = head.Hash
}

s.log.Infow("Reorg detected", "localHead", localHead, "forkHead", forkBlock.Hash)

err = s.Blockchain.RevertHead()
err = s.blockchain.RevertHead()
if err != nil {
s.log.Warnw("Failed reverting HEAD", "reverted", localHead, "err", err)
} else {
Expand Down Expand Up @@ -350,12 +359,12 @@ func (s *Synchronizer) pollPending(ctx context.Context, sem chan struct{}) {
}

func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error {
highestBlockHeader := s.HighestBlockHeader.Load()
highestBlockHeader := s.highestBlockHeader.Load()
if highestBlockHeader == nil {
return nil
}

head, err := s.Blockchain.HeadsHeader()
head, err := s.blockchain.HeadsHeader()
if err != nil {
return err
}
Expand All @@ -365,7 +374,7 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error {
return nil
}

pendingStateUpdate, pendingBlock, err := s.StarknetData.StateUpdatePendingWithBlock(ctx)
pendingStateUpdate, pendingBlock, err := s.starknetData.StateUpdatePendingWithBlock(ctx)
if err != nil {
return err
}
Expand All @@ -376,7 +385,7 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error {
}

s.log.Debugw("Found pending block", "txns", pendingBlock.TransactionCount)
return s.Blockchain.StorePending(&blockchain.Pending{
return s.blockchain.StorePending(&blockchain.Pending{
Block: pendingBlock,
StateUpdate: pendingStateUpdate,
NewClasses: newClasses,
Expand All @@ -389,7 +398,7 @@ func (s *Synchronizer) updateStats(block *core.Block) {
currentHeight = block.Number
highestKnownHeight uint64 = 0
)
highestBlockHeader := s.HighestBlockHeader.Load()
highestBlockHeader := s.highestBlockHeader.Load()
if highestBlockHeader != nil {
highestKnownHeight = highestBlockHeader.Number
}
Expand All @@ -399,3 +408,14 @@ func (s *Synchronizer) updateStats(block *core.Block) {
s.bestBlockGauge.Set(float64(highestKnownHeight))
s.transactionCount.Add(float64(transactions))
}

func (s *Synchronizer) StartingBlockNumber() (uint64, error) {
if s.startingBlockNumber == nil {
return 0, errors.New("not running")
}
return *s.startingBlockNumber, nil
}

func (s *Synchronizer) HighestBlockHeader() *core.Header {
return s.highestBlockHeader.Load()
}

0 comments on commit 935eb23

Please sign in to comment.