Skip to content

Commit

Permalink
Updated the Finalizer to reduce the frequency of heavy query
Browse files Browse the repository at this point in the history
  • Loading branch information
amit-momin committed Jan 30, 2025
1 parent 9d459df commit 8d63046
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/giant-dancers-tie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Updated EVM TXM's Finalizer component to reduce the frequency of heavy DB query #internal
70 changes: 68 additions & 2 deletions core/chains/evm/txmgr/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ var (
)

// processHeadTimeout represents a sanity limit on how long ProcessHead should take to complete
const processHeadTimeout = 10 * time.Minute
const (
processHeadTimeout = 10 * time.Minute
attemptsCacheRefreshThreshold = 5
)

type finalizerTxStore interface {
DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error
Expand Down Expand Up @@ -103,6 +106,9 @@ type evmFinalizer struct {

lastProcessedFinalizedBlockNum int64
resumeCallback resumeCallback

attemptsCache []TxAttempt
attemptsCacheHitCount int
}

func NewEvmFinalizer(
Expand All @@ -125,6 +131,7 @@ func NewEvmFinalizer(
headTracker: headTracker,
mb: mailbox.NewSingle[*evmtypes.Head](),
resumeCallback: nil,
attemptsCacheHitCount: attemptsCacheRefreshThreshold, // start hit count at threshold to refresh cache on first run
}
}

Expand Down Expand Up @@ -377,7 +384,7 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, block
}

func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFinalizedHead *evmtypes.Head) error {
attempts, err := f.txStore.FindAttemptsRequiringReceiptFetch(ctx, f.chainID)
attempts, err := f.fetchAttemptsRequiringReceiptFetch(ctx)
if err != nil {
return fmt.Errorf("failed to fetch broadcasted attempts for confirmed transactions: %w", err)
}
Expand Down Expand Up @@ -413,6 +420,8 @@ func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFi
errorList = append(errorList, err)
continue
}
// Filter out attempts with found receipts from cache, if needed
f.filterAttemptsCache(receipts)
}
if len(errorList) > 0 {
return errors.Join(errorList...)
Expand Down Expand Up @@ -666,3 +675,60 @@ func (f *evmFinalizer) buildTxHashList(finalizedReceipts []*evmtypes.Receipt) []
}
return txHashes
}

// fetchAttemptsRequiringReceiptFetch is a wrapper around the TxStore call to fetch attempts requiring receipt fetch.
// Attempts are cached and used for subsequent fetches to reduce the load of the query.
// The attempts cache is refreshed every 3 requests.
func (f *evmFinalizer) fetchAttemptsRequiringReceiptFetch(ctx context.Context) ([]TxAttempt, error) {
// Return attempts from attempts cache if it is populated and the hit count has not reached the threshold for refresh
if f.attemptsCacheHitCount < attemptsCacheRefreshThreshold {
f.attemptsCacheHitCount++
return f.attemptsCache, nil
}
attempts, err := f.txStore.FindAttemptsRequiringReceiptFetch(ctx, f.chainID)
if err != nil {
return nil, err
}
// Refresh the cache with the latest results
f.attemptsCache = attempts
// Reset the cache hit count
f.attemptsCacheHitCount = 0
return f.attemptsCache, nil
}

// filterAttemptsCache removes attempts from the cache if a receipt was found for their transaction's ID
func (f *evmFinalizer) filterAttemptsCache(receipts []*evmtypes.Receipt) {
// Skip method if no receipts found
if len(receipts) == 0 {
return
}
// Skip method if refresh threshold has been met
// No need to filter the attempts cache since fresh data will be fetched on the next iteration
if f.attemptsCacheHitCount >= attemptsCacheRefreshThreshold {
return
}
attemptsWithoutReceipts := make([]TxAttempt, 0, len(f.attemptsCache))
txIDsWithReceipts := make([]int64, 0, len(f.attemptsCache))
// Gather the unique tx IDs that receipts were found for
for _, receipt := range receipts {
for _, attempt := range f.attemptsCache {
if attempt.Hash.Cmp(receipt.TxHash) == 0 {
txIDsWithReceipts = append(txIDsWithReceipts, attempt.TxID)
}
}
}
// Filter out attempts for tx with found receipts from the existing attempts cache
for _, attempt := range f.attemptsCache {
foundATxID := false
for _, txID := range txIDsWithReceipts {
if attempt.TxID == txID {
foundATxID = true
break
}
}
if !foundATxID {
attemptsWithoutReceipts = append(attemptsWithoutReceipts, attempt)
}
}
f.attemptsCache = attemptsWithoutReceipts
}
24 changes: 24 additions & 0 deletions core/chains/evm/txmgr/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
Expand Down Expand Up @@ -871,6 +872,29 @@ func TestFinalizer_FetchAndStoreReceipts(t *testing.T) {
require.Equal(t, txmgrcommon.TxFatalError, etx.State)
require.Equal(t, txmgr.ErrCouldNotGetReceipt, etx.Error.String)
})

t.Run("attempts requiring receipt fetch is not fetched from TxStore every head", func(t *testing.T) {
txStore := mocks.NewEvmTxStore(t)
finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, false, txStore, txmClient, ht)

// Mock finalizer txstore calls that are not needed
txStore.On("SaveFetchedReceipts", mock.Anything, mock.Anything).Return(nil).Maybe()
txStore.On("FindTxesPendingCallback", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe()
txStore.On("UpdateTxCallbackCompleted", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
txStore.On("FindConfirmedTxesReceipts", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe()
txStore.On("FindTxesByIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe()

// RPC returns nil receipt for attempt
ethClient.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Maybe()

// Should fetch attempts list from txstore
attempt := cltest.NewLegacyEthTxAttempt(t, 0)
txStore.On("FindAttemptsRequiringReceiptFetch", mock.Anything, mock.Anything).Return([]txmgr.TxAttempt{attempt}, nil).Once()
require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead))
// Should use the attempts cache for receipt fetch
require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead))
require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead))
})
}

func TestFinalizer_FetchAndStoreReceipts_batching(t *testing.T) {
Expand Down

0 comments on commit 8d63046

Please sign in to comment.