diff --git a/examples/live/interactive_brokers/historic_download.py b/examples/live/interactive_brokers/historic_download.py index 79b401aa6ce9..afcd2980ad0f 100644 --- a/examples/live/interactive_brokers/historic_download.py +++ b/examples/live/interactive_brokers/historic_download.py @@ -13,78 +13,65 @@ # See the License for the specific language governing permissions and # limitations under the License. # ------------------------------------------------------------------------------------------------- +import asyncio +import datetime -import pandas as pd +from nautilus_trader.adapters.interactive_brokers.common import IBContract +from nautilus_trader.adapters.interactive_brokers.historic import HistoricInteractiveBrokersClient +from nautilus_trader.persistence.catalog import ParquetDataCatalog -# fmt: off -from nautilus_trader.adapters.interactive_brokers.config import InteractiveBrokersDataClientConfig -from nautilus_trader.adapters.interactive_brokers.factories import InteractiveBrokersLiveDataClientFactory -from nautilus_trader.adapters.interactive_brokers.factories import InteractiveBrokersLiveExecClientFactory -from nautilus_trader.adapters.interactive_brokers.historic.bar_data import BarDataDownloader -from nautilus_trader.adapters.interactive_brokers.historic.bar_data import BarDataDownloaderConfig -from nautilus_trader.config import LoggingConfig -from nautilus_trader.config import TradingNodeConfig -from nautilus_trader.live.node import TradingNode -from nautilus_trader.model.data import Bar +async def main(): + contract = IBContract( + secType="STK", + symbol="AAPL", + exchange="SMART", + primaryExchange="NASDAQ", + ) + instrument_id = "TSLA.NASDAQ" -# fmt: on + client = HistoricInteractiveBrokersClient(port=4002, client_id=5) + await client._connect() + await asyncio.sleep(2) -# *** MAKE SURE YOU HAVE REQUIRED DATA SUBSCRIPTION FOR THIS WORK WORK AS INTENDED. *** + instruments = await client.request_instruments( + contracts=[contract], + instrument_ids=[instrument_id], + ) -df = pd.DataFrame() + bars = await client.request_bars( + bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], + end_date_time=datetime.datetime(2023, 11, 6, 16, 30), + tz_name="America/New_York", + duration="1 D", + contracts=[contract], + instrument_ids=[instrument_id], + ) + trade_ticks = await client.request_ticks( + "TRADES", + start_date_time=datetime.datetime(2023, 11, 6, 10, 0), + end_date_time=datetime.datetime(2023, 11, 6, 10, 1), + tz_name="America/New_York", + contracts=[contract], + instrument_ids=[instrument_id], + ) -# Data Handler for BarDataDownloader -def do_something_with_bars(bars: list): - global df - bars_dict = [Bar.to_dict(bar) for bar in bars] - df = pd.concat([df, pd.DataFrame(bars_dict)]) - df = df.sort_values(by="ts_init") + quote_ticks = await client.request_ticks( + "BID_ASK", + start_date_time=datetime.datetime(2023, 11, 6, 10, 0), + end_date_time=datetime.datetime(2023, 11, 6, 10, 1), + tz_name="America/New_York", + contracts=[contract], + instrument_ids=[instrument_id], + ) + catalog = ParquetDataCatalog("./catalog") + catalog.write_data(instruments) + catalog.write_data(bars) + catalog.write_data(trade_ticks) + catalog.write_data(quote_ticks) -# Configure the trading node -config_node = TradingNodeConfig( - trader_id="TESTER-001", - logging=LoggingConfig(log_level="INFO"), - data_clients={ - "InteractiveBrokers": InteractiveBrokersDataClientConfig( - ibg_host="127.0.0.1", - ibg_port=7497, - ibg_client_id=1, - ), - }, - timeout_connection=90.0, -) -# Instantiate the node with a configuration -node = TradingNode(config=config_node) - -# Configure your strategy -downloader_config = BarDataDownloaderConfig( - start_iso_ts="2023-09-01T00:00:00+00:00", - end_iso_ts="2023-09-30T00:00:00+00:00", - bar_types=[ - "AAPL.NASDAQ-1-MINUTE-BID-EXTERNAL", - "AAPL.NASDAQ-1-MINUTE-ASK-EXTERNAL", - "AAPL.NASDAQ-1-MINUTE-LAST-EXTERNAL", - ], - handler=do_something_with_bars, - freq="1W", -) - -# Instantiate the downloader and add into node -downloader = BarDataDownloader(config=downloader_config) -node.trader.add_actor(downloader) - -# Register your client factories with the node (can take user defined factories) -node.add_data_client_factory("InteractiveBrokers", InteractiveBrokersLiveDataClientFactory) -node.add_exec_client_factory("InteractiveBrokers", InteractiveBrokersLiveExecClientFactory) -node.build() - -# Stop and dispose of the node with SIGINT/CTRL+C if __name__ == "__main__": - try: - node.run() - finally: - node.dispose() + asyncio.run(main()) diff --git a/nautilus_trader/adapters/interactive_brokers/client/client.py b/nautilus_trader/adapters/interactive_brokers/client/client.py index fdbfee544233..20952aa43d05 100644 --- a/nautilus_trader/adapters/interactive_brokers/client/client.py +++ b/nautilus_trader/adapters/interactive_brokers/client/client.py @@ -1089,6 +1089,7 @@ async def get_historical_bars( use_rth: bool, end_date_time: str, duration: str, + timeout: int = 60, ): name = str(bar_type) if not (request := self.requests.get(name=name)): @@ -1114,7 +1115,7 @@ async def get_historical_bars( ) self._log.debug(f"reqHistoricalData: {request.req_id=}, {contract=}") request.handle() - return await self._await_request(request, 20) + return await self._await_request(request, timeout) else: self._log.info(f"Request already exist for {request}") @@ -1294,9 +1295,15 @@ async def get_historical_ticks( self, contract: IBContract, tick_type: str, - end_date_time: pd.Timestamp, - use_rth: bool, + start_date_time: pd.Timestamp | str = "", + end_date_time: pd.Timestamp | str = "", + use_rth: bool = True, ): + if isinstance(start_date_time, pd.Timestamp): + start_date_time = start_date_time.strftime("%Y%m%d %H:%M:%S %Z") + if isinstance(end_date_time, pd.Timestamp): + end_date_time = end_date_time.strftime("%Y%m%d %H:%M:%S %Z") + name = (str(ib_contract_to_instrument_id(contract)), tick_type) if not (request := self.requests.get(name=name)): req_id = self._next_req_id() @@ -1307,8 +1314,8 @@ async def get_historical_ticks( self._client.reqHistoricalTicks, reqId=req_id, contract=contract, - startDateTime="", - endDateTime=end_date_time.strftime("%Y%m%d %H:%M:%S %Z"), + startDateTime=start_date_time, + endDateTime=end_date_time, numberOfTicks=1000, whatToShow=tick_type, useRth=use_rth, @@ -1318,13 +1325,19 @@ async def get_historical_ticks( cancel=functools.partial(self._client.cancelHistoricalData, reqId=req_id), ) request.handle() - return await self._await_request(request, 20) + return await self._await_request(request, 60) else: self._log.info(f"Request already exist for {request}") - def historicalTicksBidAsk(self, req_id: int, ticks: list, done: bool): + def historicalTicksBidAsk( + self, + req_id: int, + ticks: list, + done: bool, + ): # : Override the EWrapper self.logAnswer(current_fn_name(), vars()) - + if not done: + return if request := self.requests.get(req_id=req_id): instrument_id = InstrumentId.from_str(request.name[0]) instrument = self._cache.instrument(instrument_id) @@ -1335,8 +1348,8 @@ def historicalTicksBidAsk(self, req_id: int, ticks: list, done: bool): instrument_id=instrument_id, bid_price=instrument.make_price(tick.priceBid), ask_price=instrument.make_price(tick.priceAsk), - bid_size=instrument.make_price(tick.sizeBid), - ask_size=instrument.make_price(tick.sizeAsk), + bid_size=instrument.make_qty(tick.sizeBid), + ask_size=instrument.make_qty(tick.sizeAsk), ts_event=ts_event, ts_init=ts_event, ) @@ -1344,12 +1357,16 @@ def historicalTicksBidAsk(self, req_id: int, ticks: list, done: bool): self._end_request(req_id) - def historicalTicksLast(self, req_id: int, ticks: list, done: bool): + def historicalTicksLast(self, req_id: int, ticks: list, done: bool): # : Override the EWrapper self.logAnswer(current_fn_name(), vars()) + if not done: + return self._process_trade_ticks(req_id, ticks) - def historicalTicks(self, req_id: int, ticks: list, done: bool): + def historicalTicks(self, req_id: int, ticks: list, done: bool): # : Override the EWrapper self.logAnswer(current_fn_name(), vars()) + if not done: + return self._process_trade_ticks(req_id, ticks) def _process_trade_ticks(self, req_id: int, ticks: list): diff --git a/nautilus_trader/adapters/interactive_brokers/common.py b/nautilus_trader/adapters/interactive_brokers/common.py index f6b342d1e794..5d44a0af14fd 100644 --- a/nautilus_trader/adapters/interactive_brokers/common.py +++ b/nautilus_trader/adapters/interactive_brokers/common.py @@ -85,7 +85,7 @@ class IBContract(NautilusConfig, frozen=True, repr_omit_defaults=True): Exchange where security is traded. Will be SMART for Stocks. primaryExchange: str Exchange where security is registered. Applies to Stocks. - localSymbol: str + symbol: str Unique Symbol registered in Exchange. build_options_chain: bool (default: None) Search for full option chain diff --git a/nautilus_trader/adapters/interactive_brokers/data.py b/nautilus_trader/adapters/interactive_brokers/data.py index afe66ae0c816..4fac7a13133a 100644 --- a/nautilus_trader/adapters/interactive_brokers/data.py +++ b/nautilus_trader/adapters/interactive_brokers/data.py @@ -376,8 +376,8 @@ async def _handle_ticks_request( ticks_part = await self._client.get_historical_ticks( contract, tick_type, - end, - self._use_regular_trading_hours, + end_date_time=end, + use_rth=self._use_regular_trading_hours, ) if not ticks_part: break diff --git a/nautilus_trader/adapters/interactive_brokers/gateway.py b/nautilus_trader/adapters/interactive_brokers/gateway.py index 375b609afd93..074a2bbe10a9 100644 --- a/nautilus_trader/adapters/interactive_brokers/gateway.py +++ b/nautilus_trader/adapters/interactive_brokers/gateway.py @@ -45,14 +45,14 @@ class InteractiveBrokersGateway: A class to manage starting an Interactive Brokers Gateway docker container. """ - IMAGE: ClassVar[str] = "ghcr.io/unusualalpha/ib-gateway:stable" + IMAGE: ClassVar[str] = "ghcr.io/unusualalpha/ib-gateway:10.19" CONTAINER_NAME: ClassVar[str] = "nautilus-ib-gateway" PORTS: ClassVar[dict[str, int]] = {"paper": 4002, "live": 4001} def __init__( self, - username: str, - password: str, + username: str | None = None, + password: str | None = None, host: str | None = "localhost", port: int | None = None, trading_mode: str | None = "paper", diff --git a/nautilus_trader/adapters/interactive_brokers/historic/__init__.py b/nautilus_trader/adapters/interactive_brokers/historic/__init__.py index ca16b56e4794..d45066a414dc 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/__init__.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/__init__.py @@ -12,3 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # ------------------------------------------------------------------------------------------------- +# fmt: off +from nautilus_trader.adapters.interactive_brokers.historic.client import HistoricInteractiveBrokersClient + + +# fmt: on + +__all__ = [ + "HistoricInteractiveBrokersClient", +] diff --git a/nautilus_trader/adapters/interactive_brokers/historic/async_actor.py b/nautilus_trader/adapters/interactive_brokers/historic/async_actor.py deleted file mode 100644 index fcc5dfd4217a..000000000000 --- a/nautilus_trader/adapters/interactive_brokers/historic/async_actor.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python3 -# ------------------------------------------------------------------------------------------------- -# Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved. -# https://nautechsystems.io -# -# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ------------------------------------------------------------------------------------------------- - -import asyncio -import functools -from collections.abc import Callable - -# fmt: off -from collections.abc import Coroutine - -import async_timeout - -from nautilus_trader.common.actor import Actor -from nautilus_trader.common.actor import ActorConfig -from nautilus_trader.common.clock import LiveClock -from nautilus_trader.config.common import Environment -from nautilus_trader.core.rust.common import LogColor -from nautilus_trader.core.uuid import UUID4 - - -# fmt: on - - -class AsyncActor(Actor): - def __init__(self, config: ActorConfig): - super().__init__(config) - - self.environment: Environment | None = Environment.BACKTEST - - # Hot Cache - self._pending_async_requests: dict[UUID4, asyncio.Event] = {} - - # Initialized in on_start - self._loop: asyncio.AbstractEventLoop | None = None - - def on_start(self): - if isinstance(self.clock, LiveClock): - self.environment = Environment.LIVE - - if self.environment == Environment.LIVE: - self._loop = asyncio.get_running_loop() - self.create_task(self._on_start()) - else: - asyncio.run(self._on_start()) - - async def _on_start(self): - raise NotImplementedError( # pragma: no cover - "implement the `_on_start` coroutine", # pragma: no cover - ) - - def _finish_response(self, request_id: UUID4): - super()._finish_response(request_id) - if request_id in self._pending_async_requests: - self._pending_async_requests[request_id].set() - - async def await_request(self, request_id: UUID4, timeout: int = 30): - self._pending_async_requests[request_id] = asyncio.Event() - try: - async with async_timeout.timeout(timeout): - await self._pending_async_requests[request_id].wait() - except asyncio.TimeoutError: - self.log.error(f"Failed to download data for {request_id}") - del self._pending_async_requests[request_id] - - def create_task( - self, - coro: Coroutine, - log_msg: str | None = None, - actions: Callable | None = None, - success: str | None = None, - ) -> asyncio.Task: - """ - Run the given coroutine with error handling and optional callback actions when - done. - - Parameters - ---------- - coro : Coroutine - The coroutine to run. - log_msg : str, optional - The log message for the task. - actions : Callable, optional - The actions callback to run when the coroutine is done. - success : str, optional - The log message to write on actions success. - - Returns - ------- - asyncio.Task - - """ - log_msg = log_msg or coro.__name__ - self._log.debug(f"Creating task {log_msg}.") - task = self._loop.create_task( - coro, - name=coro.__name__, - ) - task.add_done_callback( - functools.partial( - self._on_task_completed, - actions, - success, - ), - ) - return task - - def _on_task_completed( - self, - actions: Callable | None, - success: str | None, - task: asyncio.Task, - ) -> None: - if task.exception(): - self._log.error( - f"Error on `{task.get_name()}`: " f"{task.exception()!r}", - ) - else: - if actions: - try: - actions() - except Exception as e: - self._log.error( - f"Failed triggering action {actions.__name__} on `{task.get_name()}`: " - f"{e!r}", - ) - if success: - self._log.info(success, LogColor.GREEN) diff --git a/nautilus_trader/adapters/interactive_brokers/historic/bar_data.py b/nautilus_trader/adapters/interactive_brokers/historic/bar_data.py deleted file mode 100644 index a3d095bfb25a..000000000000 --- a/nautilus_trader/adapters/interactive_brokers/historic/bar_data.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python3 -# ------------------------------------------------------------------------------------------------- -# Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved. -# https://nautechsystems.io -# -# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ------------------------------------------------------------------------------------------------- - -from collections.abc import Callable - -import pandas as pd - -# fmt: off -from nautilus_trader.adapters.interactive_brokers.historic.async_actor import AsyncActor -from nautilus_trader.common.actor import ActorConfig -from nautilus_trader.core.correctness import PyCondition -from nautilus_trader.model.data.bar import Bar -from nautilus_trader.model.data.bar import BarType - - -# fmt: on - - -class BarDataDownloaderConfig(ActorConfig): - """ - Configuration for `BarDataDownloader` instances. - """ - - start_iso_ts: str - end_iso_ts: str - bar_types: list[str] - handler: Callable - freq: str = "1W" - request_timeout: int = 30 - - -class BarDataDownloader(AsyncActor): - def __init__(self, config: BarDataDownloaderConfig): - super().__init__(config) - try: - self.start_time: pd.Timestamp = pd.to_datetime( - config.start_iso_ts, - format="%Y-%m-%dT%H:%M:%S%z", - ) - self.end_time: pd.Timestamp = pd.to_datetime( - config.end_iso_ts, - format="%Y-%m-%dT%H:%M:%S%z", - ) - except ValueError: - raise ValueError("`start_iso_ts` and `end_iso_ts` must be like '%Y-%m-%dT%H:%M:%S%z'") - - self.bar_types: list[BarType] = [] - for bar_type in config.bar_types: - self.bar_types.append(BarType.from_str(bar_type)) - - self.handler: Callable | None = config.handler - self.freq: str = config.freq - - async def _on_start(self): - instrument_ids = {bar_type.instrument_id for bar_type in self.bar_types} - for instrument_id in instrument_ids: - request_id = self.request_instrument(instrument_id) - await self.await_request(request_id, timeout=self.config.request_timeout) - - request_dates = list(pd.date_range(self.start_time, self.end_time, freq=self.freq)) - - for request_date in request_dates: - for bar_type in self.bar_types: - request_id = self.request_bars( - bar_type=bar_type, - start=request_date, - end=request_date + pd.Timedelta(self.freq), - ) - await self.await_request(request_id, timeout=self.config.request_timeout) - - self.stop() - - def handle_bars(self, bars: list): - """ - Handle the given historical bar data by handling each bar individually. - - Parameters - ---------- - bars : list[Bar] - The bars to handle. - - Warnings - -------- - System method (not intended to be called by user code). - - """ - PyCondition.not_none(bars, "bars") # Can be empty - - length = len(bars) - first: Bar = bars[0] if length > 0 else None - last: Bar = bars[length - 1] if length > 0 else None - - if length > 0: - self._log.info(f"Received data for {first.bar_type}.") - else: - self._log.error(f"Received data for unknown bar type.") - return - - if length > 0 and first.ts_init > last.ts_init: - raise RuntimeError(f"cannot handle data: incorrectly sorted") - - # Send Bars response as a whole to handler - self.handler(bars) diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py new file mode 100644 index 000000000000..e4faeb6e3b85 --- /dev/null +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -0,0 +1,543 @@ +import asyncio +import datetime +import re +from typing import Literal + +import pandas as pd +from ibapi.common import MarketDataTypeEnum + +# fmt: off +from nautilus_trader.adapters.interactive_brokers.client import InteractiveBrokersClient +from nautilus_trader.adapters.interactive_brokers.common import IBContract +from nautilus_trader.adapters.interactive_brokers.parsing.instruments import ib_contract_to_instrument_id +from nautilus_trader.adapters.interactive_brokers.parsing.instruments import instrument_id_to_ib_contract +from nautilus_trader.adapters.interactive_brokers.providers import InteractiveBrokersInstrumentProvider +from nautilus_trader.adapters.interactive_brokers.providers import InteractiveBrokersInstrumentProviderConfig + +# fmt: on +from nautilus_trader.cache.cache import Cache +from nautilus_trader.common.clock import LiveClock +from nautilus_trader.common.logging import Logger +from nautilus_trader.common.logging import LoggerAdapter +from nautilus_trader.core.datetime import dt_to_unix_nanos +from nautilus_trader.core.datetime import unix_nanos_to_dt +from nautilus_trader.model.data import QuoteTick +from nautilus_trader.model.data import TradeTick +from nautilus_trader.model.data.bar import Bar +from nautilus_trader.model.data.bar import BarSpecification +from nautilus_trader.model.data.bar import BarType +from nautilus_trader.model.enums import AggregationSource +from nautilus_trader.model.identifiers import InstrumentId +from nautilus_trader.model.identifiers import TraderId +from nautilus_trader.model.instruments.base import Instrument +from nautilus_trader.msgbus.bus import MessageBus +from nautilus_trader.persistence.catalog import ParquetDataCatalog + + +class HistoricInteractiveBrokersClient: + """ + Provides a means of requesting historical market data for backtesting. + """ + + def __init__( + self, + host: str = "127.0.0.1", + port: int = 7497, + client_id: int = 1, + market_data_type: MarketDataTypeEnum = MarketDataTypeEnum.REALTIME, + ): + loop = asyncio.get_event_loop() + loop.set_debug(True) + clock = LiveClock() + logger = Logger(clock) + self.log = LoggerAdapter("HistoricInteractiveBrokersClient", logger) + msgbus = MessageBus( + TraderId("historic_interactive_brokers_client-001"), + clock, + logger, + ) + cache = Cache(logger) + self.market_data_type = market_data_type + self._client = InteractiveBrokersClient( + loop=loop, + msgbus=msgbus, + cache=cache, + clock=clock, + logger=logger, + host=host, + port=port, + client_id=client_id, + ) + + async def _connect(self) -> None: + # Connect client + await self._client.is_running_async() + self._client.registered_nautilus_clients.add(1) + + # Set Market Data Type + await self._client.set_market_data_type(self.market_data_type) + + async def request_instruments( + self, + instrument_provider_config: InteractiveBrokersInstrumentProviderConfig | None = None, + contracts: list[IBContract] | None = None, + instrument_ids: list[str] | None = None, + ) -> list[Instrument]: + """ + Return Instruments given either a InteractiveBrokersInstrumentProviderConfig or + a list of IBContracts and/or InstrumentId strings. + + Parameters + ---------- + instrument_provider_config : InteractiveBrokersInstrumentProviderConfig + An instrument provider config defining which instruments to retrieve. + contracts : list[IBContract], default 'None' + IBContracts defining which instruments to retrieve. + instrument_ids : list[str], default 'None' + Instrument IDs (e.g. AAPL.NASDAQ) defining which instruments to retrieve. + + Returns + ------- + list[Instrument] + + """ + if instrument_provider_config and (contracts or instrument_ids): + raise ValueError( + "Either instrument_provider_config or ib_contracts/instrument_ids should be provided, not both.", + ) + if instrument_provider_config is None: + instrument_provider_config = InteractiveBrokersInstrumentProviderConfig( + load_contracts=frozenset(contracts) if contracts else None, + load_ids=frozenset(instrument_ids) if instrument_ids else None, + ) + provider = InteractiveBrokersInstrumentProvider( + self._client, + instrument_provider_config, + Logger(LiveClock()), + ) + await provider.load_all_async() + return list(provider._instruments.values()) + + async def request_bars( + self, + bar_specifications: list[str], + end_date_time: datetime.datetime, + tz_name: str, + start_date_time: datetime.datetime | None = None, + duration: str | None = None, + contracts: list[IBContract] | None = None, + instrument_ids: list[str] | None = None, + use_rth: bool = True, + timeout: int = 120, + ) -> list[Bar]: + """ + Return Bars for one or more bar specifications for a list of IBContracts and/or + InstrumentId strings. + + Parameters + ---------- + bar_specifications : list[str] + BarSpecifications represented as strings defining which bars to retrieve. + (e.g. '1-HOUR-LAST', '5-MINUTE-MID') + start_date_time : datetime.datetime + The start date time for the bars. If provided, duration is derived. + end_date_time : datetime.datetime + The end date time for the bars. + tz_name : str + The timezone to use. (e.g. 'America/New_York', 'UTC') + duration : str + The amount of time to go back from the end_date_time. + Valid values follow the pattern of an integer followed by S|D|W|M|Y + for seconds, days, weeks, months, or years respectively. + contracts : list[IBContract], default 'None' + IBContracts defining which bars to retrieve. + instrument_ids : list[str], default 'None' + Instrument IDs (e.g. AAPL.NASDAQ) defining which bars to retrieve. + use_rth : bool, default 'True' + Whether to use regular trading hours. + timeout : int, default '120' + The timeout in seconds for each request. + + Returns + ------- + list[Bar] + + """ + if start_date_time and duration: + raise ValueError("Either start_date_time or duration should be provided, not both.") + + # Derive duration if start_date_time and end_date_time are provided + if start_date_time and end_date_time: + start_date_time = pd.Timestamp(start_date_time, tz=tz_name).tz_convert("UTC") + end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") + if start_date_time >= end_date_time: + raise ValueError("Start date must be before end date.") + else: + pattern = r"^\d+\s[SDWMY]$" + if not re.match(pattern, duration): + raise ValueError("duration must be in format: 'int S|D|W|M|Y'") + + contracts = contracts or [] + instrument_ids = instrument_ids or [] + if not contracts and not instrument_ids: + raise ValueError("Either contracts or instrument_ids must be provided") + + # Convert instrument_id strings to IBContracts + contracts.extend( + [ + instrument_id_to_ib_contract( + InstrumentId.from_str(instrument_id), + ) + for instrument_id in instrument_ids + ], + ) + + # Ensure instruments are fetched and cached + await self._fetch_instruments_if_not_cached(contracts) + + data: list[Bar] = [] + + for contract in contracts: + for bar_spec in bar_specifications: + instrument_id = ib_contract_to_instrument_id(contract) + bar_type = BarType( + instrument_id, + BarSpecification.from_str(bar_spec), + AggregationSource.EXTERNAL, + ) + + for segment_end_date_time, segment_duration in self._calculate_duration_segments( + start_date_time, + end_date_time, + duration, + ): + self.log.info( + f"{instrument_id}: Requesting historical bars: {bar_type} ending on '{segment_end_date_time}' " + "with duration '{segment_duration}'", + ) + + bars = await self._client.get_historical_bars( + bar_type, + contract, + use_rth, + segment_end_date_time.strftime("%Y%m%d-%H:%M:%S"), + segment_duration, + timeout=timeout, + ) + if bars: + self.log.info( + f"{instrument_id}: Number of bars retrieved in batch: {len(bars)}", + ) + data.extend(bars) + self.log.info(f"Total number of bars in data: {len(data)}") + else: + self.log.info(f"{instrument_id}: No bars retrieved for: {bar_type}") + + return sorted(data, key=lambda x: x.ts_init) + + async def request_ticks( + self, + tick_type: Literal["TRADES", "BID_ASK"], + start_date_time: datetime.datetime, + end_date_time: datetime.datetime, + tz_name: str, + contracts: list[IBContract] | None = None, + instrument_ids: list[str] | None = None, + use_rth: bool = True, + ) -> list[TradeTick | QuoteTick]: + """ + Return TradeTicks or QuoteTicks for one or more bar specifications for a list of + IBContracts and/or InstrumentId strings. + + Parameters + ---------- + tick_type : Literal["TRADES", "BID_ASK"] + The type of ticks to retrieve. + start_date_time : datetime.date + The start date for the ticks. + end_date_time : datetime.date + The end date for the ticks. + tz_name : str + The timezone to use. (e.g. 'America/New_York', 'UTC') + contracts : list[IBContract], default 'None' + IBContracts defining which ticks to retrieve. + instrument_ids : list[str], default 'None' + Instrument IDs (e.g. AAPL.NASDAQ) defining which ticks to retrieve. + use_rth : bool, default 'True' + Whether to use regular trading hours. + + Returns + ------- + list[TradeTick | QuoteTick] + + """ + if tick_type not in ["TRADES", "BID_ASK"]: + raise ValueError( + "tick_type must be one of: 'TRADES' (for TradeTicks), 'BID_ASK' (for QuoteTicks)", + ) + if start_date_time >= end_date_time: + raise ValueError("Start date must be before end date.") + start_date_time = pd.Timestamp(start_date_time, tz=tz_name).tz_convert("UTC") + end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") + if (end_date_time - start_date_time) > pd.Timedelta(days=1): + self.log.warning( + "Requesting tick data for more than 1 day may take a long time, particularly for liquid instruments. " + "You may want to consider sourcing tick data elsewhere.", + ) + + contracts = contracts or [] + instrument_ids = instrument_ids or [] + if not contracts and not instrument_ids: + raise ValueError("Either contracts or instrument_ids must be provided") + + # Convert instrument_id strings to IBContracts + contracts.extend( + [ + instrument_id_to_ib_contract( + InstrumentId.from_str(instrument_id), + ) + for instrument_id in instrument_ids + ], + ) + + # Ensure instruments are fetched and cached + await self._fetch_instruments_if_not_cached(contracts) + + data: list[TradeTick | QuoteTick] = [] + for contract in contracts: + instrument_id = ib_contract_to_instrument_id(contract) + current_start_date_time = start_date_time + while True: + self.log.info( + f"{instrument_id}: Requesting {tick_type} ticks from {current_start_date_time}", + ) + + ticks = await self._client.get_historical_ticks( + contract=contract, + tick_type=tick_type, + start_date_time=current_start_date_time, + use_rth=use_rth, + ) + + if not ticks: + break + + self.log.info( + f"{instrument_id}: Number of {tick_type} ticks retrieved in batch: {len(ticks)}", + ) + + current_start_date_time, should_continue = self._handle_timestamp_iteration( + ticks, + end_date_time, + ) + + if not should_continue: + # Filter out ticks that are after the end_date_time + ticks = [ + tick for tick in ticks if tick.ts_event <= dt_to_unix_nanos(end_date_time) + ] + data.extend(ticks) + self.log.info(f"Total number of {tick_type} ticks in data: {len(data)}") + break + + data.extend(ticks) + self.log.info(f"Total number of {tick_type} ticks in data: {len(data)}") + + return sorted(data, key=lambda x: x.ts_init) + + def _handle_timestamp_iteration( + self, + ticks: list[TradeTick | QuoteTick], + end_date_time: pd.Timestamp, + ) -> tuple[pd.Timestamp | None, bool]: + """ + Return the max timestamp from the given ticks and whether to continue iterating. + If all timestamps occur in the same second, the max timestamp will be + incremented by 1 second. + + Parameters + ---------- + ticks : list[TradeTick | QuoteTick] + The type of ticks to retrieve. + end_date_time : datetime.date + The end date for the ticks. + + Returns + ------- + tuple[pd.Timestamp | None, bool] + + """ + if not ticks: + return None, False + + timestamps = [unix_nanos_to_dt(tick.ts_event) for tick in ticks] + min_timestamp = min(timestamps) + max_timestamp = max(timestamps) + + if min_timestamp.floor("S") == max_timestamp.floor("S"): + max_timestamp = max_timestamp.floor("S") + pd.Timedelta(seconds=1) + + if max_timestamp >= end_date_time: + return None, False + + return max_timestamp, True + + async def _fetch_instruments_if_not_cached(self, contracts: list[IBContract]) -> None: + """ + Fetch and cache Instruments for the given IBContracts if they are not already + cached. + + Parameters + ---------- + contracts : list[IBContract] + A list of IBContracts to fetch Instruments for. + + Returns + ------- + None + + """ + for contract in contracts: + instrument_id = ib_contract_to_instrument_id(contract) + if not self._client._cache.instrument(instrument_id): + self.log.info(f"Fetching Instrument for: {instrument_id}") + await self.request_instruments(contracts=[contract]) + + def _calculate_duration_segments( + self, + start_date: pd.Timestamp | None, + end_date: pd.Timestamp, + duration: str | None, + ) -> list[tuple[pd.Timestamp, str]]: + """ + Calculate the difference in years, days, and seconds between two dates for the + purpose of requesting specific date ranges for historical bars. + + This function breaks down the time difference between two provided dates (start_date + and end_date) into separate components: years, days, and seconds. It accounts for leap + years in its calculation of years and considers detailed time components (hours, minutes, + seconds) for precise calculation of seconds. + + Each component of the time difference (years, days, seconds) is represented as a + tuple in the returned list. + The first element is the date that indicates the end point of that time segment + when moving from start_date to end_date. For example, if the function calculates 1 + year, the date for the year entry will be the end date after 1 year has passed + from start_date. This helps in understanding the progression of time from start_date + to end_date in segmented intervals. + + Parameters + ---------- + start_date : pd.Timestamp | None + The starting date and time. + end_date : pd.Timestamp + The ending date and time. + duration : str + The amount of time to go back from the end_date_time. + Valid values follow the pattern of an integer followed by S|D|W|M|Y + for seconds, days, weeks, months, or years respectively. + + Returns + ------- + tuple[pd.Timestamp, str]: A list of tuples, each containing a date and a duration. + The date represents the end point of each calculated time segment (year, day, second), + and the duration is the length of the time segment as a string. + + """ + if duration: + return [(end_date, duration)] + + total_delta = end_date - start_date + + # Calculate full years in the time delta + years = total_delta.days // 365 + minus_years_date = end_date - pd.Timedelta(days=365 * years) + + # Calculate remaining days after subtracting full years + days = (minus_years_date - start_date).days + minus_days_date = minus_years_date - pd.Timedelta(days=days) + + # Calculate remaining time in seconds + delta = minus_days_date - start_date + subsecond = ( + 1 + if delta.components.milliseconds > 0 + or delta.components.microseconds > 0 + or delta.components.nanoseconds > 0 + else 0 + ) + seconds = ( + delta.components.hours * 3600 + + delta.components.minutes * 60 + + delta.components.seconds + + subsecond + ) + + results = [] + if years: + results.append((end_date, f"{years} Y")) + + if days: + results.append((minus_years_date, f"{days} D")) + + if seconds: + results.append((minus_days_date, f"{seconds} S")) + + return results + + +# will remove this post testing and review +async def main(): + contract = IBContract( + secType="STK", + symbol="AAPL", + exchange="SMART", + primaryExchange="NASDAQ", + ) + instrument_id = "TSLA.NASDAQ" + + client = HistoricInteractiveBrokersClient(port=4002, client_id=5) + await client._connect() + await asyncio.sleep(2) + + instruments = await client.request_instruments( + contracts=[contract], + instrument_ids=[instrument_id], + ) + + bars = await client.request_bars( + bar_specifications=["1-DAY-LAST", "8-HOUR-MID"], + start_date_time=datetime.datetime(2022, 10, 15, 3), + end_date_time=datetime.datetime(2023, 11, 1), + tz_name="America/New_York", + contracts=[contract], + instrument_ids=[instrument_id], + ) + + trade_ticks = await client.request_ticks( + "TRADES", + start_date_time=datetime.datetime(2023, 11, 6, 10, 0), + end_date_time=datetime.datetime(2023, 11, 6, 10, 1), + tz_name="America/New_York", + contracts=[contract], + instrument_ids=[instrument_id], + ) + + quote_ticks = await client.request_ticks( + "BID_ASK", + start_date_time=datetime.datetime(2023, 11, 6, 10, 0), + end_date_time=datetime.datetime(2023, 11, 6, 10, 1), + tz_name="America/New_York", + contracts=[contract], + instrument_ids=[instrument_id], + ) + + catalog = ParquetDataCatalog("./catalog") + catalog.write_data(instruments) + catalog.write_data(bars) + catalog.write_data(trade_ticks) + catalog.write_data(quote_ticks) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/nautilus_trader/adapters/interactive_brokers/parsing/instruments.py b/nautilus_trader/adapters/interactive_brokers/parsing/instruments.py index a7033a61e1d3..abe248d55681 100644 --- a/nautilus_trader/adapters/interactive_brokers/parsing/instruments.py +++ b/nautilus_trader/adapters/interactive_brokers/parsing/instruments.py @@ -315,7 +315,7 @@ def ib_contract_to_instrument_id(contract: IBContract) -> InstrumentId: security_type = contract.secType if security_type == "STK": - symbol = contract.localSymbol.replace(" ", "-") + symbol = (contract.localSymbol or contract.symbol).replace(" ", "-") venue = contract.primaryExchange if contract.exchange == "SMART" else contract.exchange elif security_type == "OPT": symbol = contract.localSymbol.replace(" ", "") or contract.symbol.replace(" ", "") diff --git a/nautilus_trader/adapters/interactive_brokers/providers.py b/nautilus_trader/adapters/interactive_brokers/providers.py index 3c684db0e2cb..434899d74375 100644 --- a/nautilus_trader/adapters/interactive_brokers/providers.py +++ b/nautilus_trader/adapters/interactive_brokers/providers.py @@ -277,6 +277,7 @@ async def load_async( continue self._log.info(f"Adding {instrument=} from InteractiveBrokersInstrumentProvider") self.add(instrument) + self._client._cache.add_instrument(instrument) self.contract_details[instrument.id.value] = details self.contract_id_to_instrument_id[details.contract.conId] = instrument.id