From ba90c0f979e7826cc05057ee3693865c0a314eeb Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Tue, 31 Oct 2023 10:40:05 +0100 Subject: [PATCH 1/3] CCIP-1230 Exposing entire LogPollerBlock from LatestBlock in LogPoller (#11105) * Exposing entire LogPollerBlock from LatestBlock function in the LogPoller's interface * Exposing entire LogPollerBlock from LatestBlock function in the LogPoller's interface --- core/chains/evm/logpoller/disabled.go | 4 +- core/chains/evm/logpoller/helper_test.go | 2 +- core/chains/evm/logpoller/log_poller.go | 8 ++-- .../evm/logpoller/log_poller_internal_test.go | 2 +- core/chains/evm/logpoller/log_poller_test.go | 39 ++++++++++++------- core/chains/evm/logpoller/mocks/log_poller.go | 10 ++--- core/services/blockhashstore/coordinators.go | 6 +-- core/services/blockhashstore/delegate.go | 2 +- core/services/blockhashstore/delegate_test.go | 3 +- core/services/blockhashstore/feeder_test.go | 6 +-- .../plugins/ocr2keeper/evm20/log_provider.go | 24 ++++++------ .../ocr2/plugins/ocr2keeper/evm20/registry.go | 10 ++--- .../plugins/ocr2keeper/evm20/registry_test.go | 2 +- .../ocr2keeper/evm21/block_subscriber.go | 7 ++-- .../ocr2keeper/evm21/block_subscriber_test.go | 4 +- .../evm21/logprovider/block_time.go | 5 ++- .../evm21/logprovider/block_time_test.go | 2 +- .../evm21/logprovider/integration_test.go | 4 +- .../ocr2keeper/evm21/logprovider/provider.go | 10 ++--- .../evm21/logprovider/provider_life_cycle.go | 2 +- .../logprovider/provider_life_cycle_test.go | 5 ++- .../evm21/logprovider/provider_test.go | 2 +- .../ocr2keeper/evm21/logprovider/recoverer.go | 8 ++-- .../evm21/logprovider/recoverer_test.go | 11 +++--- .../ocr2/plugins/ocr2keeper/evm21/registry.go | 10 ++--- .../plugins/ocr2keeper/evm21/registry_test.go | 2 +- .../evm21/transmit/event_provider.go | 6 +-- .../evm21/transmit/event_provider_test.go | 2 +- .../ocr2vrf/coordinator/coordinator.go | 2 +- .../ocr2vrf/coordinator/coordinator_test.go | 4 +- core/services/relay/evm/config_poller.go | 2 +- .../relay/evm/functions/config_poller.go | 2 +- .../relay/evm/functions/logpoller_wrapper.go | 15 +++---- .../evm/functions/logpoller_wrapper_test.go | 2 +- .../relay/evm/mercury/config_poller.go | 2 +- 35 files changed, 123 insertions(+), 104 deletions(-) diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index 4bcf1c5086..b54d4e6fc8 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -39,7 +39,9 @@ func (disabled) UnregisterFilter(name string, qopts ...pg.QOpt) error { return E func (disabled) HasFilter(name string) bool { return false } -func (disabled) LatestBlock(qopts ...pg.QOpt) (int64, error) { return -1, ErrDisabled } +func (disabled) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) { + return LogPollerBlock{}, ErrDisabled +} func (disabled) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) { return nil, ErrDisabled diff --git a/core/chains/evm/logpoller/helper_test.go b/core/chains/evm/logpoller/helper_test.go index 8415641c40..c61d3d5fad 100644 --- a/core/chains/evm/logpoller/helper_test.go +++ b/core/chains/evm/logpoller/helper_test.go @@ -92,7 +92,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 { th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) latest, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(ctx)) - return latest + 1 + return latest.BlockNumber + 1 } func (th *TestHarness) assertDontHave(t *testing.T, start, end int) { diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 54999cbdfb..85f69ca49e 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -37,7 +37,7 @@ type LogPoller interface { RegisterFilter(filter Filter, qopts ...pg.QOpt) error UnregisterFilter(name string, qopts ...pg.QOpt) error HasFilter(name string) bool - LatestBlock(qopts ...pg.QOpt) (int64, error) + LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) // General querying @@ -1018,13 +1018,13 @@ func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common. // LatestBlock returns the latest block the log poller is on. It tracks blocks to be able // to detect reorgs. -func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) { +func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) { b, err := lp.orm.SelectLatestBlock(qopts...) if err != nil { - return 0, err + return LogPollerBlock{}, err } - return b.BlockNumber, nil + return *b, nil } func (lp *logPoller) BlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) { diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index b9474158a6..c0d081582f 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -262,7 +262,7 @@ func TestLogPoller_Replay(t *testing.T) { lp.PollAndSaveLogs(tctx, 4) latest, err := lp.LatestBlock() require.NoError(t, err) - require.Equal(t, int64(4), latest) + require.Equal(t, int64(4), latest.BlockNumber) t.Run("abort before replayStart received", func(t *testing.T) { // Replay() should abort immediately if caller's context is cancelled before request signal is read diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 1ee8f4dcb7..471c728cdd 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -311,8 +311,8 @@ func Test_BackupLogPoller(t *testing.T) { body.Transactions = types.Transactions{} // number of tx's must match # of logs for GetLogs() to succeed rawdb.WriteBody(th.EthDB, h.Hash(), h.Number.Uint64(), body) - currentBlock := th.PollAndSaveLogs(ctx, 1) - assert.Equal(t, int64(35), currentBlock) + currentBlockNumber := th.PollAndSaveLogs(ctx, 1) + assert.Equal(t, int64(35), currentBlockNumber) // simulate logs becoming available rawdb.WriteReceipts(th.EthDB, h.Hash(), h.Number.Uint64(), receipts) @@ -342,12 +342,12 @@ func Test_BackupLogPoller(t *testing.T) { markBlockAsFinalized(t, th, 34) // Run ordinary poller + backup poller at least once - currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) - th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1) + currentBlock, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) + th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1) th.LogPoller.BackupPollAndSaveLogs(ctx, 100) currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) - require.Equal(t, int64(37), currentBlock+1) + require.Equal(t, int64(37), currentBlock.BlockNumber+1) // logs still shouldn't show up, because we don't want to backfill the last finalized log // to help with reorg detection @@ -359,11 +359,11 @@ func Test_BackupLogPoller(t *testing.T) { markBlockAsFinalized(t, th, 35) // Run ordinary poller + backup poller at least once more - th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1) th.LogPoller.BackupPollAndSaveLogs(ctx, 100) currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) - require.Equal(t, int64(38), currentBlock+1) + require.Equal(t, int64(38), currentBlock.BlockNumber+1) // all 3 logs in block 34 should show up now, thanks to backup logger logs, err = th.LogPoller.Logs(30, 37, EmitterABI.Events["Log1"].ID, th.EmitterAddress1, @@ -471,6 +471,13 @@ func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) { // 1 -> 2 -> ... th.PollAndSaveLogs(ctx, 1) + // Check that latest block has the same properties as the head + latestBlock, err := th.LogPoller.LatestBlock() + require.NoError(t, err) + assert.Equal(t, latestBlock.BlockNumber, header.Number.Int64()) + assert.Equal(t, latestBlock.FinalizedBlockNumber, header.Number.Int64()) + assert.Equal(t, latestBlock.BlockHash, header.Hash()) + // Register filter err = th.LogPoller.RegisterFilter(logpoller.Filter{ Name: "Test Emitter", @@ -619,7 +626,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) { require.Len(t, gethLogs, 2) lb, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) - th.PollAndSaveLogs(context.Background(), lb+1) + th.PollAndSaveLogs(context.Background(), lb.BlockNumber+1) lg1, err := th.LogPoller.Logs(0, 20, EmitterABI.Events["Log1"].ID, th.EmitterAddress1, pg.WithParentCtx(testutils.Context(t))) require.NoError(t, err) @@ -667,9 +674,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1. ec.Commit() } - currentBlock := int64(1) - lp.PollAndSaveLogs(testutils.Context(t), currentBlock) - currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t))) + currentBlockNumber := int64(1) + lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber) + currentBlock, err := lp.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.NoError(t, err) matchesGeth := func() bool { // Check every block is identical @@ -719,7 +726,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber) currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.NoError(t, err) } @@ -1245,7 +1252,7 @@ func TestGetReplayFromBlock(t *testing.T) { require.NoError(t, err) latest, err := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.NoError(t, err) - assert.Equal(t, latest, fromBlock) + assert.Equal(t, latest.BlockNumber, fromBlock) // Should take min(latest, requested) in this case requested. requested = int64(7) @@ -1551,6 +1558,10 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000) + // Should return error before the first poll and save + _, err := th.LogPoller.LatestBlock() + require.Error(t, err) + // Mark first block as finalized h := th.Client.Blockchain().CurrentHeader() th.Client.Blockchain().SetFinalized(h) @@ -1562,7 +1573,7 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) { th.PollAndSaveLogs(ctx, 1) - latestBlock, err := th.ORM.SelectLatestBlock() + latestBlock, err := th.LogPoller.LatestBlock() require.NoError(t, err) require.Equal(t, int64(numberOfBlocks), latestBlock.BlockNumber) require.Equal(t, tt.expectedFinalizedBlock, latestBlock.FinalizedBlockNumber) diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index f435734164..01be5f7ba5 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -330,7 +330,7 @@ func (_m *LogPoller) IndexedLogsWithSigsExcluding(address common.Address, eventS } // LatestBlock provides a mock function with given fields: qopts -func (_m *LogPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) { +func (_m *LogPoller) LatestBlock(qopts ...pg.QOpt) (logpoller.LogPollerBlock, error) { _va := make([]interface{}, len(qopts)) for _i := range qopts { _va[_i] = qopts[_i] @@ -339,15 +339,15 @@ func (_m *LogPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) { _ca = append(_ca, _va...) ret := _m.Called(_ca...) - var r0 int64 + var r0 logpoller.LogPollerBlock var r1 error - if rf, ok := ret.Get(0).(func(...pg.QOpt) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(...pg.QOpt) (logpoller.LogPollerBlock, error)); ok { return rf(qopts...) } - if rf, ok := ret.Get(0).(func(...pg.QOpt) int64); ok { + if rf, ok := ret.Get(0).(func(...pg.QOpt) logpoller.LogPollerBlock); ok { r0 = rf(qopts...) } else { - r0 = ret.Get(0).(int64) + r0 = ret.Get(0).(logpoller.LogPollerBlock) } if rf, ok := ret.Get(1).(func(...pg.QOpt) error); ok { diff --git a/core/services/blockhashstore/coordinators.go b/core/services/blockhashstore/coordinators.go index ff5aff1f5e..4cb58bab6f 100644 --- a/core/services/blockhashstore/coordinators.go +++ b/core/services/blockhashstore/coordinators.go @@ -128,7 +128,7 @@ func (v *V1Coordinator) Fulfillments(ctx context.Context, fromBlock uint64) ([]E logs, err := v.lp.LogsWithSigs( int64(fromBlock), - int64(toBlock), + toBlock.BlockNumber, []common.Hash{ v1.VRFCoordinatorRandomnessRequestFulfilled{}.Topic(), }, @@ -219,7 +219,7 @@ func (v *V2Coordinator) Fulfillments(ctx context.Context, fromBlock uint64) ([]E logs, err := v.lp.LogsWithSigs( int64(fromBlock), - int64(toBlock), + toBlock.BlockNumber, []common.Hash{ v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic(), }, @@ -310,7 +310,7 @@ func (v *V2PlusCoordinator) Fulfillments(ctx context.Context, fromBlock uint64) logs, err := v.lp.LogsWithSigs( int64(fromBlock), - int64(toBlock), + toBlock.BlockNumber, []common.Hash{ v2plus.IVRFCoordinatorV2PlusInternalRandomWordsFulfilled{}.Topic(), }, diff --git a/core/services/blockhashstore/delegate.go b/core/services/blockhashstore/delegate.go index 90819f27a1..e60ffc41cc 100644 --- a/core/services/blockhashstore/delegate.go +++ b/core/services/blockhashstore/delegate.go @@ -172,7 +172,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { if err != nil { return 0, errors.Wrap(err, "getting chain head") } - return uint64(head), nil + return uint64(head.BlockNumber), nil }) return []job.ServiceCtx{&service{ diff --git a/core/services/blockhashstore/delegate_test.go b/core/services/blockhashstore/delegate_test.go index 582492105e..20b727de79 100644 --- a/core/services/blockhashstore/delegate_test.go +++ b/core/services/blockhashstore/delegate_test.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" mocklp "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -58,7 +59,7 @@ func createTestDelegate(t *testing.T) (*blockhashstore.Delegate, *testData) { sendingKey, _ := cltest.MustInsertRandomKey(t, kst) lp := &mocklp.LogPoller{} lp.On("RegisterFilter", mock.Anything).Return(nil) - lp.On("LatestBlock", mock.Anything, mock.Anything).Return(int64(0), nil) + lp.On("LatestBlock", mock.Anything, mock.Anything).Return(logpoller.LogPollerBlock{}, nil) relayExtenders := evmtest.NewChainRelayExtenders( t, diff --git a/core/services/blockhashstore/feeder_test.go b/core/services/blockhashstore/feeder_test.go index 3145a9fd76..8d9ed48c4b 100644 --- a/core/services/blockhashstore/feeder_test.go +++ b/core/services/blockhashstore/feeder_test.go @@ -445,7 +445,7 @@ func (test testCase) testFeederWithLogPollerVRFv1(t *testing.T) { // Mock log poller. lp.On("LatestBlock", mock.Anything). - Return(latest, nil) + Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil) lp.On( "LogsWithSigs", fromBlock, @@ -543,7 +543,7 @@ func (test testCase) testFeederWithLogPollerVRFv2(t *testing.T) { // Mock log poller. lp.On("LatestBlock", mock.Anything). - Return(latest, nil) + Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil) lp.On( "LogsWithSigs", fromBlock, @@ -641,7 +641,7 @@ func (test testCase) testFeederWithLogPollerVRFv2Plus(t *testing.T) { // Mock log poller. lp.On("LatestBlock", mock.Anything). - Return(latest, nil) + Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil) lp.On( "LogsWithSigs", fromBlock, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go b/core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go index e32c4a0662..4449137f91 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go @@ -150,8 +150,8 @@ func (c *LogProvider) PerformLogs(ctx context.Context) ([]ocr2keepers.PerformLog // always check the last lookback number of blocks and rebroadcast // this allows the plugin to make decisions based on event confirmations logs, err := c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ registry.KeeperRegistryUpkeepPerformed{}.Topic(), }, @@ -174,7 +174,7 @@ func (c *LogProvider) PerformLogs(ctx context.Context) ([]ocr2keepers.PerformLog Key: UpkeepKeyHelper[uint32]{}.MakeUpkeepKey(p.CheckBlockNumber, p.Id), TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(p.BlockNumber), TransactionHash: p.TxHash.Hex(), - Confirmations: end - p.BlockNumber, + Confirmations: end.BlockNumber - p.BlockNumber, } vals = append(vals, l) } @@ -193,8 +193,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR // ReorgedUpkeepReportLogs logs, err := c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ registry.KeeperRegistryReorgedUpkeepReport{}.Topic(), }, @@ -211,8 +211,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR // StaleUpkeepReportLogs logs, err = c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ registry.KeeperRegistryStaleUpkeepReport{}.Topic(), }, @@ -229,8 +229,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR // InsufficientFundsUpkeepReportLogs logs, err = c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ registry.KeeperRegistryInsufficientFundsUpkeepReport{}.Topic(), }, @@ -257,7 +257,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId), TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber), TransactionHash: r.TxHash.Hex(), - Confirmations: end - r.BlockNumber, + Confirmations: end.BlockNumber - r.BlockNumber, } vals = append(vals, l) } @@ -272,7 +272,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId), TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber), TransactionHash: r.TxHash.Hex(), - Confirmations: end - r.BlockNumber, + Confirmations: end.BlockNumber - r.BlockNumber, } vals = append(vals, l) } @@ -287,7 +287,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId), TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber), TransactionHash: r.TxHash.Hex(), - Confirmations: end - r.BlockNumber, + Confirmations: end.BlockNumber - r.BlockNumber, } vals = append(vals, l) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm20/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm20/registry.go index 7d1de17a5b..096026afc5 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm20/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm20/registry.go @@ -346,7 +346,7 @@ func (r *EvmRegistry) initialize() error { func (r *EvmRegistry) pollLogs() error { var latest int64 - var end int64 + var end logpoller.LogPollerBlock var err error if end, err = r.poller.LatestBlock(pg.WithParentCtx(r.ctx)); err != nil { @@ -355,11 +355,11 @@ func (r *EvmRegistry) pollLogs() error { r.mu.Lock() latest = r.lastPollBlock - r.lastPollBlock = end + r.lastPollBlock = end.BlockNumber r.mu.Unlock() // if start and end are the same, no polling needs to be done - if latest == 0 || latest == end { + if latest == 0 || latest == end.BlockNumber { return nil } @@ -367,8 +367,8 @@ func (r *EvmRegistry) pollLogs() error { var logs []logpoller.Log if logs, err = r.poller.LogsWithSigs( - end-logEventLookback, - end, + end.BlockNumber-logEventLookback, + end.BlockNumber, upkeepStateEvents, r.addr, pg.WithParentCtx(r.ctx), diff --git a/core/services/ocr2/plugins/ocr2keeper/evm20/registry_test.go b/core/services/ocr2/plugins/ocr2keeper/evm20/registry_test.go index 348b5a47c0..8662bfd047 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm20/registry_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm20/registry_test.go @@ -189,7 +189,7 @@ func TestPollLogs(t *testing.T) { if test.LatestBlock != nil { mp.On("LatestBlock", mock.Anything). - Return(test.LatestBlock.OutputBlock, test.LatestBlock.OutputErr) + Return(logpoller.LogPollerBlock{BlockNumber: test.LatestBlock.OutputBlock}, test.LatestBlock.OutputErr) } if test.LogsWithSigs != nil { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go index 9766d98876..ac0f14b8fd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go @@ -77,12 +77,13 @@ func (bs *BlockSubscriber) getBlockRange(ctx context.Context) ([]uint64, error) if err != nil { return nil, err } - bs.lggr.Infof("latest block from log poller is %d", h) + latestBlockNumber := h.BlockNumber + bs.lggr.Infof("latest block from log poller is %d", latestBlockNumber) var blocks []uint64 for i := bs.blockSize - 1; i >= 0; i-- { - if h-i > 0 { - blocks = append(blocks, uint64(h-i)) + if latestBlockNumber-i > 0 { + blocks = append(blocks, uint64(latestBlockNumber-i)) } } return blocks, nil diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go index 618ea83d4e..004b5fac6c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go @@ -97,7 +97,7 @@ func TestBlockSubscriber_GetBlockRange(t *testing.T) { for _, tc := range tests { t.Run(tc.Name, func(t *testing.T) { lp := new(mocks.LogPoller) - lp.On("LatestBlock", mock.Anything).Return(tc.LatestBlock, tc.LatestBlockErr) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.LatestBlock}, tc.LatestBlockErr) bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize @@ -278,7 +278,7 @@ func TestBlockSubscriber_Start(t *testing.T) { hb := commonmocks.NewHeadBroadcaster[*evmtypes.Head, common.Hash](t) hb.On("Subscribe", mock.Anything).Return(&evmtypes.Head{Number: 42}, func() {}) lp := new(mocks.LogPoller) - lp.On("LatestBlock", mock.Anything).Return(int64(100), nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) blocks := []uint64{97, 98, 99, 100} pollerBlocks := []logpoller.LogPollerBlock{ { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go index 9fc35dd84b..814ed29d90 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go @@ -34,10 +34,11 @@ func (r *blockTimeResolver) BlockTime(ctx context.Context, blockSampleSize int64 if err != nil { return 0, fmt.Errorf("failed to get latest block from poller: %w", err) } - if latest <= blockSampleSize { + latestBlockNumber := latest.BlockNumber + if latestBlockNumber <= blockSampleSize { return defaultBlockTime, nil } - start, end := latest-blockSampleSize, latest + start, end := latestBlockNumber-blockSampleSize, latestBlockNumber startTime, endTime, err := r.getSampleTimestamps(ctx, uint64(start), uint64(end)) if err != nil { return 0, err diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time_test.go index 0ad9990e18..7009cfaa9b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time_test.go @@ -69,7 +69,7 @@ func TestBlockTimeResolver_BlockTime(t *testing.T) { lp := new(lpmocks.LogPoller) resolver := newBlockTimeResolver(lp) - lp.On("LatestBlock", mock.Anything).Return(tc.latestBlock, tc.latestBlockErr) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.latestBlock}, tc.latestBlockErr) lp.On("GetBlocksRange", mock.Anything, mock.Anything).Return(tc.blocksRange, tc.blocksRangeErr) blockTime, err := resolver.BlockTime(ctx, tc.blockSampleSize) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go index 506dcb9ea3..d454e33533 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go @@ -317,7 +317,7 @@ func TestIntegration_LogEventProvider_RateLimit(t *testing.T) { var minimumBlockCount int64 = 500 latestBlock, _ := lp.LatestBlock() - assert.GreaterOrEqual(t, latestBlock, minimumBlockCount, "to ensure the integrety of the test, the minimum block count before the test should be %d but got %d", minimumBlockCount, latestBlock) + assert.GreaterOrEqual(t, latestBlock.BlockNumber, minimumBlockCount, "to ensure the integrety of the test, the minimum block count before the test should be %d but got %d", minimumBlockCount, latestBlock) } require.NoError(t, logProvider.ReadLogs(ctx, ids...)) @@ -566,7 +566,7 @@ func waitLogPoller(ctx context.Context, t *testing.T, backend *backends.Simulate for { latestPolled, lberr := lp.LatestBlock(pg.WithParentCtx(ctx)) require.NoError(t, lberr) - if latestPolled >= latestBlock { + if latestPolled.BlockNumber >= latestBlock { break } lp.PollAndSaveLogs(ctx, latestBlock) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index b62fb37084..3a6aee63b6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -159,11 +159,11 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers if err != nil { return nil, fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } - start := latest - p.opts.LookbackBlocks + start := latest.BlockNumber - p.opts.LookbackBlocks if start <= 0 { start = 1 } - logs := p.buffer.dequeueRange(start, latest, AllowedLogsPerUpkeep, MaxPayloads) + logs := p.buffer.dequeueRange(start, latest.BlockNumber, AllowedLogsPerUpkeep, MaxPayloads) // p.lggr.Debugw("got latest logs from buffer", "latest", latest, "diff", diff, "logs", len(logs)) @@ -197,12 +197,12 @@ func (p *logEventProvider) ReadLogs(pctx context.Context, ids ...*big.Int) error if err != nil { return fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } - if latest == 0 { + if latest.BlockNumber == 0 { return fmt.Errorf("%w: %s", ErrHeadNotAvailable, "latest block is 0") } - filters := p.getFilters(latest, ids...) + filters := p.getFilters(latest.BlockNumber, ids...) - err = p.readLogs(ctx, latest, filters) + err = p.readLogs(ctx, latest.BlockNumber, filters) p.updateFiltersLastPoll(filters) // p.lggr.Debugw("read logs for entries", "latestBlock", latest, "entries", len(entries), "err", err) if err != nil { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go index ab816adb1b..69a4872351 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go @@ -128,7 +128,7 @@ func (p *logEventProvider) register(ctx context.Context, lpFilter logpoller.Filt // already registered in DB before, no need to backfill return nil } - backfillBlock := latest - int64(LogBackfillBuffer) + backfillBlock := latest.BlockNumber - int64(LogBackfillBuffer) if backfillBlock < 1 { // New chain, backfill from start backfillBlock = 1 diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go index 4b1ff06f31..03395cb5b5 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/core" @@ -109,7 +110,7 @@ func TestLogEventProvider_LifeCycle(t *testing.T) { lp := new(mocks.LogPoller) lp.On("RegisterFilter", mock.Anything).Return(nil) lp.On("UnregisterFilter", mock.Anything).Return(nil) - lp.On("LatestBlock", mock.Anything).Return(int64(0), nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{}, nil) hasFitlerTimes := 1 if tc.unregister { hasFitlerTimes = 2 @@ -149,7 +150,7 @@ func TestEventLogProvider_RefreshActiveUpkeeps(t *testing.T) { mp.On("RegisterFilter", mock.Anything).Return(nil) mp.On("UnregisterFilter", mock.Anything).Return(nil) mp.On("HasFilter", mock.Anything).Return(false) - mp.On("LatestBlock", mock.Anything).Return(int64(0), nil) + mp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{}, nil) mp.On("ReplayAsync", mock.Anything).Return(nil) p := NewLogProvider(logger.TestLogger(t), mp, &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200)) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go index db22886cbb..a8e33ba23b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go @@ -248,7 +248,7 @@ func TestLogEventProvider_ReadLogs(t *testing.T) { mp.On("ReplayAsync", mock.Anything).Return() mp.On("HasFilter", mock.Anything).Return(false) mp.On("UnregisterFilter", mock.Anything, mock.Anything).Return(nil) - mp.On("LatestBlock", mock.Anything).Return(int64(1), nil) + mp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: int64(1)}, nil) mp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{ { BlockNumber: 1, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 3994f1d841..a1a10a0ab3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -206,7 +206,7 @@ func (r *logRecoverer) getLogTriggerCheckData(ctx context.Context, proposal ocr2 return nil, err } - start, offsetBlock := r.getRecoveryWindow(latest) + start, offsetBlock := r.getRecoveryWindow(latest.BlockNumber) if proposal.Trigger.LogTriggerExtension == nil { return nil, errors.New("missing log trigger extension") } @@ -295,7 +295,7 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. allLogsCounter := 0 logsCount := map[string]int{} - r.sortPending(uint64(latestBlock)) + r.sortPending(uint64(latestBlock.BlockNumber)) var results, pending []ocr2keepers.UpkeepPayload for _, payload := range r.pending { @@ -328,7 +328,7 @@ func (r *logRecoverer) recover(ctx context.Context) error { return fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } - start, offsetBlock := r.getRecoveryWindow(latest) + start, offsetBlock := r.getRecoveryWindow(latest.BlockNumber) if offsetBlock < 0 { // too soon to recover, we don't have enough blocks return nil @@ -609,7 +609,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { return fmt.Errorf("failed to get states: %w", err) } lggr := r.lggr.With("where", "clean") - start, _ := r.getRecoveryWindow(latestBlock) + start, _ := r.getRecoveryWindow(latestBlock.BlockNumber) r.lock.Lock() defer r.lock.Unlock() var removed int diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go index 2fdf04f76c..c882a22bc1 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go @@ -32,7 +32,7 @@ func TestLogRecoverer_GetRecoverables(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() lp := &lpmocks.LogPoller{} - lp.On("LatestBlock", mock.Anything).Return(int64(100), nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) r := NewLogRecoverer(logger.TestLogger(t), lp, nil, nil, nil, nil, NewOptions(200)) tests := []struct { @@ -182,7 +182,7 @@ func TestLogRecoverer_Clean(t *testing.T) { start, _ := r.getRecoveryWindow(0) block24h := int64(math.Abs(float64(start))) - lp.On("LatestBlock", mock.Anything).Return(block24h+oldLogsOffset, nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: block24h + oldLogsOffset}, nil) statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, nil) r.lock.Lock() @@ -423,7 +423,7 @@ func TestLogRecoverer_Recover(t *testing.T) { recoverer, filterStore, lp, statesReader := setupTestRecoverer(t, time.Millisecond*50, lookbackBlocks) filterStore.AddActiveUpkeeps(tc.active...) - lp.On("LatestBlock", mock.Anything).Return(tc.latestBlock, tc.latestBlockErr) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.latestBlock}, tc.latestBlockErr) lp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.logs, tc.logsErr) statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, tc.statesErr) @@ -1206,8 +1206,9 @@ type mockLogPoller struct { func (p *mockLogPoller) LogsWithSigs(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) { return p.LogsWithSigsFn(start, end, eventSigs, address, qopts...) } -func (p *mockLogPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) { - return p.LatestBlockFn(qopts...) +func (p *mockLogPoller) LatestBlock(qopts ...pg.QOpt) (logpoller.LogPollerBlock, error) { + block, err := p.LatestBlockFn(qopts...) + return logpoller.LogPollerBlock{BlockNumber: block}, err } type mockClient struct { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 1cad587e63..0fc4499ef5 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -361,7 +361,7 @@ func (r *EvmRegistry) refreshLogTriggerUpkeepsBatch(logTriggerIDs []*big.Int) er func (r *EvmRegistry) pollUpkeepStateLogs() error { var latest int64 - var end int64 + var end logpoller.LogPollerBlock var err error if end, err = r.poller.LatestBlock(pg.WithParentCtx(r.ctx)); err != nil { @@ -370,18 +370,18 @@ func (r *EvmRegistry) pollUpkeepStateLogs() error { r.mu.Lock() latest = r.lastPollBlock - r.lastPollBlock = end + r.lastPollBlock = end.BlockNumber r.mu.Unlock() // if start and end are the same, no polling needs to be done - if latest == 0 || latest == end { + if latest == 0 || latest == end.BlockNumber { return nil } var logs []logpoller.Log if logs, err = r.poller.LogsWithSigs( - end-logEventLookback, - end, + end.BlockNumber-logEventLookback, + end.BlockNumber, upkeepStateEvents, r.addr, pg.WithParentCtx(r.ctx), diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go index 0cd5ecd259..4be0ccce4e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go @@ -145,7 +145,7 @@ func TestPollLogs(t *testing.T) { if test.LatestBlock != nil { mp.On("LatestBlock", mock.Anything). - Return(test.LatestBlock.OutputBlock, test.LatestBlock.OutputErr) + Return(logpoller.LogPollerBlock{BlockNumber: test.LatestBlock.OutputBlock}, test.LatestBlock.OutputErr) } if test.LogsWithSigs != nil { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go index b0ae2a7bf6..5682fe877b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go @@ -140,8 +140,8 @@ func (c *EventProvider) GetLatestEvents(ctx context.Context) ([]ocr2keepers.Tran // always check the last lookback number of blocks and rebroadcast // this allows the plugin to make decisions based on event confirmations logs, err := c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ iregistry21.IKeeperRegistryMasterUpkeepPerformed{}.Topic(), iregistry21.IKeeperRegistryMasterStaleUpkeepReport{}.Topic(), @@ -155,7 +155,7 @@ func (c *EventProvider) GetLatestEvents(ctx context.Context) ([]ocr2keepers.Tran return nil, fmt.Errorf("%w: failed to collect logs from log poller", err) } - return c.processLogs(end, logs...) + return c.processLogs(end.BlockNumber, logs...) } // processLogs will parse the unseen logs and return the corresponding transmit events. diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider_test.go index 72f3b63088..58e95bc423 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider_test.go @@ -89,7 +89,7 @@ func TestTransmitEventProvider_Sanity(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - lp.On("LatestBlock", mock.Anything).Return(tc.latestBlock, nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.latestBlock}, nil) lp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.logs, nil) res, err := provider.GetLatestEvents(ctx) diff --git a/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator.go b/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator.go index e51b68f415..1b58a01732 100644 --- a/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator.go +++ b/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator.go @@ -227,7 +227,7 @@ func (c *coordinator) CurrentChainHeight(ctx context.Context) (uint64, error) { if err != nil { return 0, err } - return uint64(head), nil + return uint64(head.BlockNumber), nil } // ReportIsOnchain returns true iff a report for the given OCR epoch/round is diff --git a/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator_test.go b/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator_test.go index 26d0f2996a..dc489b4958 100644 --- a/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator_test.go +++ b/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator_test.go @@ -1032,7 +1032,7 @@ func TestCoordinator_ReportBlocks(t *testing.T) { requestedBlocks := []uint64{195, 196} lp := lp_mocks.NewLogPoller(t) lp.On("LatestBlock", mock.Anything). - Return(int64(latestHeadNumber), nil) + Return(logpoller.LogPollerBlock{BlockNumber: int64(latestHeadNumber)}, nil) lp.On("GetBlocksRange", mock.Anything, append(requestedBlocks, uint64(latestHeadNumber-lookbackBlocks+1), uint64(latestHeadNumber)), mock.Anything). Return(nil, errors.New("GetBlocks error")) @@ -1720,7 +1720,7 @@ func getLogPoller( lp := lp_mocks.NewLogPoller(t) if needsLatestBlock { lp.On("LatestBlock", mock.Anything). - Return(int64(latestHeadNumber), nil) + Return(logpoller.LogPollerBlock{BlockNumber: int64(latestHeadNumber)}, nil) } var logPollerBlocks []logpoller.LogPollerBlock diff --git a/core/services/relay/evm/config_poller.go b/core/services/relay/evm/config_poller.go index 504155bf1e..f52cc43084 100644 --- a/core/services/relay/evm/config_poller.go +++ b/core/services/relay/evm/config_poller.go @@ -211,7 +211,7 @@ func (cp *configPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint } return 0, err } - return uint64(latest), nil + return uint64(latest.BlockNumber), nil } func (cp *configPoller) isConfigStoreAvailable() bool { diff --git a/core/services/relay/evm/functions/config_poller.go b/core/services/relay/evm/functions/config_poller.go index f068f13cc7..7a59d49989 100644 --- a/core/services/relay/evm/functions/config_poller.go +++ b/core/services/relay/evm/functions/config_poller.go @@ -181,7 +181,7 @@ func (cp *configPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint } return 0, err } - return uint64(latest), nil + return uint64(latest.BlockNumber), nil } // called from LogPollerWrapper in a separate goroutine diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index db2c7fd68c..8a2505b58d 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -76,7 +76,7 @@ func (l *logPollerWrapper) Start(context.Context) error { l.lggr.Errorw("LogPollerWrapper: LatestBlock() failed, starting from 0", "error", err) } else { l.lggr.Debugw("LogPollerWrapper: LatestBlock() got starting block", "block", nextBlock) - l.nextBlock = nextBlock - l.blockOffset + l.nextBlock = nextBlock.BlockNumber - l.blockOffset } l.closeWait.Add(1) go l.checkForRouteUpdates() @@ -122,9 +122,10 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.mu.Unlock() return nil, nil, err } - latest -= l.blockOffset - if latest >= nextBlock { - l.nextBlock = latest + 1 + latestBlockNumber := latest.BlockNumber + latestBlockNumber -= l.blockOffset + if latestBlockNumber >= nextBlock { + l.nextBlock = latestBlockNumber + 1 } l.mu.Unlock() @@ -135,18 +136,18 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.lggr.Debug("LatestEvents: no non-zero coordinators to check") return resultsReq, resultsResp, errors.New("no non-zero coordinators to check") } - if latest < nextBlock { + if latestBlockNumber < nextBlock { l.lggr.Debugw("LatestEvents: no new blocks to check", "latest", latest, "nextBlock", nextBlock) return resultsReq, resultsResp, nil } for _, coordinator := range coordinators { - requestLogs, err := l.logPoller.Logs(nextBlock, latest, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator) + requestLogs, err := l.logPoller.Logs(nextBlock, latestBlockNumber, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator) if err != nil { l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "latest", latest, "nextBlock", nextBlock) return nil, nil, err } - responseLogs, err := l.logPoller.Logs(nextBlock, latest, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator) + responseLogs, err := l.logPoller.Logs(nextBlock, latestBlockNumber, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator) if err != nil { l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "latest", latest, "nextBlock", nextBlock) return nil, nil, err diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index 224aa51a5d..c91c3c49aa 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -60,7 +60,7 @@ func setUp(t *testing.T, updateFrequencySec uint32) (*lpmocks.LogPoller, types.L lpWrapper, err := functions.NewLogPollerWrapper(gethcommon.Address{}, config, client, lp, lggr) require.NoError(t, err) - lp.On("LatestBlock").Return(int64(100), nil) + lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) return lp, lpWrapper, client } diff --git a/core/services/relay/evm/mercury/config_poller.go b/core/services/relay/evm/mercury/config_poller.go index 2f16157bfa..8964a28304 100644 --- a/core/services/relay/evm/mercury/config_poller.go +++ b/core/services/relay/evm/mercury/config_poller.go @@ -188,7 +188,7 @@ func (cp *ConfigPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint } return 0, err } - return uint64(latest), nil + return uint64(latest.BlockNumber), nil } func (cp *ConfigPoller) startLogSubscription() { From 0f9bf06d9a55683f4bca71d0a01672dfc6122a96 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Mon, 13 Nov 2023 17:43:00 +0100 Subject: [PATCH 2/3] CCIP-1277 LogPoller - Fixing leaky abstraction by removing Q() from the ORM interface (#11200) * Fixing leaky abstraction by removing Q() from the ORM interface. Moving all the TX internals into the ORM implementation * Adding test * Post review fix * Post rebase fixes --- core/chains/evm/logpoller/log_poller.go | 39 +---- core/chains/evm/logpoller/models.go | 9 ++ core/chains/evm/logpoller/observability.go | 22 +-- .../evm/logpoller/observability_test.go | 2 +- core/chains/evm/logpoller/orm.go | 99 ++++++++---- core/chains/evm/logpoller/orm_test.go | 141 +++++++++++++++++- 6 files changed, 232 insertions(+), 80 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 85f69ca49e..b80a6f5f0b 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -676,9 +676,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks) - err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { - return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithQueryer(tx)) - }) + err = lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithParentCtx(ctx)) if err != nil { lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to) return err @@ -747,21 +745,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren // the canonical set per read. Typically, if an application took action on a log // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. // Its also nicely analogous to reading from the chain itself. - err2 = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { - // These deletes are bounded by reorg depth, so they are - // fast and should not slow down the log readers. - err3 := lp.orm.DeleteBlocksAfter(blockAfterLCA.Number, pg.WithQueryer(tx)) - if err3 != nil { - lp.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err3) - return err3 - } - err3 = lp.orm.DeleteLogsAfter(blockAfterLCA.Number, pg.WithQueryer(tx)) - if err3 != nil { - lp.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err3) - return err3 - } - return nil - }) + err2 = lp.orm.DeleteLogsAndBlocksAfter(blockAfterLCA.Number, pg.WithParentCtx(ctx)) if err2 != nil { // If we error on db commit, we can't know if the tx went through or not. // We return an error here which will cause us to restart polling from lastBlockSaved + 1 @@ -846,20 +830,11 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int return } lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp.Unix()) - err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { - if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber, pg.WithQueryer(tx)); err2 != nil { - return err2 - } - if len(logs) == 0 { - return nil - } - return lp.orm.InsertLogs(convertLogs(logs, - []LogPollerBlock{{BlockNumber: currentBlockNumber, - BlockTimestamp: currentBlock.Timestamp}}, - lp.lggr, - lp.ec.ConfiguredChainID(), - ), pg.WithQueryer(tx)) - }) + block := NewLogPollerBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber) + err = lp.orm.InsertLogsWithBlock( + convertLogs(logs, []LogPollerBlock{block}, lp.lggr, lp.ec.ConfiguredChainID()), + block, + ) if err != nil { lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber) return diff --git a/core/chains/evm/logpoller/models.go b/core/chains/evm/logpoller/models.go index 9c55786777..87ddd079a5 100644 --- a/core/chains/evm/logpoller/models.go +++ b/core/chains/evm/logpoller/models.go @@ -56,3 +56,12 @@ func (l *Log) ToGethLog() types.Log { Index: uint(l.LogIndex), } } + +func NewLogPollerBlock(blockHash common.Hash, blockNumber int64, timestamp time.Time, finalizedBlockNumber int64) LogPollerBlock { + return LogPollerBlock{ + BlockHash: blockHash, + BlockNumber: blockNumber, + BlockTimestamp: timestamp, + FinalizedBlockNumber: finalizedBlockNumber, + } +} diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index d3415bfcae..b2d7ff9198 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -68,19 +68,15 @@ func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QC } } -func (o *ObservedORM) Q() pg.Q { - return o.ORM.Q() -} - func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { return withObservedExec(o, "InsertLogs", func() error { return o.ORM.InsertLogs(logs, qopts...) }) } -func (o *ObservedORM) InsertBlock(hash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlock int64, qopts ...pg.QOpt) error { - return withObservedExec(o, "InsertBlock", func() error { - return o.ORM.InsertBlock(hash, blockNumber, blockTimestamp, lastFinalizedBlock, qopts...) +func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error { + return withObservedExec(o, "InsertLogsWithBlock", func() error { + return o.ORM.InsertLogsWithBlock(logs, block, qopts...) }) } @@ -102,21 +98,15 @@ func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error { }) } -func (o *ObservedORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error { - return withObservedExec(o, "DeleteBlocksAfter", func() error { - return o.ORM.DeleteBlocksAfter(start, qopts...) - }) -} - func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { return withObservedExec(o, "DeleteBlocksBefore", func() error { return o.ORM.DeleteBlocksBefore(end, qopts...) }) } -func (o *ObservedORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error { - return withObservedExec(o, "DeleteLogsAfter", func() error { - return o.ORM.DeleteLogsAfter(start, qopts...) +func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error { + return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error { + return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...) }) } diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 0d3eadf47d..ded3d7854d 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -38,7 +38,7 @@ func TestMultipleMetricsArePublished(t *testing.T) { _, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(0, []common.Address{{}}, []common.Hash{{}}, 1, pg.WithParentCtx(ctx)) _, _ = orm.SelectIndexedLogsCreatedAfter(common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx)) _ = orm.InsertLogs([]Log{}, pg.WithParentCtx(ctx)) - _ = orm.InsertBlock(common.Hash{}, 1, time.Now(), 0, pg.WithParentCtx(ctx)) + _ = orm.InsertLogsWithBlock([]Log{}, NewLogPollerBlock(common.Hash{}, 1, time.Now(), 0), pg.WithParentCtx(ctx)) require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration)) require.Equal(t, 10, testutil.CollectAndCount(orm.datasetSize)) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 43dd96c30a..f107df5ca9 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -20,17 +20,15 @@ import ( // it exposes some of the database implementation details (e.g. pg.Q). Ideally it should be agnostic and could be applied to any persistence layer. // What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM. type ORM interface { - Q() pg.Q InsertLogs(logs []Log, qopts ...pg.QOpt) error - InsertBlock(blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlockNumber int64, qopts ...pg.QOpt) error + InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error InsertFilter(filter Filter, qopts ...pg.QOpt) error LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) DeleteFilter(name string, qopts ...pg.QOpt) error - DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error - DeleteLogsAfter(start int64, qopts ...pg.QOpt) error + DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error DeleteExpiredLogs(qopts ...pg.QOpt) error GetBlocksRange(start int64, end int64, qopts ...pg.QOpt) ([]LogPollerBlock, error) @@ -58,6 +56,7 @@ type ORM interface { type DbORM struct { chainID *big.Int q pg.Q + lggr logger.Logger } // NewORM creates a DbORM scoped to chainID. @@ -67,13 +66,10 @@ func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) * return &DbORM{ chainID: chainID, q: q, + lggr: lggr, } } -func (o *DbORM) Q() pg.Q { - return o.q -} - // InsertBlock is idempotent to support replays. func (o *DbORM) InsertBlock(blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64, qopts ...pg.QOpt) error { args, err := newQueryArgs(o.chainID). @@ -191,12 +187,6 @@ func (o *DbORM) SelectLatestLogByEventSigWithConfs(eventSig common.Hash, address return &l, nil } -// DeleteBlocksAfter delete all blocks after and including start. -func (o *DbORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error { - q := o.q.WithOpts(qopts...) - return q.ExecQ(`DELETE FROM evm.log_poller_blocks WHERE block_number >= $1 AND evm_chain_id = $2`, start, utils.NewBig(o.chainID)) -} - // DeleteBlocksBefore delete all blocks before and including end. func (o *DbORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) @@ -204,9 +194,31 @@ func (o *DbORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { return err } -func (o *DbORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error { - q := o.q.WithOpts(qopts...) - return q.ExecQ(`DELETE FROM evm.logs WHERE block_number >= $1 AND evm_chain_id = $2`, start, utils.NewBig(o.chainID)) +func (o *DbORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error { + // These deletes are bounded by reorg depth, so they are + // fast and should not slow down the log readers. + return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error { + args, err := newQueryArgs(o.chainID). + withStartBlock(start). + toArgs() + if err != nil { + o.lggr.Error("Cant build args for DeleteLogsAndBlocksAfter queries", "err", err) + return err + } + + _, err = tx.NamedExec(`DELETE FROM evm.log_poller_blocks WHERE block_number >= :start_block AND evm_chain_id = :evm_chain_id`, args) + if err != nil { + o.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err) + return err + } + + _, err = tx.NamedExec(`DELETE FROM evm.logs WHERE block_number >= :start_block AND evm_chain_id = :evm_chain_id`, args) + if err != nil { + o.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err) + return err + } + return nil + }) } type Exp struct { @@ -233,13 +245,35 @@ func (o *DbORM) DeleteExpiredLogs(qopts ...pg.QOpt) error { // InsertLogs is idempotent to support replays. func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { - for _, log := range logs { - if o.chainID.Cmp(log.EvmChainId.ToInt()) != 0 { - return errors.Errorf("invalid chainID in log got %v want %v", log.EvmChainId.ToInt(), o.chainID) - } + if err := o.validateLogs(logs); err != nil { + return err } - q := o.q.WithOpts(qopts...) + return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error { + return o.insertLogsWithinTx(logs, tx) + }) +} + +func (o *DbORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error { + // Optimization, don't open TX when there is only a block to be persisted + if len(logs) == 0 { + return o.InsertBlock(block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, qopts...) + } + + if err := o.validateLogs(logs); err != nil { + return err + } + + // Block and logs goes with the same TX to ensure atomicity + return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error { + if err := o.InsertBlock(block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, pg.WithQueryer(tx)); err != nil { + return err + } + return o.insertLogsWithinTx(logs, tx) + }) +} + +func (o *DbORM) insertLogsWithinTx(logs []Log, tx pg.Queryer) error { batchInsertSize := 4000 for i := 0; i < len(logs); i += batchInsertSize { start, end := i, i+batchInsertSize @@ -247,12 +281,14 @@ func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { end = len(logs) } - err := q.ExecQNamed(` - INSERT INTO evm.logs - (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) + _, err := tx.NamedExec(` + INSERT INTO evm.logs + (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) VALUES - (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) - ON CONFLICT DO NOTHING`, logs[start:end]) + (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) + ON CONFLICT DO NOTHING`, + logs[start:end], + ) if err != nil { if errors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { @@ -267,6 +303,15 @@ func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { return nil } +func (o *DbORM) validateLogs(logs []Log) error { + for _, log := range logs { + if o.chainID.Cmp(log.EvmChainId.ToInt()) != 0 { + return errors.Errorf("invalid chainID in log got %v want %v", log.EvmChainId.ToInt(), o.chainID) + } + } + return nil +} + func (o *DbORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) { args, err := newQueryArgs(o.chainID). withStartBlock(start). diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 66e1afdc93..887984055e 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -4,6 +4,7 @@ import ( "bytes" "database/sql" "fmt" + "math" "math/big" "testing" "time" @@ -15,7 +16,10 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -179,13 +183,13 @@ func TestORM(t *testing.T) { assert.Equal(t, int64(12), latest.BlockNumber) // Delete a block (only 10 on chain). - require.NoError(t, o1.DeleteBlocksAfter(10)) + require.NoError(t, o1.DeleteLogsAndBlocksAfter(10)) _, err = o1.SelectBlockByHash(common.HexToHash("0x1234")) require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) // Delete blocks from another chain. - require.NoError(t, o2.DeleteBlocksAfter(11)) + require.NoError(t, o2.DeleteLogsAndBlocksAfter(11)) _, err = o2.SelectBlockByHash(common.HexToHash("0x1234")) require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) @@ -318,7 +322,6 @@ func TestORM(t *testing.T) { require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) // With block 12, anything <=2 should work - require.NoError(t, o1.DeleteBlocksAfter(10)) require.NoError(t, o1.InsertBlock(common.HexToHash("0x1234"), 11, time.Now(), 0)) require.NoError(t, o1.InsertBlock(common.HexToHash("0x1235"), 12, time.Now(), 0)) _, err = o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 0) @@ -421,7 +424,7 @@ func TestORM(t *testing.T) { assert.Len(t, logs, 7) // Delete logs after should delete all logs. - err = o1.DeleteLogsAfter(1) + err = o1.DeleteLogsAndBlocksAfter(1) require.NoError(t, err) logs, err = o1.SelectLogsByBlockRange(1, latest.BlockNumber) require.NoError(t, err) @@ -1301,3 +1304,133 @@ func TestNestedLogPollerBlocksQuery(t *testing.T) { require.NoError(t, err) require.Len(t, logs, 0) } + +func TestInsertLogsWithBlock(t *testing.T) { + chainID := testutils.NewRandomEVMChainID() + event := utils.RandomBytes32() + address := utils.RandomAddress() + + // We need full db here, because we want to test transaction rollbacks. + // Using pgtest.NewSqlxDB(t) will run all tests in TXs which is not desired for this type of test + // (inner tx rollback will rollback outer tx, blocking rest of execution) + _, db := heavyweight.FullTestDBV2(t, nil) + o := logpoller.NewORM(chainID, db, logger.TestLogger(t), pgtest.NewQConfig(true)) + + correctLog := GenLog(chainID, 1, 1, utils.RandomAddress().String(), event[:], address) + invalidLog := GenLog(chainID, -10, -10, utils.RandomAddress().String(), event[:], address) + correctBlock := logpoller.NewLogPollerBlock(utils.RandomBytes32(), 20, time.Now(), 10) + invalidBlock := logpoller.NewLogPollerBlock(utils.RandomBytes32(), -10, time.Now(), -10) + + tests := []struct { + name string + logs []logpoller.Log + block logpoller.LogPollerBlock + shouldRollback bool + }{ + { + name: "properly persist all data", + logs: []logpoller.Log{correctLog}, + block: correctBlock, + shouldRollback: false, + }, + { + name: "rollbacks transaction when block is invalid", + logs: []logpoller.Log{correctLog}, + block: invalidBlock, + shouldRollback: true, + }, + { + name: "rollbacks transaction when log is invalid", + logs: []logpoller.Log{invalidLog}, + block: correctBlock, + shouldRollback: true, + }, + { + name: "rollback when only some logs are invalid", + logs: []logpoller.Log{correctLog, invalidLog}, + block: correctBlock, + shouldRollback: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // clean all logs and blocks between test cases + defer func() { _ = o.DeleteLogsAndBlocksAfter(0) }() + insertError := o.InsertLogsWithBlock(tt.logs, tt.block) + + logs, logsErr := o.SelectLogs(0, math.MaxInt, address, event) + block, blockErr := o.SelectLatestBlock() + + if tt.shouldRollback { + assert.Error(t, insertError) + + assert.NoError(t, logsErr) + assert.Len(t, logs, 0) + + assert.Error(t, blockErr) + } else { + assert.NoError(t, insertError) + + assert.NoError(t, logsErr) + assert.Len(t, logs, len(tt.logs)) + + assert.NoError(t, blockErr) + assert.Equal(t, block.BlockNumber, tt.block.BlockNumber) + } + }) + } +} + +func TestInsertLogsInTx(t *testing.T) { + chainID := testutils.NewRandomEVMChainID() + event := utils.RandomBytes32() + address := utils.RandomAddress() + maxLogsSize := 9000 + + // We need full db here, because we want to test transaction rollbacks. + _, db := heavyweight.FullTestDBV2(t, nil) + o := logpoller.NewORM(chainID, db, logger.TestLogger(t), pgtest.NewQConfig(true)) + + logs := make([]logpoller.Log, maxLogsSize, maxLogsSize+1) + for i := 0; i < maxLogsSize; i++ { + logs[i] = GenLog(chainID, int64(i+1), int64(i+1), utils.RandomAddress().String(), event[:], address) + } + invalidLog := GenLog(chainID, -10, -10, utils.RandomAddress().String(), event[:], address) + + tests := []struct { + name string + logs []logpoller.Log + shouldRollback bool + }{ + { + name: "all logs persisted", + logs: logs, + shouldRollback: false, + }, + { + name: "rollback when invalid log is passed", + logs: append(logs, invalidLog), + shouldRollback: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // clean all logs and blocks between test cases + defer func() { _ = o.DeleteLogsAndBlocksAfter(0) }() + + insertErr := o.InsertLogs(tt.logs) + logsFromDb, err := o.SelectLogs(0, math.MaxInt, address, event) + assert.NoError(t, err) + + if tt.shouldRollback { + assert.Error(t, insertErr) + assert.Len(t, logsFromDb, 0) + } else { + assert.NoError(t, insertErr) + assert.Len(t, logsFromDb, len(tt.logs)) + } + }) + } +} From a4b98b9936965144a427420ab62dabcb67215a30 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Tue, 14 Nov 2023 14:34:59 +0100 Subject: [PATCH 3/3] Post cherry-pick fixes --- core/chains/evm/logpoller/orm_test.go | 4 ++-- .../ocr2/plugins/ccip/execution_reporting_plugin.go | 2 +- .../plugins/ccip/execution_reporting_plugin_test.go | 5 +++-- .../ocr2/plugins/ccip/internal/cache/cache.go | 2 +- .../ocr2/plugins/ccip/internal/cache/cache_test.go | 2 +- .../plugins/ccip/internal/cache/tokenpool_test.go | 3 ++- .../ocr2/plugins/ccip/internal/ccipdata/logpoller.go | 2 +- .../ccip/internal/ccipdata/mocks/reader_mock.go | 11 ++++++----- .../ccipdata/offramp_reader_v1_0_0_unit_test.go | 3 ++- .../plugins/ccip/internal/ccipdata/offramp_v1_0_0.go | 4 ++-- .../ccip/internal/ccipdata/price_registry_v1_0_0.go | 2 +- .../ocr2/plugins/ccip/internal/ccipdata/reader.go | 3 ++- 12 files changed, 24 insertions(+), 19 deletions(-) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 887984055e..49bf844053 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -1313,7 +1313,7 @@ func TestInsertLogsWithBlock(t *testing.T) { // We need full db here, because we want to test transaction rollbacks. // Using pgtest.NewSqlxDB(t) will run all tests in TXs which is not desired for this type of test // (inner tx rollback will rollback outer tx, blocking rest of execution) - _, db := heavyweight.FullTestDBV2(t, nil) + _, db := heavyweight.FullTestDBV2(t, "insert_test", nil) o := logpoller.NewORM(chainID, db, logger.TestLogger(t), pgtest.NewQConfig(true)) correctLog := GenLog(chainID, 1, 1, utils.RandomAddress().String(), event[:], address) @@ -1389,7 +1389,7 @@ func TestInsertLogsInTx(t *testing.T) { maxLogsSize := 9000 // We need full db here, because we want to test transaction rollbacks. - _, db := heavyweight.FullTestDBV2(t, nil) + _, db := heavyweight.FullTestDBV2(t, "insert_logs_tx", nil) o := logpoller.NewORM(chainID, db, logger.TestLogger(t), pgtest.NewQConfig(true)) logs := make([]logpoller.Log, maxLogsSize, maxLogsSize+1) diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go index a90617b4ad..1f69a20ce5 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go @@ -790,7 +790,7 @@ func (r *ExecutionReportingPlugin) getReportsWithSendRequests( } // get executable sequence numbers - executedMp, err := r.getExecutedSeqNrsInRange(ctx, intervalMin, intervalMax, latestBlock) + executedMp, err := r.getExecutedSeqNrsInRange(ctx, intervalMin, intervalMax, latestBlock.BlockNumber) if err != nil { return err } diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go index 54bad512f4..96e2205d43 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" lpMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal" @@ -108,7 +109,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) { p.config.commitStoreReader = commitStoreReader destReader := ccipdatamocks.NewReader(t) - destReader.On("LatestBlock", ctx).Return(int64(1234), nil).Maybe() + destReader.On("LatestBlock", ctx).Return(logpoller.LogPollerBlock{BlockNumber: 1234}, nil).Maybe() p.config.destReader = destReader var executionEvents []ccipdata.Event[ccipdata.ExecutionStateChanged] @@ -1014,7 +1015,7 @@ func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) { p.config.onRampReader = sourceReader destReader := ccipdatamocks.NewReader(t) - destReader.On("LatestBlock", ctx).Return(tc.destLatestBlock, nil).Maybe() + destReader.On("LatestBlock", ctx).Return(logpoller.LogPollerBlock{BlockNumber: tc.destLatestBlock}, nil).Maybe() var executedEvents []ccipdata.Event[ccipdata.ExecutionStateChanged] for _, executedSeqNum := range tc.destExecutedSeqNums { executedEvents = append(executedEvents, ccipdata.Event[ccipdata.ExecutionStateChanged]{ diff --git a/core/services/ocr2/plugins/ccip/internal/cache/cache.go b/core/services/ocr2/plugins/ccip/internal/cache/cache.go index 14b3fac770..1363aa35b6 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/cache.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/cache.go @@ -91,7 +91,7 @@ func (c *CachedChain[T]) initializeCache(ctx context.Context) (T, error) { return empty, err } - c.updateCache(value, latestBlock-c.optimisticConfirmations) + c.updateCache(value, latestBlock.BlockNumber-c.optimisticConfirmations) return c.copyCachedValue(), nil } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/cache_test.go b/core/services/ocr2/plugins/ccip/internal/cache/cache_test.go index 51dc4d914b..7d52944adb 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/cache_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/cache_test.go @@ -22,7 +22,7 @@ const ( func TestGet_InitDataForTheFirstTime(t *testing.T) { lp := lpMocks.NewLogPoller(t) - lp.On("LatestBlock", mock.Anything).Maybe().Return(int64(100), nil) + lp.On("LatestBlock", mock.Anything).Maybe().Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) contract := newCachedContract(lp, "", []string{"value1"}, 0) diff --git a/core/services/ocr2/plugins/ccip/internal/cache/tokenpool_test.go b/core/services/ocr2/plugins/ccip/internal/cache/tokenpool_test.go index b5c9b45f12..a89755e61c 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/tokenpool_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/tokenpool_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -74,7 +75,7 @@ func TestNewTokenPools(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { mockLp := mocks.NewLogPoller(t) - mockLp.On("LatestBlock", mock.Anything).Return(int64(100), nil) + mockLp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) offRamp := ccipdatamocks.NewOffRampReader(t) offRamp.On("TokenEvents").Return([]common.Hash{}) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go index 256bc0171d..715f55f12a 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go @@ -36,7 +36,7 @@ func NewLogPollerReader(lp logpoller.LogPoller, lggr logger.Logger, client evmcl } } -func (c *LogPollerReader) LatestBlock(ctx context.Context) (int64, error) { +func (c *LogPollerReader) LatestBlock(ctx context.Context) (logpoller.LogPollerBlock, error) { return c.lp.LatestBlock(pg.WithParentCtx(ctx)) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/reader_mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/reader_mock.go index 1ec57502f5..0a866f7ccf 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/reader_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/reader_mock.go @@ -5,6 +5,7 @@ package mocks import ( context "context" + logpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" mock "github.com/stretchr/testify/mock" ) @@ -14,18 +15,18 @@ type Reader struct { } // LatestBlock provides a mock function with given fields: ctx -func (_m *Reader) LatestBlock(ctx context.Context) (int64, error) { +func (_m *Reader) LatestBlock(ctx context.Context) (logpoller.LogPollerBlock, error) { ret := _m.Called(ctx) - var r0 int64 + var r0 logpoller.LogPollerBlock var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context) (logpoller.LogPollerBlock, error)); ok { return rf(ctx) } - if rf, ok := ret.Get(0).(func(context.Context) int64); ok { + if rf, ok := ret.Get(0).(func(context.Context) logpoller.LogPollerBlock); ok { r0 = rf(ctx) } else { - r0 = ret.Get(0).(int64) + r0 = ret.Get(0).(logpoller.LogPollerBlock) } if rf, ok := ret.Get(1).(func(context.Context) error); ok { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_reader_v1_0_0_unit_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_reader_v1_0_0_unit_test.go index ea816e9902..a06e0b754b 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_reader_v1_0_0_unit_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_reader_v1_0_0_unit_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib" @@ -79,7 +80,7 @@ func TestOffRampGetDestinationTokensFromSourceTokens(t *testing.T) { } lp := mocks.NewLogPoller(t) - lp.On("LatestBlock", mock.Anything).Return(int64(rand.Uint64()), nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: rand.Int63()}, nil) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_v1_0_0.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_v1_0_0.go index d797d2fa13..670f7a61a1 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_v1_0_0.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_v1_0_0.go @@ -164,7 +164,7 @@ func (o *OffRampV1_0_0) GetDestinationTokensFromSourceTokens(ctx context.Context return nil, fmt.Errorf("get latest block: %w", err) } - results, err := o.evmBatchCaller.BatchCall(ctx, uint64(latestBlock), evmCalls) + results, err := o.evmBatchCaller.BatchCall(ctx, uint64(latestBlock.BlockNumber), evmCalls) if err != nil { return nil, fmt.Errorf("batch call limit: %w", err) } @@ -207,7 +207,7 @@ func (o *OffRampV1_0_0) GetTokenPoolsRateLimits(ctx context.Context, poolAddress return nil, fmt.Errorf("get latest block: %w", err) } - results, err := o.evmBatchCaller.BatchCall(ctx, uint64(latestBlock), evmCalls) + results, err := o.evmBatchCaller.BatchCall(ctx, uint64(latestBlock.BlockNumber), evmCalls) if err != nil { return nil, fmt.Errorf("batch call limit: %w", err) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/price_registry_v1_0_0.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/price_registry_v1_0_0.go index a070bbcdb7..b78aec9b37 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/price_registry_v1_0_0.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/price_registry_v1_0_0.go @@ -156,7 +156,7 @@ func (p *PriceRegistryV1_0_0) GetTokensDecimals(ctx context.Context, tokenAddres return nil, fmt.Errorf("get latest block: %w", err) } - results, err := p.evmBatchCaller.BatchCall(ctx, uint64(latestBlock), evmCalls) + results, err := p.evmBatchCaller.BatchCall(ctx, uint64(latestBlock.BlockNumber), evmCalls) if err != nil { return nil, fmt.Errorf("batch call limit: %w", err) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go index 1f67bd56d1..eb75b12e28 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) @@ -36,5 +37,5 @@ type Closer interface { //go:generate mockery --quiet --name Reader --filename reader_mock.go --case=underscore type Reader interface { // LatestBlock returns the latest known/parsed block of the underlying implementation. - LatestBlock(ctx context.Context) (int64, error) + LatestBlock(ctx context.Context) (logpoller.LogPollerBlock, error) }