From 17207810f44e691559c07ef92493cab7ae02e677 Mon Sep 17 00:00:00 2001 From: Devin Moss Date: Mon, 7 Aug 2023 19:48:29 -0500 Subject: [PATCH] Add candle subscription support to MarketDataStreamer --- lib/market-data-streamer.ts | 128 +++++++++++++++++++++++++++++++----- lib/tastytrade-api.ts | 4 +- package-lock.json | 4 +- package.json | 2 +- 4 files changed, 117 insertions(+), 21 deletions(-) diff --git a/lib/market-data-streamer.ts b/lib/market-data-streamer.ts index 404cabf..855953f 100644 --- a/lib/market-data-streamer.ts +++ b/lib/market-data-streamer.ts @@ -3,6 +3,7 @@ import _ from 'lodash' import { v4 as uuidv4 } from 'uuid' export enum MarketDataSubscriptionType { + Candle = 'Candle', Quote = 'Quote', Trade = 'Trade', Summary = 'Summary', @@ -11,11 +12,30 @@ export enum MarketDataSubscriptionType { Underlying = 'Underlying' } +export enum CandleType { + Tick = 't', + Second = 's', + Minute = 'm', + Hour = 'h', + Day = 'd', + Week = 'w', + Month = 'mo', + ThirdFriday = 'o', + Year = 'y', + Volume = 'v', + Price = 'p' +} + export type MarketDataListener = (data: any) => void +export type ErrorListener = (error: any) => void +export type AuthStateListener = (isAuthorized: boolean) => void -type QueuedSubscription = { symbol: string, subscriptionTypes: MarketDataSubscriptionType[] } -type SubscriptionOptions = { subscriptionTypes: MarketDataSubscriptionType[], channelId: number } +type QueuedSubscription = { symbol: string, subscriptionTypes: MarketDataSubscriptionType[], subscriptionArgs?: any } +type SubscriptionOptions = { subscriptionTypes: MarketDataSubscriptionType[], channelId: number, subscriptionArgs?: any } +export type CandleSubscriptionOptions = { period: number, type: CandleType, channelId: number } +type Remover = () => void +// List of all subscription types except for Candle const AllSubscriptionTypes = Object.values(MarketDataSubscriptionType) const KeepaliveInterval = 30000 // 30 seconds @@ -30,17 +50,39 @@ export default class MarketDataStreamer { private openChannels = new Set() private subscriptionsQueue: Map = new Map() private authState = '' + private errorListeners = new Map() + private authStateListeners = new Map() - addDataListener(dataListener: MarketDataListener) { + addDataListener(dataListener: MarketDataListener, channelId: number | null = null): Remover { if (_.isNil(dataListener)) { return _.noop } const guid = uuidv4() - this.dataListeners.set(guid, dataListener) + this.dataListeners.set(guid, { listener: dataListener, channelId }) return () => this.dataListeners.delete(guid) } + addErrorListener(errorListener: ErrorListener): Remover { + if (_.isNil(errorListener)) { + return _.noop + } + const guid = uuidv4() + this.errorListeners.set(guid, errorListener) + + return () => this.errorListeners.delete(guid) + } + + addAuthStateChangeListener(authStateListener: AuthStateListener): Remover { + if (_.isNil(authStateListener)) { + return _.noop + } + const guid = uuidv4() + this.authStateListeners.set(guid, authStateListener) + + return () => this.authStateListeners.delete(guid) + } + connect(url: string, token: string) { if (this.isConnected) { throw new Error('MarketDataStreamer is attempting to connect when an existing websocket is already connected') @@ -61,6 +103,11 @@ export default class MarketDataStreamer { this.clearKeepalive() + this.webSocket.onopen = null + this.webSocket.onerror = null + this.webSocket.onmessage = null + this.webSocket.onclose = null + this.webSocket.close() this.webSocket = null @@ -69,14 +116,48 @@ export default class MarketDataStreamer { this.authState = '' } - // TODO: add listener to options, return unsubscriber - addSubscription(symbol: string, options = { subscriptionTypes: AllSubscriptionTypes, channelId: DefaultChannelId }) { - const { subscriptionTypes, channelId } = options + addSubscription(symbol: string, options = { subscriptionTypes: AllSubscriptionTypes, channelId: DefaultChannelId }): Remover { + let { subscriptionTypes } = options + // Don't allow candle subscriptions in this method. Use addCandleSubscription instead + subscriptionTypes = _.without(subscriptionTypes, MarketDataSubscriptionType.Candle) + + const isOpen = this.isChannelOpened(options.channelId) + if (isOpen) { + this.sendSubscriptionMessage(symbol, subscriptionTypes, options.channelId, 'add') + } else { + this.queueSubscription(symbol, { subscriptionTypes, channelId: options.channelId }) + } + + return () => { + this.removeSubscription(symbol, options) + } + } + + /** + * Adds a candle subscription (historical data) + * @param streamerSymbol Get this from an instrument's streamer-symbol json response field + * @param fromTime Epoch timestamp from where you want to start + * @param options Period and Type are the grouping you want to apply to the candle data + * For example, a period/type of 5/m means you want each candle to represent 5 minutes of data + * From there, setting fromTime to 24 hours ago would give you 24 hours of data grouped in 5 minute intervals + * @returns + */ + addCandleSubscription(streamerSymbol: string, fromTime: number, options: CandleSubscriptionOptions) { + const subscriptionTypes = [MarketDataSubscriptionType.Candle] + const channelId = options.channelId ?? DefaultChannelId + + // Example: AAPL{=5m} where each candle represents 5 minutes of data + const candleSymbol = `${streamerSymbol}{=${options.period}${options.type}}` const isOpen = this.isChannelOpened(channelId) + const subscriptionArgs = { fromTime } if (isOpen) { - this.sendSubscriptionMessage(symbol, subscriptionTypes, channelId, 'add') + this.sendSubscriptionMessage(candleSymbol, subscriptionTypes, channelId, 'add', subscriptionArgs) } else { - this.queueSubscription(symbol, options) + this.queueSubscription(candleSymbol, { subscriptionTypes, channelId, subscriptionArgs }) + } + + return () => { + this.removeSubscription(candleSymbol, { subscriptionTypes, channelId }) } } @@ -100,7 +181,7 @@ export default class MarketDataStreamer { } openFeedChannel(channelId: number) { - if (!this.isDxLinkAuthorized) { + if (!this.isReadyToOpenChannels) { throw new Error(`Unable to open channel ${channelId} due to DxLink authorization state: ${this.authState}`) } @@ -122,6 +203,10 @@ export default class MarketDataStreamer { return this.isConnected && this.openChannels.has(channelId) } + get isReadyToOpenChannels() { + return this.isConnected && this.isDxLinkAuthorized + } + get isConnected() { return !_.isNil(this.webSocket) } @@ -142,14 +227,14 @@ export default class MarketDataStreamer { } private queueSubscription(symbol: string, options: SubscriptionOptions) { - const { subscriptionTypes, channelId } = options + const { subscriptionTypes, channelId, subscriptionArgs } = options let queue = this.subscriptionsQueue.get(options.channelId) if (_.isNil(queue)) { queue = [] this.subscriptionsQueue.set(channelId, queue) } - queue.push({ symbol, subscriptionTypes }) + queue.push({ symbol, subscriptionTypes, subscriptionArgs }) } private dequeueSubscription(symbol: string, options: SubscriptionOptions) { @@ -170,7 +255,7 @@ export default class MarketDataStreamer { // Clear out queue immediately this.subscriptionsQueue.set(channelId, []) queuedSubscriptions.forEach(subscription => { - this.sendSubscriptionMessage(subscription.symbol, subscription.subscriptionTypes, channelId, 'add') + this.sendSubscriptionMessage(subscription.symbol, subscription.subscriptionTypes, channelId, 'add', subscription.subscriptionArgs) }) } @@ -181,8 +266,8 @@ export default class MarketDataStreamer { * @param {*} channelId * @param {*} direction add or remove */ - private sendSubscriptionMessage(symbol: string, subscriptionTypes: MarketDataSubscriptionType[], channelId: number, direction: string) { - const subscriptions = subscriptionTypes.map(type => ({ "symbol": symbol, "type": type })) + private sendSubscriptionMessage(symbol: string, subscriptionTypes: MarketDataSubscriptionType[], channelId: number, direction: string, subscriptionArgs: any = {}) { + const subscriptions = subscriptionTypes.map(type => (Object.assign({}, { "symbol": symbol, "type": type }, subscriptionArgs ?? {}))) this.sendMessage({ "type": "FEED_SUBSCRIPTION", "channel": channelId, @@ -192,6 +277,7 @@ export default class MarketDataStreamer { private onError(error: any) { console.error('Error received: ', error) + this.notifyErrorListeners(error) } private onOpen() { @@ -227,6 +313,8 @@ export default class MarketDataStreamer { private handleAuthStateMessage(data: any) { this.authState = data.state + this.authStateListeners.forEach(listener => listener(this.isDxLinkAuthorized)) + if (this.isDxLinkAuthorized) { this.openFeedChannel(DefaultChannelId) } else { @@ -244,7 +332,15 @@ export default class MarketDataStreamer { } private notifyListeners(jsonData: any) { - this.dataListeners.forEach(listener => listener(jsonData)) + this.dataListeners.forEach(listenerData => { + if (listenerData.channelId === jsonData.channel || _.isNil(listenerData.channelId)) { + listenerData.listener(jsonData) + } + }) + } + + private notifyErrorListeners(error: any) { + this.errorListeners.forEach(listener => listener(error)) } private handleMessageReceived(data: string) { diff --git a/lib/tastytrade-api.ts b/lib/tastytrade-api.ts index f68664d..5867465 100644 --- a/lib/tastytrade-api.ts +++ b/lib/tastytrade-api.ts @@ -1,6 +1,6 @@ import TastytradeHttpClient from "./services/tastytrade-http-client" import { AccountStreamer, STREAMER_STATE, Disposer, StreamerStateObserver } from './account-streamer' -import MarketDataStreamer, { MarketDataSubscriptionType, MarketDataListener } from "./market-data-streamer" +import MarketDataStreamer, { CandleSubscriptionOptions, CandleType, MarketDataSubscriptionType, MarketDataListener } from "./market-data-streamer" //Services: import SessionService from "./services/session-service" @@ -61,5 +61,5 @@ export default class TastytradeClient { } } -export { MarketDataStreamer, MarketDataSubscriptionType, MarketDataListener } +export { MarketDataStreamer, MarketDataSubscriptionType, MarketDataListener, CandleSubscriptionOptions, CandleType } export { AccountStreamer, STREAMER_STATE, Disposer, StreamerStateObserver } diff --git a/package-lock.json b/package-lock.json index 221015a..e11dfe2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@tastytrade/api", - "version": "2.0.0", + "version": "2.1.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@tastytrade/api", - "version": "2.0.0", + "version": "2.1.0", "license": "MIT", "dependencies": { "@types/lodash": "^4.14.182", diff --git a/package.json b/package.json index 47825a5..42c8c81 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tastytrade/api", - "version": "2.0.0", + "version": "2.1.0", "main": "dist/tastytrade-api.js", "typings": "dist/tastytrade-api.d.ts", "repository": "https://github.com/tastytrade/tastytrade-api-js",