diff --git a/.github/workflows/continuous-integration-e2e.yaml b/.github/workflows/continuous-integration-e2e.yaml index aa3d1790124..30900c70f8f 100644 --- a/.github/workflows/continuous-integration-e2e.yaml +++ b/.github/workflows/continuous-integration-e2e.yaml @@ -8,7 +8,7 @@ env: KEY_MANAGEMENT_PARAMS: '{"bip32Ed25519": "Sodium", "accountIndex": 0, "chainId":{"networkId": 0, "networkMagic": 888}, "passphrase":"some_passphrase","mnemonic":"vacant violin soft weird deliver render brief always monitor general maid smart jelly core drastic erode echo there clump dizzy card filter option defense"}' TEST_CLIENT_ASSET_PROVIDER: 'http' TEST_CLIENT_ASSET_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4014/"}' - TEST_CLIENT_CHAIN_HISTORY_PROVIDER: 'http' + TEST_CLIENT_CHAIN_HISTORY_PROVIDER: 'ws' TEST_CLIENT_CHAIN_HISTORY_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4000/"}' DB_SYNC_CONNECTION_STRING: 'postgresql://postgres:doNoUseThisSecret!@localhost:5435/cexplorer' TEST_CLIENT_HANDLE_PROVIDER: 'http' @@ -22,7 +22,7 @@ env: TEST_CLIENT_REWARDS_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4000/"}' TEST_CLIENT_TX_SUBMIT_PROVIDER: 'http' TEST_CLIENT_TX_SUBMIT_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4000/"}' - TEST_CLIENT_UTXO_PROVIDER: 'http' + TEST_CLIENT_UTXO_PROVIDER: 'ws' TEST_CLIENT_UTXO_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4000/"}' TEST_CLIENT_STAKE_POOL_PROVIDER: 'http' TEST_CLIENT_STAKE_POOL_PROVIDER_PARAMS: '{"baseUrl":"http://localhost:4000/"}' @@ -79,7 +79,6 @@ jobs: run: | yarn workspace @cardano-sdk/e2e test:wallet:epoch0 yarn workspace @cardano-sdk/e2e test:projection - yarn workspace @cardano-sdk/e2e test:ws - name: Wait for epoch 3 run: | @@ -88,6 +87,7 @@ jobs: - name: 🔬 Test - e2e - wallet at epoch 3 run: | yarn workspace @cardano-sdk/e2e test:wallet:epoch3 + yarn workspace @cardano-sdk/e2e test:ws yarn workspace @cardano-sdk/e2e test:pg-boss yarn workspace @cardano-sdk/e2e test:providers env: diff --git a/.github/workflows/k6-wallets.yaml b/.github/workflows/k6-wallets.yaml index 1f94674bc1d..ccd1c0cd313 100644 --- a/.github/workflows/k6-wallets.yaml +++ b/.github/workflows/k6-wallets.yaml @@ -47,7 +47,7 @@ on: options: - '5m' - '2m' - + env: TARGET_ENV: ${{ inputs.environment || 'dev' }} TARGET_NET: ${{ inputs.network || 'mainnet'}} @@ -58,7 +58,6 @@ env: HD_ACTIVE_ADDR_COUNT: ${{ inputs.hd-addr-per-wallet || 10 }} HD_MAX_TX_HISTORY: ${{ inputs.hd-tx-history-size-per-wallet || 100 }} - jobs: wallet-restoration: runs-on: ubuntu-latest @@ -81,7 +80,7 @@ jobs: K6_CLOUD_PROJECT_ID: ${{ secrets.K6_CLOUD_PROJECT_ID }} with: filename: ./packages/e2e/test/k6/scenarios/wallets.test.js - cloud: true + cloud: false token: ${{ secrets.K6_CLOUD_API_TOKEN }} flags: > -e TARGET_ENV=$TARGET_ENV @@ -91,4 +90,4 @@ jobs: -e RAMP_UP_DURATION=$RAMP_UP_DURATION -e STEADY_STATE_DURATION=$STEADY_STATE_DURATION -e HD_ACTIVE_ADDR_COUNT=$HD_ACTIVE_ADDR_COUNT - -e HD_MAX_TX_HISTORY=$HD_MAX_TX_HISTORY \ No newline at end of file + -e HD_MAX_TX_HISTORY=$HD_MAX_TX_HISTORY diff --git a/.github/workflows/k6-web-socket.yaml b/.github/workflows/k6-web-socket.yaml index 93422d01ce9..bd443de9c7a 100644 --- a/.github/workflows/k6-web-socket.yaml +++ b/.github/workflows/k6-web-socket.yaml @@ -26,6 +26,21 @@ on: type: number required: true default: 1000 + hd-addr-per-wallet: + description: 'Number of addresses per wallet.' + type: number + required: true + default: 10 + hd-tx-history-size-per-wallet: + description: 'Maximum number of transactions per wallet. Filter wallet addresses to have a tx history size smaller than this value.' + type: number + required: true + default: 1000 + max-vu: + description: 'Maximum number of simultaneous simulated users.' + type: number + required: true + default: 100 jobs: web-socket: @@ -55,6 +70,9 @@ jobs: -e TARGET_ENV=${{ inputs.environment }} -e TARGET_NET=${{ inputs.network }} -e WALLETS=${{ inputs.wallets }} + -e HD_ACTIVE_ADDR_COUNT=${{ inputs.hd-addr-per-wallet }} + -e HD_MAX_TX_HISTORY=${{ inputs.hd-tx-history-size-per-wallet }} + -e MAX_VU=${{ inputs.max-vu }} --out json=web-socket-results.json --quiet - name: Upload performance test results diff --git a/compose/common.yml b/compose/common.yml index 76c039ae9c7..b963e2be790 100644 --- a/compose/common.yml +++ b/compose/common.yml @@ -1,3 +1,5 @@ +# cSpell:ignore utxo + x-from-sdk: &from-sdk healthcheck: interval: 10s diff --git a/packages/cardano-services-client/src/WebSocket.ts b/packages/cardano-services-client/src/WebSocket.ts index 1632077a878..16abf8f1c2e 100644 --- a/packages/cardano-services-client/src/WebSocket.ts +++ b/packages/cardano-services-client/src/WebSocket.ts @@ -1,30 +1,48 @@ +// cSpell:ignore cardano sonarjs utxos +/* eslint-disable max-depth */ /* eslint-disable unicorn/prefer-add-event-listener */ import { AsyncReturnType, Cardano, + ChainHistoryProvider, EpochInfo, EraSummary, + MetadataDbModel, NetworkInfoMethods, NetworkInfoProvider, + Paginated, ProviderError, ProviderFailure, + Serialization, StakeSummary, SupplySummary, + TransactionsByAddressesArgs, + UtxoByAddressesArgs, + UtxoProvider, WSMessage, WsProvider, createSlotEpochInfoCalc } from '@cardano-sdk/core'; +import { HexBlob, fromSerializableObject } from '@cardano-sdk/util'; import { Logger } from 'ts-log'; import { Observable, ReplaySubject, Subject, filter, firstValueFrom, merge } from 'rxjs'; -import { fromSerializableObject } from '@cardano-sdk/util'; import WebSocket from 'isomorphic-ws'; const NOT_CONNECTED_ID = 'not-connected'; +type Txs = { [key: Cardano.TransactionId]: Cardano.HydratedTx }; + +type TxsByAddresses = Exclude; + type WSStatus = 'connecting' | 'connected' | 'idle' | 'stop'; -export type WSHandler = (message: WSMessage) => void; +type WSHandler = (error?: Error, message?: WSMessage) => void; + +interface DeferredRequests { + timeout?: NodeJS.Timeout; + requests: { complete: (error?: Error) => void; txsByAddresses: TxsByAddresses }[]; +} export interface WsClientConfiguration { /** The interval in seconds between two heartbeat messages. Default 55". */ @@ -38,10 +56,18 @@ export interface WsClientConfiguration { } export interface WsClientDependencies { + /** The `httpChainHistoryProvider`. */ + chainHistoryProvider: ChainHistoryProvider; + /** The logger. */ logger: Logger; } +interface AddressStatus { + lower: Cardano.BlockNo; + status: 'synced' | 'syncing'; +} + interface EpochRollover { epochInfo: EpochInfo; eraSummaries: EraSummary[]; @@ -50,10 +76,44 @@ interface EpochRollover { protocolParameters: Cardano.ProtocolParameters; } +const deserializeDatum = (tx: Cardano.HydratedTx) => { + for (const output of tx.body.outputs) + if (output.datum) output.datum = Serialization.PlutusData.fromCbor(output.datum as unknown as HexBlob).toCore(); +}; + +const deserializeMetadata = (tx: Cardano.HydratedTx) => { + if (tx.auxiliaryData) + tx.auxiliaryData = { + blob: new Map( + (tx.auxiliaryData as unknown as MetadataDbModel[]).map((metadata) => { + const bKey = BigInt(metadata[0]); + + return [bKey, Serialization.GeneralTransactionMetadata.fromCbor(metadata[1]).toCore().get(bKey)!] as const; + }) + ) + }; +}; + const isEventError = (error: unknown): error is { error: Error } => // eslint-disable-next-line @typescript-eslint/no-explicit-any typeof error === 'object' && !!error && (error as any).error instanceof Error; +export const isTxRelevant = ( + { body: { collaterals, collateralReturn, inputs, outputs }, inputSource }: Cardano.HydratedTx, + addresses: Cardano.PaymentAddress[] +) => + inputSource === Cardano.InputSource.inputs + ? inputs.some((input) => addresses.includes(input.address)) || + outputs.some((output) => addresses.includes(output.address)) + : collaterals!.some((input) => addresses.includes(input.address)) || + (collateralReturn && addresses.includes(collateralReturn.address)); + +const removeRolledBackTxs = (txs: Txs, blockNo: Cardano.BlockNo) => { + let id: Cardano.TransactionId; + + for (id in txs) if (txs[id].blockHeader.blockNo > blockNo) delete txs[id]; +}; + export class CardanoWsClient extends WsProvider { /** The client id, assigned by the server. */ clientId = NOT_CONNECTED_ID; @@ -68,17 +128,29 @@ export class CardanoWsClient extends WsProvider { >; }; + /** WebSocket based `ChainHistoryProvider` implementation. */ + chainHistoryProvider: ChainHistoryProvider; + /** WebSocket based `NetworkInfoProvider` implementation. */ networkInfoProvider: NetworkInfoProvider; + /** WebSocket based `UtxoProvider` implementation. */ + utxoProvider: UtxoProvider; + + private addresses: { [key: Cardano.PaymentAddress]: AddressStatus } = {}; private closePromise: Promise; private closeResolver: () => void; + private deferredRequests: DeferredRequests = { requests: [] }; private epochSubject$: Subject; + private handlers = new Map(); private heartbeatInterval: number; private heartbeatTimeout: NodeJS.Timeout | undefined; private logger: Logger; + private requestId = 0; private status: WSStatus = 'idle'; + private transactions: Txs = {}; private url: URL; + private utxos: Txs = {}; private ws: WebSocket; private networkInfoSubjects = {} as { @@ -109,6 +181,13 @@ export class CardanoWsClient extends WsProvider { }; this.networkInfo = this.networkInfoSubjects; + this.chainHistoryProvider = { + blocksByHashes: (args) => deps.chainHistoryProvider.blocksByHashes(args), + healthCheck: () => this.healthCheck(), + transactionsByAddresses: (args) => this.transactionsByAddresses(args), + transactionsByHashes: (args) => deps.chainHistoryProvider.transactionsByHashes(args) + }; + this.networkInfoProvider = { eraSummaries: this.createNetworkInfoProviderMethod('eraSummaries'), genesisParameters: this.createNetworkInfoProviderMethod('genesisParameters'), @@ -119,6 +198,11 @@ export class CardanoWsClient extends WsProvider { stake: this.createNetworkInfoProviderMethod('stake') }; + this.utxoProvider = { + healthCheck: () => this.healthCheck(), + utxoByAddresses: (args) => this.utxoByAddresses(args) + }; + this.connect(); } @@ -135,7 +219,8 @@ export class CardanoWsClient extends WsProvider { // If the value was an error different from starting, throw it, otherwise it is a return value for the method if ('ok' in value && 'reason' in value && value.ok === false) throw new ProviderError(ProviderFailure.ConnectionFailure, undefined, value.reason); - else return value as AsyncReturnType; + + return value as AsyncReturnType; }; } @@ -145,16 +230,37 @@ export class CardanoWsClient extends WsProvider { const ws = (this.ws = new WebSocket(this.url)); - // eslint-disable-next-line sonarjs/cognitive-complexity, complexity + // eslint-disable-next-line sonarjs/cognitive-complexity, complexity, max-statements ws.onmessage = (event) => { try { if (typeof event.data !== 'string') throw new Error('Unexpected data from WebSocket '); const message = fromSerializableObject(JSON.parse(event.data)); - const { clientId, networkInfo } = message; + const { clientId, networkInfo, responseTo, syncing, transactions, utxos } = message; + + if (clientId) { + this.logger.info(`Connected with clientId ${(this.clientId = clientId)}`); + + if (syncing) this.emitHealth('Server is still syncing', { overwrite: true }); + else this.emitHealth(); + } - if (clientId) this.logger.info(`Connected with clientId ${(this.clientId = clientId)}`); + if (transactions) + for (const tx of transactions) { + deserializeDatum(tx); + deserializeMetadata(tx); + this.transactions[tx.id] = tx; + delete this.utxos[tx.id]; + } + + if (utxos) + for (const tx of utxos) + if (!this.transactions[tx.id]) { + deserializeDatum(tx); + this.utxos[tx.id] = tx; + } + // Handle networkInfo as last one if (networkInfo) { const { eraSummaries, genesisParameters, ledgerTip, lovelaceSupply, protocolParameters, stake } = networkInfo; @@ -165,7 +271,12 @@ export class CardanoWsClient extends WsProvider { if (stake) this.networkInfoSubjects.stake$.next(stake); // Emit ledgerTip as last one - if (ledgerTip) this.networkInfoSubjects.ledgerTip$.next(ledgerTip); + if (ledgerTip) { + removeRolledBackTxs(this.transactions, ledgerTip.blockNo); + removeRolledBackTxs(this.utxos, ledgerTip.blockNo); + + this.networkInfoSubjects.ledgerTip$.next(ledgerTip); + } // If it is an epoch rollover, emit it if (eraSummaries && ledgerTip && lovelaceSupply && protocolParameters && !clientId) { @@ -174,6 +285,17 @@ export class CardanoWsClient extends WsProvider { this.epochSubject$.next({ epochInfo, eraSummaries, ledgerTip, lovelaceSupply, protocolParameters }); } } + + if (responseTo) { + const handler = this.handlers.get(responseTo); + + if (handler) { + const { error } = message; + + this.handlers.delete(responseTo); + error ? handler(error) : handler(undefined, message); + } + } } catch (error) { this.logger.error(error, 'While parsing message', event.data, this.clientId); } @@ -208,13 +330,17 @@ export class CardanoWsClient extends WsProvider { this.logger.error(err, 'Async error from WebSocket client', this.clientId); ws.close(); - this.emitHealth(err.message, true); + this.emitHealth(err.message, { overwrite: true }); + + for (const handler of this.handlers.values()) handler(err); + this.handlers.clear(); + this.addresses = {}; + this.transactions = {}; }; ws.onopen = () => { if (this.status !== 'stop') this.status = 'connected'; this.heartbeat(); - this.emitHealth(); }; } @@ -245,12 +371,153 @@ export class CardanoWsClient extends WsProvider { * @param request the request. * @returns `true` is sent, otherwise `false`. */ - private request(request: WSMessage) { + private request(request: WSMessage, handler?: WSHandler) { if (this.status !== 'connected') return false; + // Heartbeat messages do not expect a response, so they neither need a requestId, ... + if (Object.keys(request).length > 0) + // ... otherwise add requestId + request = { ...request, requestId: ++this.requestId }; + this.ws.send(JSON.stringify(request)); this.heartbeat(); + if (request.requestId && handler) this.handlers.set(request.requestId, handler); + return true; } + + private transactionsByAddresses({ addresses, blockRange, pagination }: TransactionsByAddressesArgs) { + // eslint-disable-next-line sonarjs/cognitive-complexity + return new Promise>((resolve, reject) => { + const lower = blockRange?.lowerBound || (0 as Cardano.BlockNo); + const upper = blockRange?.upperBound || Number.POSITIVE_INFINITY; + const requestAddresses: Cardano.PaymentAddress[] = []; + const request = { addresses: requestAddresses, lower }; + + const complete = (error?: Error) => { + if (error) { + for (const address of requestAddresses) delete this.addresses[address]; + + return reject(error); + } + + const transactions = Object.values(this.transactions) + .filter(({ blockHeader: { blockNo } }) => lower <= blockNo && blockNo <= upper) + .filter((tx) => isTxRelevant(tx, addresses)) + .sort((a, b) => a.blockHeader.blockNo - b.blockHeader.blockNo || a.index - b.index); + + const first = pagination?.startAt || 0; + const last = first + (pagination?.limit || Number.POSITIVE_INFINITY); + + const pageResults = transactions.filter((_, i) => first <= i && i < last); + + resolve({ pageResults, totalResultCount: transactions.length }); + }; + + // Check which addresses require sync + for (const address of addresses) { + const status = this.addresses[address]; + let toSend = false; + + if (status) { + if (status.status === 'syncing') + return complete(new ProviderError(ProviderFailure.Conflict, null, `${address} still loading`)); + if (lower < status.lower) toSend = true; + } else toSend = true; + + if (toSend) { + requestAddresses.push(address); + this.addresses[address] = { lower, status: 'syncing' }; + } + } + + firstValueFrom(this.health$.pipe(filter(({ reason }) => reason !== 'starting'))) + .then(({ ok, reason }) => { + if (!ok) return complete(new ProviderError(ProviderFailure.ConnectionFailure, undefined, reason)); + + // If no addresses need to be synced, just run complete + if (requestAddresses.length === 0) return complete(); + + this.deferRequest(request, (error) => { + if (error) return complete(new ProviderError(ProviderFailure.ConnectionFailure, error, error.message)); + + for (const address of requestAddresses) this.addresses[address].status = 'synced'; + + complete(); + }); + }) + // This should actually never happen + .catch(complete); + }); + } + + private deferRequest(txsByAddresses: TxsByAddresses, complete: (error?: Error) => void) { + const { requests, timeout } = this.deferredRequests; + + if (timeout) clearTimeout(timeout); + + requests.push({ complete, txsByAddresses }); + + this.deferredRequests.timeout = setTimeout(() => { + this.deferredRequests = { requests: [] }; + this.request( + { + txsByAddresses: requests.reduce( + (prev, { txsByAddresses: { addresses, lower } }) => ({ + addresses: [...prev.addresses, ...addresses], + lower: prev.lower < lower ? prev.lower : lower + }), + { addresses: [], lower: Number.POSITIVE_INFINITY as Cardano.BlockNo } as TxsByAddresses + ) + }, + // eslint-disable-next-line @typescript-eslint/no-shadow, unicorn/no-array-for-each + (error) => requests.forEach(({ complete }) => complete(error)) + ); + }, 3); + } + + // eslint-disable-next-line sonarjs/cognitive-complexity, complexity + private utxoByAddresses({ addresses }: UtxoByAddressesArgs) { + for (const address of addresses) { + const status = this.addresses[address]; + + if (!status) + return Promise.reject(new ProviderError(ProviderFailure.NotImplemented, null, `${address} not loaded`)); + + if (status.status === 'syncing') + return Promise.reject(new ProviderError(ProviderFailure.Conflict, null, `${address} still loading`)); + } + + const result: [Cardano.HydratedTxIn, Cardano.TxOut][] = []; + const transactions = [...Object.values(this.utxos), ...Object.values(this.transactions)] + .filter((tx) => isTxRelevant(tx, addresses)) + .sort((a, b) => a.blockHeader.blockNo - b.blockHeader.blockNo || a.index - b.index); + + for (let txOutIdx = 0; txOutIdx < transactions.length; ++txOutIdx) { + const txOut = transactions[txOutIdx]; + + for (let txOutOutIdx = 0; txOutOutIdx < txOut.body.outputs.length; ++txOutOutIdx) { + const txOutput = txOut.body.outputs[txOutOutIdx]; + + if (addresses.includes(txOutput.address)) { + let unspent = true; + + for (let txInIdx = txOutIdx + 1; txInIdx < transactions.length && unspent; ++txInIdx) { + const txIn = transactions[txInIdx]; + + for (let txInInIdx = 0; txInInIdx < txIn.body.inputs.length && unspent; ++txInInIdx) { + const txInput = txIn.body.inputs[txInInIdx]; + + if (txInput.txId === txOut.id && txInput.index === txOutOutIdx) unspent = false; + } + } + + if (unspent) result.push([{ address: txOutput.address, index: txOutOutIdx, txId: txOut.id }, txOutput]); + } + } + } + + return Promise.resolve(result); + } } diff --git a/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/ChainHistoryBuilder.ts b/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/ChainHistoryBuilder.ts index 2f18b434922..b4edc9d4a94 100644 --- a/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/ChainHistoryBuilder.ts +++ b/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/ChainHistoryBuilder.ts @@ -86,7 +86,7 @@ const mapWithdrawals: (source: [{ credential: DbSyncCredential; network: string }); // eslint-disable-next-line complexity, @typescript-eslint/no-explicit-any -const getGovernanceAction = (description: any): Cardano.GovernanceAction => { +export const getGovernanceAction = (description: any): Cardano.GovernanceAction => { const { contents, tag } = description; const governanceActionId = contents && contents[0] ? { actionIndex: contents[0].govActionIx, id: contents[0].txId } : null; @@ -150,10 +150,13 @@ const getGovernanceAction = (description: any): Cardano.GovernanceAction => { throw new Error(`Unknown GovernanceActionType '${tag}' with description "${JSON.stringify(description)}"`); }; -const getVoter = ( - txId: Cardano.TransactionId, - { committee_has_script, committee_voter, drep_has_script, drep_voter, pool_voter, voter_role }: VotingProceduresModel -): Cardano.Voter => { +type PartialVotingProceduresModel = Pick< + VotingProceduresModel, + 'committee_has_script' | 'committee_voter' | 'drep_has_script' | 'drep_voter' | 'pool_voter' | 'voter_role' +>; + +export const getVoter = (txId: Cardano.TransactionId, model: PartialVotingProceduresModel): Cardano.Voter => { + const { committee_has_script, committee_voter, drep_has_script, drep_voter, pool_voter, voter_role } = model; let hash: Hash28ByteBase16; switch (voter_role) { diff --git a/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/mappers.ts b/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/mappers.ts index 036bc9284c4..603c5716990 100644 --- a/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/mappers.ts +++ b/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/mappers.ts @@ -171,7 +171,7 @@ export const mapRedeemer = (redeemerModel: RedeemerModel): Cardano.Redeemer => ( purpose: mapRedeemerPurpose(redeemerModel.purpose) }); -export const mapAnchor = (anchorUrl: string, anchorDataHash: string): Cardano.Anchor | null => { +export const mapAnchor = (anchorUrl?: string, anchorDataHash?: string): Cardano.Anchor | null => { if (!!anchorUrl && !!anchorDataHash) { return { dataHash: anchorDataHash as Hash32ByteBase16, diff --git a/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/types.ts b/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/types.ts index 662062c17fb..60682738ec5 100644 --- a/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/types.ts +++ b/packages/cardano-services/src/ChainHistory/DbSyncChainHistory/types.ts @@ -117,7 +117,7 @@ export interface RedeemerModel { tx_id: Buffer; } -type VoterRole = 'ConstitutionalCommittee' | 'DRep' | 'SPO'; +export type VoterRole = 'ConstitutionalCommittee' | 'DRep' | 'SPO'; export interface VotingProceduresModel { tx_id: Buffer; diff --git a/packages/cardano-services/src/Program/programs/providerServer.ts b/packages/cardano-services/src/Program/programs/providerServer.ts index d25bd3ea231..c616557f64a 100644 --- a/packages/cardano-services/src/Program/programs/providerServer.ts +++ b/packages/cardano-services/src/Program/programs/providerServer.ts @@ -1,3 +1,5 @@ +// cSpell:ignore impls + /* eslint-disable complexity */ import { AssetProvider, @@ -157,12 +159,24 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => { const getEpochMonitor = memoize((dbPool: Pool) => new DbSyncEpochPollService(dbPool, args.epochPollInterval!)); + const getDbSyncChainHistoryProvider = withDbSyncProvider((dbPools, cardanoNode) => { + const cache = { healthCheck: healthCheckCache }; + const metadataService = createDbSyncMetadataService(dbPools.main, logger); + + return new DbSyncChainHistoryProvider( + { paginationPageSizeLimit: args.paginationPageSizeLimit! }, + { cache, cardanoNode, dbPools, logger, metadataService } + ); + }, ServiceNames.ChainHistory); + const getWebSocketClient = () => { const url = args.webSocketApiUrl; if (!url) throw new MissingProgramOption('WebSocket', CommonOptionsDescriptions.WebSocketApiUrl); - return new CardanoWsClient({ logger }, { url }); + const chainHistoryProvider = getDbSyncChainHistoryProvider(); + + return new CardanoWsClient({ chainHistoryProvider, logger }, { url }); }; const getDbSyncStakePoolProvider = withDbSyncProvider((dbPools, cardanoNode) => { @@ -298,6 +312,8 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => { new BlockfrostNetworkInfoProvider({ blockfrost: getBlockfrostApi(), logger }); const getDbSyncNetworkInfoProvider = withDbSyncProvider((dbPools, cardanoNode) => { + if (args.useWebSocketApi) return getWebSocketClient().networkInfoProvider; + if (!genesisData) throw new MissingProgramOption(ServiceNames.NetworkInfo, CommonOptionsDescriptions.CardanoNodeConfigPath); @@ -317,23 +333,6 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => { const getBlockfrostChainHistoryProvider = () => new BlockfrostChainHistoryProvider({ blockfrost: getBlockfrostApi(), logger }); - const getDbSyncChainHistoryProvider = withDbSyncProvider( - (dbPools, cardanoNode) => - new DbSyncChainHistoryProvider( - { paginationPageSizeLimit: args.paginationPageSizeLimit! }, - { - cache: { - healthCheck: healthCheckCache - }, - cardanoNode, - dbPools, - logger, - metadataService: createDbSyncMetadataService(dbPools.main, logger) - } - ), - ServiceNames.ChainHistory - ); - const getBlockfrostRewardsProvider = () => new BlockfrostRewardsProvider({ blockfrost: getBlockfrostApi(), logger }); const getDbSyncRewardsProvider = withDbSyncProvider( @@ -417,8 +416,6 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => { const networkInfoProvider = args.networkInfoProvider === ProviderImplementation.BLOCKFROST ? getBlockfrostNetworkInfoProvider() - : args.useWebSocketApi - ? getWebSocketClient().networkInfoProvider : getDbSyncNetworkInfoProvider(); return new NetworkInfoHttpService({ logger, diff --git a/packages/cardano-services/src/WsServer/db.ts b/packages/cardano-services/src/WsServer/db.ts index b4b3ae69db3..ccf4bddd27a 100644 --- a/packages/cardano-services/src/WsServer/db.ts +++ b/packages/cardano-services/src/WsServer/db.ts @@ -6,9 +6,10 @@ import { Pool } from 'pg'; const notifyTipBody = `\ BEGIN - PERFORM pg_notify('sdk_tip', json_build_object( + PERFORM PG_NOTIFY('sdk_tip', JSON_BUILD_OBJECT( + 'blockId', NEW.id, 'blockNo', NEW.block_no, - 'hash', encode(NEW.hash, 'hex'), + 'hash', ENCODE(NEW.hash, 'hex'), 'slot', NEW.slot_no )::TEXT); RETURN NEW; diff --git a/packages/cardano-services/src/WsServer/requests.ts b/packages/cardano-services/src/WsServer/requests.ts index 92d15d62ace..69ddcac43d3 100644 --- a/packages/cardano-services/src/WsServer/requests.ts +++ b/packages/cardano-services/src/WsServer/requests.ts @@ -1,7 +1,17 @@ +// cSpell:ignore cardano deleg deregistration drep unreg unregistration utxo utxos + import { Cardano, CardanoNode, CardanoNodeUtil } from '@cardano-sdk/core'; import { Pool } from 'pg'; import { ProtocolParamsModel } from '../NetworkInfo/DbSyncNetworkInfoProvider/types'; import { toProtocolParams } from '../NetworkInfo/DbSyncNetworkInfoProvider/mappers'; +import { transactionsByIds } from './transactions'; + +// Workaround for @types/pg +declare module 'pg' { + interface QueryConfig { + rowMode?: 'array'; + } +} export const getLovelaceSupply = async (db: Pool, maxLovelaceSupply: Cardano.Lovelace) => { const query = 'SELECT utxo + rewards AS circulating, reserves FROM ada_pots ORDER BY id DESC LIMIT 1'; @@ -40,3 +50,99 @@ export const getStake = async (cardanoNode: CardanoNode, db: Pool) => { return { active, live }; }; + +interface TxsByAddressesOptions { + action?: (txs?: Cardano.HydratedTx[], utxos?: Cardano.HydratedTx[]) => void; + blockId?: string; + lower?: Cardano.BlockNo; +} + +export const transactionsByAddresses = async ( + addresses: Cardano.PaymentAddress[], + db: Pool, + options: TxsByAddressesOptions = {} +) => { + const { action, blockId, lower } = options; + + // In case of last block, let's pick all the transactions from it (last block), next let's pick relevant inputs and outputs + const fromLastBlock = async () => { + const text = `\ +WITH txs AS (SELECT tx.id AS tid FROM tx WHERE block_id = $1) +SELECT DISTINCT tid FROM ( + SELECT tid, address FROM txs JOIN tx_out ON tid = tx_id + UNION ALL + SELECT tid, address FROM txs JOIN tx_in ON tid = tx_in_id JOIN tx_out ON tx_out_id = tx_id AND tx_out_index = index +) t WHERE address = ANY($2)`; + + const result = await db.query<[string]>({ name: 'l_block', rowMode: 'array', text, values: [blockId, addresses] }); + return [result.rows.flat()] as const; + }; + + // In case of wallet restoration, let's just pick all the relevant inputs and outputs + const walletRestore = async () => { + const text = `\ +WITH outputs AS (SELECT tx_id, index FROM tx_out WHERE address = ANY($1)) +SELECT DISTINCT tid FROM ( + SELECT tx_id AS tid FROM outputs + UNION ALL + SELECT tx_in_id AS tid FROM outputs JOIN tx_in ON tx_out_id = tx_id AND tx_out_index = index +) t`; + + const result = await db.query<[string]>({ name: 'restore', rowMode: 'array', text, values: [addresses] }); + return [result.rows.flat()] as const; + }; + + // In case of wallet reconnection, ... + const walletReConnection = async () => { + // ... first of all let's convert the blockNo in block.id to save some JOINs, ... + const text1 = 'SELECT id FROM block WHERE block_no = $1'; + const result1 = await db.query<[string]>({ name: 'n2d', rowMode: 'array', text: text1, values: [lower] }); + const [bId] = result1.rows[0]; + + // ... next let's pick all the relevant transactions from latest blocks and from them, relevant inputs and outputs; + const text2 = `\ +WITH txs AS (SELECT tx.id AS tid FROM tx WHERE block_id >= $1) +SELECT DISTINCT tid FROM ( + SELECT tid, address FROM txs JOIN tx_out ON tid = tx_id + UNION ALL + SELECT tid, address FROM txs JOIN tx_in ON tid = tx_in_id JOIN tx_out ON tx_out_id = tx_id AND tx_out_index = index +) t WHERE address = ANY($2)`; + + // ... last let's pick not spent relevant outputs from previous blocks + const text3 = `\ +WITH source AS (SELECT tx_id, tx_in_id FROM tx_out JOIN tx ON tx_id = tx.id AND block_id < $1 + LEFT JOIN tx_in ON tx_out_id = tx_id AND tx_out_index = index + WHERE address = ANY($2)), +unspent AS (SELECT tx_id FROM source WHERE tx_in_id IS NULL) +SELECT DISTINCT tx_id FROM ( + SELECT tx_id FROM unspent + UNION ALL + SELECT tx_in_id AS tx_id FROM source WHERE tx_in_id > (SELECT MIN(tx_id) FROM unspent) +) t`; + + const [result2, result3] = await Promise.all([ + db.query<[string]>({ name: 'reconnect2', rowMode: 'array', text: text2, values: [bId, addresses] }), + db.query<[string]>({ name: 'reconnect3', rowMode: 'array', text: text3, values: [bId, addresses] }) + ]); + + return [result2.rows.flat(), result3.rows.flat()] as const; + }; + + const [txIds, partialTxIds] = await (blockId ? fromLastBlock() : lower ? walletReConnection() : walletRestore()); + + if (txIds.length === 0 && (!partialTxIds || partialTxIds.length === 0)) return []; + + const chunks: Cardano.HydratedTx[][] = []; + + for (let i = 0; i < txIds.length; i += 100) { + const txs = await transactionsByIds(txIds.slice(i, i + 100), db); + + action ? action(txs) : chunks.push(txs); + } + + if (partialTxIds) + for (let i = 0; i < partialTxIds.length; i += 100) + action!(undefined, await transactionsByIds(partialTxIds.slice(i, i + 100), db)); + + return chunks.flat(); +}; diff --git a/packages/cardano-services/src/WsServer/server.ts b/packages/cardano-services/src/WsServer/server.ts index 7415c238013..c003ab60bb4 100644 --- a/packages/cardano-services/src/WsServer/server.ts +++ b/packages/cardano-services/src/WsServer/server.ts @@ -1,3 +1,5 @@ +// cSpell:ignore cardano utxos + import { Cardano, CardanoNode, @@ -12,8 +14,9 @@ import { Logger } from 'ts-log'; import { Notification, Pool } from 'pg'; import { Server, createServer } from 'http'; import { WebSocket, WebSocketServer } from 'ws'; -import { getLovelaceSupply, getProtocolParameters, getStake } from './requests'; +import { getLovelaceSupply, getProtocolParameters, getStake, transactionsByAddresses } from './requests'; import { initDB } from './db'; +import { isTxRelevant } from '@cardano-sdk/cardano-services-client'; import { toGenesisParams } from '../NetworkInfo/DbSyncNetworkInfoProvider/mappers'; import { toSerializableObject } from '@cardano-sdk/util'; import { v4 } from 'uuid'; @@ -22,11 +25,15 @@ export { WebSocket } from 'ws'; declare module 'ws' { interface WebSocket { + addresses: Cardano.PaymentAddress[]; clientId: string; heartbeat: number; logError: (error: Error, msg: string) => void; - logInfo: (msg: string) => void; + logInfo: (msg: object | string) => void; + logDebug: (msg: object | string) => void; + + sendMessage: (message: WSMessage) => void; } interface WebSocketServer { @@ -38,6 +45,9 @@ export interface WsServerConfiguration { /** The cache time to live in seconds. */ dbCacheTtl: number; + /** The heartbeat check interval in seconds. */ + heartbeatInterval?: number; + /** The heartbeat timeout in seconds. */ heartbeatTimeout?: number; @@ -62,6 +72,25 @@ export interface WsServerDependencies { // eslint-disable-next-line @typescript-eslint/no-empty-function const noop = () => {}; +interface NotificationBody { + message: WSMessage; + transactions: Cardano.HydratedTx[]; +} + +interface NotificationEvent { + notification: number; + transactions: Cardano.HydratedTx[]; +} + +/** + * Since some debug log information may require heavy computation to stringify data, + * better checking if we are interested in logging some data before actually logging it. + */ +const debugLog = process.env.LOGGER_MIN_SEVERITY === 'debug'; + +const toError = (error: unknown) => + error instanceof Error ? error : new Error(`Unknown error: ${JSON.stringify(error)}`); + export class CardanoWsServer extends WsProvider { private cardanoNode: CardanoNode; private closeNotify = noop; @@ -73,9 +102,10 @@ export class CardanoWsServer extends WsProvider { private lastSlot = Number.POSITIVE_INFINITY as Cardano.Slot; private logger: Logger; private networkInfo: NetworkInfoResponses; - private notifications = new Map(); + private notifications = new Map(); private server: Server; private stakeInterval: NodeJS.Timer | undefined; + private syncing = true; private wss: WebSocketServer; constructor(dependencies: WsServerDependencies, cfg: WsServerConfiguration) { @@ -108,6 +138,7 @@ export class CardanoWsServer extends WsProvider { this.init( cfg.port, Seconds.toMilliseconds(Seconds(cfg.dbCacheTtl || 120)), + Seconds.toMilliseconds(Seconds(cfg.heartbeatInterval || 10)), Seconds.toMilliseconds(Seconds(cfg.heartbeatTimeout || 60)) ).catch((error) => { this.logger.error(error, 'Error in init sequence'); @@ -124,10 +155,14 @@ export class CardanoWsServer extends WsProvider { if (this.stakeInterval) clearInterval(this.stakeInterval); this.wss.close((wsError) => { - if (wsError) this.logger.error(wsError, 'Error while closing the WebSocket server'); + if (wsError && (!(wsError instanceof Error) || wsError.message !== 'The server is not running')) + this.logger.error(wsError, 'Error while closing the WebSocket server'); this.server.close((httpError) => { - if (httpError) this.logger.error(httpError, 'Error while closing the HTTP server'); + // TODO required TypeScript v5.5.4 to remove casting as any + // eslint-disable-next-line @typescript-eslint/no-explicit-any + if (httpError && (!('code' in httpError) || (httpError as any).code !== 'ERR_SERVER_NOT_RUNNING')) + this.logger.error(httpError, 'Error while closing the HTTP server'); if (callback) callback(); }); @@ -139,9 +174,18 @@ export class CardanoWsServer extends WsProvider { /** Creates a simple HTTP server which just handles the `/health` URL. Mainly used to listen the WS server. */ private createHttpServer() { return createServer(async (req, res) => { - if (req.url === '/health') return res.end(JSON.stringify(await this.healthCheck())); + const { method, url } = req; + + if (['/health', '/ready'].includes(url!)) { + const healthCheck = await this.healthCheck(); - this.logger.info(req.method, req.url); + if (url === '/health' && healthCheck.notRecoverable) res.statusCode = 500; + if (url === '/ready' && !healthCheck.ok) res.statusCode = 500; + + return res.end(JSON.stringify(healthCheck)); + } + + this.logger.info(method, url); res.statusCode = 404; res.end('Not found'); @@ -150,7 +194,8 @@ export class CardanoWsServer extends WsProvider { private createOnNotification() { // This is the entry point for a new NOTIFY event from the DB; i.e. each time a new record is inserted in the block table - return (msg: Notification) => { + // eslint-disable-next-line sonarjs/cognitive-complexity + return (msg: Notification) => (async () => { const notification = ++this.lastReceivedNotification; const { payload } = msg; @@ -158,27 +203,56 @@ export class CardanoWsServer extends WsProvider { if (!payload) throw new Error('Missing payload in NOTIFY'); // The payload of the NOTIFY event contain the tip in the correct format - const ledgerTip = JSON.parse(payload) as Cardano.Tip; + const { blockId, ...ledgerTip } = JSON.parse(payload) as Cardano.Tip & { blockId: string }; this.networkInfo.ledgerTip = ledgerTip; - let networkInfo: WSMessage['networkInfo']; + this.logger.debug(`Notification ${notification}: ${JSON.stringify(ledgerTip)}`); + + const epochRollover = async () => { + if (ledgerTip.slot <= this.lastSlot) return { ledgerTip }; + + this.logger.debug(`Epoch rollover for notification ${notification}...`); - if (ledgerTip.slot <= this.lastSlot) networkInfo = { ledgerTip }; - else { await this.onEpochRollover(); const { eraSummaries, lovelaceSupply, protocolParameters } = this.networkInfo; - networkInfo = { eraSummaries, ledgerTip, lovelaceSupply, protocolParameters }; - } + if (debugLog) + this.logger.debug( + `Epoch rollover for notification ${notification}: ${JSON.stringify( + toSerializableObject({ eraSummaries, lovelaceSupply, protocolParameters }) + )}` + ); + + return { eraSummaries, ledgerTip, lovelaceSupply, protocolParameters }; + }; + + const loadTransactions = async () => { + const addressesMap = new Map(); + for (const ws of this.wss.clients) for (const address of ws.addresses) addressesMap.set(address, true); + const addresses = [...addressesMap.keys()]; + + if (debugLog) this.logger.debug(`Transactions for notification ${notification} ${JSON.stringify(addresses)}`); - this.send({ networkInfo }, notification); + const txs = addresses.length === 0 ? [] : await transactionsByAddresses(addresses, this.db, { blockId }); + + if (debugLog) this.logger.debug(`Transactions for notification ${notification} ${JSON.stringify(txs)}`); + + return txs; + }; + + const [networkInfo, transactions] = await Promise.all([epochRollover(), loadTransactions()]); + + this.send({ networkInfo }, { notification, transactions }); })().catch((error) => { this.logger.error(error, 'Error while handling tip notification'); // Since an error while handling tip notification may be source of data inconsistencies, better to shutdown + this.emitHealth( + error instanceof Error ? error.message || 'Unknown error' : `Unknown error ${JSON.stringify(error)}`, + { notRecoverable: true, overwrite: true } + ); this.close(); }); - }; } private listenNotify() { @@ -199,7 +273,7 @@ export class CardanoWsServer extends WsProvider { db.connect((err, client, done) => { if (err) { logger.error(err, 'Error while connecting to DB to listen on sdk_tip NOTIFY'); - this.emitHealth(err.message, true); + this.emitHealth(err.message, { overwrite: true }); // In case of error opening the DB client, just retry after 1". return reAddListener(); @@ -231,7 +305,7 @@ export class CardanoWsServer extends WsProvider { client.on('error', (e) => { logger.error(e, 'Async error from sdk_tip NOTIFY'); - this.emitHealth(e.message, true); + this.emitHealth(e.message, { overwrite: true }); this.closeNotify(); }); @@ -240,11 +314,10 @@ export class CardanoWsServer extends WsProvider { // Issue the LISTEN command to get the notification event client.query('LISTEN sdk_tip', (e) => { - // If there was no errors in the statement, set the flag for the health check - if (!e) return this.emitHealth(); + if (!e) return; logger.error(e, 'Error while listening on sdk_tip NOTIFY'); - this.emitHealth(e.message, true); + this.emitHealth(e.message, { overwrite: true }); this.closeNotify(); }); }); @@ -253,7 +326,7 @@ export class CardanoWsServer extends WsProvider { addListener(); } - private async init(port: number, dbCacheTtl: number, heartbeatTimeout: number) { + private async init(port: number, dbCacheTtl: number, heartbeatInterval: number, heartbeatTimeout: number) { const { cardanoNode, db, logger, networkInfo, server, wss } = this; const refreshStake = async () => { @@ -286,34 +359,99 @@ export class CardanoWsServer extends WsProvider { ws.logInfo('Timed out, closing'); ws.close(); } - }, 10_000); + }, heartbeatInterval); // eslint-disable-next-line unicorn/consistent-destructuring this.heartbeatInterval.unref(); + const check = async () => { + const { localNode } = await cardanoNode.healthCheck(); + + if (!localNode) throw new Error('Missing node health check response'); + if (!localNode.ledgerTip) throw new Error('Missing "ledgerTip" in node health check response'); + if (!localNode.networkSync) throw new Error('Missing "networkSync" in node health check response'); + + const projectedTip = networkInfo.ledgerTip; + const tipDiff = localNode.ledgerTip.blockNo - projectedTip.blockNo; + + // Two blocks difference tolerance + if (tipDiff >= 2) return this.emitHealth({ localNode, ok: false, projectedTip }); + + // Leave untouched the status set by healthCheck in init method + if (localNode.ledgerTip.blockNo === 0) return; + + const ok = localNode.networkSync >= 0.999; + + // eslint-disable-next-line unicorn/consistent-destructuring + if (ok && this.syncing) this.syncing = false; + + this.emitHealth({ localNode, ok, projectedTip }); + }; + + const healthCheck = () => { + check() + .finally(() => setTimeout(healthCheck, 1000).unref()) + .catch((error_) => { + const error = toError(error_); + + this.emitHealth(error.message); + logger.error(error, 'While checking node health check'); + }); + }; + + // Check synchronously to be sure init is complete + await check(); + // Next schedule asynchronous checks + healthCheck(); + server.listen(port, () => logger.info('WebSocket server ready and listening')); this.listenNotify(); } private createOnConnection() { - const { logger, networkInfo } = this; - // This is the entry point for each new WebSocket connection return (ws: WebSocket) => { + const { logger, networkInfo, syncing } = this; const clientId = (ws.clientId = v4()); - const stringMessage = JSON.stringify(toSerializableObject({ clientId, networkInfo })); + + ws.addresses = []; // Create some wrappers for the logger - ws.logInfo = (msg: string) => logger.info({ clientId }, msg); ws.logError = (error: Error, msg: string) => { logger.error({ clientId }, msg); logger.error(error, msg); }; + ws.logInfo = (msg: object | string) => + logger.info(...(typeof msg === 'string' ? [{ clientId }, msg] : [{ clientId, ...msg }])); + ws.logDebug = (msg: object | string) => + logger.debug(...(typeof msg === 'string' ? [{ clientId }, msg] : [{ clientId, ...msg }])); + + ws.sendMessage = (message: WSMessage) => { + const stringMessage = JSON.stringify(toSerializableObject(message)); + + ws.logDebug(stringMessage); + ws.send(stringMessage); + }; + + const onClose = (close?: boolean) => { + if (close) ws.close(); + ws.logInfo('Connection closed'); + // eslint-disable-next-line @typescript-eslint/no-empty-function + ws.sendMessage = () => {}; + }; ws.logInfo('Connected'); + // If still syncing, actually do not accept connections + if (syncing) { + ws.sendMessage({ clientId, syncing: true }); + setTimeout(() => onClose(true), 1000); + + return; + } + // Attach the handlers to the WS connection events - ws.on('close', () => ws.logInfo('Connection closed')); + ws.on('close', onClose); ws.on('error', (error) => ws.logError(error, 'Async error from WebSocket connection')); // This is the entry point for each new WebSocket message from this connection ws.on('message', (data) => { @@ -322,15 +460,32 @@ export class CardanoWsServer extends WsProvider { // This is never expected... just in case if (!(data instanceof Buffer)) - return ws.logError( - new Error('Not a Buffer'), - `Unexpected data from WebSocket connection ${JSON.stringify(data)}` - ); + return ws.logError(new Error('Not a Buffer'), `Unexpected data from WebSocket ${JSON.stringify(data)}`); + + // DoS protection + if (data.length > 1024 * 100) { + ws.logError(new Error('Buffer too long'), 'Unexpected data length from WebSocket: closing'); + return ws.close(); + } + + let request: WSMessage; + + try { + request = JSON.parse(data.toString('utf8')); + } catch (error) { + ws.logError(error as Error, 'Error parsing message: closing'); + return ws.close(); + } + + // Heartbeat messages do not expect a response + if (Object.keys(request).length === 0) return; + + this.request(ws, request).catch((error) => ws.logError(error, 'Error while processing request')); }); // Actually set the timeout for the first time ws.heartbeat = Date.now(); - ws.send(stringMessage); + ws.sendMessage({ clientId, networkInfo }); }; } @@ -351,18 +506,70 @@ export class CardanoWsServer extends WsProvider { this.lastSlot = createSlotEpochInfoCalc(networkInfo.eraSummaries)(networkInfo.ledgerTip.slot).lastSlot.slot; } - private send(message: WSMessage, notification?: number) { + private async request(ws: WebSocket, request: WSMessage) { + const { txsByAddresses, requestId } = request; + + try { + ws.logInfo(request); + + if (txsByAddresses) { + const action = (transactions?: Cardano.HydratedTx[], utxos?: Cardano.HydratedTx[]) => { + if (transactions) ws.logInfo(`Sending ${transactions.length} transactions for request ${requestId}`); + if (utxos) ws.logInfo(`Sending ${utxos.length} partial transactions for request ${requestId}`); + + ws.sendMessage({ transactions, utxos }); + }; + + const { addresses, lower } = txsByAddresses; + + ws.addresses.push(...addresses); + await transactionsByAddresses(addresses, this.db, { action, lower }); + } + + if (requestId) { + ws.logInfo(`Responding to request ${requestId}`); + ws.sendMessage({ responseTo: requestId }); + } + } catch (error_) { + const error = toError(error_); + + ws.logError(error, 'While performing request'); + if (requestId) ws.sendMessage({ error, responseTo: requestId }); + } + } + + private send(message: WSMessage, notificationEvent?: NotificationEvent) { // If the message is not bound to a tip notification, just send it - if (!notification) return this.sendString(JSON.stringify(toSerializableObject(message))); + if (!notificationEvent) return this.sendString(JSON.stringify(toSerializableObject(message))); + + const { notification, transactions } = notificationEvent; + + if (debugLog) this.logger.debug(`Scheduling notification: ${JSON.stringify(toSerializableObject(message))}`); // Ensure messages from notifications are propagated in the same order as the notification was received - this.notifications.set(notification, message); + this.notifications.set(notification, { message, transactions }); while (this.notifications.has(this.lastSentNotification + 1)) { - const msg = this.notifications.get(++this.lastSentNotification); + this.sendNotification(this.notifications.get(++this.lastSentNotification)!); this.notifications.delete(this.lastSentNotification); + } + } + + private sendNotification(notification: NotificationBody) { + const { message, transactions } = notification; + const stringMessage = JSON.stringify(toSerializableObject(message)); + + for (const ws of this.wss.clients) { + const txs = transactions.filter((tx) => isTxRelevant(tx, ws.addresses)); + + if (debugLog) + ws.logDebug( + `Sending notification: ${ + txs.length === 0 ? stringMessage : JSON.stringify(toSerializableObject({ ...message, transactions: txs })) + }` + ); - this.sendString(JSON.stringify(toSerializableObject(msg))); + txs.length === 0 ? ws.send(stringMessage) : ws.sendMessage({ ...message, transactions: txs }); } } diff --git a/packages/cardano-services/src/WsServer/transactions.ts b/packages/cardano-services/src/WsServer/transactions.ts new file mode 100644 index 00000000000..4cfc0e52a10 --- /dev/null +++ b/packages/cardano-services/src/WsServer/transactions.ts @@ -0,0 +1,727 @@ +// cSpell:ignore cardano deleg deregistration drep unreg unregistration utxos + +import { Cardano, MetadataDbModel, jsonToNativeScript } from '@cardano-sdk/core'; +import { Ed25519PublicKeyHex, Ed25519SignatureHex, Hash28ByteBase16 } from '@cardano-sdk/crypto'; +import { Pool } from 'pg'; +import { VoterRole } from '../ChainHistory/DbSyncChainHistory/types'; +import { getGovernanceAction } from '../ChainHistory'; +import { mapAnchor } from '../ChainHistory/DbSyncChainHistory/mappers'; + +interface ActionModel { + deposit: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + description: any; + hash: string; + url: string; + view: Cardano.RewardAccount; +} + +interface OutputModel { + address: Cardano.PaymentAddress; + value: string; + datum: string | null; + hash: Cardano.DatumHash | null; + assets: [Cardano.AssetId, string][]; + script: Cardano.Script | { __type: '_native'; json: Object }; +} + +const redeemerPurposeMap = { + cert: Cardano.RedeemerPurpose.certificate, + mint: Cardano.RedeemerPurpose.mint, + propose: Cardano.RedeemerPurpose.propose, + reward: Cardano.RedeemerPurpose.withdrawal, + spend: Cardano.RedeemerPurpose.spend, + vote: Cardano.RedeemerPurpose.vote +} as const; + +interface RedeemerModel { + index: number; + purpose: keyof typeof redeemerPurposeMap; + mem: number; + steps: number; +} + +interface VoteModel { + role: VoterRole; + cVoter: Hash28ByteBase16; + cScript: boolean; + dVoter: Hash28ByteBase16; + dScript: boolean; + pVoter: Hash28ByteBase16; + vote: Cardano.Vote; + index: number; + tx: Cardano.TransactionId; + url: string; + hash: string; +} + +interface TxModel { + id: Cardano.TransactionId; + index: number; + size: number; + fee: number; + before: Cardano.Slot | null; + hereafter: Cardano.Slot | null; + valid: boolean; + block: Cardano.BlockNo; + hash: Cardano.BlockId; + slot: Cardano.Slot; + collateral: OutputModel; + actions: ActionModel[]; + commRet: [number, Hash28ByteBase16, boolean, string, string][]; + commReg: [number, Hash28ByteBase16, boolean, Hash28ByteBase16, boolean][]; + spRetire: [number, Cardano.EpochNo, Cardano.PoolId][]; + spReg: [number, Cardano.PoolId, number, Cardano.RewardAccount, number, number, number, Cardano.VrfVkHex][]; + deRep: [string | null, number, Cardano.CredentialType, Hash28ByteBase16, string, string][]; + votes: VoteModel[]; + vDele: [number, Cardano.CredentialType, Hash28ByteBase16 | null, string, Cardano.RewardAccount][]; + unReg: [number, number, Cardano.RewardAccount][]; + reg: [number, number, Cardano.RewardAccount][]; + sDele: [number, Cardano.RewardAccount, Cardano.PoolId][]; + mint: [Cardano.AssetId, string][]; + withdrawals: { quantity: string; stakeAddress: Cardano.RewardAccount }[]; + redeemers: RedeemerModel[]; + collaterals: Cardano.HydratedTxIn[]; + metadata: MetadataDbModel[]; + inputs: Cardano.HydratedTxIn[]; + outputs: OutputModel[]; +} + +/** + * Transaction Query Builder. + * + * Helper class to build the complex query; + * - avoids some code repetition + * - helps the read + * - helps in case of refactoring + */ +class TQB { + constructor(public query: string) {} + + /** Adds an indentation level to the query to nest it into an outer query. */ + indent() { + return new TQB( + this.query + .split('\n') + .map((line) => ` ${line}`) + .join('\n') + ); + } + + /** + * Nests this query into an outer query with aggregation. + * + * Certificates doesn't need to be ordered as their index is a relevant part to build compound certificates; + * they are ordered after extracting the compound certificates: that's why the `order` parameter is optional. + * + * @param attribute The attribute name of the resulting JSON + * @param element The element from the row; the ones which will be aggregated into an array + * @param joins The list of the LEFT JOINs + * @param order The value used to order the elements in the array + */ + nest(attribute: string, element: string, joins: string[], order?: string) { + const leftJoins = joins.map((join) => ` LEFT JOIN ${join}`).join(''); + const orderBy = order ? ` ORDER BY ${order}` : ''; + + return new TQB(`\ +SELECT tid, JSONB_SET(JSONB_AGG(tx)->0, '{${attribute}}', JSONB_AGG(${element}${orderBy})) AS tx FROM ( +${this.indent().query} +) t${leftJoins} GROUP BY tid`); + } +} + +const innerQuery = (collateral: boolean) => `\ +SELECT tx.id AS tid, JSON_BUILD_OBJECT( + 'id', ENCODE((ARRAY_AGG(tx.hash))[1], 'hex'), + 'index', (ARRAY_AGG(block_index))[1], + 'size', (ARRAY_AGG(tx.size))[1], + 'fee', (ARRAY_AGG(fee))[1], + 'before', (ARRAY_AGG(invalid_before))[1], + 'hereafter', (ARRAY_AGG(invalid_hereafter))[1], + 'valid', (ARRAY_AGG(valid_contract))[1], + 'block', (ARRAY_AGG(block_no))[1], + 'hash', ENCODE((ARRAY_AGG(b.hash))[1], 'hex'), + 'slot', (ARRAY_AGG(slot_no))[1]${ + collateral + ? // eslint-disable-next-line sonarjs/no-nested-template-literals + `, + 'collateral', JSONB_BUILD_OBJECT( + 'address', (ARRAY_AGG(address))[1], + 'value', (ARRAY_AGG(value))[1], + 'hash', ENCODE((ARRAY_AGG(data_hash))[1], 'hex'), + 'assets', JSONB_BUILD_ARRAY(JSONB_BUILD_ARRAY(NULL)), + 'script', CASE WHEN (ARRAY_AGG(bytes))[1] IS NULL THEN NULL ELSE JSONB_BUILD_OBJECT( + '__type', 'plutus', + 'bytes', ENCODE((ARRAY_AGG(bytes))[1], 'hex'), + 'version', CASE WHEN (ARRAY_AGG(type))[1] = 'plutusV1' THEN 0 WHEN (ARRAY_AGG(type))[1] = 'plutusV2' THEN 1 ELSE 2 END + ) END + )` + : '' + } +) AS tx +FROM tx JOIN block b ON tx.block_id = b.id${ + collateral + ? ' LEFT JOIN collateral_tx_out ON tx_id = tx.id LEFT JOIN script s ON s.id = reference_script_id AND s.bytes IS NOT NULL' + : '' +} WHERE tx.id = ANY($1) GROUP BY tx.id`; + +/* eslint-disable sonarjs/no-duplicate-string */ +const tqb = new TQB(innerQuery(true)) + .nest( + 'actions', + `\ +JSONB_BUILD_OBJECT( + 'deposit', deposit::TEXT, + 'description', description, + 'hash', ENCODE(data_hash, 'hex'), + 'url', url, + 'view', view +)`, + [ + 'gov_action_proposal g ON tx_id = tid', + 'voting_anchor v ON voting_anchor_id = v.id', + 'stake_address s ON return_address = s.id' + ], + 'g.id' + ) + .nest('commRet', "JSONB_BUILD_ARRAY(cert_index, ENCODE(raw, 'hex'), has_script, url, ENCODE(data_hash, 'hex'))", [ + 'committee_de_registration ON tx_id = tid', + 'committee_hash ON cold_key_id = committee_hash.id', + 'voting_anchor ON voting_anchor.id = voting_anchor_id' + ]) + .nest( + 'commReg', + "JSONB_BUILD_ARRAY(cert_index, ENCODE(c1.raw, 'hex'), c1.has_script, ENCODE(c2.raw, 'hex'), c2.has_script)", + [ + 'committee_registration ON tx_id = tid', + 'committee_hash AS c1 ON cold_key_id = c1.id', + 'committee_hash AS c2 ON hot_key_id = c2.id' + ] + ) + .nest('spRetire', 'JSONB_BUILD_ARRAY(cert_index, retiring_epoch, view)', [ + 'pool_retire ON announced_tx_id = tid', + 'pool_hash p ON p.id = hash_id' + ]) + .nest( + 'spReg', + `\ +JSONB_BUILD_ARRAY( + cert_index, + p.view, + CASE WHEN deposit IS NULL THEN 0 ELSE deposit::INTEGER END, + s.view, + pledge, + fixed_cost, + margin, + ENCODE(vrf_key_hash, 'hex') +)`, + [ + 'pool_update ON registered_tx_id = tid', + 'pool_hash p ON p.id = hash_id', + 'stake_address s ON s.id = reward_addr_id' + ] + ) + .nest( + 'deRep', + "JSONB_BUILD_ARRAY(deposit::TEXT, cert_index, CASE WHEN has_script THEN 1 ELSE 0 END, ENCODE(raw, 'hex'), url, ENCODE(data_hash, 'hex'))", + [ + 'drep_registration ON tx_id = tid', + 'drep_hash ON drep_hash.id = drep_hash_id', + 'voting_anchor ON voting_anchor.id = voting_anchor_id' + ] + ) + .nest( + 'votes', + `\ +JSONB_BUILD_OBJECT( + 'role', voter_role, + 'cVoter', ENCODE(c.raw, 'hex'), + 'cScript', c.has_script, + 'dVoter', ENCODE(d.raw, 'hex'), + 'dScript', d.has_script, + 'pVoter', ENCODE(p.hash_raw, 'hex'), + 'vote', CASE WHEN vote = 'No' THEN 0 WHEN vote = 'Yes' THEN 1 WHEN vote = 'Abstain' THEN 2 END, + 'index', g.index::INTEGER, + 'tx', ENCODE(hash, 'hex'), + 'url', a.url, + 'hash', ENCODE(a.data_hash, 'hex') +)`, + [ + 'voting_procedure v ON v.tx_id = tid', + 'gov_action_proposal g ON gov_action_proposal_id = g.id', + 'tx ON g.tx_id = tx.id', + 'drep_hash d ON drep_voter = d.id', + 'pool_hash p ON pool_voter = p.id', + 'voting_anchor a ON v.voting_anchor_id = a.id', + 'committee_hash c ON c.id = committee_voter' + ], + 'v.index' + ) + .nest( + 'vDele', + "JSONB_BUILD_ARRAY(cert_index, CASE WHEN has_script THEN 1 ELSE 0 END, ENCODE(raw, 'hex'), drep_hash.view, stake_address.view)", + [ + 'delegation_vote ON tx_id = tid', + 'drep_hash ON drep_hash.id = drep_hash_id', + 'stake_address ON stake_address.id = addr_id' + ] + ) + .nest( + 'unReg', + `\ +JSONB_BUILD_ARRAY(cert_index, ( + SELECT sr.deposit FROM stake_registration AS sr + WHERE sr.addr_id = sd.addr_id AND sr.tx_id < sd.tx_id + ORDER BY sr.tx_id DESC LIMIT 1 +), view)`, + ['stake_deregistration sd ON tx_id = tid', 'stake_address ON stake_address.id = addr_id'] + ) + .nest('reg', 'JSONB_BUILD_ARRAY(cert_index, deposit, view)', [ + 'stake_registration ON tx_id = tid', + 'stake_address ON stake_address.id = addr_id' + ]) + .nest('sDele', 'JSONB_BUILD_ARRAY(cert_index, addr.view, pool.view)', [ + 'delegation ON tx_id = tid', + 'pool_hash AS pool ON pool.id = pool_hash_id', + 'stake_address AS addr ON addr.id = addr_id' + ]) + .nest( + 'mint', + "JSONB_BUILD_ARRAY(ENCODE(policy || name, 'hex'), quantity::TEXT)", + ['ma_tx_mint ON tx_id = tid', 'multi_asset ON ident = multi_asset.id'], + 'ma_tx_mint.id' + ) + .nest( + 'withdrawals', + "JSONB_BUILD_OBJECT('quantity', amount::TEXT, 'stakeAddress', view)", + ['withdrawal ON tx_id = tid', 'stake_address ON stake_address.id = addr_id'], + 'withdrawal.id' + ) + .nest( + 'redeemers', + "JSONB_BUILD_OBJECT('index', index, 'purpose', purpose, 'mem', unit_mem, 'steps', unit_steps)", + ['redeemer ON tx_id = tid'], + 'id' + ) + .nest( + 'collaterals', + "JSONB_BUILD_OBJECT('address', address, 'txId', ENCODE(hash, 'hex'), 'index', index)", + [ + 'collateral_tx_in c ON tx_in_id = tid', + 'tx_out ON tx_out_id = tx_id AND tx_out_index = index', + 'tx ON tx_id = tx.id' + ], + 'c.id' + ) + .nest( + 'metadata', + "JSONB_BUILD_ARRAY(key::TEXT, ENCODE(bytes, 'hex'))", + ['tx_metadata ON tx_metadata.tx_id = tid'], + 'id' + ); +/* eslint-enable sonarjs/no-duplicate-string */ + +const buildQuery = (qb: TQB) => `\ +SELECT tid, JSONB_SET(JSONB_AGG(tx)->0, '{outputs}', JSONB_AGG(out ORDER BY oid)) AS tx FROM ( + SELECT tid, JSONB_AGG(tx)->0 AS tx, o.id AS oid, JSONB_BUILD_OBJECT( + 'address', o.address, + 'value', o.value::TEXT, + 'datum', ENCODE((ARRAY_AGG(d.bytes))[1], 'hex'), + 'hash', ENCODE(o.data_hash, 'hex'), + 'assets', JSONB_AGG(JSONB_BUILD_ARRAY(ENCODE(policy || name, 'hex'), quantity::TEXT) ORDER BY m.id), + 'script', CASE WHEN (ARRAY_AGG(s.json))[1] IS NOT NULL THEN JSONB_BUILD_OBJECT( + '__type', '_native', + 'json', (ARRAY_AGG(s.json))[1] + ) WHEN (ARRAY_AGG(s.bytes))[1] IS NOT NULL THEN JSONB_BUILD_OBJECT( + '__type', 'plutus', + 'bytes', ENCODE((ARRAY_AGG(s.bytes))[1], 'hex'), + 'version', CASE WHEN (ARRAY_AGG(type))[1] = 'plutusV1' THEN 0 WHEN (ARRAY_AGG(type))[1] = 'plutusV2' THEN 1 ELSE 2 END + ) ELSE NULL END + ) AS out FROM ( +${ + qb + .nest( + 'inputs', + "JSONB_BUILD_OBJECT('address', address, 'txId', ENCODE(hash, 'hex'), 'index', index)", + ['tx_in ON tx_in_id = tid', 'tx_out ON tx_out_id = tx_id AND tx_out_index = index', 'tx ON tx_id = tx.id'], + 'tx_in.id' + ) + .indent() + .indent().query +} + ) t LEFT JOIN tx_out o ON o.tx_id = tid LEFT JOIN ma_tx_out m ON m.tx_out_id = o.id LEFT JOIN multi_asset a ON m.ident = a.id + LEFT JOIN script s ON s.id = reference_script_id LEFT JOIN datum d ON d.id = inline_datum_id GROUP BY tid, o.id +) t GROUP BY tid`; + +const getTransactions = buildQuery(tqb); +const getUtxos = buildQuery(new TQB(innerQuery(false))); + +const mapActions = (actions: ActionModel[]) => + actions[0].description + ? actions.map(({ deposit, description, hash, url, view }) => ({ + anchor: mapAnchor(url, hash)!, + deposit: BigInt(deposit), + governanceAction: getGovernanceAction(description), + rewardAccount: view + })) + : undefined; + +const mapAsset = (asset: [Cardano.AssetId, string]) => [asset[0], BigInt(asset[1])] as const; + +const compound = < + C1 extends Cardano.HydratedCertificate, + C2 extends Cardano.HydratedCertificate, + C3 extends Cardano.HydratedCertificate +>( + certs1: (readonly [number, C1])[], + certs2: (readonly [number, C2])[], + merge: (c1: C1, c2: C2) => C3 +) => { + const result1: (readonly [number, C1])[] = []; + const result: (readonly [number, C3])[] = []; + const foundIndexes2: number[] = []; + + for (const c1 of certs1) { + const c2index = certs2.findIndex((c2) => c1[0] === c2[0]); + + if (c2index === -1) result1.push(c1); + else { + foundIndexes2.push(c2index); + result.push([c1[0], merge(c1[1], certs2[c2index][1])]); + } + } + + const result2 = certs2.filter((_, c2index) => !foundIndexes2.includes(c2index)); + + return [result1, result2, result] as const; +}; + +// eslint-disable-next-line complexity, max-statements, sonarjs/cognitive-complexity +const mapCertificates = (tx: TxModel) => { + const { deRep, reg, sDele, unReg, vDele, spReg, spRetire, commReg, commRet } = tx; + const result: Cardano.HydratedCertificate[] = []; + + let sRegDel: (readonly [number, Cardano.StakeRegistrationDelegationCertificate])[]; + let sVotRegDel: (readonly [number, Cardano.StakeVoteRegistrationDelegationCertificate])[]; + let sVotDel: (readonly [number, Cardano.StakeVoteDelegationCertificate])[]; + let vRegDel: (readonly [number, Cardano.VoteRegistrationDelegationCertificate])[]; + + const mapStakeReg = (cert: TxModel['reg'][number]) => ({ + __typename: Cardano.CertificateType.Registration as const, + deposit: BigInt(cert[1]), + stakeCredential: Cardano.Address.fromBech32(cert[2]).asReward()!.getPaymentCredential() + }); + let sReg = (reg[0][2] ? reg : []).map((cert) => [cert[0], mapStakeReg(cert)] as const); + + const mapStakeDeleg = (cert: TxModel['sDele'][number]) => ({ + __typename: Cardano.CertificateType.StakeDelegation as const, + poolId: cert[2], + stakeCredential: Cardano.Address.fromBech32(cert[1]).asReward()!.getPaymentCredential() + }); + let sDel = (sDele[0][1] ? sDele : []).map((cert) => [cert[0], mapStakeDeleg(cert)] as const); + + const mapVoteDeleg = (cert: TxModel['vDele'][number]) => ({ + __typename: Cardano.CertificateType.VoteDelegation as const, + dRep: (cert[2] + ? { hash: cert[2], type: cert[1] } + : cert[3] === 'drep_always_abstain' + ? { __typename: 'AlwaysAbstain' } + : { __typename: 'AlwaysNoConfidence' }) as Cardano.DelegateRepresentative, + stakeCredential: Cardano.Address.fromBech32(cert[4]).asReward()!.getPaymentCredential() + }); + let vDel = (vDele[0][0] !== null ? vDele : []).map((cert) => [cert[0], mapVoteDeleg(cert)] as const); + + [sDel, sReg, sRegDel] = compound(sDel, sReg, ({ poolId }, { deposit, stakeCredential }) => ({ + __typename: Cardano.CertificateType.StakeRegistrationDelegation, + deposit, + poolId, + stakeCredential + })); + + // eslint-disable-next-line prefer-const + [vDel, sRegDel, sVotRegDel] = compound(vDel, sRegDel, ({ dRep, stakeCredential }, { poolId, deposit }) => ({ + __typename: Cardano.CertificateType.StakeVoteRegistrationDelegation, + dRep, + deposit, + poolId, + stakeCredential + })); + + // eslint-disable-next-line prefer-const + [sDel, vDel, sVotDel] = compound(sDel, vDel, ({ poolId }, { dRep, stakeCredential }) => ({ + __typename: Cardano.CertificateType.StakeVoteDelegation, + dRep, + poolId, + stakeCredential + })); + + // eslint-disable-next-line prefer-const + [sReg, vDel, vRegDel] = compound(sReg, vDel, ({ deposit }, { dRep, stakeCredential }) => ({ + __typename: Cardano.CertificateType.VoteRegistrationDelegation, + dRep, + deposit, + stakeCredential + })); + + for (const cert of sReg) result[cert[0]] = cert[1]; + for (const cert of sDel) result[cert[0]] = cert[1]; + for (const cert of vDel) result[cert[0]] = cert[1]; + for (const cert of sRegDel) result[cert[0]] = cert[1]; + for (const cert of sVotDel) result[cert[0]] = cert[1]; + for (const cert of vRegDel) result[cert[0]] = cert[1]; + for (const cert of sVotRegDel) result[cert[0]] = cert[1]; + + if (unReg[0][2]) + for (const cert of unReg) + result[cert[0]] = { + __typename: Cardano.CertificateType.Unregistration as const, + deposit: BigInt(cert[1]), + stakeCredential: Cardano.Address.fromBech32(cert[2]).asReward()!.getPaymentCredential() + }; + + if (deRep[0][3]) + for (const [dep, ...cert] of deRep) { + const update = dep === null; + const unreg = !update && dep.startsWith('-'); + const deposit = BigInt(update ? 0 : unreg ? dep.slice(1) : dep); + const anchor = mapAnchor(cert[3], cert[4]); + + result[cert[0]] = { + ...(update + ? { __typename: Cardano.CertificateType.UpdateDelegateRepresentative as const, anchor } + : unreg + ? { __typename: Cardano.CertificateType.UnregisterDelegateRepresentative as const, deposit } + : { __typename: Cardano.CertificateType.RegisterDelegateRepresentative as const, anchor, deposit }), + dRepCredential: { hash: cert[2], type: cert[1] } + }; + } + + if (spReg[0][1]) + for (const cert of spReg) + result[cert[0]] = { + __typename: Cardano.CertificateType.PoolRegistration as const, + deposit: BigInt(cert[2]), + poolParameters: { + cost: BigInt(cert[5]), + id: cert[1], + margin: Cardano.FractionUtils.toFraction(cert[6]), + owners: [], + pledge: BigInt(cert[4]), + relays: [], + rewardAccount: cert[3], + vrf: cert[7] + } + }; + + if (spRetire[0][2]) + for (const cert of spRetire) + result[cert[0]] = { + __typename: Cardano.CertificateType.PoolRetirement as const, + epoch: cert[1], + poolId: cert[2] + }; + + const getCredentialType = (hasScript: boolean) => + hasScript ? Cardano.CredentialType.ScriptHash : Cardano.CredentialType.KeyHash; + + if (commReg[0][1]) + for (const cert of commReg) + result[cert[0]] = { + __typename: Cardano.CertificateType.AuthorizeCommitteeHot, + coldCredential: { hash: cert[1], type: getCredentialType(cert[2]) }, + hotCredential: { hash: cert[3], type: getCredentialType(cert[4]) } + }; + + if (commRet[0][1]) + for (const cert of commRet) + result[cert[0]] = { + __typename: Cardano.CertificateType.ResignCommitteeCold, + anchor: mapAnchor(cert[3], cert[4]), + coldCredential: { hash: cert[1], type: getCredentialType(cert[2]) } + }; + + return result.length > 0 ? result : undefined; +}; + +const mapOutput = (output: OutputModel) => { + const assets = output.assets[0][0] ? new Map(output.assets.map(mapAsset)) : undefined; + const result: Cardano.TxOut = { + address: output.address, + value: { coins: BigInt(output.value) } + }; + + if (assets) result.value.assets = assets; + // Let's do this forcing to perform the deserialization on client side + if (output.datum) result.datum = output.datum as unknown as Cardano.PlutusData; + if (output.hash) result.datumHash = output.hash; + if (output.script) + result.scriptReference = + output.script.__type === '_native' ? jsonToNativeScript(output.script.json) : output.script; + + return result; +}; + +// TODO: unfortunately this is not nullable and not implemented. +// Remove this and select the actual redeemer data from `redeemer_data` table. +const stubRedeemerData = Buffer.from('not implemented'); + +const mapRedeemer = (redeemer: RedeemerModel): Cardano.Redeemer => ({ + data: stubRedeemerData, + executionUnits: { memory: redeemer.mem, steps: redeemer.steps }, + index: redeemer.index, + purpose: redeemerPurposeMap[redeemer.purpose] +}); + +export const mapVoter = ({ role, cVoter, cScript, dVoter, dScript, pVoter }: VoteModel): Cardano.Voter => { + switch (role) { + case 'ConstitutionalCommittee': + return cScript + ? { + __typename: Cardano.VoterType.ccHotScriptHash, + credential: { hash: cVoter, type: Cardano.CredentialType.ScriptHash } + } + : { + __typename: Cardano.VoterType.ccHotKeyHash, + credential: { hash: cVoter, type: Cardano.CredentialType.KeyHash } + }; + + case 'DRep': + return dScript + ? { + __typename: Cardano.VoterType.dRepScriptHash, + credential: { hash: dVoter, type: Cardano.CredentialType.ScriptHash } + } + : { + __typename: Cardano.VoterType.dRepKeyHash, + credential: { hash: dVoter, type: Cardano.CredentialType.KeyHash } + }; + + case 'SPO': + return { + __typename: Cardano.VoterType.stakePoolKeyHash, + credential: { hash: pVoter, type: Cardano.CredentialType.KeyHash } + }; + } +}; + +const mapVotes = (votes: VoteModel[]): Cardano.VotingProcedures => { + const procedures: Cardano.VotingProcedures = []; + let lastStringified = ''; + let lastVotes: Cardano.VotingProcedureVote[] = []; + + for (const vote of votes) { + const voter = mapVoter(vote); + const stringified = JSON.stringify(voter); + const procedure: Cardano.VotingProcedureVote = { + actionId: { actionIndex: vote.index, id: vote.tx }, + votingProcedure: { anchor: vote.url ? mapAnchor(vote.url, vote.hash) : null, vote: vote.vote } + }; + + if (stringified !== lastStringified) { + lastStringified = stringified; + procedures.push({ voter, votes: (lastVotes = [procedure]) }); + } else lastVotes.push(procedure); + } + + return procedures; +}; + +const mapWithdrawals = (withdrawals: TxModel['withdrawals']) => + withdrawals[0].stakeAddress + ? (withdrawals || []).map(({ quantity, stakeAddress }) => ({ + quantity: BigInt(quantity), + stakeAddress + })) + : undefined; + +const signatures = new Map(); + +// eslint-disable-next-line complexity +const mapTx = (tx: TxModel): Cardano.HydratedTx => { + const collateralReturn = tx.collateral.address ? mapOutput(tx.collateral) : undefined; + const collaterals = tx.collaterals[0].txId ? tx.collaterals : []; + const fee = BigInt(tx.fee); + const inputs = tx.inputs[0].address ? tx.inputs : []; + const outputs = tx.outputs[0].address ? tx.outputs.map(mapOutput) : []; + + return { + // Let's do this forcing to perform the deserialization on client side + auxiliaryData: tx.metadata[0][1] ? (tx.metadata as unknown as Cardano.AuxiliaryData) : undefined, + blockHeader: { blockNo: tx.block, hash: tx.hash, slot: tx.slot }, + body: { + ...(tx.valid + ? { collateralReturn, collaterals, fee, inputs, outputs } + : { + collateralReturn: outputs[0], + collaterals: inputs, + fee: 0n, + inputs: [], + outputs: [], + totalCollateral: fee + }), + certificates: mapCertificates(tx), + mint: tx.mint[0][0] ? new Map(tx.mint.map(mapAsset)) : undefined, + proposalProcedures: mapActions(tx.actions), + validityInterval: { invalidBefore: tx.before || undefined, invalidHereafter: tx.hereafter || undefined }, + votingProcedures: tx.votes[0].role ? mapVotes(tx.votes) : undefined, + withdrawals: mapWithdrawals(tx.withdrawals) + }, + id: tx.id, + index: tx.index, + inputSource: tx.valid ? Cardano.InputSource.inputs : Cardano.InputSource.collaterals, + txSize: tx.size, + witness: { redeemers: tx.redeemers[0].purpose ? tx.redeemers.map(mapRedeemer) : undefined, signatures } + }; +}; + +const actions = [{}] as ActionModel[]; +const collateral = {} as OutputModel; +const collaterals = [{}] as Cardano.HydratedTxIn[]; +const commReg = [[]] as unknown as TxModel['commReg']; +const commRet = [[]] as unknown as TxModel['commRet']; +const deRep = [[]] as unknown as TxModel['deRep']; +const metadata = [[]] as unknown as MetadataDbModel[]; +const mint = [[]] as unknown as TxModel['mint']; +const redeemers = [{}] as RedeemerModel[]; +const reg = [[]] as unknown as TxModel['reg']; +const sDele = [[]] as unknown as TxModel['sDele']; +const spReg = [[]] as unknown as TxModel['spReg']; +const spRetire = [[]] as unknown as TxModel['spRetire']; +const unReg = [[]] as unknown as TxModel['unReg']; +const vDele = [[null]] as unknown as TxModel['vDele']; +const votes = [{}] as VoteModel[]; +const withdrawals = [{}] as TxModel['withdrawals']; + +export const transactionsByIds = async (ids: string[], db: Pool, onlyUtxos?: boolean) => { + const result = await db.query<{ tx: TxModel }>({ + ...(onlyUtxos ? { name: 'get_utxos', text: getUtxos } : { name: 'get_txs', text: getTransactions }), + values: [ids] + }); + + const rows = onlyUtxos + ? result.rows.map(({ tx }) => ({ + tx: { + ...tx, + actions, + collateral, + collaterals, + commReg, + commRet, + deRep, + metadata, + mint, + redeemers, + reg, + sDele, + spReg, + spRetire, + unReg, + vDele, + votes, + withdrawals + } + })) + : result.rows; + + return rows.map(({ tx }) => mapTx(tx)); +}; diff --git a/packages/core/src/WebSocket.ts b/packages/core/src/WebSocket.ts index 8b4be83e6f8..8d6e8d26067 100644 --- a/packages/core/src/WebSocket.ts +++ b/packages/core/src/WebSocket.ts @@ -1,41 +1,90 @@ +// cSpell:ignore utxos + +import { BlockNo } from './Cardano/types/Block'; import { HealthCheckResponse, NetworkInfoProvider, Provider } from './Provider'; +import { HexBlob } from '@cardano-sdk/util'; +import { HydratedTx } from './Cardano/types/Transaction'; import { Observable, ReplaySubject, firstValueFrom } from 'rxjs'; +import { PaymentAddress } from './Cardano/Address/PaymentAddress'; export type AsyncReturnType unknown> = F extends () => Promise ? R : never; +export type MetadataDbModel = [string, HexBlob]; + export type NetworkInfoMethods = Exclude; export type NetworkInfoResponses = { [m in NetworkInfoMethods]: AsyncReturnType }; export interface WSMessage { + /** The addresses the client subscribes. */ + txsByAddresses?: { addresses: PaymentAddress[]; lower: BlockNo }; + /** The client id assigned by the server. */ clientId?: string; + /** The error on server while performing a request. */ + error?: Error; + /** Latest value(s) for the `NetworkInfoProvider` methods.*/ networkInfo?: Partial; + + /** The request id from client to server. */ + requestId?: number; + + /** This message is the response to the message with this id. */ + responseTo?: number; + + /** The server is still syncing. */ + syncing?: boolean; + + /** The transactions. */ + transactions?: HydratedTx[]; + + /** The partial transactions for UTXOs. */ + utxos?: HydratedTx[]; +} + +interface EmitHealthOptions { + notRecoverable?: boolean; + overwrite?: boolean; +} + +interface WSHealthCheckResponse extends HealthCheckResponse { + notRecoverable?: boolean; } export class WsProvider implements Provider { /** Emits the health state. */ - public health$: Observable; + public health$: Observable; - private healthSubject$: ReplaySubject; + private healthSubject$: ReplaySubject; + private notRecoverable?: boolean; private reason?: string; constructor() { - this.health$ = this.healthSubject$ = new ReplaySubject(1); + this.health$ = this.healthSubject$ = new ReplaySubject(1); this.healthSubject$.next({ ok: false, reason: 'starting' }); } - protected emitHealth(reason?: string, overwrite?: boolean) { + protected emitHealth(reason?: string | HealthCheckResponse, { notRecoverable, overwrite }: EmitHealthOptions = {}) { + if (this.notRecoverable) return; + if (!reason) { this.reason = undefined; return this.healthSubject$.next({ ok: true }); } - if (overwrite || !this.reason) this.reason = reason; + if (notRecoverable) this.notRecoverable = true; + + let result: WSHealthCheckResponse; + + if (typeof reason === 'string') { + if (overwrite || !this.reason) this.reason = reason; + + result = { notRecoverable: this.notRecoverable, ok: false, reason: this.reason }; + } else result = reason; - this.healthSubject$.next({ ok: false, reason: this.reason }); + this.healthSubject$.next(result); } public healthCheck() { diff --git a/packages/e2e/.env.example b/packages/e2e/.env.example index f4be88f767a..7a9bd4986e1 100644 --- a/packages/e2e/.env.example +++ b/packages/e2e/.env.example @@ -12,7 +12,7 @@ KEY_MANAGEMENT_PARAMS='{"bip32Ed25519": "Sodium", "accountIndex": 0, "chainId":{ # Providers setup - required by getWallet TEST_CLIENT_ASSET_PROVIDER=http TEST_CLIENT_ASSET_PROVIDER_PARAMS='{"baseUrl":"http://localhost:4014/"}' -TEST_CLIENT_CHAIN_HISTORY_PROVIDER=http +TEST_CLIENT_CHAIN_HISTORY_PROVIDER=ws TEST_CLIENT_CHAIN_HISTORY_PROVIDER_PARAMS='{"baseUrl":"http://localhost:4000/"}' TEST_CLIENT_HANDLE_PROVIDER=http TEST_CLIENT_HANDLE_PROVIDER_PARAMS='{"baseUrl":"http://localhost:4011/"}' @@ -22,7 +22,7 @@ TEST_CLIENT_REWARDS_PROVIDER=http TEST_CLIENT_REWARDS_PROVIDER_PARAMS='{"baseUrl":"http://localhost:4000/"}' TEST_CLIENT_TX_SUBMIT_PROVIDER=http TEST_CLIENT_TX_SUBMIT_PROVIDER_PARAMS='{"baseUrl":"http://localhost:4000/"}' -TEST_CLIENT_UTXO_PROVIDER=http +TEST_CLIENT_UTXO_PROVIDER=ws TEST_CLIENT_UTXO_PROVIDER_PARAMS='{"baseUrl":"http://localhost:4000/"}' TEST_CLIENT_STAKE_POOL_PROVIDER=http TEST_CLIENT_STAKE_POOL_PROVIDER_PARAMS='{"baseUrl":"http://localhost:4000/"}' diff --git a/packages/e2e/package.json b/packages/e2e/package.json index b3b3688a0f5..af9fc8d7959 100644 --- a/packages/e2e/package.json +++ b/packages/e2e/package.json @@ -30,7 +30,7 @@ "test:ogmios": "jest -c jest.config.js --forceExit --selectProjects ogmios --runInBand --verbose", "test:pg-boss": "jest -c jest.config.js --forceExit --selectProjects pg-boss --runInBand --verbose", "test:providers": "jest -c jest.config.js --forceExit --selectProjects providers --runInBand --verbose", - "test:wallet": "yarn wait-for-network-init ; yarn test:wallet:epoch0 && yarn test:ws && { yarn wait-for-network-epoch-3 ; yarn test:wallet:epoch3 ; }", + "test:wallet": "yarn wait-for-network-init ; yarn test:wallet:epoch0 && { yarn wait-for-network-epoch-3 ; yarn test:wallet:epoch3 && yarn test:ws ; }", "test:wallet:epoch0": "jest -c jest.config.js --forceExit --selectProjects wallet_epoch_0 --runInBand --verbose", "test:wallet:epoch3": "jest -c jest.config.js --forceExit --selectProjects wallet_epoch_3 --runInBand --verbose", "test:wallet-real-ada": "NETWORK_SPEED=slow jest -c jest.config.js --forceExit --selectProjects wallet-real-ada --runInBand --verbose", diff --git a/packages/e2e/src/factories.ts b/packages/e2e/src/factories.ts index 061a95e6999..1663a53cf24 100644 --- a/packages/e2e/src/factories.ts +++ b/packages/e2e/src/factories.ts @@ -1,4 +1,5 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ +// cSpell:ignore dcspark ledgerjs multiplatform shiroyasha vespaiach import * as CML from '@dcspark/cardano-multiplatform-lib-nodejs'; import * as Crypto from '@cardano-sdk/crypto'; import { @@ -64,9 +65,10 @@ const customHttpFetchAdapter = isNodeJs ? undefined : require('@shiroyasha9/axio const HTTP_PROVIDER = 'http'; const OGMIOS_PROVIDER = 'ogmios'; const STUB_PROVIDER = 'stub'; -const MISSING_URL_PARAM = 'Missing URL'; const WS_PROVIDER = 'ws'; +const MISSING_URL_PARAM = 'Missing URL'; + export type CreateKeyAgent = (dependencies: KeyAgentDependencies) => Promise; export const keyManagementFactory = new ProviderFactory(); export const assetProviderFactory = new ProviderFactory(); @@ -93,6 +95,25 @@ addressDiscoveryFactory.register( bip32Ed25519Factory.register('CML', async () => new Crypto.CmlBip32Ed25519(CML)); bip32Ed25519Factory.register('Sodium', async () => new Crypto.SodiumBip32Ed25519()); +// Web Socket + +let wsClient: CardanoWsClient; + +const getWsClient = async (logger: Logger) => { + if (wsClient) return wsClient; + + const env = getEnv(walletVariables); + const chainHistoryProvider = await chainHistoryProviderFactory.create( + HTTP_PROVIDER, + env.TEST_CLIENT_CHAIN_HISTORY_PROVIDER_PARAMS, + logger + ); + + if (env.WS_PROVIDER_URL === undefined) throw new Error(`${networkInfoHttpProvider.name}: ${MISSING_URL_PARAM}`); + + return (wsClient = new CardanoWsClient({ chainHistoryProvider, logger }, { url: new URL(env.WS_PROVIDER_URL) })); +}; + // Asset providers assetProviderFactory.register(HTTP_PROVIDER, async (params: any, logger: Logger): Promise => { @@ -120,6 +141,11 @@ chainHistoryProviderFactory.register( } ); +chainHistoryProviderFactory.register( + WS_PROVIDER, + async (_params: any, logger: Logger) => (await getWsClient(logger)).chainHistoryProvider +); + networkInfoProviderFactory.register( HTTP_PROVIDER, async (params: any, logger: Logger): Promise => { @@ -131,15 +157,10 @@ networkInfoProviderFactory.register( } ); -networkInfoProviderFactory.register(WS_PROVIDER, (_params: any, logger: Logger): Promise => { - const env = getEnv(walletVariables); - - if (env.WS_PROVIDER_URL === undefined) throw new Error(`${networkInfoHttpProvider.name}: ${MISSING_URL_PARAM}`); - - const wsClient = new CardanoWsClient({ logger }, { url: new URL(env.WS_PROVIDER_URL) }); - - return Promise.resolve(wsClient.networkInfoProvider); -}); +networkInfoProviderFactory.register( + WS_PROVIDER, + async (_params: any, logger: Logger) => (await getWsClient(logger)).networkInfoProvider +); rewardsProviderFactory.register(HTTP_PROVIDER, async (params: any, logger: Logger): Promise => { if (params.baseUrl === undefined) throw new Error(`${rewardsHttpProvider.name}: ${MISSING_URL_PARAM}`); @@ -191,6 +212,11 @@ utxoProviderFactory.register(HTTP_PROVIDER, async (params: any, logger: Logger): }); }); +utxoProviderFactory.register( + WS_PROVIDER, + async (_params: any, logger: Logger) => (await getWsClient(logger)).utxoProvider +); + handleProviderFactory.register(HTTP_PROVIDER, async (params: any, logger: Logger): Promise => { if (params.baseUrl === undefined) throw new Error(`${handleHttpProvider.name}: ${MISSING_URL_PARAM}`); diff --git a/packages/e2e/test/k6/scenarios/web-socket.test.js b/packages/e2e/test/k6/scenarios/web-socket.test.js index 593a89bf633..57eca3e2682 100644 --- a/packages/e2e/test/k6/scenarios/web-socket.test.js +++ b/packages/e2e/test/k6/scenarios/web-socket.test.js @@ -1,15 +1,27 @@ +// cSpell:ignore loadimpact + import * as k6Utils from '../../../../util-dev/dist/cjs/k6-utils.js'; import { Counter, Trend } from 'k6/metrics'; +import { SharedArray } from 'k6/data'; import { check } from 'k6'; import ws from 'k6/ws'; -// eslint-disable-next-line no-undef -const { WALLETS } = Object.assign({ WALLETS: '10000' }, __ENV); -// eslint-disable-next-line no-undef -const dut = k6Utils.getDut(__ENV); +const { HD_ACTIVE_ADDR_COUNT, HD_MAX_TX_HISTORY, MAX_VU, TARGET_NET, WALLETS } = Object.assign( + { HD_ACTIVE_ADDR_COUNT: '10', HD_MAX_TX_HISTORY: '1000', MAX_VU: '100', TARGET_NET: 'mainnet', WALLETS: '10000' }, + // eslint-disable-next-line no-undef + __ENV +); +// eslint-disable-next-line no-undef +const dut = k6Utils.getDut(__ENV, { networks: ['mainnet', 'preprod'] }); const url = `wss://${dut}/ws`; +const activeAddrCount = Number.parseInt(HD_ACTIVE_ADDR_COUNT, 10); +const maxTxHistory = Number.parseInt(HD_MAX_TX_HISTORY, 10); +const maxVu = Number.parseInt(MAX_VU, 10); +const numWallets = Number.parseInt(WALLETS, 10); +const testDuration = Math.floor((numWallets * 3) / maxVu); + export const options = { ext: { loadimpact: { @@ -21,28 +33,73 @@ export const options = { connections: { executor: 'ramping-vus', gracefulRampDown: '0s', - gracefulStop: '120s', - stages: [{ duration: '3s', target: Number.parseInt(WALLETS, 10) }], + gracefulStop: `${testDuration + 60}s`, + stages: [{ duration: `${testDuration}s`, target: maxVu }], startVUs: 10 } } }; +/** Wallet addresses extracted from the JSON dump file */ +const fileName = `../../dump/addresses/${TARGET_NET}.json`; +// eslint-disable-next-line no-undef +const walletsOrig = new SharedArray('walletsData', () => JSON.parse(open(fileName))); + +export const setup = () => { + // Remove "big transaction history wallets" + const filteredWallets = walletsOrig.filter(({ tx_count }) => tx_count < maxTxHistory); + // Create chunks of `activeAddrCount` addresses per HD wallet + const wallets = k6Utils.chunkArray(filteredWallets, activeAddrCount); + + return { wallets: wallets.slice(0, numWallets) }; +}; + const operationalTrend = new Trend('_operational', true); +const syncTrend = new Trend('_sync', true); +const syncCount = new Counter('_sync_count'); const unexpectedCloseCounter = new Counter('_unexpected_close'); -export const run = () => { +const getDummyAddr = (addr, idx, suffix = 'mh') => { + const last3Chars = addr.slice(-3); + const updateChars = last3Chars !== `${suffix}${idx}` ? `${suffix}${idx}` : `${idx}${suffix}`; + return addr.slice(0, -3) + updateChars; +}; + +export const run = ({ wallets }) => { const begin = Date.now(); + // eslint-disable-next-line no-undef + const vu = __VU; + const wallet = wallets[vu % wallets.length]; // each wallet is a collection of addresses const res = ws.connect(url, null, (socket) => { let closed = false; - let firstMessage = true; + let requestId = 0; + + const nextAddress = () => { + // Simplified address discovery with 50 fake addresses + if (++requestId > wallet.length + 50) { + closed = true; + socket.close(); - socket.on('message', () => { - if (firstMessage) { - operationalTrend.add(Date.now() - begin); - firstMessage = false; + syncTrend.add(Date.now() - begin); + syncCount.add(1); + + return; } + + const address = + requestId <= wallet.length + ? wallet[requestId - 1].address + : getDummyAddr(wallet[0].address, requestId - wallet.length); + + socket.send(JSON.stringify({ requestId, txsByAddresses: { addresses: [address], lower: 0 } })); + }; + + socket.on('message', (message) => { + const { clientId, responseTo } = JSON.parse(message); + + if (clientId) operationalTrend.add(Date.now() - begin); + if (clientId || responseTo) nextAddress(); }); // Count unexpected close @@ -52,12 +109,6 @@ export const run = () => { // Heartbeat socket.setTimeout(() => socket.send('{}'), 30 * 1000); - - // End the test after 80" - socket.setTimeout(() => { - closed = true; - socket.close(); - }, 80 * 1000); }); check(res, { 'status is 101': (r) => r && r.status === 101 }); diff --git a/packages/e2e/test/long-running/webSocket.test.ts b/packages/e2e/test/long-running/webSocket.test.ts index e1e638906c2..934617da9d2 100644 --- a/packages/e2e/test/long-running/webSocket.test.ts +++ b/packages/e2e/test/long-running/webSocket.test.ts @@ -1,13 +1,15 @@ -import { CardanoWsClient } from '@cardano-sdk/cardano-services-client'; +import { CardanoWsClient, chainHistoryHttpProvider } from '@cardano-sdk/cardano-services-client'; import { getEnv, walletVariables } from '../../src'; import { logger } from '@cardano-sdk/util-dev'; const env = getEnv([...walletVariables]); describe('Web Socket', () => { + const chainHistoryProvider = chainHistoryHttpProvider({ logger, ...env.TEST_CLIENT_CHAIN_HISTORY_PROVIDER_PARAMS }); let client: CardanoWsClient; - const openClient = () => (client = new CardanoWsClient({ logger }, { url: new URL(env.WS_PROVIDER_URL) })); + const openClient = () => + (client = new CardanoWsClient({ chainHistoryProvider, logger }, { url: new URL(env.WS_PROVIDER_URL) })); const closeClient = () => (client ? client.close() : Promise.resolve()); diff --git a/packages/e2e/test/wallet_epoch_3/PersonalWallet/conwayTransactions.test.ts b/packages/e2e/test/wallet_epoch_3/PersonalWallet/conwayTransactions.test.ts index 9efbe6a57c6..01f6952f8d8 100644 --- a/packages/e2e/test/wallet_epoch_3/PersonalWallet/conwayTransactions.test.ts +++ b/packages/e2e/test/wallet_epoch_3/PersonalWallet/conwayTransactions.test.ts @@ -451,8 +451,8 @@ describe('PersonalWallet/conwayTransactions', () => { }); describe('with proposal procedure', () => { - let actionId: Cardano.GovernanceActionId; let confirmedTx: Cardano.HydratedTx; + let id: Cardano.TransactionId; let proposalProcedures: Cardano.ProposalProcedure[]; beforeAll(async () => { @@ -519,10 +519,7 @@ describe('PersonalWallet/conwayTransactions', () => { .customize(({ txBody }) => ({ ...txBody, proposalProcedures })) .build() .sign(); - const [id, tx] = await submitAndConfirm(wallet, signedTx.tx, 1); - confirmedTx = tx; - - actionId = { actionIndex: 0, id }; + [id, confirmedTx] = await submitAndConfirm(wallet, signedTx.tx, 1); }); it('parameter_change_action correctly submits protocol parameters update', () => { @@ -533,7 +530,10 @@ describe('PersonalWallet/conwayTransactions', () => { const votingProcedures: Cardano.VotingProcedures = [ { voter: { __typename: VoterType.dRepKeyHash, credential: dRepCredential }, - votes: [{ actionId, votingProcedure: { anchor, vote: Vote.abstain } }] + votes: [ + { actionId: { actionIndex: 0, id }, votingProcedure: { anchor, vote: Vote.abstain } }, + { actionId: { actionIndex: 1, id }, votingProcedure: { anchor: null, vote: Vote.yes } } + ] } ]; const signedTx = await dRepWallet diff --git a/packages/e2e/test/ws-server/webSocket.test.ts b/packages/e2e/test/ws-server/webSocket.test.ts index 52c8ee2fda4..bd2b97a132b 100644 --- a/packages/e2e/test/ws-server/webSocket.test.ts +++ b/packages/e2e/test/ws-server/webSocket.test.ts @@ -1,4 +1,7 @@ -import { CardanoWsClient } from '@cardano-sdk/cardano-services-client'; +// cSpell:ignore cardano utxos + +import { Cardano, HealthCheckResponse, WsProvider } from '@cardano-sdk/core'; +import { CardanoWsClient, chainHistoryHttpProvider, utxoHttpProvider } from '@cardano-sdk/cardano-services-client'; import { CardanoWsServer, GenesisData, @@ -6,16 +9,18 @@ import { getOgmiosCardanoNode, util } from '@cardano-sdk/cardano-services'; -import { HealthCheckResponse, WsProvider } from '@cardano-sdk/core'; import { OgmiosCardanoNode } from '@cardano-sdk/ogmios'; import { Pool } from 'pg'; import { filter, firstValueFrom } from 'rxjs'; import { getEnv, walletVariables } from '../../src'; import { getPort } from 'get-port-please'; import { logger } from '@cardano-sdk/util-dev'; +import { toSerializableObject } from '@cardano-sdk/util'; const env = getEnv([...walletVariables, 'DB_SYNC_CONNECTION_STRING', 'OGMIOS_URL']); +const pagination = { limit: 25, startAt: 0 }; + const wsProviderReady = (provider: WsProvider) => new Promise((resolve, reject) => { // eslint-disable-next-line prefer-const @@ -30,6 +35,7 @@ const wsProviderReady = (provider: WsProvider) => }); timeout = setTimeout(() => { + timeout = undefined; subscription.unsubscribe(); reject(new Error('WsProvider timeout')); }, 10_000); @@ -52,15 +58,13 @@ const wsProviderReadyAgain = (provider: WsProvider, close: () => Promise result)); + // Next we expect at least one not ok event i.e. the close function had the desired effect + const firstFalsy = oks.findIndex((element) => !element); + expect(firstFalsy).toBeGreaterThan(0); + // Last we expect one more ok event when provider is operational once again + expect(oks.findIndex((element, index) => element && index > firstFalsy)).toBeGreaterThan(firstFalsy); resolve(); } catch (error) { @@ -72,12 +76,14 @@ const wsProviderReadyAgain = (provider: WsProvider, close: () => Promise { + timeout = undefined; subscription.unsubscribe(); reject(new Error('WsProvider timeout')); }, 10_000); @@ -86,6 +92,9 @@ const wsProviderReadyAgain = (provider: WsProvider, close: () => Promise { + const chainHistoryProvider = chainHistoryHttpProvider({ logger, ...env.TEST_CLIENT_CHAIN_HISTORY_PROVIDER_PARAMS }); + const utxoProvider = utxoHttpProvider({ logger, ...env.TEST_CLIENT_CHAIN_HISTORY_PROVIDER_PARAMS }); + let db: Pool; let cardanoNode: OgmiosCardanoNode; let genesisData: GenesisData; @@ -94,35 +103,43 @@ describe('Web Socket', () => { let client: CardanoWsClient; let server: CardanoWsServer; - const openClient = (heartbeatInterval = 55) => - (client = new CardanoWsClient({ logger }, { heartbeatInterval, url: new URL(`ws://localhost:${port}/ws`) })); + const openClient = (options: { heartbeatInterval?: number; url?: string } = {}) => { + const { heartbeatInterval, url } = { heartbeatInterval: 55, url: `ws://localhost:${port}/ws`, ...options }; + + return (client = new CardanoWsClient({ chainHistoryProvider, logger }, { heartbeatInterval, url: new URL(url) })); + }; const openServer = (heartbeatTimeout = 60) => (server = new CardanoWsServer( { cardanoNode, db, genesisData, logger }, - { dbCacheTtl: 120, heartbeatTimeout, port } + { dbCacheTtl: 120, heartbeatInterval: 1, heartbeatTimeout, port } )); const closeClient = () => (client ? client.close() : Promise.resolve()); const closeServer = () => (server ? new Promise((resolve) => server.close(resolve)) : Promise.resolve()); - const listenToClientHealthFor15Seconds = async () => { + const listenToClientHealthFor5Seconds = async () => { const health: HealthCheckResponse[] = []; const subscription = client.health$.subscribe((value) => health.push(value)); - // Listen on client.health$ for 15" - await new Promise((resolve) => setTimeout(resolve, 15_000)); + // Listen on client.health$ for 5" + await new Promise((resolve) => setTimeout(resolve, 5000)); subscription.unsubscribe(); return health; }; + const transactionsByAddresses = () => + client.chainHistoryProvider.transactionsByAddresses({ + addresses: ['fake_address' as Cardano.PaymentAddress], + pagination + }); + beforeAll(async () => { const dnsResolver = createDnsResolver({ factor: 1.1, maxRetryTime: 1000 }, logger); cardanoNode = await getOgmiosCardanoNode(dnsResolver, logger, { ogmiosUrl: new URL(env.OGMIOS_URL) }); - db = new Pool({ connectionString: env.DB_SYNC_CONNECTION_STRING }); genesisData = await util.loadGenesisData('local-network/config/network/cardano-node/config.json'); port = await getPort(); @@ -130,9 +147,11 @@ describe('Web Socket', () => { await cardanoNode.start(); }); - afterAll(() => Promise.all([cardanoNode.shutdown(), db.end()])); + beforeEach(() => (db = new Pool({ connectionString: env.DB_SYNC_CONNECTION_STRING }))); + + afterAll(() => cardanoNode.shutdown()); - afterEach(() => Promise.all([closeClient(), closeServer()])); + afterEach(() => Promise.all([db.end(), closeClient(), closeServer()])); it('Server can re-connect to DB if NOTIFY connection drops', async () => { // Close server db connection from DB server side @@ -160,17 +179,17 @@ describe('Web Socket', () => { }); it('Server disconnects clients on heartbeat timeout', async () => { - // Open a server with 3" heartbeat timeout - openServer(3); + // Open a server with 2" heartbeat timeout + openServer(2); await wsProviderReady(server); openClient(); await wsProviderReady(client); - const health = await listenToClientHealthFor15Seconds(); + const health = await listenToClientHealthFor5Seconds(); - // Considering the server performs timeouts check every 10" - // We expect the heath state of the client goes up and down more time + // Considering the server performs timeouts check every second + // We expect the heath state of the client goes up and down more times expect(health.length).toBeGreaterThanOrEqual(3); // We expect the heath state of the client goes up at least twice expect(health.filter(({ ok }) => ok).length).toBeGreaterThanOrEqual(2); @@ -179,15 +198,15 @@ describe('Web Socket', () => { }); it("Server doesn't disconnects clients without heartbeat timeouts", async () => { - // Open a server with 3" heartbeat timeout - openServer(3); + // Open a server with 2" heartbeat timeout + openServer(2); await wsProviderReady(server); // Open a client with 2" heartbeat interval - openClient(2); + openClient({ heartbeatInterval: 1 }); await wsProviderReady(client); - const health = await listenToClientHealthFor15Seconds(); + const health = await listenToClientHealthFor5Seconds(); // We expect only the buffered ok heath state expect(health.length).toBe(1); @@ -217,4 +236,149 @@ describe('Web Socket', () => { await expect(client.networkInfoProvider.ledgerTip()).rejects.toThrowError('CONNECTION_FAILURE'); }); }); + + describe('CardanoWsClient.chainHistoryProvider.transactionsByAddresses', () => { + // The first two tests are identical to CardanoWsClient.networkInfoProvider ones, + // they are anyway required because the code behind the two providers is completely different + it('It throws when disconnected but when starting', async () => { + openServer(); + await wsProviderReady(server); + + openClient(); + + await expect(transactionsByAddresses()).resolves.toHaveProperty('pageResults'); + + await closeServer(); + await firstValueFrom(client.health$.pipe(filter(({ ok }) => !ok))); + + await expect(transactionsByAddresses()).rejects.toThrowError('CONNECTION_FAILURE'); + }); + + it('If called when still starting, it throws on connect error', async () => { + openClient(); + + await expect(transactionsByAddresses()).rejects.toThrowError('CONNECTION_FAILURE'); + }); + + it('More calls while syncing address throw', async () => { + openServer(); + await wsProviderReady(server); + + openClient(); + await wsProviderReady(client); + + const deferred = async () => { + await new Promise((resolve) => setTimeout(resolve, 1)); + await expect(transactionsByAddresses()).rejects.toThrowError('CONFLICT'); + }; + + await Promise.all([ + expect(transactionsByAddresses()).resolves.toHaveProperty('pageResults'), + deferred(), + deferred() + ]); + }); + + it('More calls after address is synced, never throw', async () => { + openServer(); + await wsProviderReady(server); + + openClient(); + await wsProviderReady(client); + + await expect(transactionsByAddresses()).resolves.toHaveProperty('pageResults'); + + await Promise.all([ + expect(transactionsByAddresses()).resolves.toHaveProperty('pageResults'), + expect(transactionsByAddresses()).resolves.toHaveProperty('pageResults') + ]); + }); + }); + + describe('transactions & utxos', () => { + const tests: string[][] = [ + ['collaterals', 'SELECT tx_in_id AS tx_id FROM collateral_tx_in'], + ['collateralReturn', 'SELECT tx_id FROM collateral_tx_out'], + [ + 'datum', + 'SELECT tx_id FROM tx_out LEFT JOIN tx_in ON tx_out_id = tx_id AND tx_out_index = index WHERE inline_datum_id IS NOT NULL AND tx_out_id IS NULL AND stake_address_id IS NOT NULL' + ], + ['failed phase 2 validation', 'SELECT id AS tx_id FROM tx WHERE valid_contract = false'], + ['mint', 'SELECT tx_id FROM ma_tx_mint'], + ['metadata', 'SELECT tx_id FROM tx_metadata'], + ['withdrawals', 'SELECT tx_id FROM withdrawal'], + ['redeemers', 'SELECT tx_id FROM redeemer'], + ['governance action proposals', 'SELECT tx_id FROM gov_action_proposal'], + ['voting procedures', 'SELECT tx_id FROM voting_procedure'], + ['certificate: stake pool registration', 'SELECT registered_tx_id AS tx_id FROM pool_update ORDER BY id DESC'], + ['certificate: stake pool retire', 'SELECT announced_tx_id AS tx_id FROM pool_retire'], + ['certificate: stake credential registration', 'SELECT tx_id FROM stake_registration ORDER BY id DESC'], + ['certificate: stake credential deregistration', 'SELECT tx_id FROM stake_deregistration'], + ['certificate: stake delegation', 'SELECT tx_id FROM delegation ORDER BY id DESC'], + ['certificate: vote delegation', 'SELECT tx_id FROM delegation_vote'], + ['certificate: delegation representative registration', 'SELECT tx_id FROM drep_registration WHERE deposit > 0'], + ['certificate: delegation representative update', 'SELECT tx_id FROM drep_registration WHERE deposit IS NULL'], + [ + 'certificate: delegation representative deregistration', + 'SELECT tx_id FROM drep_registration WHERE deposit < 0' + ], + ['certificate: committee registration', 'SELECT tx_id FROM committee_registration'], + ['certificate: committee deregistration', 'SELECT tx_id FROM committee_de_registration'] + ]; + + test.each(tests)('transactions with %s', async (name, subQuery) => { + // cSpell:disable + const query = `\ +SELECT address, block_no::INTEGER AS "lowerBound" FROM (${subQuery} LIMIT 1) t, tx, tx_out o, block +WHERE tx.id = o.tx_id AND t.tx_id = o.tx_id AND block_id = block.id AND address NOT IN ( + 'addr_test1wz3937ykmlcaqxkf4z7stxpsfwfn4re7ncy48yu8vutcpxgnj28k0', 'addr_test1wqmpwrh2mlqa04e2mf3vr8w9rjt9du0dpnync8dzc85spgsya8emz')`; + // cSpell:enable + const result = await db.query<{ address: Cardano.PaymentAddress; lowerBound: Cardano.BlockNo }>(query); + let step = ''; + + if (!result.rowCount) return logger.fatal(`Test 'transactions with ${name}': not valid transactions found`); + + const { address, lowerBound } = result.rows[0]; + const request = { addresses: [address], blockRange: { lowerBound }, pagination }; + + openClient({ url: env.WS_PROVIDER_URL }); + + try { + step = 'txs ws'; + const wsTxs = await client.chainHistoryProvider.transactionsByAddresses(request); + step = 'txs http'; + const httpTxs = await chainHistoryProvider.transactionsByAddresses(request); + step = 'txs test'; + expect(toSerializableObject(wsTxs)).toEqual(toSerializableObject(httpTxs)); + + step = 'utxos ws'; + const wsUtxos = await client.utxoProvider.utxoByAddresses(request); + step = 'utxos http'; + const httpUtxos = await utxoProvider.utxoByAddresses(request); + step = 'utxos test'; + expect(toSerializableObject(wsUtxos)).toEqual(toSerializableObject(httpUtxos)); + } catch (error) { + logger.fatal(name, step, JSON.stringify(request)); + throw error; + } + }); + + test('utxos from more addresses', async () => { + openClient({ url: env.WS_PROVIDER_URL }); + + const { rows } = await db.query<{ address: Cardano.PaymentAddress }>(`\ +SELECT COUNT(DISTINCT tx_id), address FROM tx_out LEFT JOIN tx_in ON tx_out_id = tx_id AND tx_out_index = index +WHERE tx_out_id IS NULL GROUP BY address HAVING COUNT(DISTINCT tx_id) < 1000 ORDER BY COUNT(DISTINCT tx_id) DESC LIMIT 5`); + + const ledgerTip = await firstValueFrom(client.networkInfo.ledgerTip$); + const lowerBound = Math.floor(ledgerTip.blockNo * 0.8) as Cardano.BlockNo; + const request = { addresses: rows.flatMap(({ address }) => address), blockRange: { lowerBound }, pagination }; + + await client.chainHistoryProvider.transactionsByAddresses(request); + + const wsUtxos = await client.utxoProvider.utxoByAddresses(request); + const httpUtxos = await utxoProvider.utxoByAddresses(request); + expect(toSerializableObject(wsUtxos)).toEqual(toSerializableObject(httpUtxos)); + }); + }); });