Skip to content

Commit

Permalink
Updates: v2 Nonce (#597)
Browse files Browse the repository at this point in the history
* Updates

* updates

* cleaned logs

* updates

* updated recycleNonce to check if isNan

* comments updated

* chore: expose NonceResyncQueue to bullboard

* Convert nonce-recycled to set and add nonce-sent set helpers

* fix: handle removing from sent nonces in fail handler

* chore: Remove commented out code in setup.ts file

* Update nonceResyncWorker to use stricter wildcard pattern for redis keys

* Add isSentNonce helper function for checking if a nonce is in the sent nonces set

* refactor and solve nonce resynchronization logic bugs and nits in nonceResyncWorker

* chore: Refactor cancelRecycledNoncesWorker to use SMEMBERS instead of LRANGE for retrieving unused nonces

* treat `ReplacementGasFeeTowLow` as `NonceAlreadyUsed`

* updated logs & removeSentNonce

---------

Co-authored-by: Prithvish Baidya <[email protected]>
  • Loading branch information
farhanW3 and d4mr authored Aug 7, 2024
1 parent 0288dd6 commit 83e430e
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 11 deletions.
75 changes: 70 additions & 5 deletions src/db/wallets/walletNonce.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Address, eth_getTransactionCount, getRpcClient } from "thirdweb";
import { getChain } from "../../utils/chain";
import { logger } from "../../utils/logger";
import { normalizeAddress } from "../../utils/primitiveTypes";
import { redis } from "../../utils/redis/redis";
import { thirdwebClient } from "../../utils/sdk";
Expand All @@ -12,12 +13,68 @@ export const lastUsedNonceKey = (chainId: number, walletAddress: Address) =>
`nonce:${chainId}:${normalizeAddress(walletAddress)}`;

/**
* The "recycled nonces" list stores unsorted nonces to be reused or cancelled.
* The "recycled nonces" set stores unsorted nonces to be reused or cancelled.
* Example: [ "25", "23", "24" ]
*/
const recycledNoncesKey = (chainId: number, walletAddress: Address) =>
export const recycledNoncesKey = (chainId: number, walletAddress: Address) =>
`nonce-recycled:${chainId}:${normalizeAddress(walletAddress)}`;

/**
* The "sent nonces" set stores nonces that have been sent on chain but not yet mined.
*
* Example: [ "25", "23", "24" ]
*
* The `nonceResyncWorker` periodically fetches the onchain transaction count for each wallet (a),
* compares it to the last nonce sent (b), and for every nonce between b and a,
* it recycles the nonce if the nonce is not in this set.
*/
export const sentNoncesKey = (chainId: number, walletAddress: Address) =>
`nonce-sent:${chainId}:${normalizeAddress(walletAddress)}`;

export const splitSentNoncesKey = (key: string) => {
const _splittedKeys = key.split(":");
const walletAddress = normalizeAddress(_splittedKeys[2]);
const chainId = parseInt(_splittedKeys[1]);
return { walletAddress, chainId };
};

/**
* Adds a nonce to the sent nonces set (`nonce-sent:${chainId}:${walletAddress}`).
*/
export const addSentNonce = async (
chainId: number,
walletAddress: Address,
nonce: number,
) => {
const key = sentNoncesKey(chainId, walletAddress);
await redis.sadd(key, nonce.toString());
};

/**
* Removes a nonce from the sent nonces set (`nonce-sent:${chainId}:${walletAddress}`).
*/
export const removeSentNonce = async (
chainId: number,
walletAddress: Address,
nonce: number,
) => {
const key = sentNoncesKey(chainId, walletAddress);
const removed = await redis.srem(key, nonce.toString());
return removed === 1;
};

/**
* Check if a nonce is in the sent nonces set.
*/
export const isSentNonce = async (
chainId: number,
walletAddress: Address,
nonce: number,
) => {
const key = sentNoncesKey(chainId, walletAddress);
return !!(await redis.sismember(key, nonce.toString()));
};

/**
* Acquire an unused nonce.
* This should be used to send an EOA transaction with this nonce.
Expand Down Expand Up @@ -58,8 +115,16 @@ export const recycleNonce = async (
walletAddress: Address,
nonce: number,
) => {
if (isNaN(nonce)) {
logger({
level: "warn",
message: `[recycleNonce] Invalid nonce: ${nonce}`,
service: "worker",
});
return;
}
const key = recycledNoncesKey(chainId, walletAddress);
await redis.rpush(key, nonce);
await redis.sadd(key, nonce.toString());
};

/**
Expand All @@ -73,7 +138,7 @@ const _acquireRecycledNonce = async (
walletAddress: Address,
) => {
const key = recycledNoncesKey(chainId, walletAddress);
const res = await redis.lpop(key);
const res = await redis.spop(key);
return res ? parseInt(res) : null;
};

Expand Down Expand Up @@ -139,7 +204,7 @@ export const rebaseNonce = async (chainId: number, walletAddress: Address) => {
address: walletAddress,
});

// Lua script to set nonce as max(transactionCount, redis.get(lastUsedNonceKey))
// Lua script to set nonce as max
const script = `
local transactionCount = tonumber(ARGV[1])
local lastUsedNonce = tonumber(redis.call('get', KEYS[1]))
Expand Down
2 changes: 2 additions & 0 deletions src/server/middleware/adminRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { env } from "../../utils/env";
import { CancelRecycledNoncesQueue } from "../../worker/queues/cancelRecycledNoncesQueue";
import { MigratePostgresTransactionsQueue } from "../../worker/queues/migratePostgresTransactionsQueue";
import { MineTransactionQueue } from "../../worker/queues/mineTransactionQueue";
import { NonceResyncQueue } from "../../worker/queues/nonceResyncQueue";
import { ProcessEventsLogQueue } from "../../worker/queues/processEventLogsQueue";
import { ProcessTransactionReceiptsQueue } from "../../worker/queues/processTransactionReceiptsQueue";
import { PruneTransactionsQueue } from "../../worker/queues/pruneTransactionsQueue";
Expand All @@ -28,6 +29,7 @@ const QUEUES: Queue[] = [
CancelRecycledNoncesQueue.q,
PruneTransactionsQueue.q,
MigratePostgresTransactionsQueue.q,
NonceResyncQueue.q,
];

export const withAdminRoutes = async (fastify: FastifyInstance) => {
Expand Down
3 changes: 3 additions & 0 deletions src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import { initCancelRecycledNoncesWorker } from "./tasks/cancelRecycledNoncesWorker";
import { initMigratePostgresTransactionsWorker } from "./tasks/migratePostgresTransactionsWorker";
import { initMineTransactionWorker } from "./tasks/mineTransactionWorker";
import { initNonceResyncWorker } from "./tasks/nonceResyncWorker";
import { initProcessEventLogsWorker } from "./tasks/processEventLogsWorker";
import { initProcessTransactionReceiptsWorker } from "./tasks/processTransactionReceiptsWorker";
import { initPruneTransactionsWorker } from "./tasks/pruneTransactionsWorker";
Expand All @@ -26,6 +27,8 @@ export const initWorker = async () => {
initSendWebhookWorker();
await initMigratePostgresTransactionsWorker();

await initNonceResyncWorker();

// Listen for new & updated configuration data.
await newConfigurationListener();
await updatedConfigurationListener();
Expand Down
17 changes: 17 additions & 0 deletions src/worker/queues/nonceResyncQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Queue } from "bullmq";
import { redis } from "../../utils/redis/redis";
import { defaultJobOptions } from "./queues";

export class NonceResyncQueue {
static q = new Queue<string>("nonce-resync-cron", {
connection: redis,
defaultJobOptions,
});

constructor() {
NonceResyncQueue.q.setGlobalConcurrency(1);

// The cron job is defined in `initNonceResyncWorker`
// because it requires an async call to query configuration.
}
}
2 changes: 1 addition & 1 deletion src/worker/tasks/cancelRecycledNoncesWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ const getAndDeleteUnusedNonces = async (key: string) => {
// Returns all unused nonces for this key and deletes the key.
const script = `
local key = ARGV[1]
local members = redis.call('LRANGE', key, 0, -1)
local members = redis.call('SMEMBERS', key)
redis.call('DEL', key)
return members
`;
Expand Down
27 changes: 24 additions & 3 deletions src/worker/tasks/mineTransactionWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { stringify } from "thirdweb/utils";
import { getUserOpReceiptRaw } from "thirdweb/wallets/smart";
import { TransactionDB } from "../../db/transactions/db";
import { recycleNonce } from "../../db/wallets/walletNonce";
import { recycleNonce, removeSentNonce } from "../../db/wallets/walletNonce";
import { getBlockNumberish } from "../../utils/block";
import { getConfig } from "../../utils/cache/getConfig";
import { getChain } from "../../utils/chain";
Expand Down Expand Up @@ -128,6 +128,19 @@ const _mineTransaction = async (
if (result.status === "fulfilled") {
const receipt = result.value;
job.log(`Found receipt on block ${receipt.blockNumber}.`);

const removed = await removeSentNonce(
sentTransaction.chainId,
sentTransaction.from,
sentTransaction.nonce,
);

logger({
level: "debug",
message: `[mineTransactionWorker] Removed nonce ${sentTransaction.nonce} from nonce-sent set: ${removed}`,
service: "worker",
});

return {
...sentTransaction,
status: "mined",
Expand All @@ -142,7 +155,6 @@ const _mineTransaction = async (
};
}
}

// Else the transaction is not mined yet.

// Retry the transaction (after some initial delay).
Expand Down Expand Up @@ -180,6 +192,7 @@ const _mineUserOp = async (
chain,
userOpHash,
});

if (!userOpReceiptRaw) {
return null;
}
Expand Down Expand Up @@ -243,12 +256,20 @@ export const initMineTransactionWorker = () => {

if (!sentTransaction.isUserOp) {
// Release the nonce to allow it to be reused or cancelled.
job.log(`Recycling nonce: ${sentTransaction.nonce}`);
job.log(
`Recycling nonce and removing from nonce-sent: ${sentTransaction.nonce}`,
);
await recycleNonce(
sentTransaction.chainId,
sentTransaction.from,
sentTransaction.nonce,
);

await removeSentNonce(
sentTransaction.chainId,
sentTransaction.from,
sentTransaction.nonce,
);
}
}
});
Expand Down
118 changes: 118 additions & 0 deletions src/worker/tasks/nonceResyncWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { Job, Processor, Worker } from "bullmq";
import { eth_getTransactionCount, getRpcClient } from "thirdweb";
import {
inspectNonce,
isSentNonce,
recycleNonce,
splitSentNoncesKey,
} from "../../db/wallets/walletNonce";
import { getConfig } from "../../utils/cache/getConfig";
import { getChain } from "../../utils/chain";
import { logger } from "../../utils/logger";
import { redis } from "../../utils/redis/redis";
import { thirdwebClient } from "../../utils/sdk";
import { NonceResyncQueue } from "../queues/nonceResyncQueue";
import { logWorkerExceptions } from "../queues/queues";

// Must be explicitly called for the worker to run on this host.
export const initNonceResyncWorker = async () => {
const config = await getConfig();
if (config.minedTxListenerCronSchedule) {
NonceResyncQueue.q.add("cron", "", {
repeat: { pattern: config.minedTxListenerCronSchedule },
jobId: "nonce-resync-cron",
});
}

const _worker = new Worker(NonceResyncQueue.q.name, handler, {
connection: redis,
concurrency: 1,
});
logWorkerExceptions(_worker);
};

/**
* Resyncs nonces for all wallets.
* This worker should be run periodically to ensure that nonces are not skipped.
* It checks the onchain nonce for each wallet and recycles any missing nonces.
*
* This is to unblock a wallet that has been stuck due to one or more skipped nonces.
*/
const handler: Processor<any, void, string> = async (job: Job<string>) => {
const sentNoncesKeys = await redis.keys("nonce-sent*");
job.log(`Found ${sentNoncesKeys.length} nonce-sent* keys`);

for (const sentNonceKey of sentNoncesKeys) {
const { chainId, walletAddress } = splitSentNoncesKey(sentNonceKey);

const rpcRequest = getRpcClient({
client: thirdwebClient,
chain: await getChain(chainId),
});

const [transactionCount, lastUsedNonceDb] = await Promise.all([
eth_getTransactionCount(rpcRequest, {
address: walletAddress,
}),
inspectNonce(chainId, walletAddress),
]);

if (isNaN(transactionCount)) {
job.log(
`Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`,
);

logger({
level: "error",
message: `[nonceResyncWorker] Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`,
service: "worker",
});

return;
}

const lastUsedNonceOnchain = transactionCount - 1;

job.log(
`${walletAddress} last used onchain nonce: ${lastUsedNonceOnchain} and last used db nonce: ${lastUsedNonceDb}`,
);
logger({
level: "debug",
message: `[nonceResyncWorker] last used onchain nonce: ${transactionCount} and last used db nonce: ${lastUsedNonceDb}`,
service: "worker",
});

// If the last used nonce onchain is the same as or ahead of the last used nonce in the db,
// There is no need to resync the nonce.
if (lastUsedNonceOnchain >= lastUsedNonceDb) {
job.log(`No need to resync nonce for ${walletAddress}`);
logger({
level: "debug",
message: `[nonceResyncWorker] No need to resync nonce for ${walletAddress}`,
service: "worker",
});
return;
}

// for each nonce between last used db nonce and last used onchain nonce
// check if nonce exists in nonce-sent set
// if it does not exist, recycle it
for (
let _nonce = lastUsedNonceOnchain + 1;
_nonce < lastUsedNonceDb;
_nonce++
) {
const exists = await isSentNonce(chainId, walletAddress, _nonce);
logger({
level: "debug",
message: `[nonceResyncWorker] nonce ${_nonce} exists in nonce-sent set: ${exists}`,
service: "worker",
});

// If nonce does not exist in nonce-sent set, recycle it
if (!exists) {
await recycleNonce(chainId, walletAddress, _nonce);
}
}
}
};
8 changes: 6 additions & 2 deletions src/worker/tasks/sendTransactionWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { getContractAddress } from "viem";
import { TransactionDB } from "../../db/transactions/db";
import {
acquireNonce,
addSentNonce,
rebaseNonce,
recycleNonce,
} from "../../db/wallets/walletNonce";
Expand Down Expand Up @@ -171,8 +172,9 @@ const _sendTransaction = async (
transactionHash = sendTransactionResult.transactionHash;
job.log(`Sent transaction: ${transactionHash}`);
} catch (error: unknown) {
// If NonceAlreadyUsedError, rebase the nonce and retry.
if (isNonceAlreadyUsedError(error)) {
// If NonceAlreadyUsedError, which can also manifest as a ReplacementGasFeeTooLowError,
// recycle the nonce and retry the transaction.
if (isNonceAlreadyUsedError(error) || isReplacementGasFeeTooLow(error)) {
const resyncNonce = await rebaseNonce(chainId, from);
job.log(`Resynced nonce to ${resyncNonce}.`);
} else {
Expand All @@ -182,6 +184,8 @@ const _sendTransaction = async (
throw error;
}

await addSentNonce(chainId, from, nonce);

return {
...queuedTransaction,
status: "sent",
Expand Down
Loading

0 comments on commit 83e430e

Please sign in to comment.