diff --git a/client/src/socket-io/index.ts b/client/src/socket-io/index.ts index 48abfa7c82..de10130a2d 100644 --- a/client/src/socket-io/index.ts +++ b/client/src/socket-io/index.ts @@ -1,5 +1,5 @@ -import { io } from 'socket.io-client'; -import type { Socket } from 'socket.io-client'; +import { io, Socket } from 'socket.io-client'; +import type { ManagerOptions, SocketOptions } from 'socket.io-client'; import { ClientToServerMessages, Topic, @@ -30,30 +30,41 @@ function getWsUrl(url: string): URL { return urlObj; } -export interface StacksApiSocketConnectionOptions { +export type StacksApiSocketConnectionOptions = { url?: string; /** Initial topics to subscribe to. */ subscriptions?: Topic[]; + socketOpts?: Partial; +}; + +function createStacksApiSocket(opts?: StacksApiSocketConnectionOptions) { + const socketOpts = { + ...opts?.socketOpts, + query: { + ...opts?.socketOpts?.query, + // Subscriptions can be specified on init using this handshake query param. + subscriptions: Array.from(new Set(opts?.subscriptions)).join(','), + } + }; + const socket: StacksApiSocket = io(getWsUrl(opts?.url ?? BASE_PATH).href, socketOpts); + return socket; } export class StacksApiSocketClient { readonly socket: StacksApiSocket; - constructor(socket: StacksApiSocket) { - this.socket = socket; + constructor(socket: StacksApiSocket) + constructor(opts?: StacksApiSocketConnectionOptions) + constructor(args?: StacksApiSocket | StacksApiSocketConnectionOptions) { + if (args instanceof Socket) { + this.socket = args; + } else { + this.socket = createStacksApiSocket(args); + } } - public static connect({ - url = BASE_PATH, - subscriptions = [], - }: StacksApiSocketConnectionOptions = {}) { - const socket: StacksApiSocket = io(getWsUrl(url).href, { - query: { - // Subscriptions can be specified on init using this handshake query param. - subscriptions: Array.from(new Set(subscriptions)).join(','), - }, - }); - return new StacksApiSocketClient(socket); + public static connect(opts?: StacksApiSocketConnectionOptions) { + return new StacksApiSocketClient(opts); } handleSubscription(topic: Topic, subscribe = false, listener?: (...args: any[]) => void) { diff --git a/src/tests/socket-io-tests.ts b/src/tests/socket-io-tests.ts index 770c65cd8a..993c9c57b8 100644 --- a/src/tests/socket-io-tests.ts +++ b/src/tests/socket-io-tests.ts @@ -19,6 +19,7 @@ import { } from '../test-utils/test-builders'; import { PgWriteStore } from '../datastore/pg-write-store'; import { cycleMigrations, runMigrations } from '../datastore/migrations'; +import { StacksApiSocketClient } from '../../client/src/socket-io'; describe('socket-io', () => { let apiServer: ApiServer; @@ -34,6 +35,78 @@ describe('socket-io', () => { }); }); + test('socket-io-client > block updates', async () => { + const client = new StacksApiSocketClient({ + url: `http://${apiServer.address}`, + socketOpts: { reconnection: false }, + }); + + const updateWaiter: Waiter = waiter(); + const subResult = client.subscribeBlocks(block => updateWaiter.finish(block)); + + const block = new TestBlockBuilder({ block_hash: '0x1234', burn_block_hash: '0x5454' }) + .addTx({ tx_id: '0x4321' }) + .build(); + await db.update(block); + + const result = await updateWaiter; + try { + expect(result.hash).toEqual('0x1234'); + expect(result.burn_block_hash).toEqual('0x5454'); + expect(result.txs[0]).toEqual('0x4321'); + } finally { + subResult.unsubscribe(); + client.socket.close(); + } + }); + + test('socket-io-client > tx updates', async () => { + const client = new StacksApiSocketClient({ + url: `http://${apiServer.address}`, + socketOpts: { reconnection: false }, + }); + + const mempoolWaiter: Waiter = waiter(); + const txWaiters: Waiter[] = [waiter(), waiter()]; + + const mempoolSub = client.subscribeMempool(tx => mempoolWaiter.finish(tx)); + const txSub = client.subscribeTransaction('0x01', tx => { + if (tx.tx_status === 'pending') { + txWaiters[0].finish(tx); + } else { + txWaiters[1].finish(tx); + } + }); + + const block = new TestBlockBuilder().addTx().build(); + await db.update(block); + + const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending }); + await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] }); + const mempoolResult = await mempoolWaiter; + const txResult = await txWaiters[0]; + + const microblock = new TestMicroblockStreamBuilder() + .addMicroblock() + .addTx({ tx_id: '0x01' }) + .build(); + await db.updateMicroblocks(microblock); + const txMicroblockResult = await txWaiters[1]; + + try { + expect(mempoolResult.tx_status).toEqual('pending'); + expect(mempoolResult.tx_id).toEqual('0x01'); + expect(txResult.tx_status).toEqual('pending'); + expect(txResult.tx_id).toEqual('0x01'); + expect(txMicroblockResult.tx_id).toEqual('0x01'); + expect(txMicroblockResult.tx_status).toEqual('success'); + } finally { + mempoolSub.unsubscribe(); + txSub.unsubscribe(); + client.socket.close(); + } + }); + test('socket-io > block updates', async () => { const address = apiServer.address; const socket = io(`http://${address}`, {