From 2306f1057594c5b6d9639f70dfbc3443f88e434d Mon Sep 17 00:00:00 2001 From: Angel Castillo Date: Mon, 18 Nov 2024 14:21:49 +0800 Subject: [PATCH] fix: transaction tracker now compares transactions in linear time before emission - rollback logic optimized to only check transactions in reverse order Co-Authored-By: mirceahasegan --- .../mockProviders/mockChainHistoryProvider.ts | 14 +- .../src/services/TransactionsTracker.ts | 225 +++++++---- packages/wallet/src/services/util/equals.ts | 6 +- .../test/services/TransactionsTracker.test.ts | 355 +++++++++++++++++- 4 files changed, 532 insertions(+), 68 deletions(-) diff --git a/packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts b/packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts index 7b076043ff7..7a60bd28fb8 100644 --- a/packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts +++ b/packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts @@ -1,6 +1,6 @@ import * as AssetId from '../assetId'; import * as Crypto from '@cardano-sdk/crypto'; -import { Cardano, Paginated } from '@cardano-sdk/core'; +import { Cardano, Paginated, TransactionsByAddressesArgs } from '@cardano-sdk/core'; import { currentEpoch, handleAssetId, ledgerTip, stakeCredential } from './mockData'; import { somePartialStakePools } from '../createStubStakePoolProvider'; import delay from 'delay'; @@ -219,10 +219,20 @@ export const mockChainHistoryProvider2 = (delayMs: number) => { const delayedJestFn = (resolvedValue: T) => jest.fn().mockImplementationOnce(() => delay(delayMs).then(() => resolvedValue)); + const blockRangeTransactions = (blockRangeStart: number): Paginated => { + const pageResults = queryTransactionsResult2.pageResults.filter( + (res) => res.blockHeader.blockNo >= blockRangeStart + ); + + return { pageResults, totalResultCount: pageResults.length }; + }; + return { blocksByHashes: delayedJestFn(blocksByHashes), healthCheck: delayedJestFn({ ok: true }), - transactionsByAddresses: delayedJestFn(queryTransactionsResult2), + transactionsByAddresses: jest.fn(({ blockRange }: TransactionsByAddressesArgs) => + delay(delayMs).then(() => blockRangeTransactions(blockRange?.lowerBound || 0)) + ), transactionsByHashes: delayedJestFn(queryTransactionsResult2) }; }; diff --git a/packages/wallet/src/services/TransactionsTracker.ts b/packages/wallet/src/services/TransactionsTracker.ts index f2241b73191..435daae9a13 100644 --- a/packages/wallet/src/services/TransactionsTracker.ts +++ b/packages/wallet/src/services/TransactionsTracker.ts @@ -38,9 +38,7 @@ import { distinctBlock, signedTxsEquals, transactionsEquals, txInEquals } from ' import { WitnessedTx } from '@cardano-sdk/key-management'; import { newAndStoredMulticast } from './util/newAndStoredMulticast'; import chunk from 'lodash/chunk.js'; -import intersectionBy from 'lodash/intersectionBy.js'; import sortBy from 'lodash/sortBy.js'; -import unionBy from 'lodash/unionBy.js'; export interface TransactionsTrackerProps { chainHistoryProvider: ChainHistoryProvider; @@ -107,15 +105,159 @@ const allTransactionsByAddresses = async ( return response; }; -export const createAddressTransactionsProvider = ({ +const getLastTransactionsAtBlock = ( + transactions: Cardano.HydratedTx[], + blockNo: Cardano.BlockNo +): Cardano.HydratedTx[] => { + const txsFromSameBlock = []; + + for (let i = transactions.length - 1; i >= 0; --i) { + const tx = transactions[i]; + if (tx.blockHeader.blockNo === blockNo) { + txsFromSameBlock.push(tx); + } else { + break; + } + } + + return txsFromSameBlock; +}; + +export const revertLastBlock = ( + localTransactions: Cardano.HydratedTx[], + blockNo: Cardano.BlockNo, + rollback$: Subject, + newTransactions: Cardano.HydratedTx[], + logger: Logger +) => { + const result = [...localTransactions]; + + while (result.length > 0) { + const lastKnownTx = result[result.length - 1]; + + if (lastKnownTx.blockHeader.blockNo === blockNo) { + // only emit if the tx is also not present in the new transactions to be added + if (newTransactions.findIndex((tx) => tx.id === lastKnownTx.id) === -1) { + logger.debug(`Transaction ${lastKnownTx.id} was rolled back`); + rollback$.next(lastKnownTx); + } + + result.pop(); + } else { + break; + } + } + + return result; +}; + +const findIntersectionAndUpdateTxStore = ({ chainHistoryProvider, - addresses$, + logger, + store, retryBackoffConfig, + onFatalError, tipBlockHeight$, - store, - logger, - onFatalError -}: TransactionsTrackerInternalsProps): TransactionsTrackerInternals => { + rollback$, + localTransactions, + addresses +}: Pick< + TransactionsTrackerInternalsProps, + 'chainHistoryProvider' | 'logger' | 'store' | 'retryBackoffConfig' | 'onFatalError' | 'tipBlockHeight$' +> & { + localTransactions: Cardano.HydratedTx[]; + rollback$: Subject; + addresses: Cardano.PaymentAddress[]; +}) => + coldObservableProvider({ + // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first + // It should also help when using poor internet connection. + // Caveat is that local transactions might get out of date... + combinator: exhaustMap, + equals: transactionsEquals, + onFatalError, + // eslint-disable-next-line sonarjs/cognitive-complexity + provider: async () => { + let rollbackOcurred = false; + // eslint-disable-next-line no-constant-condition + while (true) { + const lastStoredTransaction: Cardano.HydratedTx | undefined = localTransactions[localTransactions.length - 1]; + + lastStoredTransaction && + logger.debug( + `Last stored tx: ${lastStoredTransaction?.id} block:${lastStoredTransaction?.blockHeader.blockNo}` + ); + + const lowerBound = lastStoredTransaction?.blockHeader.blockNo; + const newTransactions = await allTransactionsByAddresses(chainHistoryProvider, { + addresses, + blockRange: { lowerBound } + }); + + logger.debug( + `chainHistoryProvider returned ${newTransactions.length} transactions`, + lowerBound !== undefined && `since block ${lowerBound}` + ); + + // Fetching transactions from scratch, nothing else to do here. + if (lowerBound === undefined) { + if (newTransactions.length > 0) { + localTransactions = newTransactions; + store.setAll(newTransactions); + } + + return newTransactions; + } + + // If no transactions found from that block range, it means the last known block has been rolled back. + if (newTransactions.length === 0) { + localTransactions = revertLastBlock(localTransactions, lowerBound, rollback$, newTransactions, logger); + rollbackOcurred = true; + + continue; + } + + const localTxsFromSameBlock = getLastTransactionsAtBlock(localTransactions, lowerBound); + const firstSegmentOfNewTransactions = newTransactions.slice(0, localTxsFromSameBlock.length); + + // The first segment of new transaction should match exactly (same txs and same order) our last know TXs. Otherwise + // roll them back and re-apply in new order. + const sameTxAndOrder = localTxsFromSameBlock.every( + (tx, index) => tx.id === firstSegmentOfNewTransactions[index].id + ); + + if (!sameTxAndOrder) { + localTransactions = revertLastBlock(localTransactions, lowerBound, rollback$, newTransactions, logger); + rollbackOcurred = true; + + continue; + } + + // No rollbacks, if they overlap 100% do nothing, otherwise add the difference. + const areTransactionsSame = + newTransactions.length === localTxsFromSameBlock.length && + localTxsFromSameBlock.every((tx, index) => tx.id === newTransactions[index].id); + + if (!areTransactionsSame) { + // Skip overlapping transactions to avoid duplicates + localTransactions = [...localTransactions, ...newTransactions.slice(localTxsFromSameBlock.length)]; + store.setAll(localTransactions); + } else if (rollbackOcurred) { + // This case handles rollbacks without new additions + store.setAll(localTransactions); + } + + return localTransactions; + } + }, + retryBackoffConfig, + trigger$: tipBlockHeight$ + }); + +export const createAddressTransactionsProvider = ( + props: TransactionsTrackerInternalsProps +): TransactionsTrackerInternals => { + const { addresses$, store, logger } = props; const rollback$ = new Subject(); const storedTransactions$ = store.getAll().pipe(share()); return { @@ -127,61 +269,14 @@ export const createAddressTransactionsProvider = ({ ) ), combineLatest([addresses$, storedTransactions$.pipe(defaultIfEmpty([] as Cardano.HydratedTx[]))]).pipe( - switchMap(([addresses, storedTransactions]) => { - let localTransactions: Cardano.HydratedTx[] = [...storedTransactions]; - - return coldObservableProvider({ - // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first - // It should also help when using poor internet connection. - // Caveat is that local transactions might get out of date... - combinator: exhaustMap, - equals: transactionsEquals, - onFatalError, - provider: async () => { - // eslint-disable-next-line no-constant-condition - while (true) { - const lastStoredTransaction: Cardano.HydratedTx | undefined = - localTransactions[localTransactions.length - 1]; - - lastStoredTransaction && - logger.debug( - `Last stored tx: ${lastStoredTransaction?.id} block:${lastStoredTransaction?.blockHeader.blockNo}` - ); - - const lowerBound = lastStoredTransaction?.blockHeader.blockNo; - const newTransactions = await allTransactionsByAddresses(chainHistoryProvider, { - addresses, - blockRange: { lowerBound } - }); - - logger.debug( - `chainHistoryProvider returned ${newTransactions.length} transactions`, - lowerBound !== undefined && `since block ${lowerBound}` - ); - const duplicateTransactions = - lastStoredTransaction && intersectionBy(localTransactions, newTransactions, (tx) => tx.id); - if (typeof duplicateTransactions !== 'undefined' && duplicateTransactions.length === 0) { - const rollbackTransactions = localTransactions.filter( - ({ blockHeader: { blockNo } }) => blockNo >= lowerBound - ); - - from(rollbackTransactions) - .pipe(tap((tx) => logger.debug(`Transaction ${tx.id} was rolled back`))) - .subscribe((v) => rollback$.next(v)); - - // Rollback by 1 block, try again in next loop iteration - localTransactions = localTransactions.filter(({ blockHeader: { blockNo } }) => blockNo < lowerBound); - } else { - localTransactions = unionBy(localTransactions, newTransactions, (tx) => tx.id); - store.setAll(localTransactions); - return localTransactions; - } - } - }, - retryBackoffConfig, - trigger$: tipBlockHeight$ - }); - }) + switchMap(([addresses, storedTransactions]) => + findIntersectionAndUpdateTxStore({ + addresses, + localTransactions: [...storedTransactions], + rollback$, + ...props + }) + ) ) ) }; @@ -247,7 +342,9 @@ export const createTransactionsTracker = ( const transactionsSource$ = new TrackerSubject(txSource$); - const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$); + const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$).pipe( + tap((transactions) => logger.debug(`History transactions count: ${transactions?.length || 0}`)) + ); const [onChainNewTxPhase2Failed$, onChainNewTxSuccess$] = partition( newTransactions$(historicalTransactions$).pipe(share()), diff --git a/packages/wallet/src/services/util/equals.ts b/packages/wallet/src/services/util/equals.ts index 2c9485d987e..71913aba294 100644 --- a/packages/wallet/src/services/util/equals.ts +++ b/packages/wallet/src/services/util/equals.ts @@ -19,7 +19,11 @@ export const sameSortedArrayItems = (arrayA: T[], arrayB: T[], itemEquals: (a return true; }; -export const transactionsEquals = (a: Cardano.HydratedTx[], b: Cardano.HydratedTx[]) => sameArrayItems(a, b, txEquals); +export const transactionsEquals = (a: Cardano.HydratedTx[], b: Cardano.HydratedTx[]) => { + if (a === b) return true; + + return sameSortedArrayItems(a, b, txEquals); +}; export const txInEquals = (a: Cardano.TxIn, b: Cardano.TxIn) => a.txId === b.txId && a.index === b.index; diff --git a/packages/wallet/test/services/TransactionsTracker.test.ts b/packages/wallet/test/services/TransactionsTracker.test.ts index e22ac2e1bcd..a0050c6cf1f 100644 --- a/packages/wallet/test/services/TransactionsTracker.test.ts +++ b/packages/wallet/test/services/TransactionsTracker.test.ts @@ -26,7 +26,19 @@ import { dummyCbor, toOutgoingTx, toSignedTx } from '../util'; import { dummyLogger } from 'ts-log'; import delay from 'delay'; -const { generateTxAlonzo, mockChainHistoryProvider, queryTransactionsResult } = mockProviders; +const { generateTxAlonzo, mockChainHistoryProvider, queryTransactionsResult, queryTransactionsResult2 } = mockProviders; + +const updateTransactionsBlockNo = (transactions: Cardano.HydratedTx[], blockNo = Cardano.BlockNo(10_050)) => + transactions.map((tx) => ({ + ...tx, + blockHeader: { ...tx.blockHeader, blockNo } + })); + +const updateTransactionIds = (transactions: Cardano.HydratedTx[], tailPattern = 'aaa') => + transactions.map((tx) => ({ + ...tx, + id: Cardano.TransactionId(`${tx.id.slice(0, -tailPattern.length)}${tailPattern}`) + })); describe('TransactionsTracker', () => { const logger = dummyLogger; @@ -61,6 +73,22 @@ describe('TransactionsTracker', () => { store.setAll = jest.fn().mockImplementation(store.setAll.bind(store)); }); + it('emits empty array if store is empty and ChainHistoryProvider does not return any transactions', async () => { + chainHistoryProvider.transactionsByAddresses = jest + .fn() + .mockImplementation(() => delay(50).then(() => ({ pageResults: [], totalResultCount: 0 }))); + const provider$ = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }).transactionsSource$; + expect(await firstValueFrom(provider$)).toEqual([]); + expect(store.setAll).toBeCalledTimes(0); + }); + it('if store is empty, stores and emits transactions resolved by ChainHistoryProvider', async () => { const provider$ = createAddressTransactionsProvider({ addresses$: of(addresses), @@ -134,6 +162,90 @@ describe('TransactionsTracker', () => { }); }); + it('emits shortened tx history when tx was rolled back, but no new tx was added', async () => { + const [txId1, txId2] = queryTransactionsResult.pageResults; + // Two stored transactions: [1, 2] + await firstValueFrom(store.setAll([txId1, txId2])); + + // ChainHistory is shorter by 1 tx: [1] + chainHistoryProvider.transactionsByAddresses = jest + .fn() + // the mismatch will pop the single transaction found in the stored transactions + .mockImplementationOnce(() => delay(50).then(() => ({ pageResults: [], totalResultCount: 0 }))) + // intersection is found, chain is shortened + .mockImplementationOnce(() => delay(50).then(() => ({ pageResults: [txId1], totalResultCount: 1 }))); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2], // from store + [txId1] // shortened chain + ]); + + expect(rollbacks).toEqual([txId2]); + + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId1]); + expect(chainHistoryProvider.transactionsByAddresses).toBeCalledTimes(2); + }); + + it('rolls back one transaction, then finds intersection', async () => { + const [txId1, txId2] = queryTransactionsResult.pageResults; + const [txId3] = queryTransactionsResult2.pageResults.slice(-1); + // Two stored transactions: [1, 2] + await firstValueFrom(store.setAll([txId1, txId2])); + + // ChainHistory has one common and one different: [1, 3] + chainHistoryProvider.transactionsByAddresses = jest + .fn() + // the mismatch will pop the single transaction found in the stored transactions + .mockImplementationOnce(() => delay(50).then(() => ({ pageResults: [txId3], totalResultCount: 1 }))) + // intersection is found, and stored history is populated with the new transaction + .mockImplementationOnce(() => delay(50).then(() => ({ pageResults: [txId1, txId3], totalResultCount: 2 }))); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2], // from store + [txId1, txId3] // store + chain history + ]); + + expect(rollbacks).toEqual([txId2]); + + expect(store.setAll).toBeCalledTimes(2); + expect(chainHistoryProvider.transactionsByAddresses).toBeCalledTimes(2); + expect(chainHistoryProvider.transactionsByAddresses).nthCalledWith(1, { + addresses, + blockRange: { lowerBound: txId2.blockHeader.blockNo }, + pagination: { limit: 25, startAt: 0 } + }); + expect(chainHistoryProvider.transactionsByAddresses).nthCalledWith(2, { + addresses, + blockRange: { lowerBound: txId1.blockHeader.blockNo }, + pagination: { limit: 25, startAt: 0 } + }); + }); + it('queries ChainHistoryProvider again with blockRange lower bound from a previous transaction on rollback', async () => { await firstValueFrom(store.setAll(queryTransactionsResult.pageResults)); chainHistoryProvider.transactionsByAddresses = jest @@ -180,6 +292,247 @@ describe('TransactionsTracker', () => { pagination: { limit: 25, startAt: 0 } }); }); + + describe('distinct transaction sets in latest stored block vs new blocks', () => { + // Notation: is a block with 3 transactions + // [a b c] is an array of 3 transactions + + // latestStoredBlock <1 2 3> + // newBlock <4 5 6> + // rollback$ [3 2 1] - transactions need to be retried + // store&emit [4 5 6] + it('rolls back all transactions on completely disjoin sets', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo(queryTransactionsResult2.pageResults); + const [txId4, txId5, txId6] = updateTransactionIds([txId1, txId2, txId3]); + + await firstValueFrom(store.setAll([txId1, txId2, txId3])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId4, txId5, txId6], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2, txId3], // from store + [txId4, txId5, txId6] // chain history + ]); + expect(rollbacks).toEqual([txId3, txId2, txId1]); + expect(store.setAll).toBeCalledTimes(2); + }); + + // latestStoredBlock <1 2> + // newBlock <1 2 3> + // rollback$ none + // store&emit [1,2,3] + it('stores new transactions when new block is superset', async () => { + const [txId1, txId2] = updateTransactionsBlockNo(queryTransactionsResult2.pageResults, Cardano.BlockNo(10_050)); + const [txId1OtherBlock, txId2OtherBlock, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_051) + ); + + await firstValueFrom(store.setAll([txId1, txId2])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId1OtherBlock, txId2OtherBlock, txId3], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2], // from store + [txId1OtherBlock, txId2OtherBlock, txId3] // chain history + ]); + expect(rollbacks.length).toBe(0); + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId1OtherBlock, txId2OtherBlock, txId3]); + }); + + // latestStoredBlock <1 2 3> + // newBlock <1 2> + // rollback$ 3 + // store&emit [1,2] + it('rollback some transactions when new block is subset', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_050) + ); + const [txId1OtherBlock, txId2OtherBlock] = updateTransactionsBlockNo([txId1, txId2], Cardano.BlockNo(10_051)); + + await firstValueFrom(store.setAll([txId1, txId2, txId3])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId1OtherBlock, txId2OtherBlock], + totalResultCount: 2 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2, txId3], // from store + [txId1OtherBlock, txId2OtherBlock] // chain history + ]); + expect(rollbacks).toEqual([txId3]); + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId1OtherBlock, txId2OtherBlock]); + }); + + // latestStoredBlock <1 2> + // newBlocks <3> <1> <2> + // rollback$ none - transactions are on chain + // store&emit [3 1 2] - re-emit all as they might have a different blockNo + // Noop - produces the same result in the tx history + it('detects when latest block transactions are found in among new blocks', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_000) + ); + + const [txId3OtherBlock] = updateTransactionsBlockNo([txId3], Cardano.BlockNo(10_100)); + const [txId1OtherBlock] = updateTransactionsBlockNo([txId1], Cardano.BlockNo(10_200)); + const [txId2OtherBlock] = updateTransactionsBlockNo([txId2], Cardano.BlockNo(10_300)); + + await firstValueFrom(store.setAll([txId1, txId2, txId3])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId3OtherBlock, txId1OtherBlock, txId2OtherBlock], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2, txId3], // from store + [txId3OtherBlock, txId1OtherBlock, txId2OtherBlock] // chain history + ]); + expect(rollbacks.length).toBe(0); + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId3OtherBlock, txId1OtherBlock, txId2OtherBlock]); + }); + + // latestStoredBlock <1 2> + // newBlock <3 2 1> + // rollback$ none - transactions are on chain + // store&emit [3 2 1] + it('reversed order transactions plus new tx are re-emitted, but not considered rollbacks', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_000) + ); + + const [txId1OtherBlock, txId2OtherBlock, txId3OtherBlock] = updateTransactionsBlockNo( + [txId1, txId2, txId3], + Cardano.BlockNo(10_100) + ); + + await firstValueFrom(store.setAll([txId1, txId2])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId3OtherBlock, txId2OtherBlock, txId1OtherBlock], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(2)))).toEqual([ + [txId1, txId2], // from store + [txId3OtherBlock, txId2OtherBlock, txId1OtherBlock] // chain history + ]); + expect(rollbacks.length).toBe(0); + expect(store.setAll).toBeCalledTimes(2); + expect(store.setAll).nthCalledWith(2, [txId3OtherBlock, txId2OtherBlock, txId1OtherBlock]); + }); + + // latestStoredBlock <1 2 3> + // newBlock <1 2 3> + // rollback$ none + // store&emit none + it('does not emit when newBlock transactions are identical to stored transactions', async () => { + const [txId1, txId2, txId3] = updateTransactionsBlockNo( + queryTransactionsResult2.pageResults, + Cardano.BlockNo(10_000) + ); + + await firstValueFrom(store.setAll([txId1, txId2, txId3])); + + chainHistoryProvider.transactionsByAddresses = jest.fn(() => ({ + pageResults: [txId1, txId2, txId3], + totalResultCount: 3 + })); + + const { transactionsSource$: provider$, rollback$ } = createAddressTransactionsProvider({ + addresses$: of(addresses), + chainHistoryProvider, + logger, + retryBackoffConfig, + store, + tipBlockHeight$ + }); + + const rollbacks: Cardano.HydratedTx[] = []; + rollback$.subscribe((tx) => rollbacks.push(tx)); + + expect(await firstValueFrom(provider$.pipe(bufferCount(1)))).toEqual([ + [txId1, txId2, txId3] // from store + ]); + expect(rollbacks.length).toBe(0); + expect(store.setAll).toBeCalledTimes(1); + expect(store.setAll).nthCalledWith(1, [txId1, txId2, txId3]); + }); + }); }); describe('createTransactionsTracker', () => {