From fcd5e8f0f7e343ad9371e6cd68de4ce09c9c03e8 Mon Sep 17 00:00:00 2001 From: weiihann Date: Tue, 15 Oct 2024 17:39:43 +0800 Subject: [PATCH] Implement starknet_subscriptionReorg --- mocks/mock_synchronizer.go | 14 ++++++++ rpc/events.go | 49 +++++++++++++++++++++++-- rpc/events_test.go | 73 +++++++++++++++++++++++++++++++++----- rpc/handlers.go | 8 ++++- sync/sync.go | 49 +++++++++++++++++++++++++ sync/sync_test.go | 12 +++++++ 6 files changed, 193 insertions(+), 12 deletions(-) diff --git a/mocks/mock_synchronizer.go b/mocks/mock_synchronizer.go index dc55ebce7f..e4c26aa90a 100644 --- a/mocks/mock_synchronizer.go +++ b/mocks/mock_synchronizer.go @@ -82,3 +82,17 @@ func (mr *MockSyncReaderMockRecorder) SubscribeNewHeads() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeNewHeads", reflect.TypeOf((*MockSyncReader)(nil).SubscribeNewHeads)) } + +// SubscribeReorg mocks base method. +func (m *MockSyncReader) SubscribeReorg() sync.ReorgSubscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeReorg") + ret0, _ := ret[0].(sync.ReorgSubscription) + return ret0 +} + +// SubscribeReorg indicates an expected call of SubscribeReorg. +func (mr *MockSyncReaderMockRecorder) SubscribeReorg() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeReorg", reflect.TypeOf((*MockSyncReader)(nil).SubscribeReorg)) +} diff --git a/rpc/events.go b/rpc/events.go index bddb879045..7a83ea944c 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -9,6 +9,8 @@ import ( "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/sync" + "github.com/sourcegraph/conc" ) const ( @@ -80,15 +82,18 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub h.mu.Unlock() headerSub := h.newHeads.Subscribe() + reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the new heads subscription sub.wg.Go(func() { defer func() { h.unsubscribe(sub, id) headerSub.Unsubscribe() + reorgSub.Unsubscribe() }() - newHeadersChan := make(chan *core.Header, MaxBlocksBack) + var wg conc.WaitGroup - sub.wg.Go(func() { + newHeadersChan := make(chan *core.Header, MaxBlocksBack) + wg.Go(func() { h.bufferNewHeaders(subscriptionCtx, headerSub, newHeadersChan) }) @@ -97,7 +102,15 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub return } - h.processNewHeaders(subscriptionCtx, newHeadersChan, w, id) + wg.Go(func() { + h.processNewHeaders(subscriptionCtx, newHeadersChan, w, id) + }) + + wg.Go(func() { + h.processReorgs(subscriptionCtx, reorgSub, w, id) + }) + + wg.Wait() }) return &SubscriptionID{ID: id}, nil @@ -204,6 +217,36 @@ func (h *Handler) sendHeader(w jsonrpc.Conn, header *core.Header, id uint64) err return err } +func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgData], w jsonrpc.Conn, id uint64) { + for { + select { + case <-ctx.Done(): + return + case reorg := <-reorgSub.Recv(): + if err := h.sendReorg(w, reorg, id); err != nil { + h.log.Warnw("Error sending reorg", "err", err) + return + } + } + } +} + +func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgData, id uint64) error { + resp, err := json.Marshal(jsonrpc.Request{ + Version: "2.0", + Method: "starknet_subscriptionReorg", + Params: map[string]any{ + "subscription_id": id, + "result": reorg, + }, + }) + if err != nil { + return err + } + _, err = w.Write(resp) + return err +} + func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) { w, ok := jsonrpc.ConnFromContext(ctx) if !ok { diff --git a/rpc/events_test.go b/rpc/events_test.go index a0e12d8418..648c1fe5b7 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -28,7 +28,7 @@ import ( var emptyCommitments = core.BlockCommitments{} const ( - testResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` + newHeadsResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` ) func TestEvents(t *testing.T) { @@ -238,12 +238,24 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool { type fakeSyncer struct { newHeads *feed.Feed[*core.Header] + reorgs *feed.Feed[*sync.ReorgData] +} + +func newFakeSyncer() *fakeSyncer { + return &fakeSyncer{ + newHeads: feed.New[*core.Header](), + reorgs: feed.New[*sync.ReorgData](), + } } func (fs *fakeSyncer) SubscribeNewHeads() sync.HeaderSubscription { return sync.HeaderSubscription{Subscription: fs.newHeads.Subscribe()} } +func (fs *fakeSyncer) SubscribeReorg() sync.ReorgSubscription { + return sync.ReorgSubscription{Subscription: fs.reorgs.Subscribe()} +} + func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) { return 0, nil } @@ -256,7 +268,7 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { t.Parallel() chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) - syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()} + syncer := newFakeSyncer() handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) ctx, cancel := context.WithCancel(context.Background()) @@ -289,7 +301,7 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { syncer.newHeads.Send(testHeader(t)) // Receive a block header. - want := fmt.Sprintf(testResponse, id.ID) + want := fmt.Sprintf(newHeadsResponse, id.ID) got := make([]byte, len(want)) _, err := clientConn.Read(got) require.NoError(t, err) @@ -323,7 +335,7 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { log := utils.NewNopZapLogger() chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) - syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()} + syncer := newFakeSyncer() handler := rpc.New(chain, syncer, nil, "", log) ctx, cancel := context.WithCancel(context.Background()) @@ -377,11 +389,11 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { syncer.newHeads.Send(testHeader(t)) // Receive a block header. - firstWant = fmt.Sprintf(testResponse, firstID) + firstWant = fmt.Sprintf(newHeadsResponse, firstID) _, firstGot, err = conn1.Read(ctx) require.NoError(t, err) require.Equal(t, firstWant, string(firstGot)) - secondWant = fmt.Sprintf(testResponse, secondID) + secondWant = fmt.Sprintf(newHeadsResponse, secondID) _, secondGot, err = conn2.Read(ctx) require.NoError(t, err) require.Equal(t, secondWant, string(secondGot)) @@ -407,7 +419,7 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) { assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil)) chain = blockchain.New(testDB, &utils.Mainnet) - syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()} + syncer := newFakeSyncer() handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) ctx, cancel := context.WithCancel(context.Background()) @@ -450,7 +462,7 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) { syncer.newHeads.Send(testHeader(t)) // Check new block content - want = fmt.Sprintf(testResponse, id.ID) + want = fmt.Sprintf(newHeadsResponse, id.ID) got = make([]byte, len(want)) _, err = clientConn.Read(got) require.NoError(t, err) @@ -478,3 +490,48 @@ func testHeader(t *testing.T) *core.Header { } return header } + +func TestSubscriptionReorg(t *testing.T) { + t.Parallel() + + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + syncer := newFakeSyncer() + handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, handler.Run(ctx)) + }() + time.Sleep(50 * time.Millisecond) + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + // Subscribe to new heads which will send a + id, rpcErr := handler.SubscribeNewHeads(subCtx, nil) + require.Nil(t, rpcErr) + require.NotZero(t, id) + + // Simulate a reorg + syncer.reorgs.Send(&sync.ReorgData{ + StartBlockHash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), + StartBlockNum: 0, + EndBlockHash: utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"), + EndBlockNum: 2, + }) + + // Receive reorg event + want := `{"jsonrpc":"2.0","method":"starknet_subscriptionReorg","params":{"result":{"starting_block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","starting_block_number":0,"ending_block_hash":"0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86","ending_block_number":2},"subscription_id":%d}}` + want = fmt.Sprintf(want, id.ID) + got := make([]byte, len(want)) + _, err := clientConn.Read(got) + require.NoError(t, err) + require.Equal(t, want, string(got)) +} diff --git a/rpc/handlers.go b/rpc/handlers.go index e19d7892f6..783778273d 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -84,6 +84,7 @@ type Handler struct { version string newHeads *feed.Feed[*core.Header] + reorgs *feed.Feed[*sync.ReorgData] idgen func() uint64 mu stdsync.Mutex // protects subscriptions. @@ -117,6 +118,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V }, version: version, newHeads: feed.New[*core.Header](), + reorgs: feed.New[*sync.ReorgData](), subscriptions: make(map[uint64]*subscription), blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize), @@ -152,8 +154,12 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler { func (h *Handler) Run(ctx context.Context) error { newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription + reorgsSub := h.syncReader.SubscribeReorg().Subscription defer newHeadsSub.Unsubscribe() - feed.Tee[*core.Header](newHeadsSub, h.newHeads) + defer reorgsSub.Unsubscribe() + feed.Tee(newHeadsSub, h.newHeads) + feed.Tee(reorgsSub, h.reorgs) + <-ctx.Done() for _, sub := range h.subscriptions { sub.wg.Wait() diff --git a/sync/sync.go b/sync/sync.go index a2e4ac0bf0..7b729bbe17 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -34,6 +34,10 @@ type HeaderSubscription struct { *feed.Subscription[*core.Header] } +type ReorgSubscription struct { + *feed.Subscription[*ReorgData] +} + // Todo: Since this is also going to be implemented by p2p package we should move this interface to node package // //go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader @@ -41,6 +45,7 @@ type Reader interface { StartingBlockNumber() (uint64, error) HighestBlockHeader() *core.Header SubscribeNewHeads() HeaderSubscription + SubscribeReorg() ReorgSubscription } // This is temporary and will be removed once the p2p synchronizer implements this interface. @@ -58,6 +63,22 @@ func (n *NoopSynchronizer) SubscribeNewHeads() HeaderSubscription { return HeaderSubscription{feed.New[*core.Header]().Subscribe()} } +func (n *NoopSynchronizer) SubscribeReorg() ReorgSubscription { + return ReorgSubscription{feed.New[*ReorgData]().Subscribe()} +} + +// ReorgData represents data about reorganised blocks, starting and ending block number and hash +type ReorgData struct { + // StartBlockHash is the hash of the first known block of the orphaned chain + StartBlockHash *felt.Felt `json:"starting_block_hash"` + // StartBlockNum is the number of the first known block of the orphaned chain + StartBlockNum uint64 `json:"starting_block_number"` + // The last known block of the orphaned chain + EndBlockHash *felt.Felt `json:"ending_block_hash"` + // Number of the last known block of the orphaned chain + EndBlockNum uint64 `json:"ending_block_number"` +} + // Synchronizer manages a list of StarknetData to fetch the latest blockchain updates type Synchronizer struct { blockchain *blockchain.Blockchain @@ -66,12 +87,15 @@ type Synchronizer struct { startingBlockNumber *uint64 highestBlockHeader atomic.Pointer[core.Header] newHeads *feed.Feed[*core.Header] + reorgFeed *feed.Feed[*ReorgData] log utils.SimpleLogger listener EventListener pendingPollInterval time.Duration catchUpMode bool + + currReorg *ReorgData // If nil, no reorg is happening } func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, @@ -82,6 +106,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, starknetData: starkNetData, log: log, newHeads: feed.New[*core.Header](), + reorgFeed: feed.New[*ReorgData](), pendingPollInterval: pendingPollInterval, listener: &SelectiveListener{}, readOnlyBlockchain: readOnlyBlockchain, @@ -228,6 +253,11 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header) } + if s.currReorg != nil { + s.reorgFeed.Send(s.currReorg) + s.currReorg = nil // reset the reorg data + } + s.newHeads.Send(block.Header) s.log.Infow("Stored Block", "number", block.Number, "hash", block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString()) @@ -324,6 +354,19 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) { } else { s.log.Infow("Reverted HEAD", "reverted", localHead) } + + if s.currReorg == nil { // first block of the reorg + s.currReorg = &ReorgData{ + StartBlockHash: localHead, + StartBlockNum: head.Number, + EndBlockHash: localHead, + EndBlockNum: head.Number, + } + } else { // not the first block of the reorg, adjust the starting block + s.currReorg.StartBlockHash = localHead + s.currReorg.StartBlockNum = head.Number + } + s.listener.OnReorg(head.Number) } @@ -439,3 +482,9 @@ func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription { Subscription: s.newHeads.Subscribe(), } } + +func (s *Synchronizer) SubscribeReorg() ReorgSubscription { + return ReorgSubscription{ + Subscription: s.reorgFeed.Subscribe(), + } +} diff --git a/sync/sync_test.go b/sync/sync_test.go index 4f1d8a096a..6a694d34e4 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -160,8 +160,12 @@ func TestReorg(t *testing.T) { head, err := bc.HeadsHeader() require.NoError(t, err) require.Equal(t, utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"), head.Hash) + integEnd := head + integStart, err := bc.BlockHeaderByNumber(0) + require.NoError(t, err) synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false) + sub := synchronizer.SubscribeReorg() ctx, cancel = context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) cancel() @@ -170,6 +174,14 @@ func TestReorg(t *testing.T) { head, err = bc.HeadsHeader() require.NoError(t, err) require.Equal(t, utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), head.Hash) + + // Validate reorg event + got, ok := <-sub.Recv() + require.True(t, ok) + assert.Equal(t, integEnd.Hash, got.EndBlockHash) + assert.Equal(t, integEnd.Number, got.EndBlockNum) + assert.Equal(t, integStart.Hash, got.StartBlockHash) + assert.Equal(t, integStart.Number, got.StartBlockNum) }) }