diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index dd7e0c5242..0fcead7f13 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -75,7 +75,7 @@ type LogPoller interface { type LogPollerTest interface { LogPoller PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) - BackupPollAndSaveLogs(ctx context.Context) + BackupPollAndSaveLogs(ctx context.Context) error Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) PruneOldBlocks(ctx context.Context) (bool, error) @@ -407,6 +407,12 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) { defer func() { if errors.Is(err, context.Canceled) { err = ErrReplayRequestAborted + } else if errors.Is(err, ErrFinalityViolated) { + // Replay only declares finality violation and does not resolve it, as it's possible that [fromBlock, savedFinalizedBlockNumber] + // does not contain the violation. + lp.lggr.Criticalw("Replay failed due to finality violation", "fromBlock", fromBlock, "err", err) + lp.finalityViolated.Store(true) + lp.SvcErrBuffer.Append(err) } }() @@ -625,7 +631,17 @@ func (lp *logPoller) run() { lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping") continue } - lp.BackupPollAndSaveLogs(ctx) + err := lp.BackupPollAndSaveLogs(ctx) + switch { + case errors.Is(err, ErrFinalityViolated): + // BackupPoll only declares finality violation and does not resolve it, as it's possible that processed range + // does not contain the violation. + lp.lggr.Criticalw("Backup poll failed due to finality violation", "err", err) + lp.finalityViolated.Store(true) + lp.SvcErrBuffer.Append(err) + case err != nil: + lp.lggr.Errorw("Backup poller failed, retrying later", "err", err) + } } } } @@ -695,16 +711,16 @@ func (lp *logPoller) handleReplayRequest(ctx context.Context, fromBlockReq int64 } } -func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { +func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) error { if lp.backupPollerNextBlock == 0 { lastProcessed, err := lp.orm.SelectLatestBlock(ctx) if err != nil { if pkgerrors.Is(err, sql.ErrNoRows) { lp.lggr.Warnw("Backup log poller ran before first successful log poller run, skipping") - } else { - lp.lggr.Errorw("Backup log poller unable to get starting block", "err", err) + return nil } - return + + return fmt.Errorf("unable to get starting block: %w", err) } // If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-backupPollerBlockDelay) backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-lp.backupPollerBlockDelay) @@ -715,7 +731,7 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { _, latestFinalizedBlockNumber, err := lp.latestBlocks(ctx) if err != nil { lp.lggr.Warnw("Backup logpoller failed to get latest block", "err", err) - return + return nil } lastSafeBackfillBlock := latestFinalizedBlockNumber - 1 @@ -724,12 +740,13 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { if err = lp.backfill(ctx, lp.backupPollerNextBlock, lastSafeBackfillBlock); err != nil { // If there's an error backfilling, we can just return and retry from the last block saved // since we don't save any blocks on backfilling. We may re-insert the same logs but thats ok. - lp.lggr.Warnw("Backup poller failed", "err", err) - return + return fmt.Errorf("backfill failed: %w", err) } lp.lggr.Infow("Backup poller finished backfilling", "start", lp.backupPollerNextBlock, "end", lastSafeBackfillBlock) lp.backupPollerNextBlock = lastSafeBackfillBlock + 1 } + + return nil } // convertLogs converts an array of geth logs ([]type.Log) to an array of logpoller logs ([]Log) @@ -778,17 +795,32 @@ func convertTopics(topics []common.Hash) [][]byte { return topicsForDB } -// blocksFromLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber, +// blocksFromFinalizedLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber, // whether or not there are any logs in the list from that block -func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) { - var numbers []uint64 +func (lp *logPoller) blocksFromFinalizedLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) { + numbers := make([]uint64, 0, len(logs)) for _, log := range logs { numbers = append(numbers, log.BlockNumber) } if numbers[len(numbers)-1] != endBlockNumber { numbers = append(numbers, endBlockNumber) } - return lp.GetBlocksRange(ctx, numbers) + blocks, err = lp.GetBlocksRange(ctx, numbers) + if err != nil { + return nil, err + } + + if len(logs) == 0 { + return blocks, nil + } + + for i, log := range logs { + if log.BlockHash != blocks[i].BlockHash { + return nil, fmt.Errorf("finalized log produced by tx %s has block hash %s that does not match fetched block's hash %s: %w", log.TxHash, log.BlockHash, blocks[i].BlockHash, ErrFinalityViolated) + } + } + + return blocks, nil } // backfill will query FilterLogs in batches for logs in the @@ -819,7 +851,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { if len(gethLogs) == 0 { continue } - blocks, err := lp.blocksFromLogs(ctx, gethLogs, uint64(to)) + blocks, err := lp.blocksFromFinalizedLogs(ctx, gethLogs, uint64(to)) //nolint:gosec // G115 if err != nil { return err } @@ -849,11 +881,6 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { // 2. Delete all logs and blocks after the LCA // 3. Return the LCA+1, i.e. our new current (unprocessed) block. func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) { - defer func() { - if err == nil { - lp.finalityViolated.Store(false) - } - }() var err1 error if currentBlock == nil { @@ -884,7 +911,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren // We will not have the previous currentBlock on initial poll. havePreviousBlock := err1 == nil if !havePreviousBlock { - lp.lggr.Infow("Do not have previous block, first poll ever on new chain or after backfill", "currentBlockNumber", currentBlockNumber) + lp.lggr.Infow("Do not have previous block, first poll ever on new chain", "currentBlockNumber", currentBlockNumber) return currentBlock, nil } // Check for reorg. @@ -894,8 +921,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber) if err2 != nil { - lp.lggr.Warnw("Unable to find LCA after reorg, retrying", "err", err2) - return nil, pkgerrors.New("Unable to find LCA after reorg, retrying") + return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) } lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber) @@ -922,20 +948,40 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren // currentBlockNumber is the block from where new logs are to be polled & saved. Under normal // conditions this would be equal to lastProcessed.BlockNumber + 1. func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) { + err := lp.pollAndSaveLogs(ctx, currentBlockNumber) + if errors.Is(err, ErrFinalityViolated) { + lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err) + lp.finalityViolated.Store(true) + lp.SvcErrBuffer.Append(err) + return + } + + if err != nil { + lp.lggr.Errorw("Failed to poll and save logs, retrying later", "err", err) + return + } + + if lp.finalityViolated.Load() { + lp.lggr.Info("PollAndSaveLogs completed successfully - removing finality violation flag") + lp.finalityViolated.Store(false) + } +} + +func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) (err error) { lp.lggr.Debugw("Polling for logs", "currentBlockNumber", currentBlockNumber) // Intentionally not using logPoller.finalityDepth directly but the latestFinalizedBlockNumber returned from lp.latestBlocks() // latestBlocks knows how to pick a proper latestFinalizedBlockNumber based on the logPoller's configuration latestBlock, latestFinalizedBlockNumber, err := lp.latestBlocks(ctx) if err != nil { lp.lggr.Warnw("Unable to get latestBlockNumber block", "err", err, "currentBlockNumber", currentBlockNumber) - return + return nil } latestBlockNumber := latestBlock.Number if currentBlockNumber > latestBlockNumber { // Note there can also be a reorg "shortening" i.e. chain height decreases but TDD increases. In that case // we also just wait until the new tip is longer and then detect the reorg. lp.lggr.Debugw("No new blocks since last poll", "currentBlockNumber", currentBlockNumber, "latestBlockNumber", latestBlockNumber) - return + return nil } var currentBlock *evmtypes.Head if currentBlockNumber == latestBlockNumber { @@ -948,8 +994,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved and retry. - lp.lggr.Errorw("Unable to get current block, retrying", "err", err) - return + return fmt.Errorf("unable to get current block: %w", err) } currentBlockNumber = currentBlock.Number @@ -964,8 +1009,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int if err = lp.backfill(ctx, currentBlockNumber, lastSafeBackfillBlock); err != nil { // If there's an error backfilling, we can just return and retry from the last block saved // since we don't save any blocks on backfilling. We may re-insert the same logs but thats ok. - lp.lggr.Warnw("Unable to backfill finalized logs, retrying later", "err", err) - return + return fmt.Errorf("failed to backfill finalized logs: %w", err) } currentBlockNumber = lastSafeBackfillBlock + 1 } @@ -976,8 +1020,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved. - lp.lggr.Errorw("Unable to get current block", "err", err) - return + return fmt.Errorf("failed to get current block: %w", err) } currentBlockNumber = currentBlock.Number } @@ -998,7 +1041,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int ) if err != nil { lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber) - return + return nil } // Update current block. // Same reorg detection on unfinalized blocks. @@ -1007,6 +1050,8 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int break } } + + return nil } // Returns information about latestBlock, latestFinalizedBlockNumber provided by HeadTracker @@ -1042,12 +1087,14 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He // We loop via parent instead of current so current always holds the LCA+1. // If the parent block number becomes < the first finalized block our reorg is too deep. // This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config. + var ourParentBlockHash common.Hash for parent.Number >= latestFinalizedBlockNumber { - ourParentBlockHash, err := lp.orm.SelectBlockByNumber(ctx, parent.Number) + outParentBlock, err := lp.orm.SelectBlockByNumber(ctx, parent.Number) if err != nil { return nil, err } - if parent.Hash == ourParentBlockHash.BlockHash { + ourParentBlockHash = outParentBlock.BlockHash + if parent.Hash == ourParentBlockHash { // If we do have the blockhash, return blockAfterLCA return blockAfterLCA, nil } @@ -1058,11 +1105,9 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He return nil, err } } + lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber) - rerr := pkgerrors.New("Reorg greater than finality depth") - lp.SvcErrBuffer.Append(rerr) - lp.finalityViolated.Store(true) - return nil, rerr + return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number) } // PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block. @@ -1256,11 +1301,10 @@ func (lp *logPoller) fillRemainingBlocksFromRPC( blocksRequested map[uint64]struct{}, blocksFound map[uint64]LogPollerBlock, ) (map[uint64]LogPollerBlock, error) { - var remainingBlocks []string - + remainingBlocks := make([]uint64, 0, len(blocksRequested)) for num := range blocksRequested { if _, ok := blocksFound[num]; !ok { - remainingBlocks = append(remainingBlocks, hexutil.EncodeBig(new(big.Int).SetUint64(num))) + remainingBlocks = append(remainingBlocks, num) } } @@ -1276,13 +1320,13 @@ func (lp *logPoller) fillRemainingBlocksFromRPC( logPollerBlocks := make(map[uint64]LogPollerBlock) for _, head := range evmBlocks { - logPollerBlocks[uint64(head.Number)] = LogPollerBlock{ - EvmChainId: head.EVMChainID, - BlockHash: head.Hash, - BlockNumber: head.Number, - BlockTimestamp: head.Timestamp, - FinalizedBlockNumber: head.Number, // always finalized; only matters if this block is returned by LatestBlock() - CreatedAt: head.Timestamp, + logPollerBlocks[uint64(head.BlockNumber)] = LogPollerBlock{ + EvmChainId: head.EvmChainId, + BlockHash: head.BlockHash, + BlockNumber: head.BlockNumber, + BlockTimestamp: head.BlockTimestamp, + FinalizedBlockNumber: head.BlockNumber, // always finalized; only matters if this block is returned by LatestBlock() + CreatedAt: head.BlockTimestamp, } } return logPollerBlocks, nil @@ -1304,25 +1348,23 @@ var ( finalizedBlock blockValidationType = blockValidationType(rpc.FinalizedBlockNumber.String()) ) -// fetchBlocks fetches a list of blocks in a single batch. validationReq is the string to use for the +// fetchBlocks fetches a list of blocks in a single batch. finalityValidationReq is the string to use for the // additional validation request (either the "finalized" or "latest" string defined in rpc module), which // will be used to validate the finality of the other blocks. -func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string, validationReq blockValidationType) (blocks []*evmtypes.Head, err error) { +// chainReference - is used to verify that fetched blocks belong to the same chain as referenced head. +func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []uint64, finalityValidationReq blockValidationType, chainReference *LogPollerBlock) (blocks map[uint64]*evmtypes.Head, err error) { n := len(blocksRequested) - blocks = make([]*evmtypes.Head, 0, n+1) - reqs := make([]rpc.BatchElem, 0, n+1) + blocks = make(map[uint64]*evmtypes.Head, n+2) + reqs := make([]rpc.BatchElem, 0, n+2) - validationBlockIndex := n - for k, num := range blocksRequested { - if num == string(validationReq) { - validationBlockIndex = k - } - reqs = append(reqs, newBlockReq(num)) + for _, num := range blocksRequested { + reqs = append(reqs, newBlockReq(hexutil.EncodeBig(big.NewInt(0).SetUint64(num)))) } - if validationBlockIndex == n { - // Add validation req if it wasn't in there already - reqs = append(reqs, newBlockReq(string(validationReq))) + reqs = append(reqs, newBlockReq(string(finalityValidationReq))) + + if chainReference != nil { + reqs = append(reqs, newBlockReq(hexutil.EncodeBig(big.NewInt(chainReference.BlockNumber)))) } err = lp.ec.BatchCallContext(ctx, reqs) @@ -1330,64 +1372,128 @@ func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string, return nil, err } - validationBlock, err := validateBlockResponse(reqs[validationBlockIndex]) + // ensure that requested blocks belong to the same chain as referenced head + if chainReference != nil { + var rpcChainReference *evmtypes.Head + rpcChainReference, err = validateBlockResponse(reqs[len(reqs)-1]) + if err != nil { + return nil, err + } + + if rpcChainReference.Hash != chainReference.BlockHash { + return nil, fmt.Errorf("expected RPC's finalized block hash at hegiht %d to be %s but got %s: %w", + chainReference.BlockNumber, chainReference.BlockHash, rpcChainReference.Hash, ErrFinalityViolated) + } + + reqs = reqs[:len(reqs)-1] // no need to include chain reference into results + } + + latestFinalized, err := validateBlockResponse(reqs[len(reqs)-1]) if err != nil { return nil, err } - latestFinalizedBlockNumber := validationBlock.Number - if validationReq == latestBlock { + latestFinalizedBlockNumber := latestFinalized.Number + if finalityValidationReq == latestBlock { // subtract finalityDepth from "latest" to get finalized, when useFinalityTags = false latestFinalizedBlockNumber = mathutil.Max(latestFinalizedBlockNumber-lp.finalityDepth, 0) } - if len(reqs) == n+1 { - reqs = reqs[:n] // ignore last req if we added it explicitly for validation - } - for k, r := range reqs { - if k == validationBlockIndex { - // Already validated this one, just insert it in proper place - blocks = append(blocks, validationBlock) - continue - } + reqs = reqs[:len(reqs)-1] // no need to include finality validation request into results - block, err2 := validateBlockResponse(r) - if err2 != nil { - return nil, err2 + for i, r := range reqs { + block, err := validateBlockResponse(r) + if err != nil { + return nil, err } blockRequested := r.Args[0].(string) if blockRequested != string(latestBlock) && block.Number > latestFinalizedBlockNumber { return nil, fmt.Errorf( - "Received unfinalized block %d while expecting finalized block (latestFinalizedBlockNumber = %d)", + "received unfinalized block %d while expecting finalized block (latestFinalizedBlockNumber = %d)", block.Number, latestFinalizedBlockNumber) } - blocks = append(blocks, block) + blocks[blocksRequested[i]] = block } return blocks, nil } -func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) { - var blocks = make([]*evmtypes.Head, 0, len(blocksRequested)+1) - +func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []uint64, batchSize int64) (map[uint64]LogPollerBlock, error) { validationReq := finalizedBlock if !lp.useFinalityTag { validationReq = latestBlock } + chainValidationHead, err := lp.orm.SelectLatestFinalizedBlock(ctx) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to fetch latest finalized block from db: %w", err) + } + + var logPollerBlocks = make(map[uint64]LogPollerBlock, len(blocksRequested)) for i := 0; i < len(blocksRequested); i += int(batchSize) { j := i + int(batchSize) if j > len(blocksRequested) { j = len(blocksRequested) } - moreBlocks, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq) + + // As batch requests are not atomic, there is a chance that some of the blocks were replaced due to a reorg once we've observed them. + // Example: + // 1. RPC's chain is 1,2',3',4',5' (latest finalized is 1). + // 2. Batch request reads blocks 1,2'. + // 3. RPC updates its state to 1,2,3,4,5 (latest finalized is 5). + // 4. Batch request reads 4,5. + // As a result, we'll treat block 2' as finalized. To address that, we have to fetch all blocks twice and verify that the results are identical. + fetched1, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq, chainValidationHead) if err != nil { return nil, err } - blocks = append(blocks, moreBlocks...) + + fetched2, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq, chainValidationHead) + if err != nil { + return nil, err + } + + err = ensureIdenticalBlocksBatches(fetched1, fetched2) + if err != nil { + return nil, err + } + + for _, head := range fetched1 { + lpBlock := LogPollerBlock{ + EvmChainId: head.EVMChainID, + BlockHash: head.Hash, + BlockNumber: head.Number, + BlockTimestamp: head.Timestamp, + FinalizedBlockNumber: head.Number, // always finalized; only matters if this block is returned by LatestBlock() + CreatedAt: head.CreatedAt, + } + logPollerBlocks[uint64(head.Number)] = lpBlock //nolint:gosec // G115 + if chainValidationHead == nil || chainValidationHead.BlockNumber < lpBlock.BlockNumber { + chainValidationHead = &lpBlock + } + } } - return blocks, nil + return logPollerBlocks, nil +} + +func ensureIdenticalBlocksBatches(fetched1, fetched2 map[uint64]*evmtypes.Head) error { + if len(fetched1) != len(fetched2) { + return fmt.Errorf("invariant violation: expected size of batches to be identical. Fetched1: %d, Fetched2: %d", len(fetched1), len(fetched2)) + } + + for num, head1 := range fetched1 { + head2, ok := fetched2[num] + if !ok { + return fmt.Errorf("invariant violation: expected fetched1 to contain same blocks as fetched2, but %d is missing from fetched2", num) + } + + if head1.Hash != head2.Hash { + return fmt.Errorf("expected block %d to be finalized but got different hashes %s and %s from RPC: %w", num, head1.Hash, head2.Hash, ErrFinalityViolated) + } + } + + return nil } func validateBlockResponse(r rpc.BatchElem) (*evmtypes.Head, error) { diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index 620bbf14f4..58ee52b8a3 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -22,11 +22,14 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks" evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" @@ -243,7 +246,7 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { BackupPollerBlockDelay: 0, } lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.BackupPollAndSaveLogs(ctx) + require.NoError(t, lp.BackupPollAndSaveLogs(ctx)) assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) @@ -253,11 +256,28 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { require.NoError(t, err) require.Equal(t, latestBlock, lastProcessed.BlockNumber) - lp.BackupPollAndSaveLogs(ctx) + require.NoError(t, lp.BackupPollAndSaveLogs(ctx)) assert.Equal(t, int64(2), lp.backupPollerNextBlock) } func mockBatchCallContext(t *testing.T, ec *evmclimocks.Client) { + mockBatchCallContextWithHead(t, ec, newHeadVal) +} + +func newHeadVal(num int64) evmtypes.Head { + return evmtypes.Head{ + Number: num, + Hash: common.BigToHash(big.NewInt(num)), + ParentHash: common.BigToHash(big.NewInt(num - 1)), + } +} + +func newHead(num int64) *evmtypes.Head { + r := newHeadVal(num) + return &r +} + +func mockBatchCallContextWithHead(t *testing.T, ec *evmclimocks.Client, newHead func(num int64) evmtypes.Head) { ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { elems := args.Get(1).([]rpc.BatchElem) for _, e := range elems { @@ -274,7 +294,7 @@ func mockBatchCallContext(t *testing.T, ec *evmclimocks.Client) { num = int64(n) } result := e.Result.(*evmtypes.Head) - *result = evmtypes.Head{Number: num, Hash: utils.NewHash()} + *result = newHead(num) } }) } @@ -289,12 +309,12 @@ func TestLogPoller_Replay(t *testing.T) { orm := NewORM(chainID, db, lggr) var head atomic.Pointer[evmtypes.Head] - head.Store(&evmtypes.Head{Number: 4}) + head.Store(newHead(4)) events := []common.Hash{EmitterABI.Events["Log1"].ID} log1 := types.Log{ Index: 0, - BlockHash: common.Hash{}, + BlockHash: common.BigToHash(big.NewInt(head.Load().Number)), BlockNumber: uint64(head.Load().Number), Topics: events, Address: addr, @@ -303,6 +323,9 @@ func TestLogPoller_Replay(t *testing.T) { } ec := evmclimocks.NewClient(t) + ec.EXPECT().HeadByHash(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, hash common.Hash) (*evmtypes.Head, error) { + return &evmtypes.Head{Number: hash.Big().Int64(), Hash: hash}, nil + }).Maybe() ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(context.Context, *big.Int) (*evmtypes.Head, error) { return head.Load(), nil }) @@ -321,7 +344,7 @@ func TestLogPoller_Replay(t *testing.T) { headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(func(ctx context.Context) (*evmtypes.Head, *evmtypes.Head, error) { h := head.Load() - finalized := &evmtypes.Head{Number: h.Number - lpOpts.FinalityDepth} + finalized := newHead(h.Number - lpOpts.FinalityDepth) return h, finalized, nil }) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) @@ -423,7 +446,7 @@ func TestLogPoller_Replay(t *testing.T) { ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms - head.Store(&evmtypes.Head{Number: 5}) + head.Store(newHead(5)) t.Cleanup(lp.reset) servicetest.Run(t, lp) @@ -450,7 +473,7 @@ func TestLogPoller_Replay(t *testing.T) { go func() { defer close(done) - head.Store(&evmtypes.Head{Number: 4}) // Restore latest block to 4, so this matches the fromBlock requested + head.Store(newHead(4)) // Restore latest block to 4, so this matches the fromBlock requested select { case lp.replayStart <- 4: case <-ctx.Done(): @@ -471,7 +494,7 @@ func TestLogPoller_Replay(t *testing.T) { ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) t.Cleanup(lp.reset) - head.Store(&evmtypes.Head{Number: 5}) // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs() + head.Store(newHead(6)) // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs() servicetest.Run(t, lp) select { @@ -492,7 +515,7 @@ func TestLogPoller_Replay(t *testing.T) { lp.ReplayAsync(1) - recvStartReplay(testutils.Context(t), 2) + recvStartReplay(testutils.Context(t), 4) }) t.Run("ReplayAsync error", func(t *testing.T) { @@ -532,7 +555,7 @@ func TestLogPoller_Replay(t *testing.T) { require.NoError(t, err) h := head.Load() - err = lp.orm.InsertBlock(ctx, h.Hash, h.Number, h.Timestamp, h.Number) + err = lp.orm.InsertBlock(ctx, common.BigToHash(big.NewInt(h.Number)), h.Number, h.Timestamp, h.Number) require.NoError(t, err) ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) @@ -603,25 +626,35 @@ func Test_FetchBlocks(t *testing.T) { cases := []struct { name string - blocksRequested []string + blocksRequested []uint64 + chainReference *LogPollerBlock expectedErr error - }{{ - "successful validation including finalized and latest", - []string{"0x3", "latest", "0x5", "finalized", "0x1"}, - nil, - }, { - "successful validation with all block numbers", - []string{"0x2", "0x5", "0x3", "0x4"}, - nil, - }, { - "finality violation including finalized and latest", - []string{"0x8", "0x2", "latest", "finalized"}, - errors.New("Received unfinalized block 8 while expecting finalized block (latestFinalizedBlockNumber = 5)"), - }, { - "finality violation with all block numbers", - []string{"0x9", "0x2", "finalized", "latest"}, - errors.New("Received unfinalized block 9 while expecting finalized block (latestFinalizedBlockNumber = 5)"), - }} + }{ + { + "All blocks are finalized from RPC's perspective, no reference", + []uint64{2, 5, 3, 4}, + nil, + nil, + }, + { + "RPC's latest finalized is lower than request, no reference", + []uint64{8, 2}, + nil, + errors.New("received unfinalized block 8 while expecting finalized block (latestFinalizedBlockNumber = 5)"), + }, + { + "All blocks are finalized, but chain reference does not match", + []uint64{2, 5, 3, 4}, + &LogPollerBlock{BlockNumber: 1, BlockHash: common.BigToHash(big.NewInt(2))}, + errors.New("expected RPC's finalized block hash at hegiht 1 to be 0x0000000000000000000000000000000000000000000000000000000000000002 but got 0x0000000000000000000000000000000000000000000000000000000000000001: finality violated"), + }, + { + "All blocks are finalized and chain reference matches", + []uint64{2, 5, 3, 4}, + &LogPollerBlock{BlockNumber: 1, BlockHash: common.BigToHash(big.NewInt(1))}, + nil, + }, + } lp := NewLogPoller(orm, ec, lggr, nil, lpOpts) for _, tc := range cases { @@ -631,29 +664,111 @@ func Test_FetchBlocks(t *testing.T) { blockValidationReq = finalizedBlock } t.Run(fmt.Sprintf("%s where useFinalityTag=%t", tc.name, lp.useFinalityTag), func(t *testing.T) { - blocks, err := lp.fetchBlocks(ctx, tc.blocksRequested, blockValidationReq) + blocks, err := lp.fetchBlocks(ctx, tc.blocksRequested, blockValidationReq, tc.chainReference) if tc.expectedErr != nil { require.Equal(t, err.Error(), tc.expectedErr.Error()) return // PASS } require.NoError(t, err) - for i, blockRequested := range tc.blocksRequested { - switch blockRequested { - case string(latestBlock): - assert.Equal(t, int64(8), blocks[i].Number) - case string(finalizedBlock): - assert.Equal(t, int64(5), blocks[i].Number) - default: - blockNum, err2 := hexutil.DecodeUint64(blockRequested) - require.NoError(t, err2) - assert.Equal(t, int64(blockNum), blocks[i].Number) - } + for _, blockRequested := range tc.blocksRequested { + assert.Equal(t, blockRequested, uint64(blocks[blockRequested].Number)) //nolint:gosec // G115 } }) } } } +func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + lpOpts := Opts{ + PollPeriod: time.Second, + FinalityDepth: 3, + BackfillBatchSize: 3, + RpcBatchSize: 3, + KeepFinalizedBlocksDepth: 20, + BackupPollerBlockDelay: 0, + } + t.Run("Finalized DB block is not present in RPC's chain", func(t *testing.T) { + lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel) + orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr) + headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) + finalized := newHead(5) + latest := newHead(16) + headTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).RunAndReturn(func(ctx context.Context) (*evmtypes.Head, *evmtypes.Head, error) { + return latest, finalized, nil + }).Once() + ec := evmclimocks.NewClient(t) + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, number *big.Int) (*evmtypes.Head, error) { + return newHead(number.Int64()), nil + }) + ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5}}, nil).Once() + mockBatchCallContext(t, ec) + // insert finalized block with different hash than in RPC + require.NoError(t, orm.InsertBlock(tests.Context(t), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2)) + lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) + lp.PollAndSaveLogs(tests.Context(t), 4) + require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated) + }) + t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) { + lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel) + orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr) + headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) + finalized := newHead(5) + latest := newHead(16) + headTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once() + ec := evmclimocks.NewClient(t) + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, number *big.Int) (*evmtypes.Head, error) { + return newHead(number.Int64()), nil + }) + ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5}}, nil).Once() + mockBatchCallContextWithHead(t, ec, func(num int64) evmtypes.Head { + // return new hash for every call + return evmtypes.Head{Number: num, Hash: utils.NewHash()} + }) + lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) + lp.PollAndSaveLogs(tests.Context(t), 4) + require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated) + }) + t.Run("Log's hash does not match block's", func(t *testing.T) { + lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel) + orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr) + headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) + finalized := newHead(5) + latest := newHead(16) + headTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once() + ec := evmclimocks.NewClient(t) + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, number *big.Int) (*evmtypes.Head, error) { + return newHead(number.Int64()), nil + }) + ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.HexToHash("0x123")}}, nil).Once() + mockBatchCallContext(t, ec) + lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) + lp.PollAndSaveLogs(tests.Context(t), 4) + require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated) + }) + t.Run("Happy path", func(t *testing.T) { + lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel) + chainID := testutils.NewRandomEVMChainID() + orm := NewORM(chainID, db, lggr) + headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) + finalized := newHead(5) + latest := newHead(16) + headTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once() + ec := evmclimocks.NewClient(t) + ec.EXPECT().ConfiguredChainID().Return(chainID) + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, number *big.Int) (*evmtypes.Head, error) { + return newHead(number.Int64()), nil + }) + ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.BigToHash(big.NewInt(5)), Topics: []common.Hash{{}}}}, nil).Once() + mockBatchCallContext(t, ec) + lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) + lp.PollAndSaveLogs(tests.Context(t), 4) + require.NoError(t, lp.HealthReport()[lp.Name()]) + }) +} + func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) lpOpts := Opts{ diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 1ab548063a..b969e4ba78 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1350,7 +1350,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { blockNums = []uint64{2} _, err = th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums) require.Error(t, err) - assert.Equal(t, "Received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error()) + assert.Equal(t, "received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error()) th.Client.Commit() // Commit block #4, so that block #2 is finalized @@ -1419,12 +1419,6 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { _, err = th.LogPoller.GetBlocksRange(ctx, blockNums) require.Error(t, err) assert.Contains(t, err.Error(), "context canceled") - - // test canceled ctx - ctx, cancel = context.WithCancel(testutils.Context(t)) - cancel() - _, err = th.LogPoller.GetBlocksRange(ctx, blockNums) - require.Equal(t, err, context.Canceled) } func TestGetReplayFromBlock(t *testing.T) { @@ -1679,11 +1673,11 @@ func TestTooManyLogResults(t *testing.T) { crit := obs.FilterLevelExact(zapcore.DPanicLevel).All() errors := obs.FilterLevelExact(zapcore.ErrorLevel).All() warns := obs.FilterLevelExact(zapcore.WarnLevel).All() - assert.Len(t, crit, 0) - require.Len(t, errors, 1) - assert.Equal(t, errors[0].Message, "Unable to query for logs") - require.Len(t, warns, 1) - assert.Contains(t, warns[0].Message, "retrying later") + assert.Empty(t, crit) + require.Len(t, errors, 2) + assert.Contains(t, errors[0].Message, "Unable to query for logs") + assert.Contains(t, errors[1].Message, "Failed to poll and save logs, retrying later") + require.Empty(t, warns) }) } diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 0b5a8f4bd4..20ed4f5951 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -43,6 +43,7 @@ type ORM interface { SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) + SelectLatestFinalizedBlock(ctx context.Context) (*LogPollerBlock, error) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error) SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error) @@ -989,6 +990,18 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim return logs, nil } +func (o *DSORM) SelectLatestFinalizedBlock(ctx context.Context) (*LogPollerBlock, error) { + var b LogPollerBlock + if err := o.ds.GetContext(ctx, &b, + `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number <= ( + SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1 + ) ORDER BY block_number DESC LIMIT 1`, ubig.New(o.chainID), + ); err != nil { + return nil, err + } + return &b, nil +} + func nestedBlockNumberQuery(confs evmtypes.Confirmations) string { if confs == evmtypes.Finalized { return ` diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index ba66e166eb..5c0b1e42fe 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -2168,3 +2168,35 @@ func TestSelectOldestBlock(t *testing.T) { require.Equal(t, block.BlockHash, common.HexToHash("0x1233")) }) } + +func TestSelectLatestFinalizedBlock(t *testing.T) { + t.Run("If finalized block is not present in DB return error", func(t *testing.T) { + th := SetupTH(t, lpOpts) + o1 := th.ORM + o2 := th.ORM2 + ctx := testutils.Context(t) + // o2's chain does not have finalized block + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1231"), 11, time.Now(), 9)) + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1234"), 10, time.Now(), 8)) + // o1 has finalized blocks + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 11, time.Now(), 10)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 10, time.Now(), 10)) + result, err := o2.SelectLatestFinalizedBlock(ctx) + require.ErrorIs(t, err, sql.ErrNoRows) + require.Nil(t, result) + }) + t.Run("Returns latest finalized block even if there is no exact match by block number", func(t *testing.T) { + th := SetupTH(t, lpOpts) + o1 := th.ORM + ctx := testutils.Context(t) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 12, time.Now(), 10)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 11, time.Now(), 9)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1231"), 5, time.Now(), 4)) + require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1230"), 4, time.Now(), 4)) + result, err := o1.SelectLatestFinalizedBlock(ctx) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, int64(5), result.BlockNumber) + require.Equal(t, common.HexToHash("0x1231"), result.BlockHash) + }) +}