Skip to content

Commit

Permalink
test: add unit tests for socket-io-client
Browse files Browse the repository at this point in the history
  • Loading branch information
zone117x committed Jan 5, 2024
1 parent 8611bae commit 5fd077c
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 16 deletions.
43 changes: 27 additions & 16 deletions client/src/socket-io/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { io } from 'socket.io-client';
import type { Socket } from 'socket.io-client';
import { io, Socket } from 'socket.io-client';
import type { ManagerOptions, SocketOptions } from 'socket.io-client';
import {
ClientToServerMessages,
Topic,
Expand Down Expand Up @@ -30,30 +30,41 @@ function getWsUrl(url: string): URL {
return urlObj;
}

export interface StacksApiSocketConnectionOptions {
export type StacksApiSocketConnectionOptions = {
url?: string;
/** Initial topics to subscribe to. */
subscriptions?: Topic[];
socketOpts?: Partial<ManagerOptions & SocketOptions>;
};

function createStacksApiSocket(opts?: StacksApiSocketConnectionOptions) {
const socketOpts = {
...opts?.socketOpts,
query: {
...opts?.socketOpts?.query,
// Subscriptions can be specified on init using this handshake query param.
subscriptions: Array.from(new Set(opts?.subscriptions)).join(','),
}
};
const socket: StacksApiSocket = io(getWsUrl(opts?.url ?? BASE_PATH).href, socketOpts);
return socket;
}

export class StacksApiSocketClient {
readonly socket: StacksApiSocket;

constructor(socket: StacksApiSocket) {
this.socket = socket;
constructor(socket: StacksApiSocket)
constructor(opts?: StacksApiSocketConnectionOptions)
constructor(args?: StacksApiSocket | StacksApiSocketConnectionOptions) {
if (args instanceof Socket) {
this.socket = args;
} else {
this.socket = createStacksApiSocket(args);
}
}

public static connect({
url = BASE_PATH,
subscriptions = [],
}: StacksApiSocketConnectionOptions = {}) {
const socket: StacksApiSocket = io(getWsUrl(url).href, {
query: {
// Subscriptions can be specified on init using this handshake query param.
subscriptions: Array.from(new Set(subscriptions)).join(','),
},
});
return new StacksApiSocketClient(socket);
public static connect(opts?: StacksApiSocketConnectionOptions) {
return new StacksApiSocketClient(opts);
}

handleSubscription(topic: Topic, subscribe = false, listener?: (...args: any[]) => void) {
Expand Down
73 changes: 73 additions & 0 deletions src/tests/socket-io-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
} from '../test-utils/test-builders';
import { PgWriteStore } from '../datastore/pg-write-store';
import { cycleMigrations, runMigrations } from '../datastore/migrations';
import { StacksApiSocketClient } from '../../client/src/socket-io';

describe('socket-io', () => {
let apiServer: ApiServer;
Expand All @@ -34,6 +35,78 @@ describe('socket-io', () => {
});
});

test('socket-io-client > block updates', async () => {
const client = new StacksApiSocketClient({
url: `http://${apiServer.address}`,
socketOpts: { reconnection: false },
});

const updateWaiter: Waiter<Block> = waiter();
const subResult = client.subscribeBlocks(block => updateWaiter.finish(block));

const block = new TestBlockBuilder({ block_hash: '0x1234', burn_block_hash: '0x5454' })
.addTx({ tx_id: '0x4321' })
.build();
await db.update(block);

const result = await updateWaiter;
try {
expect(result.hash).toEqual('0x1234');
expect(result.burn_block_hash).toEqual('0x5454');
expect(result.txs[0]).toEqual('0x4321');
} finally {
subResult.unsubscribe();
client.socket.close();
}
});

test('socket-io-client > tx updates', async () => {
const client = new StacksApiSocketClient({
url: `http://${apiServer.address}`,
socketOpts: { reconnection: false },
});

const mempoolWaiter: Waiter<MempoolTransaction> = waiter();
const txWaiters: Waiter<MempoolTransaction | Transaction>[] = [waiter(), waiter()];

const mempoolSub = client.subscribeMempool(tx => mempoolWaiter.finish(tx));
const txSub = client.subscribeTransaction('0x01', tx => {
if (tx.tx_status === 'pending') {
txWaiters[0].finish(tx);
} else {
txWaiters[1].finish(tx);
}
});

const block = new TestBlockBuilder().addTx().build();
await db.update(block);

const mempoolTx = testMempoolTx({ tx_id: '0x01', status: DbTxStatus.Pending });
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
const mempoolResult = await mempoolWaiter;
const txResult = await txWaiters[0];

const microblock = new TestMicroblockStreamBuilder()
.addMicroblock()
.addTx({ tx_id: '0x01' })
.build();
await db.updateMicroblocks(microblock);
const txMicroblockResult = await txWaiters[1];

try {
expect(mempoolResult.tx_status).toEqual('pending');
expect(mempoolResult.tx_id).toEqual('0x01');
expect(txResult.tx_status).toEqual('pending');
expect(txResult.tx_id).toEqual('0x01');
expect(txMicroblockResult.tx_id).toEqual('0x01');
expect(txMicroblockResult.tx_status).toEqual('success');
} finally {
mempoolSub.unsubscribe();
txSub.unsubscribe();
client.socket.close();
}
});

test('socket-io > block updates', async () => {
const address = apiServer.address;
const socket = io(`http://${address}`, {
Expand Down

0 comments on commit 5fd077c

Please sign in to comment.