From 8d6304600afec0dfc3c32328e46828219ea02178 Mon Sep 17 00:00:00 2001 From: amit-momin Date: Wed, 29 Jan 2025 16:29:44 -0600 Subject: [PATCH] Updated the Finalizer to reduce the frequency of heavy query --- .changeset/giant-dancers-tie.md | 5 ++ core/chains/evm/txmgr/finalizer.go | 70 ++++++++++++++++++++++++- core/chains/evm/txmgr/finalizer_test.go | 24 +++++++++ 3 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 .changeset/giant-dancers-tie.md diff --git a/.changeset/giant-dancers-tie.md b/.changeset/giant-dancers-tie.md new file mode 100644 index 00000000000..f54d93e2021 --- /dev/null +++ b/.changeset/giant-dancers-tie.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Updated EVM TXM's Finalizer component to reduce the frequency of heavy DB query #internal diff --git a/core/chains/evm/txmgr/finalizer.go b/core/chains/evm/txmgr/finalizer.go index bc496202cd6..662f2368a16 100644 --- a/core/chains/evm/txmgr/finalizer.go +++ b/core/chains/evm/txmgr/finalizer.go @@ -58,7 +58,10 @@ var ( ) // processHeadTimeout represents a sanity limit on how long ProcessHead should take to complete -const processHeadTimeout = 10 * time.Minute +const ( + processHeadTimeout = 10 * time.Minute + attemptsCacheRefreshThreshold = 5 +) type finalizerTxStore interface { DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error @@ -103,6 +106,9 @@ type evmFinalizer struct { lastProcessedFinalizedBlockNum int64 resumeCallback resumeCallback + + attemptsCache []TxAttempt + attemptsCacheHitCount int } func NewEvmFinalizer( @@ -125,6 +131,7 @@ func NewEvmFinalizer( headTracker: headTracker, mb: mailbox.NewSingle[*evmtypes.Head](), resumeCallback: nil, + attemptsCacheHitCount: attemptsCacheRefreshThreshold, // start hit count at threshold to refresh cache on first run } } @@ -377,7 +384,7 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, block } func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFinalizedHead *evmtypes.Head) error { - attempts, err := f.txStore.FindAttemptsRequiringReceiptFetch(ctx, f.chainID) + attempts, err := f.fetchAttemptsRequiringReceiptFetch(ctx) if err != nil { return fmt.Errorf("failed to fetch broadcasted attempts for confirmed transactions: %w", err) } @@ -413,6 +420,8 @@ func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFi errorList = append(errorList, err) continue } + // Filter out attempts with found receipts from cache, if needed + f.filterAttemptsCache(receipts) } if len(errorList) > 0 { return errors.Join(errorList...) @@ -666,3 +675,60 @@ func (f *evmFinalizer) buildTxHashList(finalizedReceipts []*evmtypes.Receipt) [] } return txHashes } + +// fetchAttemptsRequiringReceiptFetch is a wrapper around the TxStore call to fetch attempts requiring receipt fetch. +// Attempts are cached and used for subsequent fetches to reduce the load of the query. +// The attempts cache is refreshed every 3 requests. +func (f *evmFinalizer) fetchAttemptsRequiringReceiptFetch(ctx context.Context) ([]TxAttempt, error) { + // Return attempts from attempts cache if it is populated and the hit count has not reached the threshold for refresh + if f.attemptsCacheHitCount < attemptsCacheRefreshThreshold { + f.attemptsCacheHitCount++ + return f.attemptsCache, nil + } + attempts, err := f.txStore.FindAttemptsRequiringReceiptFetch(ctx, f.chainID) + if err != nil { + return nil, err + } + // Refresh the cache with the latest results + f.attemptsCache = attempts + // Reset the cache hit count + f.attemptsCacheHitCount = 0 + return f.attemptsCache, nil +} + +// filterAttemptsCache removes attempts from the cache if a receipt was found for their transaction's ID +func (f *evmFinalizer) filterAttemptsCache(receipts []*evmtypes.Receipt) { + // Skip method if no receipts found + if len(receipts) == 0 { + return + } + // Skip method if refresh threshold has been met + // No need to filter the attempts cache since fresh data will be fetched on the next iteration + if f.attemptsCacheHitCount >= attemptsCacheRefreshThreshold { + return + } + attemptsWithoutReceipts := make([]TxAttempt, 0, len(f.attemptsCache)) + txIDsWithReceipts := make([]int64, 0, len(f.attemptsCache)) + // Gather the unique tx IDs that receipts were found for + for _, receipt := range receipts { + for _, attempt := range f.attemptsCache { + if attempt.Hash.Cmp(receipt.TxHash) == 0 { + txIDsWithReceipts = append(txIDsWithReceipts, attempt.TxID) + } + } + } + // Filter out attempts for tx with found receipts from the existing attempts cache + for _, attempt := range f.attemptsCache { + foundATxID := false + for _, txID := range txIDsWithReceipts { + if attempt.TxID == txID { + foundATxID = true + break + } + } + if !foundATxID { + attemptsWithoutReceipts = append(attemptsWithoutReceipts, attempt) + } + } + f.attemptsCache = attemptsWithoutReceipts +} diff --git a/core/chains/evm/txmgr/finalizer_test.go b/core/chains/evm/txmgr/finalizer_test.go index 76338d31836..81536e3e8e2 100644 --- a/core/chains/evm/txmgr/finalizer_test.go +++ b/core/chains/evm/txmgr/finalizer_test.go @@ -28,6 +28,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" @@ -871,6 +872,29 @@ func TestFinalizer_FetchAndStoreReceipts(t *testing.T) { require.Equal(t, txmgrcommon.TxFatalError, etx.State) require.Equal(t, txmgr.ErrCouldNotGetReceipt, etx.Error.String) }) + + t.Run("attempts requiring receipt fetch is not fetched from TxStore every head", func(t *testing.T) { + txStore := mocks.NewEvmTxStore(t) + finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, false, txStore, txmClient, ht) + + // Mock finalizer txstore calls that are not needed + txStore.On("SaveFetchedReceipts", mock.Anything, mock.Anything).Return(nil).Maybe() + txStore.On("FindTxesPendingCallback", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe() + txStore.On("UpdateTxCallbackCompleted", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + txStore.On("FindConfirmedTxesReceipts", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe() + txStore.On("FindTxesByIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe() + + // RPC returns nil receipt for attempt + ethClient.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Maybe() + + // Should fetch attempts list from txstore + attempt := cltest.NewLegacyEthTxAttempt(t, 0) + txStore.On("FindAttemptsRequiringReceiptFetch", mock.Anything, mock.Anything).Return([]txmgr.TxAttempt{attempt}, nil).Once() + require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead)) + // Should use the attempts cache for receipt fetch + require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead)) + require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead)) + }) } func TestFinalizer_FetchAndStoreReceipts_batching(t *testing.T) {