From 7c8b6e9d300b98019739e309ae3f1550aa2de4a5 Mon Sep 17 00:00:00 2001 From: Angel Castillo Date: Mon, 18 Nov 2024 14:20:13 +0800 Subject: [PATCH 1/2] feat(wallet): the wallet now only fetches UTXOs on tx history change --- packages/wallet/src/Wallets/BaseWallet.ts | 4 +--- packages/wallet/src/services/UtxoTracker.ts | 13 +++++++------ packages/wallet/src/services/util/equals.ts | 19 +++++++++++++++++-- .../wallet/test/services/UtxoTracker.test.ts | 12 ++++++------ 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/packages/wallet/src/Wallets/BaseWallet.ts b/packages/wallet/src/Wallets/BaseWallet.ts index 966c0ef10e1..3814c04d701 100644 --- a/packages/wallet/src/Wallets/BaseWallet.ts +++ b/packages/wallet/src/Wallets/BaseWallet.ts @@ -51,7 +51,6 @@ import { createUtxoTracker, createWalletUtil, currentEpochTracker, - distinctBlock, distinctEraSummaries } from '../services'; import { AddressType, Bip32Account, GroupedAddress, WitnessedTx, Witnesser, util } from '@cardano-sdk/key-management'; @@ -397,7 +396,6 @@ export class BaseWallet implements ObservableWallet { store: stores.tip, syncStatus: this.syncStatus }); - const tipBlockHeight$ = distinctBlock(this.tip$); this.txSubmitProvider = new SmartTxSubmitProvider( { retryBackoffConfig }, @@ -499,11 +497,11 @@ export class BaseWallet implements ObservableWallet { this.utxo = createUtxoTracker({ addresses$, + history$: this.transactions.history$, logger: contextLogger(this.#logger, 'utxo'), onFatalError, retryBackoffConfig, stores, - tipBlockHeight$, transactionsInFlight$: this.transactions.outgoing.inFlight$, utxoProvider: this.utxoProvider }); diff --git a/packages/wallet/src/services/UtxoTracker.ts b/packages/wallet/src/services/UtxoTracker.ts index efba03adee6..3b4b9cd5bdc 100644 --- a/packages/wallet/src/services/UtxoTracker.ts +++ b/packages/wallet/src/services/UtxoTracker.ts @@ -6,6 +6,7 @@ import { RetryBackoffConfig } from 'backoff-rxjs'; import { TxInFlight, UtxoTracker } from './types'; import { WalletStores } from '../persistence'; import { coldObservableProvider } from '@cardano-sdk/util-rxjs'; +import { sortUtxoByTxIn } from '@cardano-sdk/input-selection'; import chunk from 'lodash/chunk.js'; import uniqWith from 'lodash/uniqWith.js'; @@ -17,7 +18,7 @@ export interface UtxoTrackerProps { addresses$: Observable; stores: Pick; transactionsInFlight$: Observable; - tipBlockHeight$: Observable; + history$: Observable; retryBackoffConfig: RetryBackoffConfig; logger: Logger; onFatalError?: (value: unknown) => void; @@ -31,7 +32,7 @@ export interface UtxoTrackerInternals { export const createUtxoProvider = ( utxoProvider: UtxoProvider, addresses$: Observable, - tipBlockHeight$: Observable, + history$: Observable, retryBackoffConfig: RetryBackoffConfig, onFatalError?: (value: unknown) => void ) => @@ -49,10 +50,10 @@ export const createUtxoProvider = ( utxos = [...utxos, ...(await utxoProvider.utxoByAddresses({ addresses }))]; } - return utxos; + return utxos.sort(sortUtxoByTxIn); }, retryBackoffConfig, - trigger$: tipBlockHeight$ + trigger$: history$ }) ) ); @@ -64,13 +65,13 @@ export const createUtxoTracker = ( stores, transactionsInFlight$, retryBackoffConfig, - tipBlockHeight$, + history$, logger, onFatalError }: UtxoTrackerProps, { utxoSource$ = new PersistentCollectionTrackerSubject( - () => createUtxoProvider(utxoProvider, addresses$, tipBlockHeight$, retryBackoffConfig, onFatalError), + () => createUtxoProvider(utxoProvider, addresses$, history$, retryBackoffConfig, onFatalError), stores.utxo ), unspendableUtxoSource$ = new PersistentCollectionTrackerSubject( diff --git a/packages/wallet/src/services/util/equals.ts b/packages/wallet/src/services/util/equals.ts index 5426c68f29b..2c9485d987e 100644 --- a/packages/wallet/src/services/util/equals.ts +++ b/packages/wallet/src/services/util/equals.ts @@ -7,12 +7,27 @@ export const tipEquals = (a: Cardano.Tip, b: Cardano.Tip) => a.hash === b.hash; export const txEquals = (a: Cardano.HydratedTx, b: Cardano.HydratedTx) => a.id === b.id; +export const sameSortedArrayItems = (arrayA: T[], arrayB: T[], itemEquals: (a: T, b: T) => boolean): boolean => { + if (arrayA.length !== arrayB.length) return false; + + for (const [i, element] of arrayA.entries()) { + if (!itemEquals(element, arrayB[i])) { + return false; + } + } + + return true; +}; + export const transactionsEquals = (a: Cardano.HydratedTx[], b: Cardano.HydratedTx[]) => sameArrayItems(a, b, txEquals); export const txInEquals = (a: Cardano.TxIn, b: Cardano.TxIn) => a.txId === b.txId && a.index === b.index; -export const utxoEquals = (a: Cardano.Utxo[], b: Cardano.Utxo[]) => - sameArrayItems(a, b, ([aTxIn], [bTxIn]) => txInEquals(aTxIn, bTxIn)); +export const utxoEquals = (a: Cardano.Utxo[], b: Cardano.Utxo[]) => { + if (a === b) return true; + + return sameSortedArrayItems(a, b, ([aTxIn], [bTxIn]) => txInEquals(aTxIn, bTxIn)); +}; export const eraSummariesEquals = (a: EraSummary[], b: EraSummary[]) => sameArrayItems(a, b, (es1, es2) => es1.start.slot === es2.start.slot); diff --git a/packages/wallet/test/services/UtxoTracker.test.ts b/packages/wallet/test/services/UtxoTracker.test.ts index 8c6f046bce6..8745efb4758 100644 --- a/packages/wallet/test/services/UtxoTracker.test.ts +++ b/packages/wallet/test/services/UtxoTracker.test.ts @@ -20,7 +20,7 @@ const createStubOutputs = (numOutputs: number) => describe('createUtxoTracker', () => { // these variables are not relevant for this test, overwriting utxoSource$ let retryBackoffConfig: RetryBackoffConfig; - let tipBlockHeight$: Observable; + let history$: Observable; let utxoProvider: UtxoProvider; const logger = dummyLogger; @@ -88,13 +88,13 @@ describe('createUtxoTracker', () => { const utxoTracker = createUtxoTracker( { addresses$: cold('a|', { a: [ownAddress!] }), + history$, logger, retryBackoffConfig, stores: { unspendableUtxo: store, utxo: store }, - tipBlockHeight$, transactionsInFlight$, utxoProvider }, @@ -142,13 +142,13 @@ describe('createUtxoTracker', () => { const utxoTracker = createUtxoTracker( { addresses$: cold('a', { a: [address!] }), + history$, logger, retryBackoffConfig, stores: { unspendableUtxo: store, utxo: store }, - tipBlockHeight$, transactionsInFlight$, utxoProvider }, @@ -172,13 +172,13 @@ describe('createUtxoTracker', () => { const utxoTracker = createUtxoTracker( { addresses$: cold('a|', { a: [address!] }), + history$, logger, retryBackoffConfig, stores: { unspendableUtxo: store, utxo: store }, - tipBlockHeight$, transactionsInFlight$, utxoProvider }, @@ -211,13 +211,13 @@ describe('createUtxoTracker', () => { const utxoTracker = createUtxoTracker( { addresses$: cold('a|', { a: [address!] }), + history$, logger, retryBackoffConfig, stores: { unspendableUtxo: store, utxo: store }, - tipBlockHeight$, transactionsInFlight$, utxoProvider }, @@ -241,13 +241,13 @@ describe('createUtxoTracker', () => { const utxoTracker = createUtxoTracker( { addresses$: cold('a|', { a: [address!] }), + history$, logger, retryBackoffConfig, stores: { unspendableUtxo: store, utxo: store }, - tipBlockHeight$, transactionsInFlight$, utxoProvider }, From 2306f1057594c5b6d9639f70dfbc3443f88e434d Mon Sep 17 00:00:00 2001 From: Angel Castillo Date: Mon, 18 Nov 2024 14:21:49 +0800 Subject: [PATCH 2/2] fix: transaction tracker now compares transactions in linear time before emission - rollback logic optimized to only check transactions in reverse order Co-Authored-By: mirceahasegan --- .../mockProviders/mockChainHistoryProvider.ts | 14 +- .../src/services/TransactionsTracker.ts | 225 +++++++---- packages/wallet/src/services/util/equals.ts | 6 +- .../test/services/TransactionsTracker.test.ts | 355 +++++++++++++++++- 4 files changed, 532 insertions(+), 68 deletions(-) diff --git a/packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts b/packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts index 7b076043ff7..7a60bd28fb8 100644 --- a/packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts +++ b/packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts @@ -1,6 +1,6 @@ import * as AssetId from '../assetId'; import * as Crypto from '@cardano-sdk/crypto'; -import { Cardano, Paginated } from '@cardano-sdk/core'; +import { Cardano, Paginated, TransactionsByAddressesArgs } from '@cardano-sdk/core'; import { currentEpoch, handleAssetId, ledgerTip, stakeCredential } from './mockData'; import { somePartialStakePools } from '../createStubStakePoolProvider'; import delay from 'delay'; @@ -219,10 +219,20 @@ export const mockChainHistoryProvider2 = (delayMs: number) => { const delayedJestFn = (resolvedValue: T) => jest.fn().mockImplementationOnce(() => delay(delayMs).then(() => resolvedValue)); + const blockRangeTransactions = (blockRangeStart: number): Paginated => { + const pageResults = queryTransactionsResult2.pageResults.filter( + (res) => res.blockHeader.blockNo >= blockRangeStart + ); + + return { pageResults, totalResultCount: pageResults.length }; + }; + return { blocksByHashes: delayedJestFn(blocksByHashes), healthCheck: delayedJestFn({ ok: true }), - transactionsByAddresses: delayedJestFn(queryTransactionsResult2), + transactionsByAddresses: jest.fn(({ blockRange }: TransactionsByAddressesArgs) => + delay(delayMs).then(() => blockRangeTransactions(blockRange?.lowerBound || 0)) + ), transactionsByHashes: delayedJestFn(queryTransactionsResult2) }; }; diff --git a/packages/wallet/src/services/TransactionsTracker.ts b/packages/wallet/src/services/TransactionsTracker.ts index f2241b73191..435daae9a13 100644 --- a/packages/wallet/src/services/TransactionsTracker.ts +++ b/packages/wallet/src/services/TransactionsTracker.ts @@ -38,9 +38,7 @@ import { distinctBlock, signedTxsEquals, transactionsEquals, txInEquals } from ' import { WitnessedTx } from '@cardano-sdk/key-management'; import { newAndStoredMulticast } from './util/newAndStoredMulticast'; import chunk from 'lodash/chunk.js'; -import intersectionBy from 'lodash/intersectionBy.js'; import sortBy from 'lodash/sortBy.js'; -import unionBy from 'lodash/unionBy.js'; export interface TransactionsTrackerProps { chainHistoryProvider: ChainHistoryProvider; @@ -107,15 +105,159 @@ const allTransactionsByAddresses = async ( return response; }; -export const createAddressTransactionsProvider = ({ +const getLastTransactionsAtBlock = ( + transactions: Cardano.HydratedTx[], + blockNo: Cardano.BlockNo +): Cardano.HydratedTx[] => { + const txsFromSameBlock = []; + + for (let i = transactions.length - 1; i >= 0; --i) { + const tx = transactions[i]; + if (tx.blockHeader.blockNo === blockNo) { + txsFromSameBlock.push(tx); + } else { + break; + } + } + + return txsFromSameBlock; +}; + +export const revertLastBlock = ( + localTransactions: Cardano.HydratedTx[], + blockNo: Cardano.BlockNo, + rollback$: Subject, + newTransactions: Cardano.HydratedTx[], + logger: Logger +) => { + const result = [...localTransactions]; + + while (result.length > 0) { + const lastKnownTx = result[result.length - 1]; + + if (lastKnownTx.blockHeader.blockNo === blockNo) { + // only emit if the tx is also not present in the new transactions to be added + if (newTransactions.findIndex((tx) => tx.id === lastKnownTx.id) === -1) { + logger.debug(`Transaction ${lastKnownTx.id} was rolled back`); + rollback$.next(lastKnownTx); + } + + result.pop(); + } else { + break; + } + } + + return result; +}; + +const findIntersectionAndUpdateTxStore = ({ chainHistoryProvider, - addresses$, + logger, + store, retryBackoffConfig, + onFatalError, tipBlockHeight$, - store, - logger, - onFatalError -}: TransactionsTrackerInternalsProps): TransactionsTrackerInternals => { + rollback$, + localTransactions, + addresses +}: Pick< + TransactionsTrackerInternalsProps, + 'chainHistoryProvider' | 'logger' | 'store' | 'retryBackoffConfig' | 'onFatalError' | 'tipBlockHeight$' +> & { + localTransactions: Cardano.HydratedTx[]; + rollback$: Subject; + addresses: Cardano.PaymentAddress[]; +}) => + coldObservableProvider({ + // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first + // It should also help when using poor internet connection. + // Caveat is that local transactions might get out of date... + combinator: exhaustMap, + equals: transactionsEquals, + onFatalError, + // eslint-disable-next-line sonarjs/cognitive-complexity + provider: async () => { + let rollbackOcurred = false; + // eslint-disable-next-line no-constant-condition + while (true) { + const lastStoredTransaction: Cardano.HydratedTx | undefined = localTransactions[localTransactions.length - 1]; + + lastStoredTransaction && + logger.debug( + `Last stored tx: ${lastStoredTransaction?.id} block:${lastStoredTransaction?.blockHeader.blockNo}` + ); + + const lowerBound = lastStoredTransaction?.blockHeader.blockNo; + const newTransactions = await allTransactionsByAddresses(chainHistoryProvider, { + addresses, + blockRange: { lowerBound } + }); + + logger.debug( + `chainHistoryProvider returned ${newTransactions.length} transactions`, + lowerBound !== undefined && `since block ${lowerBound}` + ); + + // Fetching transactions from scratch, nothing else to do here. + if (lowerBound === undefined) { + if (newTransactions.length > 0) { + localTransactions = newTransactions; + store.setAll(newTransactions); + } + + return newTransactions; + } + + // If no transactions found from that block range, it means the last known block has been rolled back. + if (newTransactions.length === 0) { + localTransactions = revertLastBlock(localTransactions, lowerBound, rollback$, newTransactions, logger); + rollbackOcurred = true; + + continue; + } + + const localTxsFromSameBlock = getLastTransactionsAtBlock(localTransactions, lowerBound); + const firstSegmentOfNewTransactions = newTransactions.slice(0, localTxsFromSameBlock.length); + + // The first segment of new transaction should match exactly (same txs and same order) our last know TXs. Otherwise + // roll them back and re-apply in new order. + const sameTxAndOrder = localTxsFromSameBlock.every( + (tx, index) => tx.id === firstSegmentOfNewTransactions[index].id + ); + + if (!sameTxAndOrder) { + localTransactions = revertLastBlock(localTransactions, lowerBound, rollback$, newTransactions, logger); + rollbackOcurred = true; + + continue; + } + + // No rollbacks, if they overlap 100% do nothing, otherwise add the difference. + const areTransactionsSame = + newTransactions.length === localTxsFromSameBlock.length && + localTxsFromSameBlock.every((tx, index) => tx.id === newTransactions[index].id); + + if (!areTransactionsSame) { + // Skip overlapping transactions to avoid duplicates + localTransactions = [...localTransactions, ...newTransactions.slice(localTxsFromSameBlock.length)]; + store.setAll(localTransactions); + } else if (rollbackOcurred) { + // This case handles rollbacks without new additions + store.setAll(localTransactions); + } + + return localTransactions; + } + }, + retryBackoffConfig, + trigger$: tipBlockHeight$ + }); + +export const createAddressTransactionsProvider = ( + props: TransactionsTrackerInternalsProps +): TransactionsTrackerInternals => { + const { addresses$, store, logger } = props; const rollback$ = new Subject(); const storedTransactions$ = store.getAll().pipe(share()); return { @@ -127,61 +269,14 @@ export const createAddressTransactionsProvider = ({ ) ), combineLatest([addresses$, storedTransactions$.pipe(defaultIfEmpty([] as Cardano.HydratedTx[]))]).pipe( - switchMap(([addresses, storedTransactions]) => { - let localTransactions: Cardano.HydratedTx[] = [...storedTransactions]; - - return coldObservableProvider({ - // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first - // It should also help when using poor internet connection. - // Caveat is that local transactions might get out of date... - combinator: exhaustMap, - equals: transactionsEquals, - onFatalError, - provider: async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - const lastStoredTransaction: Cardano.HydratedTx | undefined = - localTransactions[localTransactions.length - 1]; - - lastStoredTransaction && - logger.debug( - `Last stored tx: ${lastStoredTransaction?.id} block:${lastStoredTransaction?.blockHeader.blockNo}` - ); - - const lowerBound = lastStoredTransaction?.blockHeader.blockNo; - const newTransactions = await allTransactionsByAddresses(chainHistoryProvider, { - addresses, - blockRange: { lowerBound } - }); - - logger.debug( - `chainHistoryProvider returned ${newTransactions.length} transactions`, - lowerBound !== undefined && `since block ${lowerBound}` - ); - const duplicateTransactions = - lastStoredTransaction && intersectionBy(localTransactions, newTransactions, (tx) => tx.id); - if (typeof duplicateTransactions !== 'undefined' && duplicateTransactions.length === 0) { - const rollbackTransactions = localTransactions.filter( - ({ blockHeader: { blockNo } }) => blockNo >= lowerBound - ); - - from(rollbackTransactions) - .pipe(tap((tx) => logger.debug(`Transaction ${tx.id} was rolled back`))) - .subscribe((v) => rollback$.next(v)); - - // Rollback by 1 block, try again in next loop iteration - localTransactions = localTransactions.filter(({ blockHeader: { blockNo } }) => blockNo < lowerBound); - } else { - localTransactions = unionBy(localTransactions, newTransactions, (tx) => tx.id); - store.setAll(localTransactions); - return localTransactions; - } - } - }, - retryBackoffConfig, - trigger$: tipBlockHeight$ - }); - }) + switchMap(([addresses, storedTransactions]) => + findIntersectionAndUpdateTxStore({ + addresses, + localTransactions: [...storedTransactions], + rollback$, + ...props + }) + ) ) ) }; @@ -247,7 +342,9 @@ export const createTransactionsTracker = ( const transactionsSource$ = new TrackerSubject(txSource$); - const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$); + const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$).pipe( + tap((transactions) => logger.debug(`History transactions count: ${transactions?.length || 0}`)) + ); const [onChainNewTxPhase2Failed$, onChainNewTxSuccess$] = partition( newTransactions$(historicalTransactions$).pipe(share()), diff --git a/packages/wallet/src/services/util/equals.ts b/packages/wallet/src/services/util/equals.ts index 2c9485d987e..71913aba294 100644 --- a/packages/wallet/src/services/util/equals.ts +++ b/packages/wallet/src/services/util/equals.ts @@ -19,7 +19,11 @@ export const sameSortedArrayItems = (arrayA: T[], arrayB: T[], itemEquals: (a return true; }; -export const transactionsEquals = (a: Cardano.HydratedTx[], b: Cardano.HydratedTx[]) => sameArrayItems(a, b, txEquals); +export const transactionsEquals = (a: Cardano.HydratedTx[], b: Cardano.HydratedTx[]) => { + if (a === b) return true; + + return sameSortedArrayItems(a, b, txEquals); +}; export const txInEquals = (a: Cardano.TxIn, b: Cardano.TxIn) => a.txId === b.txId && a.index === b.index; diff --git a/packages/wallet/test/services/TransactionsTracker.test.ts b/packages/wallet/test/services/TransactionsTracker.test.ts index e22ac2e1bcd..a0050c6cf1f 100644 --- a/packages/wallet/test/services/TransactionsTracker.test.ts +++ b/packages/wallet/test/services/TransactionsTracker.test.ts @@ -26,7 +26,19 @@ import { dummyCbor, toOutgoingTx, toSignedTx } from '../util'; import { dummyLogger } from 'ts-log'; import delay from 'delay'; -const { generateTxAlonzo, mockChainHistoryProvider, queryTransactionsResult } = mockProviders; +const { generateTxAlonzo, mockChainHistoryProvider, queryTransactionsResult, queryTransactionsResult2 } = mockProviders; + +const updateTransactionsBlockNo = (transactions: Cardano.HydratedTx[], blockNo = Cardano.BlockNo(10_050)) => + transactions.map((tx) => ({ + ...tx, + blockHeader: { ...tx.blockHeader, blockNo } + })); + +const updateTransactionIds = (transactions: Cardano.HydratedTx[], tailPattern = 'aaa') => + transactions.map((tx) => ({ + ...tx, + id: Cardano.TransactionId(`${tx.id.slice(0, -tailPattern.length)}${tailPattern}`) + })); describe('TransactionsTracker', () => { const logger = dummyLogger; @@ -61,6 +73,22 @@ describe('TransactionsTracker', () => { store.setAll = jest.fn().mockImplementation(store.setAll.bind(store)); }); + it('emits empty array if store is empty and ChainHistoryProvider does not return any transactions', async () => { + chainHistoryProvider.transactionsByAddresses = jest + .fn() + .mockImplementation(() => delay(50).then(() => ({ pageResults: [], totalResultCount: 0 }))); + const provider$ = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }).transactionsSource$; + expect(await firstValueFrom(provider$)).toEqual([]); + expect(store.setAll).toBeCalledTimes(0); + }); + it('if store is empty, stores and emits transactions resolved by ChainHistoryProvider', async () => { const provider$ = createAddressTransactionsProvider({ addresses$: of(addresses), @@ -134,6 +162,90 @@ describe('TransactionsTracker', () => { }); }); + it('emits shortened tx history when tx was rolled back, but no new tx was added', async () => { + const [txId1, txId2] = queryTransactionsResult.pageResults; + // Two stored transactions: [1, 2] + await firstValueFrom(store.setAll([txId1, txId2])); + + // ChainHistory is shorter by 1 tx: [1] + chainHistoryProvider.transactionsByAddresses = jest + .fn() + // the mismatch will pop the single transaction found in the stored transactions + .mockImplementationOnce(() => delay(50).then(() => ({ pageResults: [], totalResultCount: 0 }))) + // intersection is found, chain is shortened + .mockImplementationOnce(() => delay(50).then(() => ({ pageResults: [txId1], totalResultCount: 1 }))); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2], // from store + [txId1] // shortened chain + ]); + + expect(rollbacks).toEqual([txId2]); + + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId1]); + expect(chainHistoryProvider.transactionsByAddresses).toBeCalledTimes(2); + }); + + it('rolls back one transaction, then finds intersection', async () => { + const [txId1, txId2] = queryTransactionsResult.pageResults; + const [txId3] = queryTransactionsResult2.pageResults.slice(-1); + // Two stored transactions: [1, 2] + await firstValueFrom(store.setAll([txId1, txId2])); + + // ChainHistory has one common and one different: [1, 3] + chainHistoryProvider.transactionsByAddresses = jest + .fn() + // the mismatch will pop the single transaction found in the stored transactions + .mockImplementationOnce(() => delay(50).then(() => ({ pageResults: [txId3], totalResultCount: 1 }))) + // intersection is found, and stored history is populated with the new transaction + .mockImplementationOnce(() => delay(50).then(() => ({ pageResults: [txId1, txId3], totalResultCount: 2 }))); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2], // from store + [txId1, txId3] // store + chain history + ]); + + expect(rollbacks).toEqual([txId2]); + + expect(store.setAll).toBeCalledTimes(2); + expect(chainHistoryProvider.transactionsByAddresses).toBeCalledTimes(2); + expect(chainHistoryProvider.transactionsByAddresses).nthCalledWith(1, { + addresses, + blockRange: { lowerBound: txId2.blockHeader.blockNo }, + pagination: { limit: 25, startAt: 0 } + }); + expect(chainHistoryProvider.transactionsByAddresses).nthCalledWith(2, { + addresses, + blockRange: { lowerBound: txId1.blockHeader.blockNo }, + pagination: { limit: 25, startAt: 0 } + }); + }); + it('queries ChainHistoryProvider again with blockRange lower bound from a previous transaction on rollback', async () => { await firstValueFrom(store.setAll(queryTransactionsResult.pageResults)); chainHistoryProvider.transactionsByAddresses = jest @@ -180,6 +292,247 @@ describe('TransactionsTracker', () => { pagination: { limit: 25, startAt: 0 } }); }); + + describe('distinct transaction sets in latest stored block vs new blocks', () => { + // Notation: is a block with 3 transactions + // [a b c] is an array of 3 transactions + + // latestStoredBlock <1 2 3> + // newBlock <4 5 6> + // rollback$ [3 2 1] - transactions need to be retried + // store&emit [4 5 6] + it('rolls back all transactions on completely disjoin sets', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo(queryTransactionsResult2.pageResults); + const [txId4, txId5, txId6] = updateTransactionIds([txId1, txId2, txId3]); + + await firstValueFrom(store.setAll([txId1, txId2, txId3])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId4, txId5, txId6], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2, txId3], // from store + [txId4, txId5, txId6] // chain history + ]); + expect(rollbacks).toEqual([txId3, txId2, txId1]); + expect(store.setAll).toBeCalledTimes(2); + }); + + // latestStoredBlock <1 2> + // newBlock <1 2 3> + // rollback$ none + // store&emit [1,2,3] + it('stores new transactions when new block is superset', async () => { + const [txId1, txId2] = updateTransactionsBlockNo(queryTransactionsResult2.pageResults, Cardano.BlockNo(10_050)); + const [txId1OtherBlock, txId2OtherBlock, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_051) + ); + + await firstValueFrom(store.setAll([txId1, txId2])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId1OtherBlock, txId2OtherBlock, txId3], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2], // from store + [txId1OtherBlock, txId2OtherBlock, txId3] // chain history + ]); + expect(rollbacks.length).toBe(0); + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId1OtherBlock, txId2OtherBlock, txId3]); + }); + + // latestStoredBlock <1 2 3> + // newBlock <1 2> + // rollback$ 3 + // store&emit [1,2] + it('rollback some transactions when new block is subset', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_050) + ); + const [txId1OtherBlock, txId2OtherBlock] = updateTransactionsBlockNo([txId1, txId2], Cardano.BlockNo(10_051)); + + await firstValueFrom(store.setAll([txId1, txId2, txId3])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId1OtherBlock, txId2OtherBlock], + totalResultCount: 2 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2, txId3], // from store + [txId1OtherBlock, txId2OtherBlock] // chain history + ]); + expect(rollbacks).toEqual([txId3]); + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId1OtherBlock, txId2OtherBlock]); + }); + + // latestStoredBlock <1 2> + // newBlocks <3> <1> <2> + // rollback$ none - transactions are on chain + // store&emit [3 1 2] - re-emit all as they might have a different blockNo + // Noop - produces the same result in the tx history + it('detects when latest block transactions are found in among new blocks', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_000) + ); + + const [txId3OtherBlock] = updateTransactionsBlockNo([txId3], Cardano.BlockNo(10_100)); + const [txId1OtherBlock] = updateTransactionsBlockNo([txId1], Cardano.BlockNo(10_200)); + const [txId2OtherBlock] = updateTransactionsBlockNo([txId2], Cardano.BlockNo(10_300)); + + await firstValueFrom(store.setAll([txId1, txId2, txId3])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId3OtherBlock, txId1OtherBlock, txId2OtherBlock], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2, txId3], // from store + [txId3OtherBlock, txId1OtherBlock, txId2OtherBlock] // chain history + ]); + expect(rollbacks.length).toBe(0); + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId3OtherBlock, txId1OtherBlock, txId2OtherBlock]); + }); + + // latestStoredBlock <1 2> + // newBlock <3 2 1> + // rollback$ none - transactions are on chain + // store&emit [3 2 1] + it('reversed order transactions plus new tx are re-emitted, but not considered rollbacks', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_000) + ); + + const [txId1OtherBlock, txId2OtherBlock, txId3OtherBlock] = updateTransactionsBlockNo( + [txId1, txId2, txId3], + Cardano.BlockNo(10_100) + ); + + await firstValueFrom(store.setAll([txId1, txId2])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId3OtherBlock, txId2OtherBlock, txId1OtherBlock], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2], // from store + [txId3OtherBlock, txId2OtherBlock, txId1OtherBlock] // chain history + ]); + expect(rollbacks.length).toBe(0); + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId3OtherBlock, txId2OtherBlock, txId1OtherBlock]); + }); + + // latestStoredBlock <1 2 3> + // newBlock <1 2 3> + // rollback$ none + // store&emit none + it('does not emit when newBlock transactions are identical to stored transactions', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_000) + ); + + await firstValueFrom(store.setAll([txId1, txId2, txId3])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId1, txId2, txId3], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(1)))).toEqual([ + [txId1, txId2, txId3] // from store + ]); + expect(rollbacks.length).toBe(0); + expect(store.setAll).toBeCalledTimes(1); + expect(store.setAll).nthCalledWith(1, [txId1, txId2, txId3]); + }); + }); }); describe('createTransactionsTracker', () => {