From 83e430e3e0fbb004b22d08258b6b399458b49539 Mon Sep 17 00:00:00 2001 From: Farhan Khwaja <132962163+farhanW3@users.noreply.github.com> Date: Tue, 6 Aug 2024 17:43:05 -0700 Subject: [PATCH] Updates: v2 Nonce (#597) * Updates * updates * cleaned logs * updates * updated recycleNonce to check if isNan * comments updated * chore: expose NonceResyncQueue to bullboard * Convert nonce-recycled to set and add nonce-sent set helpers * fix: handle removing from sent nonces in fail handler * chore: Remove commented out code in setup.ts file * Update nonceResyncWorker to use stricter wildcard pattern for redis keys * Add isSentNonce helper function for checking if a nonce is in the sent nonces set * refactor and solve nonce resynchronization logic bugs and nits in nonceResyncWorker * chore: Refactor cancelRecycledNoncesWorker to use SMEMBERS instead of LRANGE for retrieving unused nonces * treat `ReplacementGasFeeTowLow` as `NonceAlreadyUsed` * updated logs & removeSentNonce --------- Co-authored-by: Prithvish Baidya --- src/db/wallets/walletNonce.ts | 75 ++++++++++- src/server/middleware/adminRoutes.ts | 2 + src/worker/index.ts | 3 + src/worker/queues/nonceResyncQueue.ts | 17 +++ .../tasks/cancelRecycledNoncesWorker.ts | 2 +- src/worker/tasks/mineTransactionWorker.ts | 27 +++- src/worker/tasks/nonceResyncWorker.ts | 118 ++++++++++++++++++ src/worker/tasks/sendTransactionWorker.ts | 8 +- test/e2e/config.ts | 15 +++ 9 files changed, 256 insertions(+), 11 deletions(-) create mode 100644 src/worker/queues/nonceResyncQueue.ts create mode 100644 src/worker/tasks/nonceResyncWorker.ts diff --git a/src/db/wallets/walletNonce.ts b/src/db/wallets/walletNonce.ts index 68aca4e54..7abe20ee1 100644 --- a/src/db/wallets/walletNonce.ts +++ b/src/db/wallets/walletNonce.ts @@ -1,5 +1,6 @@ import { Address, eth_getTransactionCount, getRpcClient } from "thirdweb"; import { getChain } from "../../utils/chain"; +import { logger } from "../../utils/logger"; import { normalizeAddress } from "../../utils/primitiveTypes"; import { redis } from "../../utils/redis/redis"; import { thirdwebClient } from "../../utils/sdk"; @@ -12,12 +13,68 @@ export const lastUsedNonceKey = (chainId: number, walletAddress: Address) => `nonce:${chainId}:${normalizeAddress(walletAddress)}`; /** - * The "recycled nonces" list stores unsorted nonces to be reused or cancelled. + * The "recycled nonces" set stores unsorted nonces to be reused or cancelled. * Example: [ "25", "23", "24" ] */ -const recycledNoncesKey = (chainId: number, walletAddress: Address) => +export const recycledNoncesKey = (chainId: number, walletAddress: Address) => `nonce-recycled:${chainId}:${normalizeAddress(walletAddress)}`; +/** + * The "sent nonces" set stores nonces that have been sent on chain but not yet mined. + * + * Example: [ "25", "23", "24" ] + * + * The `nonceResyncWorker` periodically fetches the onchain transaction count for each wallet (a), + * compares it to the last nonce sent (b), and for every nonce between b and a, + * it recycles the nonce if the nonce is not in this set. + */ +export const sentNoncesKey = (chainId: number, walletAddress: Address) => + `nonce-sent:${chainId}:${normalizeAddress(walletAddress)}`; + +export const splitSentNoncesKey = (key: string) => { + const _splittedKeys = key.split(":"); + const walletAddress = normalizeAddress(_splittedKeys[2]); + const chainId = parseInt(_splittedKeys[1]); + return { walletAddress, chainId }; +}; + +/** + * Adds a nonce to the sent nonces set (`nonce-sent:${chainId}:${walletAddress}`). + */ +export const addSentNonce = async ( + chainId: number, + walletAddress: Address, + nonce: number, +) => { + const key = sentNoncesKey(chainId, walletAddress); + await redis.sadd(key, nonce.toString()); +}; + +/** + * Removes a nonce from the sent nonces set (`nonce-sent:${chainId}:${walletAddress}`). + */ +export const removeSentNonce = async ( + chainId: number, + walletAddress: Address, + nonce: number, +) => { + const key = sentNoncesKey(chainId, walletAddress); + const removed = await redis.srem(key, nonce.toString()); + return removed === 1; +}; + +/** + * Check if a nonce is in the sent nonces set. + */ +export const isSentNonce = async ( + chainId: number, + walletAddress: Address, + nonce: number, +) => { + const key = sentNoncesKey(chainId, walletAddress); + return !!(await redis.sismember(key, nonce.toString())); +}; + /** * Acquire an unused nonce. * This should be used to send an EOA transaction with this nonce. @@ -58,8 +115,16 @@ export const recycleNonce = async ( walletAddress: Address, nonce: number, ) => { + if (isNaN(nonce)) { + logger({ + level: "warn", + message: `[recycleNonce] Invalid nonce: ${nonce}`, + service: "worker", + }); + return; + } const key = recycledNoncesKey(chainId, walletAddress); - await redis.rpush(key, nonce); + await redis.sadd(key, nonce.toString()); }; /** @@ -73,7 +138,7 @@ const _acquireRecycledNonce = async ( walletAddress: Address, ) => { const key = recycledNoncesKey(chainId, walletAddress); - const res = await redis.lpop(key); + const res = await redis.spop(key); return res ? parseInt(res) : null; }; @@ -139,7 +204,7 @@ export const rebaseNonce = async (chainId: number, walletAddress: Address) => { address: walletAddress, }); - // Lua script to set nonce as max(transactionCount, redis.get(lastUsedNonceKey)) + // Lua script to set nonce as max const script = ` local transactionCount = tonumber(ARGV[1]) local lastUsedNonce = tonumber(redis.call('get', KEYS[1])) diff --git a/src/server/middleware/adminRoutes.ts b/src/server/middleware/adminRoutes.ts index 58a5b4292..90d0375ee 100644 --- a/src/server/middleware/adminRoutes.ts +++ b/src/server/middleware/adminRoutes.ts @@ -10,6 +10,7 @@ import { env } from "../../utils/env"; import { CancelRecycledNoncesQueue } from "../../worker/queues/cancelRecycledNoncesQueue"; import { MigratePostgresTransactionsQueue } from "../../worker/queues/migratePostgresTransactionsQueue"; import { MineTransactionQueue } from "../../worker/queues/mineTransactionQueue"; +import { NonceResyncQueue } from "../../worker/queues/nonceResyncQueue"; import { ProcessEventsLogQueue } from "../../worker/queues/processEventLogsQueue"; import { ProcessTransactionReceiptsQueue } from "../../worker/queues/processTransactionReceiptsQueue"; import { PruneTransactionsQueue } from "../../worker/queues/pruneTransactionsQueue"; @@ -28,6 +29,7 @@ const QUEUES: Queue[] = [ CancelRecycledNoncesQueue.q, PruneTransactionsQueue.q, MigratePostgresTransactionsQueue.q, + NonceResyncQueue.q, ]; export const withAdminRoutes = async (fastify: FastifyInstance) => { diff --git a/src/worker/index.ts b/src/worker/index.ts index 283188f0b..ab0177426 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -10,6 +10,7 @@ import { import { initCancelRecycledNoncesWorker } from "./tasks/cancelRecycledNoncesWorker"; import { initMigratePostgresTransactionsWorker } from "./tasks/migratePostgresTransactionsWorker"; import { initMineTransactionWorker } from "./tasks/mineTransactionWorker"; +import { initNonceResyncWorker } from "./tasks/nonceResyncWorker"; import { initProcessEventLogsWorker } from "./tasks/processEventLogsWorker"; import { initProcessTransactionReceiptsWorker } from "./tasks/processTransactionReceiptsWorker"; import { initPruneTransactionsWorker } from "./tasks/pruneTransactionsWorker"; @@ -26,6 +27,8 @@ export const initWorker = async () => { initSendWebhookWorker(); await initMigratePostgresTransactionsWorker(); + await initNonceResyncWorker(); + // Listen for new & updated configuration data. await newConfigurationListener(); await updatedConfigurationListener(); diff --git a/src/worker/queues/nonceResyncQueue.ts b/src/worker/queues/nonceResyncQueue.ts new file mode 100644 index 000000000..23fbf22bc --- /dev/null +++ b/src/worker/queues/nonceResyncQueue.ts @@ -0,0 +1,17 @@ +import { Queue } from "bullmq"; +import { redis } from "../../utils/redis/redis"; +import { defaultJobOptions } from "./queues"; + +export class NonceResyncQueue { + static q = new Queue("nonce-resync-cron", { + connection: redis, + defaultJobOptions, + }); + + constructor() { + NonceResyncQueue.q.setGlobalConcurrency(1); + + // The cron job is defined in `initNonceResyncWorker` + // because it requires an async call to query configuration. + } +} diff --git a/src/worker/tasks/cancelRecycledNoncesWorker.ts b/src/worker/tasks/cancelRecycledNoncesWorker.ts index 4c5641ecf..9040eb1e0 100644 --- a/src/worker/tasks/cancelRecycledNoncesWorker.ts +++ b/src/worker/tasks/cancelRecycledNoncesWorker.ts @@ -77,7 +77,7 @@ const getAndDeleteUnusedNonces = async (key: string) => { // Returns all unused nonces for this key and deletes the key. const script = ` local key = ARGV[1] - local members = redis.call('LRANGE', key, 0, -1) + local members = redis.call('SMEMBERS', key) redis.call('DEL', key) return members `; diff --git a/src/worker/tasks/mineTransactionWorker.ts b/src/worker/tasks/mineTransactionWorker.ts index 392f81f40..9619c6643 100644 --- a/src/worker/tasks/mineTransactionWorker.ts +++ b/src/worker/tasks/mineTransactionWorker.ts @@ -10,7 +10,7 @@ import { import { stringify } from "thirdweb/utils"; import { getUserOpReceiptRaw } from "thirdweb/wallets/smart"; import { TransactionDB } from "../../db/transactions/db"; -import { recycleNonce } from "../../db/wallets/walletNonce"; +import { recycleNonce, removeSentNonce } from "../../db/wallets/walletNonce"; import { getBlockNumberish } from "../../utils/block"; import { getConfig } from "../../utils/cache/getConfig"; import { getChain } from "../../utils/chain"; @@ -128,6 +128,19 @@ const _mineTransaction = async ( if (result.status === "fulfilled") { const receipt = result.value; job.log(`Found receipt on block ${receipt.blockNumber}.`); + + const removed = await removeSentNonce( + sentTransaction.chainId, + sentTransaction.from, + sentTransaction.nonce, + ); + + logger({ + level: "debug", + message: `[mineTransactionWorker] Removed nonce ${sentTransaction.nonce} from nonce-sent set: ${removed}`, + service: "worker", + }); + return { ...sentTransaction, status: "mined", @@ -142,7 +155,6 @@ const _mineTransaction = async ( }; } } - // Else the transaction is not mined yet. // Retry the transaction (after some initial delay). @@ -180,6 +192,7 @@ const _mineUserOp = async ( chain, userOpHash, }); + if (!userOpReceiptRaw) { return null; } @@ -243,12 +256,20 @@ export const initMineTransactionWorker = () => { if (!sentTransaction.isUserOp) { // Release the nonce to allow it to be reused or cancelled. - job.log(`Recycling nonce: ${sentTransaction.nonce}`); + job.log( + `Recycling nonce and removing from nonce-sent: ${sentTransaction.nonce}`, + ); await recycleNonce( sentTransaction.chainId, sentTransaction.from, sentTransaction.nonce, ); + + await removeSentNonce( + sentTransaction.chainId, + sentTransaction.from, + sentTransaction.nonce, + ); } } }); diff --git a/src/worker/tasks/nonceResyncWorker.ts b/src/worker/tasks/nonceResyncWorker.ts new file mode 100644 index 000000000..cccdb28aa --- /dev/null +++ b/src/worker/tasks/nonceResyncWorker.ts @@ -0,0 +1,118 @@ +import { Job, Processor, Worker } from "bullmq"; +import { eth_getTransactionCount, getRpcClient } from "thirdweb"; +import { + inspectNonce, + isSentNonce, + recycleNonce, + splitSentNoncesKey, +} from "../../db/wallets/walletNonce"; +import { getConfig } from "../../utils/cache/getConfig"; +import { getChain } from "../../utils/chain"; +import { logger } from "../../utils/logger"; +import { redis } from "../../utils/redis/redis"; +import { thirdwebClient } from "../../utils/sdk"; +import { NonceResyncQueue } from "../queues/nonceResyncQueue"; +import { logWorkerExceptions } from "../queues/queues"; + +// Must be explicitly called for the worker to run on this host. +export const initNonceResyncWorker = async () => { + const config = await getConfig(); + if (config.minedTxListenerCronSchedule) { + NonceResyncQueue.q.add("cron", "", { + repeat: { pattern: config.minedTxListenerCronSchedule }, + jobId: "nonce-resync-cron", + }); + } + + const _worker = new Worker(NonceResyncQueue.q.name, handler, { + connection: redis, + concurrency: 1, + }); + logWorkerExceptions(_worker); +}; + +/** + * Resyncs nonces for all wallets. + * This worker should be run periodically to ensure that nonces are not skipped. + * It checks the onchain nonce for each wallet and recycles any missing nonces. + * + * This is to unblock a wallet that has been stuck due to one or more skipped nonces. + */ +const handler: Processor = async (job: Job) => { + const sentNoncesKeys = await redis.keys("nonce-sent*"); + job.log(`Found ${sentNoncesKeys.length} nonce-sent* keys`); + + for (const sentNonceKey of sentNoncesKeys) { + const { chainId, walletAddress } = splitSentNoncesKey(sentNonceKey); + + const rpcRequest = getRpcClient({ + client: thirdwebClient, + chain: await getChain(chainId), + }); + + const [transactionCount, lastUsedNonceDb] = await Promise.all([ + eth_getTransactionCount(rpcRequest, { + address: walletAddress, + }), + inspectNonce(chainId, walletAddress), + ]); + + if (isNaN(transactionCount)) { + job.log( + `Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`, + ); + + logger({ + level: "error", + message: `[nonceResyncWorker] Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`, + service: "worker", + }); + + return; + } + + const lastUsedNonceOnchain = transactionCount - 1; + + job.log( + `${walletAddress} last used onchain nonce: ${lastUsedNonceOnchain} and last used db nonce: ${lastUsedNonceDb}`, + ); + logger({ + level: "debug", + message: `[nonceResyncWorker] last used onchain nonce: ${transactionCount} and last used db nonce: ${lastUsedNonceDb}`, + service: "worker", + }); + + // If the last used nonce onchain is the same as or ahead of the last used nonce in the db, + // There is no need to resync the nonce. + if (lastUsedNonceOnchain >= lastUsedNonceDb) { + job.log(`No need to resync nonce for ${walletAddress}`); + logger({ + level: "debug", + message: `[nonceResyncWorker] No need to resync nonce for ${walletAddress}`, + service: "worker", + }); + return; + } + + // for each nonce between last used db nonce and last used onchain nonce + // check if nonce exists in nonce-sent set + // if it does not exist, recycle it + for ( + let _nonce = lastUsedNonceOnchain + 1; + _nonce < lastUsedNonceDb; + _nonce++ + ) { + const exists = await isSentNonce(chainId, walletAddress, _nonce); + logger({ + level: "debug", + message: `[nonceResyncWorker] nonce ${_nonce} exists in nonce-sent set: ${exists}`, + service: "worker", + }); + + // If nonce does not exist in nonce-sent set, recycle it + if (!exists) { + await recycleNonce(chainId, walletAddress, _nonce); + } + } + } +}; diff --git a/src/worker/tasks/sendTransactionWorker.ts b/src/worker/tasks/sendTransactionWorker.ts index 9330716a3..4bf78d388 100644 --- a/src/worker/tasks/sendTransactionWorker.ts +++ b/src/worker/tasks/sendTransactionWorker.ts @@ -9,6 +9,7 @@ import { getContractAddress } from "viem"; import { TransactionDB } from "../../db/transactions/db"; import { acquireNonce, + addSentNonce, rebaseNonce, recycleNonce, } from "../../db/wallets/walletNonce"; @@ -171,8 +172,9 @@ const _sendTransaction = async ( transactionHash = sendTransactionResult.transactionHash; job.log(`Sent transaction: ${transactionHash}`); } catch (error: unknown) { - // If NonceAlreadyUsedError, rebase the nonce and retry. - if (isNonceAlreadyUsedError(error)) { + // If NonceAlreadyUsedError, which can also manifest as a ReplacementGasFeeTooLowError, + // recycle the nonce and retry the transaction. + if (isNonceAlreadyUsedError(error) || isReplacementGasFeeTooLow(error)) { const resyncNonce = await rebaseNonce(chainId, from); job.log(`Resynced nonce to ${resyncNonce}.`); } else { @@ -182,6 +184,8 @@ const _sendTransaction = async ( throw error; } + await addSentNonce(chainId, from, nonce); + return { ...queuedTransaction, status: "sent", diff --git a/test/e2e/config.ts b/test/e2e/config.ts index 9d49e9ac3..bb4792a16 100644 --- a/test/e2e/config.ts +++ b/test/e2e/config.ts @@ -11,6 +11,21 @@ export const CONFIG: Config = { USE_LOCAL_CHAIN: true, CHAIN: anvil, + // CHAIN: defineChain({ + // name: "B3 Sepolia", + // id: 1993, + // rpcUrls: { + // default: { + // http: ["https://sepolia.b3.fun/http"], + // webSocket: ["wss://sepolia.b3.fun/ws"], + // }, + // }, + // nativeCurrency: { + // name: "Ether", + // symbol: "ETH", + // decimals: 18, + // }, + // }), TRANSACTION_COUNT: 500, TRANSACTIONS_PER_BATCH: 100,