diff --git a/mocks/mock_synchronizer.go b/mocks/mock_synchronizer.go new file mode 100644 index 0000000000..12524511d0 --- /dev/null +++ b/mocks/mock_synchronizer.go @@ -0,0 +1,64 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/NethermindEth/juno/sync (interfaces: Reader) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + core "github.com/NethermindEth/juno/core" + gomock "github.com/golang/mock/gomock" +) + +// MockSyncReader is a mock of Reader interface. +type MockSyncReader struct { + ctrl *gomock.Controller + recorder *MockSyncReaderMockRecorder +} + +// MockSyncReaderMockRecorder is the mock recorder for MockSyncReader. +type MockSyncReaderMockRecorder struct { + mock *MockSyncReader +} + +// NewMockSyncReader creates a new mock instance. +func NewMockSyncReader(ctrl *gomock.Controller) *MockSyncReader { + mock := &MockSyncReader{ctrl: ctrl} + mock.recorder = &MockSyncReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSyncReader) EXPECT() *MockSyncReaderMockRecorder { + return m.recorder +} + +// HighestBlockHeader mocks base method. +func (m *MockSyncReader) HighestBlockHeader() *core.Header { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HighestBlockHeader") + ret0, _ := ret[0].(*core.Header) + return ret0 +} + +// HighestBlockHeader indicates an expected call of HighestBlockHeader. +func (mr *MockSyncReaderMockRecorder) HighestBlockHeader() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HighestBlockHeader", reflect.TypeOf((*MockSyncReader)(nil).HighestBlockHeader)) +} + +// StartingBlockNumber mocks base method. +func (m *MockSyncReader) StartingBlockNumber() (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartingBlockNumber") + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StartingBlockNumber indicates an expected call of StartingBlockNumber. +func (mr *MockSyncReaderMockRecorder) StartingBlockNumber() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartingBlockNumber", reflect.TypeOf((*MockSyncReader)(nil).StartingBlockNumber)) +} diff --git a/rpc/handlers.go b/rpc/handlers.go index 64c14d4fe3..1b1a4f17e4 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -60,7 +60,7 @@ const ( type Handler struct { bcReader blockchain.Reader - synchronizer *sync.Synchronizer + syncReader sync.Reader network utils.Network gatewayClient Gateway feederClient *feeder.Client @@ -69,12 +69,12 @@ type Handler struct { version string } -func New(bcReader blockchain.Reader, synchronizer *sync.Synchronizer, n utils.Network, +func New(bcReader blockchain.Reader, syncReader sync.Reader, n utils.Network, gatewayClient Gateway, feederClient *feeder.Client, virtualMachine vm.VM, version string, logger utils.Logger, ) *Handler { return &Handler{ bcReader: bcReader, - synchronizer: synchronizer, + syncReader: syncReader, network: n, log: logger, feederClient: feederClient, @@ -578,11 +578,11 @@ func (h *Handler) StateUpdate(id BlockID) (*StateUpdate, *jsonrpc.Error) { func (h *Handler) Syncing() (*Sync, *jsonrpc.Error) { defaultSyncState := &Sync{Syncing: new(bool)} - startingBlockNumber := h.synchronizer.StartingBlockNumber - if startingBlockNumber == nil { + startingBlockNumber, err := h.syncReader.StartingBlockNumber() + if err != nil { return defaultSyncState, nil } - startingBlockHeader, err := h.bcReader.BlockHeaderByNumber(*startingBlockNumber) + startingBlockHeader, err := h.bcReader.BlockHeaderByNumber(startingBlockNumber) if err != nil { return defaultSyncState, nil } @@ -590,7 +590,7 @@ func (h *Handler) Syncing() (*Sync, *jsonrpc.Error) { if err != nil { return defaultSyncState, nil } - highestBlockHeader := h.synchronizer.HighestBlockHeader.Load() + highestBlockHeader := h.syncReader.HighestBlockHeader() if highestBlockHeader == nil { return defaultSyncState, nil } diff --git a/rpc/handlers_test.go b/rpc/handlers_test.go index 88f5d7ad21..ff87f1e6d0 100644 --- a/rpc/handlers_test.go +++ b/rpc/handlers_test.go @@ -6,7 +6,6 @@ import ( "errors" "math/rand" "testing" - "time" "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/clients/feeder" @@ -18,7 +17,6 @@ import ( "github.com/NethermindEth/juno/mocks" "github.com/NethermindEth/juno/rpc" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" - "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/ethereum/go-ethereum/common" "github.com/golang/mock/gomock" @@ -1119,23 +1117,20 @@ func TestSyncing(t *testing.T) { mockCtrl := gomock.NewController(t) t.Cleanup(mockCtrl.Finish) - client := feeder.NewTestClient(t, utils.MAINNET) - gw := adaptfeeder.New(client) - log := utils.NewNopZapLogger() - synchronizer := sync.New(nil, gw, log, time.Duration(0)) - + synchronizer := mocks.NewMockSyncReader(mockCtrl) mockReader := mocks.NewMockReader(mockCtrl) handler := rpc.New(mockReader, synchronizer, utils.MAINNET, nil, nil, nil, "", nil) defaultSyncState := false + startingBlock := uint64(0) + synchronizer.EXPECT().StartingBlockNumber().Return(startingBlock, errors.New("nope")) t.Run("undefined starting block", func(t *testing.T) { syncing, err := handler.Syncing() assert.Nil(t, err) assert.Equal(t, &rpc.Sync{Syncing: &defaultSyncState}, syncing) }) - startingBlock := uint64(0) - synchronizer.StartingBlockNumber = &startingBlock + synchronizer.EXPECT().StartingBlockNumber().Return(startingBlock, nil).AnyTimes() t.Run("empty blockchain", func(t *testing.T) { mockReader.EXPECT().BlockHeaderByNumber(startingBlock).Return(nil, errors.New("empty blockchain")) @@ -1143,6 +1138,8 @@ func TestSyncing(t *testing.T) { assert.Nil(t, err) assert.Equal(t, &rpc.Sync{Syncing: &defaultSyncState}, syncing) }) + + synchronizer.EXPECT().HighestBlockHeader().Return(nil).Times(2) t.Run("undefined highest block", func(t *testing.T) { mockReader.EXPECT().BlockHeaderByNumber(startingBlock).Return(&core.Header{}, nil) mockReader.EXPECT().HeadsHeader().Return(&core.Header{}, nil) @@ -1160,7 +1157,7 @@ func TestSyncing(t *testing.T) { assert.Equal(t, &rpc.Sync{Syncing: &defaultSyncState}, syncing) }) - synchronizer.HighestBlockHeader.Store(&core.Header{Number: 2, Hash: new(felt.Felt).SetUint64(2)}) + synchronizer.EXPECT().HighestBlockHeader().Return(&core.Header{Number: 2, Hash: new(felt.Felt).SetUint64(2)}).Times(2) t.Run("block height is equal to highest block", func(t *testing.T) { mockReader.EXPECT().BlockHeaderByNumber(startingBlock).Return(&core.Header{}, nil) mockReader.EXPECT().HeadsHeader().Return(&core.Header{Number: 2}, nil) diff --git a/sync/sync.go b/sync/sync.go index d7c1bffa58..8e15f32145 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -19,7 +19,10 @@ import ( "github.com/sourcegraph/conc/stream" ) -var _ service.Service = (*Synchronizer)(nil) +var ( + _ service.Service = (*Synchronizer)(nil) + _ Reader = (*Synchronizer)(nil) +) const ( opVerifyLabel = "verify" @@ -27,12 +30,18 @@ const ( opFetchLabel = "fetch" ) +//go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader +type Reader interface { + StartingBlockNumber() (uint64, error) + HighestBlockHeader() *core.Header +} + // Synchronizer manages a list of StarknetData to fetch the latest blockchain updates type Synchronizer struct { - Blockchain *blockchain.Blockchain - StarknetData starknetdata.StarknetData - StartingBlockNumber *uint64 - HighestBlockHeader atomic.Pointer[core.Header] + blockchain *blockchain.Blockchain + starknetData starknetdata.StarknetData + startingBlockNumber *uint64 + highestBlockHeader atomic.Pointer[core.Header] log utils.SimpleLogger @@ -53,8 +62,8 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log utils.SimpleLogger, pendingPollInterval time.Duration, ) *Synchronizer { s := &Synchronizer{ - Blockchain: bc, - StarknetData: starkNetData, + blockchain: bc, + starknetData: starkNetData, log: log, pendingPollInterval: pendingPollInterval, @@ -108,7 +117,7 @@ func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers case <-ctx.Done(): return func() {} default: - stateUpdate, block, err := s.StarknetData.StateUpdateWithBlock(ctx, height) + stateUpdate, block, err := s.starknetData.StateUpdateWithBlock(ctx, height) if err != nil { continue } @@ -128,7 +137,7 @@ func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers } func (s *Synchronizer) fetchUnknownClasses(ctx context.Context, stateUpdate *core.StateUpdate) (map[felt.Felt]core.Class, error) { - state, closer, err := s.Blockchain.HeadState() + state, closer, err := s.blockchain.HeadState() if err != nil { // if err is db.ErrKeyNotFound we are on an empty DB if !errors.Is(err, db.ErrKeyNotFound) { @@ -151,7 +160,7 @@ func (s *Synchronizer) fetchUnknownClasses(ctx context.Context, stateUpdate *cor } if errors.Is(stateErr, db.ErrKeyNotFound) { - class, fetchErr := s.StarknetData.Class(ctx, classHash) + class, fetchErr := s.starknetData.Class(ctx, classHash) if fetchErr == nil { newClasses[*classHash] = class } @@ -183,7 +192,7 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat newClasses map[felt.Felt]core.Class, resetStreams context.CancelFunc, ) stream.Callback { timer := prometheus.NewTimer(s.opTimerHistogram.WithLabelValues(opVerifyLabel)) - commitments, err := s.Blockchain.SanityCheckNewHeight(block, stateUpdate, newClasses) + commitments, err := s.blockchain.SanityCheckNewHeight(block, stateUpdate, newClasses) timer.ObserveDuration() return func() { select { @@ -196,7 +205,7 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat return } timer := prometheus.NewTimer(s.opTimerHistogram.WithLabelValues(opStoreLabel)) - err = s.Blockchain.Store(block, commitments, stateUpdate, newClasses) + err = s.blockchain.Store(block, commitments, stateUpdate, newClasses) timer.ObserveDuration() if err != nil { @@ -212,13 +221,13 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat resetStreams() return } - highestBlockHeader := s.HighestBlockHeader.Load() + highestBlockHeader := s.highestBlockHeader.Load() if highestBlockHeader == nil || highestBlockHeader.Number <= block.Number { - highestBlock, err := s.StarknetData.BlockLatest(ctx) + highestBlock, err := s.starknetData.BlockLatest(ctx) if err != nil { s.log.Warnw("Failed fetching latest block", "err", err) } else { - s.HighestBlockHeader.Store(highestBlock.Header) + s.highestBlockHeader.Store(highestBlock.Header) isBehind := highestBlock.Number > block.Number+uint64(maxWorkers()) if s.catchUpMode != isBehind { resetStreams() @@ -236,7 +245,7 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat func (s *Synchronizer) nextHeight() uint64 { nextHeight := uint64(0) - if h, err := s.Blockchain.Height(); err == nil { + if h, err := s.blockchain.Height(); err == nil { nextHeight = h + 1 } return nextHeight @@ -244,8 +253,8 @@ func (s *Synchronizer) nextHeight() uint64 { func (s *Synchronizer) syncBlocks(syncCtx context.Context) { defer func() { - s.StartingBlockNumber = nil - s.HighestBlockHeader.Store(nil) + s.startingBlockNumber = nil + s.highestBlockHeader.Store(nil) }() fetchers, verifiers := s.setupWorkers() @@ -253,7 +262,7 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) { nextHeight := s.nextHeight() startingHeight := nextHeight - s.StartingBlockNumber = &startingHeight + s.startingBlockNumber = &startingHeight pendingSem := make(chan struct{}, 1) go s.pollPending(syncCtx, pendingSem) @@ -306,14 +315,14 @@ func (s *Synchronizer) setupWorkers() (*stream.Stream, *stream.Stream) { func (s *Synchronizer) revertHead(forkBlock *core.Block) { var localHead *felt.Felt - head, err := s.Blockchain.HeadsHeader() + head, err := s.blockchain.HeadsHeader() if err == nil { localHead = head.Hash } s.log.Infow("Reorg detected", "localHead", localHead, "forkHead", forkBlock.Hash) - err = s.Blockchain.RevertHead() + err = s.blockchain.RevertHead() if err != nil { s.log.Warnw("Failed reverting HEAD", "reverted", localHead, "err", err) } else { @@ -350,12 +359,12 @@ func (s *Synchronizer) pollPending(ctx context.Context, sem chan struct{}) { } func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error { - highestBlockHeader := s.HighestBlockHeader.Load() + highestBlockHeader := s.highestBlockHeader.Load() if highestBlockHeader == nil { return nil } - head, err := s.Blockchain.HeadsHeader() + head, err := s.blockchain.HeadsHeader() if err != nil { return err } @@ -365,7 +374,7 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error { return nil } - pendingStateUpdate, pendingBlock, err := s.StarknetData.StateUpdatePendingWithBlock(ctx) + pendingStateUpdate, pendingBlock, err := s.starknetData.StateUpdatePendingWithBlock(ctx) if err != nil { return err } @@ -376,7 +385,7 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error { } s.log.Debugw("Found pending block", "txns", pendingBlock.TransactionCount) - return s.Blockchain.StorePending(&blockchain.Pending{ + return s.blockchain.StorePending(&blockchain.Pending{ Block: pendingBlock, StateUpdate: pendingStateUpdate, NewClasses: newClasses, @@ -389,7 +398,7 @@ func (s *Synchronizer) updateStats(block *core.Block) { currentHeight = block.Number highestKnownHeight uint64 = 0 ) - highestBlockHeader := s.HighestBlockHeader.Load() + highestBlockHeader := s.highestBlockHeader.Load() if highestBlockHeader != nil { highestKnownHeight = highestBlockHeader.Number } @@ -399,3 +408,14 @@ func (s *Synchronizer) updateStats(block *core.Block) { s.bestBlockGauge.Set(float64(highestKnownHeight)) s.transactionCount.Add(float64(transactions)) } + +func (s *Synchronizer) StartingBlockNumber() (uint64, error) { + if s.startingBlockNumber == nil { + return 0, errors.New("not running") + } + return *s.startingBlockNumber, nil +} + +func (s *Synchronizer) HighestBlockHeader() *core.Header { + return s.highestBlockHeader.Load() +}