Skip to content

Commit

Permalink
Add candle subscription support to MarketDataStreamer
Browse files Browse the repository at this point in the history
  • Loading branch information
dmoss18 committed Aug 8, 2023
1 parent 89023bf commit 60f9b34
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 21 deletions.
128 changes: 112 additions & 16 deletions lib/market-data-streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import _ from 'lodash'
import { v4 as uuidv4 } from 'uuid'

export enum MarketDataSubscriptionType {
Candle = 'Candle',
Quote = 'Quote',
Trade = 'Trade',
Summary = 'Summary',
Expand All @@ -11,11 +12,30 @@ export enum MarketDataSubscriptionType {
Underlying = 'Underlying'
}

export enum CandleType {
Tick = 't',
Second = 's',
Minute = 'm',
Hour = 'h',
Day = 'd',
Week = 'w',
Month = 'mo',
ThirdFriday = 'o',
Year = 'y',
Volume = 'v',
Price = 'p'
}

export type MarketDataListener = (data: any) => void

Check warning on line 29 in lib/market-data-streamer.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 29 in lib/market-data-streamer.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type
export type ErrorListener = (error: any) => void

Check warning on line 30 in lib/market-data-streamer.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 30 in lib/market-data-streamer.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type
export type AuthStateListener = (isAuthorized: boolean) => void

type QueuedSubscription = { symbol: string, subscriptionTypes: MarketDataSubscriptionType[] }
type SubscriptionOptions = { subscriptionTypes: MarketDataSubscriptionType[], channelId: number }
type QueuedSubscription = { symbol: string, subscriptionTypes: MarketDataSubscriptionType[], subscriptionArgs?: any }

Check warning on line 33 in lib/market-data-streamer.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 33 in lib/market-data-streamer.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type
type SubscriptionOptions = { subscriptionTypes: MarketDataSubscriptionType[], channelId: number, subscriptionArgs?: any }

Check warning on line 34 in lib/market-data-streamer.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 34 in lib/market-data-streamer.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type
export type CandleSubscriptionOptions = { period: number, type: CandleType, channelId: number }
type Remover = () => void

// List of all subscription types except for Candle
const AllSubscriptionTypes = Object.values(MarketDataSubscriptionType)

const KeepaliveInterval = 30000 // 30 seconds
Expand All @@ -30,17 +50,39 @@ export default class MarketDataStreamer {
private openChannels = new Set()
private subscriptionsQueue: Map<number, QueuedSubscription[]> = new Map()
private authState = ''
private errorListeners = new Map()
private authStateListeners = new Map()

addDataListener(dataListener: MarketDataListener) {
addDataListener(dataListener: MarketDataListener, channelId: number | null = null): Remover {
if (_.isNil(dataListener)) {
return _.noop
}
const guid = uuidv4()
this.dataListeners.set(guid, dataListener)
this.dataListeners.set(guid, { listener: dataListener, channelId })

return () => this.dataListeners.delete(guid)
}

addErrorListener(errorListener: ErrorListener): Remover {
if (_.isNil(errorListener)) {
return _.noop
}
const guid = uuidv4()
this.errorListeners.set(guid, errorListener)

return () => this.errorListeners.delete(guid)
}

addAuthStateChangeListener(authStateListener: AuthStateListener): Remover {
if (_.isNil(authStateListener)) {
return _.noop
}
const guid = uuidv4()
this.authStateListeners.set(guid, authStateListener)

return () => this.authStateListeners.delete(guid)
}

connect(url: string, token: string) {
if (this.isConnected) {
throw new Error('MarketDataStreamer is attempting to connect when an existing websocket is already connected')
Expand All @@ -61,6 +103,11 @@ export default class MarketDataStreamer {

this.clearKeepalive()

this.webSocket.onopen = null
this.webSocket.onerror = null
this.webSocket.onmessage = null
this.webSocket.onclose = null

this.webSocket.close()
this.webSocket = null

Expand All @@ -69,14 +116,48 @@ export default class MarketDataStreamer {
this.authState = ''
}

// TODO: add listener to options, return unsubscriber
addSubscription(symbol: string, options = { subscriptionTypes: AllSubscriptionTypes, channelId: DefaultChannelId }) {
const { subscriptionTypes, channelId } = options
addSubscription(symbol: string, options = { subscriptionTypes: AllSubscriptionTypes, channelId: DefaultChannelId }): Remover {
let { subscriptionTypes } = options
// Don't allow candle subscriptions in this method. Use addCandleSubscription instead
subscriptionTypes = _.without(subscriptionTypes, MarketDataSubscriptionType.Candle)

const isOpen = this.isChannelOpened(options.channelId)
if (isOpen) {
this.sendSubscriptionMessage(symbol, subscriptionTypes, options.channelId, 'add')
} else {
this.queueSubscription(symbol, { subscriptionTypes, channelId: options.channelId })
}

return () => {
this.removeSubscription(symbol, options)
}
}

/**
* Adds a candle subscription (historical data)
* @param streamerSymbol Get this from an instrument's streamer-symbol json response field
* @param fromTime Epoch timestamp from where you want to start
* @param options Period and Type are the grouping you want to apply to the candle data
* For example, a period/type of 5/m means you want each candle to represent 5 minutes of data
* From there, setting fromTime to 24 hours ago would give you 24 hours of data grouped in 5 minute intervals
* @returns
*/
addCandleSubscription(streamerSymbol: string, fromTime: number, options: CandleSubscriptionOptions) {
const subscriptionTypes = [MarketDataSubscriptionType.Candle]
const channelId = options.channelId ?? DefaultChannelId

// Example: AAPL{=5m} where each candle represents 5 minutes of data
const candleSymbol = `${streamerSymbol}{=${options.period}${options.type}}`
const isOpen = this.isChannelOpened(channelId)
const subscriptionArgs = { fromTime }
if (isOpen) {
this.sendSubscriptionMessage(symbol, subscriptionTypes, channelId, 'add')
this.sendSubscriptionMessage(candleSymbol, subscriptionTypes, channelId, 'add', subscriptionArgs)
} else {
this.queueSubscription(symbol, options)
this.queueSubscription(candleSymbol, { subscriptionTypes, channelId, subscriptionArgs })
}

return () => {
this.removeSubscription(candleSymbol, { subscriptionTypes, channelId })
}
}

Expand All @@ -100,7 +181,7 @@ export default class MarketDataStreamer {
}

openFeedChannel(channelId: number) {
if (!this.isDxLinkAuthorized) {
if (!this.isReadyToOpenChannels) {
throw new Error(`Unable to open channel ${channelId} due to DxLink authorization state: ${this.authState}`)
}

Expand All @@ -122,6 +203,10 @@ export default class MarketDataStreamer {
return this.isConnected && this.openChannels.has(channelId)
}

get isReadyToOpenChannels() {
return this.isConnected && this.isDxLinkAuthorized
}

get isConnected() {
return !_.isNil(this.webSocket)
}
Expand All @@ -142,14 +227,14 @@ export default class MarketDataStreamer {
}

private queueSubscription(symbol: string, options: SubscriptionOptions) {
const { subscriptionTypes, channelId } = options
const { subscriptionTypes, channelId, subscriptionArgs } = options
let queue = this.subscriptionsQueue.get(options.channelId)
if (_.isNil(queue)) {
queue = []
this.subscriptionsQueue.set(channelId, queue)
}

queue.push({ symbol, subscriptionTypes })
queue.push({ symbol, subscriptionTypes, subscriptionArgs })
}

private dequeueSubscription(symbol: string, options: SubscriptionOptions) {
Expand All @@ -170,7 +255,7 @@ export default class MarketDataStreamer {
// Clear out queue immediately
this.subscriptionsQueue.set(channelId, [])
queuedSubscriptions.forEach(subscription => {
this.sendSubscriptionMessage(subscription.symbol, subscription.subscriptionTypes, channelId, 'add')
this.sendSubscriptionMessage(subscription.symbol, subscription.subscriptionTypes, channelId, 'add', subscription.subscriptionArgs)
})
}

Expand All @@ -181,8 +266,8 @@ export default class MarketDataStreamer {
* @param {*} channelId
* @param {*} direction add or remove
*/
private sendSubscriptionMessage(symbol: string, subscriptionTypes: MarketDataSubscriptionType[], channelId: number, direction: string) {
const subscriptions = subscriptionTypes.map(type => ({ "symbol": symbol, "type": type }))
private sendSubscriptionMessage(symbol: string, subscriptionTypes: MarketDataSubscriptionType[], channelId: number, direction: string, subscriptionArgs: any = {}) {
const subscriptions = subscriptionTypes.map(type => (Object.assign({}, { "symbol": symbol, "type": type }, subscriptionArgs ?? {})))
this.sendMessage({
"type": "FEED_SUBSCRIPTION",
"channel": channelId,
Expand All @@ -192,6 +277,7 @@ export default class MarketDataStreamer {

private onError(error: any) {
console.error('Error received: ', error)
this.notifyErrorListeners(error)
}

private onOpen() {
Expand Down Expand Up @@ -227,6 +313,8 @@ export default class MarketDataStreamer {

private handleAuthStateMessage(data: any) {
this.authState = data.state
this.authStateListeners.forEach(listener => listener(this.isDxLinkAuthorized))

if (this.isDxLinkAuthorized) {
this.openFeedChannel(DefaultChannelId)
} else {
Expand All @@ -244,7 +332,15 @@ export default class MarketDataStreamer {
}

private notifyListeners(jsonData: any) {
this.dataListeners.forEach(listener => listener(jsonData))
this.dataListeners.forEach(listenerData => {
if (listenerData.channelId === jsonData.channel || _.isNil(listenerData.channelId)) {
listenerData.listener(jsonData)
}
})
}

private notifyErrorListeners(error: any) {
this.errorListeners.forEach(listener => listener(error))
}

private handleMessageReceived(data: string) {
Expand Down
4 changes: 2 additions & 2 deletions lib/tastytrade-api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import TastytradeHttpClient from "./services/tastytrade-http-client"
import { AccountStreamer, STREAMER_STATE, Disposer, StreamerStateObserver } from './account-streamer'
import MarketDataStreamer, { MarketDataSubscriptionType, MarketDataListener } from "./market-data-streamer"
import MarketDataStreamer, { CandleSubscriptionOptions, CandleType, MarketDataSubscriptionType, MarketDataListener } from "./market-data-streamer"

//Services:
import SessionService from "./services/session-service"
Expand Down Expand Up @@ -61,5 +61,5 @@ export default class TastytradeClient {
}
}

export { MarketDataStreamer, MarketDataSubscriptionType, MarketDataListener }
export { MarketDataStreamer, MarketDataSubscriptionType, MarketDataListener, CandleSubscriptionOptions, CandleType }
export { AccountStreamer, STREAMER_STATE, Disposer, StreamerStateObserver }
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tastytrade/api",
"version": "2.0.0",
"version": "2.1.0",
"main": "dist/tastytrade-api.js",
"typings": "dist/tastytrade-api.d.ts",
"repository": "https://github.com/tastytrade/tastytrade-api-js",
Expand Down

0 comments on commit 60f9b34

Please sign in to comment.