Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/lw 11842 performance improvements #1530

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions packages/util-dev/src/mockProviders/mockChainHistoryProvider.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as AssetId from '../assetId';
import * as Crypto from '@cardano-sdk/crypto';
import { Cardano, Paginated } from '@cardano-sdk/core';
import { Cardano, Paginated, TransactionsByAddressesArgs } from '@cardano-sdk/core';
import { currentEpoch, handleAssetId, ledgerTip, stakeCredential } from './mockData';
import { somePartialStakePools } from '../createStubStakePoolProvider';
import delay from 'delay';
Expand Down Expand Up @@ -219,10 +219,20 @@ export const mockChainHistoryProvider2 = (delayMs: number) => {
const delayedJestFn = <T>(resolvedValue: T) =>
jest.fn().mockImplementationOnce(() => delay(delayMs).then(() => resolvedValue));

const blockRangeTransactions = (blockRangeStart: number): Paginated<Cardano.HydratedTx> => {
const pageResults = queryTransactionsResult2.pageResults.filter(
(res) => res.blockHeader.blockNo >= blockRangeStart
);

return { pageResults, totalResultCount: pageResults.length };
};

return {
blocksByHashes: delayedJestFn(blocksByHashes),
healthCheck: delayedJestFn({ ok: true }),
transactionsByAddresses: delayedJestFn(queryTransactionsResult2),
transactionsByAddresses: jest.fn(({ blockRange }: TransactionsByAddressesArgs) =>
delay(delayMs).then(() => blockRangeTransactions(blockRange?.lowerBound || 0))
),
transactionsByHashes: delayedJestFn(queryTransactionsResult2)
};
};
Expand Down
4 changes: 1 addition & 3 deletions packages/wallet/src/Wallets/BaseWallet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import {
createUtxoTracker,
createWalletUtil,
currentEpochTracker,
distinctBlock,
distinctEraSummaries
} from '../services';
import { AddressType, Bip32Account, GroupedAddress, WitnessedTx, Witnesser, util } from '@cardano-sdk/key-management';
Expand Down Expand Up @@ -397,7 +396,6 @@ export class BaseWallet implements ObservableWallet {
store: stores.tip,
syncStatus: this.syncStatus
});
const tipBlockHeight$ = distinctBlock(this.tip$);

this.txSubmitProvider = new SmartTxSubmitProvider(
{ retryBackoffConfig },
Expand Down Expand Up @@ -499,11 +497,11 @@ export class BaseWallet implements ObservableWallet {

this.utxo = createUtxoTracker({
addresses$,
history$: this.transactions.history$,
logger: contextLogger(this.#logger, 'utxo'),
onFatalError,
retryBackoffConfig,
stores,
tipBlockHeight$,
transactionsInFlight$: this.transactions.outgoing.inFlight$,
utxoProvider: this.utxoProvider
});
Expand Down
225 changes: 161 additions & 64 deletions packages/wallet/src/services/TransactionsTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ import { distinctBlock, signedTxsEquals, transactionsEquals, txInEquals } from '
import { WitnessedTx } from '@cardano-sdk/key-management';
import { newAndStoredMulticast } from './util/newAndStoredMulticast';
import chunk from 'lodash/chunk.js';
import intersectionBy from 'lodash/intersectionBy.js';
import sortBy from 'lodash/sortBy.js';
import unionBy from 'lodash/unionBy.js';

export interface TransactionsTrackerProps {
chainHistoryProvider: ChainHistoryProvider;
Expand Down Expand Up @@ -107,15 +105,159 @@ const allTransactionsByAddresses = async (
return response;
};

export const createAddressTransactionsProvider = ({
const getLastTransactionsAtBlock = (
transactions: Cardano.HydratedTx[],
blockNo: Cardano.BlockNo
): Cardano.HydratedTx[] => {
const txsFromSameBlock = [];

for (let i = transactions.length - 1; i >= 0; --i) {
const tx = transactions[i];
if (tx.blockHeader.blockNo === blockNo) {
txsFromSameBlock.push(tx);
} else {
break;
}
}

return txsFromSameBlock;
};

export const revertLastBlock = (
localTransactions: Cardano.HydratedTx[],
blockNo: Cardano.BlockNo,
rollback$: Subject<Cardano.HydratedTx>,
newTransactions: Cardano.HydratedTx[],
logger: Logger
) => {
const result = [...localTransactions];

while (result.length > 0) {
const lastKnownTx = result[result.length - 1];

if (lastKnownTx.blockHeader.blockNo === blockNo) {
// only emit if the tx is also not present in the new transactions to be added
if (newTransactions.findIndex((tx) => tx.id === lastKnownTx.id) === -1) {
logger.debug(`Transaction ${lastKnownTx.id} was rolled back`);
rollback$.next(lastKnownTx);
}

result.pop();
} else {
break;
}
}

return result;
};

const findIntersectionAndUpdateTxStore = ({
chainHistoryProvider,
addresses$,
logger,
store,
retryBackoffConfig,
onFatalError,
tipBlockHeight$,
store,
logger,
onFatalError
}: TransactionsTrackerInternalsProps): TransactionsTrackerInternals => {
rollback$,
localTransactions,
addresses
}: Pick<
TransactionsTrackerInternalsProps,
'chainHistoryProvider' | 'logger' | 'store' | 'retryBackoffConfig' | 'onFatalError' | 'tipBlockHeight$'
> & {
localTransactions: Cardano.HydratedTx[];
rollback$: Subject<Cardano.HydratedTx>;
addresses: Cardano.PaymentAddress[];
}) =>
coldObservableProvider({
// Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first
// It should also help when using poor internet connection.
// Caveat is that local transactions might get out of date...
combinator: exhaustMap,
equals: transactionsEquals,
onFatalError,
// eslint-disable-next-line sonarjs/cognitive-complexity
provider: async () => {
let rollbackOcurred = false;
// eslint-disable-next-line no-constant-condition
while (true) {
const lastStoredTransaction: Cardano.HydratedTx | undefined = localTransactions[localTransactions.length - 1];

lastStoredTransaction &&
logger.debug(
`Last stored tx: ${lastStoredTransaction?.id} block:${lastStoredTransaction?.blockHeader.blockNo}`
);

const lowerBound = lastStoredTransaction?.blockHeader.blockNo;
const newTransactions = await allTransactionsByAddresses(chainHistoryProvider, {
addresses,
blockRange: { lowerBound }
});

logger.debug(
`chainHistoryProvider returned ${newTransactions.length} transactions`,
lowerBound !== undefined && `since block ${lowerBound}`
);

// Fetching transactions from scratch, nothing else to do here.
if (lowerBound === undefined) {
if (newTransactions.length > 0) {
localTransactions = newTransactions;
store.setAll(newTransactions);
}

return newTransactions;
}

// If no transactions found from that block range, it means the last known block has been rolled back.
if (newTransactions.length === 0) {
localTransactions = revertLastBlock(localTransactions, lowerBound, rollback$, newTransactions, logger);
rollbackOcurred = true;

continue;
}

const localTxsFromSameBlock = getLastTransactionsAtBlock(localTransactions, lowerBound);
const firstSegmentOfNewTransactions = newTransactions.slice(0, localTxsFromSameBlock.length);

// The first segment of new transaction should match exactly (same txs and same order) our last know TXs. Otherwise
// roll them back and re-apply in new order.
const sameTxAndOrder = localTxsFromSameBlock.every(
(tx, index) => tx.id === firstSegmentOfNewTransactions[index].id
);

if (!sameTxAndOrder) {
localTransactions = revertLastBlock(localTransactions, lowerBound, rollback$, newTransactions, logger);
rollbackOcurred = true;

continue;
}

// No rollbacks, if they overlap 100% do nothing, otherwise add the difference.
const areTransactionsSame =
newTransactions.length === localTxsFromSameBlock.length &&
localTxsFromSameBlock.every((tx, index) => tx.id === newTransactions[index].id);

if (!areTransactionsSame) {
// Skip overlapping transactions to avoid duplicates
localTransactions = [...localTransactions, ...newTransactions.slice(localTxsFromSameBlock.length)];
store.setAll(localTransactions);
} else if (rollbackOcurred) {
// This case handles rollbacks without new additions
store.setAll(localTransactions);
}

return localTransactions;
}
},
retryBackoffConfig,
trigger$: tipBlockHeight$
});

export const createAddressTransactionsProvider = (
props: TransactionsTrackerInternalsProps
): TransactionsTrackerInternals => {
const { addresses$, store, logger } = props;
const rollback$ = new Subject<Cardano.HydratedTx>();
const storedTransactions$ = store.getAll().pipe(share());
return {
Expand All @@ -127,61 +269,14 @@ export const createAddressTransactionsProvider = ({
)
),
combineLatest([addresses$, storedTransactions$.pipe(defaultIfEmpty([] as Cardano.HydratedTx[]))]).pipe(
switchMap(([addresses, storedTransactions]) => {
let localTransactions: Cardano.HydratedTx[] = [...storedTransactions];

return coldObservableProvider({
// Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first
// It should also help when using poor internet connection.
// Caveat is that local transactions might get out of date...
combinator: exhaustMap,
equals: transactionsEquals,
onFatalError,
provider: async () => {
// eslint-disable-next-line no-constant-condition
while (true) {
const lastStoredTransaction: Cardano.HydratedTx | undefined =
localTransactions[localTransactions.length - 1];

lastStoredTransaction &&
logger.debug(
`Last stored tx: ${lastStoredTransaction?.id} block:${lastStoredTransaction?.blockHeader.blockNo}`
);

const lowerBound = lastStoredTransaction?.blockHeader.blockNo;
const newTransactions = await allTransactionsByAddresses(chainHistoryProvider, {
addresses,
blockRange: { lowerBound }
});

logger.debug(
`chainHistoryProvider returned ${newTransactions.length} transactions`,
lowerBound !== undefined && `since block ${lowerBound}`
);
const duplicateTransactions =
lastStoredTransaction && intersectionBy(localTransactions, newTransactions, (tx) => tx.id);
if (typeof duplicateTransactions !== 'undefined' && duplicateTransactions.length === 0) {
const rollbackTransactions = localTransactions.filter(
({ blockHeader: { blockNo } }) => blockNo >= lowerBound
);

from(rollbackTransactions)
.pipe(tap((tx) => logger.debug(`Transaction ${tx.id} was rolled back`)))
.subscribe((v) => rollback$.next(v));

// Rollback by 1 block, try again in next loop iteration
localTransactions = localTransactions.filter(({ blockHeader: { blockNo } }) => blockNo < lowerBound);
} else {
localTransactions = unionBy(localTransactions, newTransactions, (tx) => tx.id);
store.setAll(localTransactions);
return localTransactions;
}
}
},
retryBackoffConfig,
trigger$: tipBlockHeight$
});
})
switchMap(([addresses, storedTransactions]) =>
findIntersectionAndUpdateTxStore({
addresses,
localTransactions: [...storedTransactions],
rollback$,
...props
})
)
)
)
};
Expand Down Expand Up @@ -247,7 +342,9 @@ export const createTransactionsTracker = (

const transactionsSource$ = new TrackerSubject(txSource$);

const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$);
const historicalTransactions$ = createHistoricalTransactionsTrackerSubject(transactionsSource$).pipe(
tap((transactions) => logger.debug(`History transactions count: ${transactions?.length || 0}`))
);

const [onChainNewTxPhase2Failed$, onChainNewTxSuccess$] = partition(
newTransactions$(historicalTransactions$).pipe(share()),
Expand Down
13 changes: 7 additions & 6 deletions packages/wallet/src/services/UtxoTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { RetryBackoffConfig } from 'backoff-rxjs';
import { TxInFlight, UtxoTracker } from './types';
import { WalletStores } from '../persistence';
import { coldObservableProvider } from '@cardano-sdk/util-rxjs';
import { sortUtxoByTxIn } from '@cardano-sdk/input-selection';
import chunk from 'lodash/chunk.js';
import uniqWith from 'lodash/uniqWith.js';

Expand All @@ -17,7 +18,7 @@ export interface UtxoTrackerProps {
addresses$: Observable<Cardano.PaymentAddress[]>;
stores: Pick<WalletStores, 'utxo' | 'unspendableUtxo'>;
transactionsInFlight$: Observable<TxInFlight[]>;
tipBlockHeight$: Observable<Cardano.BlockNo>;
history$: Observable<Cardano.HydratedTx[]>;
retryBackoffConfig: RetryBackoffConfig;
logger: Logger;
onFatalError?: (value: unknown) => void;
Expand All @@ -31,7 +32,7 @@ export interface UtxoTrackerInternals {
export const createUtxoProvider = (
utxoProvider: UtxoProvider,
addresses$: Observable<Cardano.PaymentAddress[]>,
tipBlockHeight$: Observable<Cardano.BlockNo>,
history$: Observable<Cardano.HydratedTx[]>,
retryBackoffConfig: RetryBackoffConfig,
onFatalError?: (value: unknown) => void
) =>
Expand All @@ -49,10 +50,10 @@ export const createUtxoProvider = (
utxos = [...utxos, ...(await utxoProvider.utxoByAddresses({ addresses }))];
}

return utxos;
return utxos.sort(sortUtxoByTxIn);
},
retryBackoffConfig,
trigger$: tipBlockHeight$
trigger$: history$
})
)
);
Expand All @@ -64,13 +65,13 @@ export const createUtxoTracker = (
stores,
transactionsInFlight$,
retryBackoffConfig,
tipBlockHeight$,
history$,
logger,
onFatalError
}: UtxoTrackerProps,
{
utxoSource$ = new PersistentCollectionTrackerSubject<Cardano.Utxo>(
() => createUtxoProvider(utxoProvider, addresses$, tipBlockHeight$, retryBackoffConfig, onFatalError),
() => createUtxoProvider(utxoProvider, addresses$, history$, retryBackoffConfig, onFatalError),
stores.utxo
),
unspendableUtxoSource$ = new PersistentCollectionTrackerSubject(
Expand Down
Loading
Loading