From 0bfe3d784a02e8fe8febab55153a6de286d202c9 Mon Sep 17 00:00:00 2001 From: limx0 Date: Fri, 27 Oct 2023 19:37:41 +1100 Subject: [PATCH 01/12] WIP --- .../interactive_brokers/historic/client.py | 40 ++ .../interactive_brokers/historic/core.py | 422 ++++++++++++++++++ 2 files changed, 462 insertions(+) create mode 100644 nautilus_trader/adapters/interactive_brokers/historic/client.py create mode 100644 nautilus_trader/adapters/interactive_brokers/historic/core.py diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py new file mode 100644 index 000000000000..9d35632bd815 --- /dev/null +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -0,0 +1,40 @@ +import asyncio + +from nautilus_trader.adapters.interactive_brokers.client import InteractiveBrokersClient +from nautilus_trader.cache.cache import Cache +from nautilus_trader.common.clock import LiveClock +from nautilus_trader.common.logging import Logger +from nautilus_trader.model.identifiers import TraderId +from nautilus_trader.msgbus.bus import MessageBus + + +class HistoricInteractiveBrokersClient(InteractiveBrokersClient): + def __init__( + self, + host: str = "127.0.0.1", + port: int = 7497, + client_id: int = 1, + ): + loop = asyncio.get_event_loop() + clock = LiveClock() + logger = Logger(clock) + msgbus = MessageBus( + TraderId("historic_interactive_brokers_client"), + clock, + logger, + ) + cache = Cache(logger) + super().__init__( + loop=loop, + msgbus=msgbus, + cache=cache, + clock=clock, + logger=logger, + host=host, + port=port, + client_id=client_id, + ) + + +if __name__ == "__main__": + client = HistoricInteractiveBrokersClient() diff --git a/nautilus_trader/adapters/interactive_brokers/historic/core.py b/nautilus_trader/adapters/interactive_brokers/historic/core.py new file mode 100644 index 000000000000..0f93c5f5adcf --- /dev/null +++ b/nautilus_trader/adapters/interactive_brokers/historic/core.py @@ -0,0 +1,422 @@ +# ------------------------------------------------------------------------------------------------- +# Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved. +# https://nautechsystems.io +# +# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------------------------- +import datetime +import logging +from typing import Literal + +import pandas as pd +import pytz +from ibapi.common import BarData +from ibapi.common import HistoricalTickBidAsk +from ibapi.common import HistoricalTickLast +from ibapi.contract import Contract + +from nautilus_trader.adapters.interactive_brokers.client import InteractiveBrokersClient +from nautilus_trader.adapters.interactive_brokers.historic.client import HistoricInteractiveBrokersClient +from nautilus_trader.adapters.interactive_brokers.parsing.data import generate_trade_id +from nautilus_trader.adapters.interactive_brokers.parsing.instruments import parse_instrument +from nautilus_trader.core.datetime import dt_to_unix_nanos +from nautilus_trader.model.data.bar import Bar +from nautilus_trader.model.data.bar import BarSpecification +from nautilus_trader.model.data.bar import BarType +from nautilus_trader.model.data.tick import QuoteTick +from nautilus_trader.model.data.tick import TradeTick +from nautilus_trader.model.enums import AggregationSource +from nautilus_trader.model.enums import AggressorSide +from nautilus_trader.model.enums import bar_aggregation_to_str +from nautilus_trader.model.enums import price_type_to_str +from nautilus_trader.model.identifiers import InstrumentId +from nautilus_trader.model.instruments import Instrument +from nautilus_trader.model.objects import Price +from nautilus_trader.model.objects import Quantity +from nautilus_trader.persistence.catalog.parquet import ParquetDataCatalog + + +logger = logging.getLogger(__name__) + + +def generate_filename( + catalog: ParquetDataCatalog, + instrument_id: InstrumentId, + kind: Literal["BID_ASK", "TRADES"], + date: datetime.date, +) -> str: + fn_kind = {"BID_ASK": "quote_tick", "TRADES": "trade_tick", "BARS": "bars"}[kind.split("-")[0]] + return f"{catalog.path}/data/{fn_kind}.parquet/instrument_id={instrument_id.value}/{date:%Y%m%d}-0.parquet" + + +def back_fill_catalog( + catalog: ParquetDataCatalog, + contracts: list[Contract], + start_date: datetime.date, + end_date: datetime.date, + tz_name: str, + kinds=("BID_ASK", "TRADES"), +): + """ + Backfill the data catalog with market data from Interactive Brokers. + + Parameters + ---------- + ib : IB + The ib_insync client. + catalog : ParquetDataCatalog + The data catalog to write the data to. + contracts : list[Contract] + The list of IB Contracts to collect data for. + start_date : datetime.date + The start_date for the backfill. + end_date : datetime.date + The end_date for the backfill. + tz_name : str + The timezone of the contracts + kinds : tuple[str] (default: ('BID_ASK', 'TRADES') + The kinds to query data for, can be any of: + - BID_ASK + - TRADES + - A bar specification, i.e. BARS-1-MINUTE-LAST or BARS-5-SECOND-MID + + """ + client = HistoricInteractiveBrokersClient() + for date in pd.bdate_range(start_date, end_date, tz=tz_name): + for contract in contracts: + [details] = client.reqContractDetails(contract=contract) + instrument = parse_instrument(contract_details=details) + + # Check if this instrument exists in the catalog, if not, write it. + if not catalog.instruments(instrument_ids=[instrument.id.value]): + catalog.write_data([instrument]) + + for kind in kinds: + fn = generate_filename(catalog, instrument_id=instrument.id, kind=kind, date=date) + if catalog.fs.exists(fn): + logger.info( + f"file for {instrument.id.value} {kind} {date:%Y-%m-%d} exists, skipping", + ) + continue + logger.info(f"Fetching {instrument.id.value} {kind} for {date:%Y-%m-%d}") + + data = request_data( + contract=contract, + instrument=instrument, + date=date.date(), + kind=kind, + tz_name=tz_name, + client=client, + ) + if data is None: + continue + + template = f"{date:%Y%m%d}" + "-{i}.parquet" + catalog.write_data(data, basename_template=template) + + +def request_data( + client: HistoricInteractiveBrokersClient, + contract: Contract, + instrument: Instrument, + date: datetime.date, + kind: str, + tz_name: str, +): + if kind in ("TRADES", "BID_ASK"): + raw = request_tick_data( + client=client, + contract=contract, + date=date, + kind=kind, + tz_name=tz_name, + ) + elif kind.split("-")[0] == "BARS": + bar_spec = BarSpecification.from_str(kind.split("-", maxsplit=1)[1]) + raw = request_bar_data( + contract=contract, + date=date, + bar_spec=bar_spec, + tz_name=tz_name, + client=client, + ) + else: + raise RuntimeError(f"Unknown {kind=}") + + if not raw: + logging.info(f"No ticks for {date=} {kind=} {contract=}, skipping") + return + logger.info(f"Fetched {len(raw)} raw {kind}") + if kind == "TRADES": + return parse_historic_trade_ticks(historic_ticks=raw, instrument=instrument) + elif kind == "BID_ASK": + return parse_historic_quote_ticks(historic_ticks=raw, instrument=instrument) + elif kind.split("-")[0] == "BARS": + return parse_historic_bars(historic_bars=raw, instrument=instrument, kind=kind) + else: + raise RuntimeError(f"Unknown {kind=}") + + +def request_tick_data( + client: HistoricInteractiveBrokersClient, + contract: Contract, + date: datetime.date, + kind: str, + tz_name: str, +) -> list: + assert kind in ("TRADES", "BID_ASK") + data: list = [] + + while True: + start_time = _determine_next_timestamp( + date=date, + timestamps=[d.time for d in data], + tz_name=tz_name, + ) + logger.debug(f"Using start_time: {start_time}") + + ticks = _request_historical_ticks( + client=client, + contract=contract, + start_time=start_time.strftime("%Y%m%d %H:%M:%S %Z"), + what=kind, + ) + + ticks = [t for t in ticks if t not in data] + + if not ticks or ticks[-1].time < start_time: + break + + logger.debug(f"Received {len(ticks)} ticks between {ticks[0].time} and {ticks[-1].time}") + + last_timestamp = pd.Timestamp(ticks[-1].time) + last_date = last_timestamp.astimezone(tz_name).date() + + if last_date != date: + # May contain data from next date, filter this out + data.extend( + [ + tick + for tick in ticks + if pd.Timestamp(tick.time).astimezone(tz_name).date() == date + ], + ) + break + else: + data.extend(ticks) + return data + + +def request_bar_data( + client: InteractiveBrokersClient, + contract: Contract, + date: datetime.date, + tz_name: str, + bar_spec: BarSpecification, +) -> list: + data: list = [] + + start_time = pd.Timestamp(date).tz_localize(tz_name).tz_convert("UTC") + end_time = start_time + datetime.timedelta(days=1) + + while True: + logger.debug(f"Using end_time: {end_time}") + + # bar_data_list: BarDataList = _request_historical_bars( + bar_data_list = _request_historical_bars( + client=client, + contract=contract, + end_time=end_time.strftime("%Y%m%d %H:%M:%S %Z"), + bar_spec=bar_spec, + ) + + bars = [bar for bar in bar_data_list if bar not in data and bar.volume != 0] + + if not bars: + break + + logger.info(f"Received {len(bars)} bars between {bars[0].date} and {bars[-1].date}") + + # We're requesting from end_date backwards, set our timestamp to the earliest timestamp + first_timestamp = pd.Timestamp(bars[0].date).tz_convert(tz_name) + first_date = first_timestamp.date() + + if first_date != date: + # May contain data from next date, filter this out + data.extend( + [ + bar + for bar in bars + if parse_response_datetime(bar.date, tz_name=tz_name).date() == date + ], + ) + break + else: + data.extend(bars) + + end_time = first_timestamp + + return data + + +def _request_historical_ticks( + client: InteractiveBrokersClient, + contract: Contract, + start_time: str, + what="BID_ASK", +): + return client.reqHistoricalTicks( + contract=contract, + startDateTime=start_time, + endDateTime="", + numberOfTicks=1000, + whatToShow=what, + useRth=False, + ) + + +def _bar_spec_to_hist_data_request(bar_spec: BarSpecification) -> dict[str, str]: + aggregation = bar_aggregation_to_str(bar_spec.aggregation) + price_type = price_type_to_str(bar_spec.price_type) + accepted_aggregations = ("SECOND", "MINUTE", "HOUR") + + err = f"Loading historic bars is for intraday data, bar_spec.aggregation should be {accepted_aggregations}" + assert aggregation in accepted_aggregations, err + + price_mapping = {"MID": "MIDPOINT", "LAST": "TRADES"} + what_to_show = price_mapping.get(price_type, price_type) + + size_mapping = {"SECOND": "sec", "MINUTE": "min", "HOUR": "hour"} + suffix = "" if bar_spec.step == 1 and aggregation != "SECOND" else "s" + bar_size = size_mapping.get(aggregation, aggregation) + bar_size_setting = f"{bar_spec.step} {bar_size + suffix}" + return {"durationStr": "1 D", "barSizeSetting": bar_size_setting, "whatToShow": what_to_show} + + +def _request_historical_bars( + client: HistoricInteractiveBrokersClient, + contract: Contract, + end_time: str, + bar_spec: BarSpecification, +): + spec = _bar_spec_to_hist_data_request(bar_spec=bar_spec) + return client._client.reqHistoricalData( + contract=contract, + endDateTime=end_time, + durationStr=spec["durationStr"], + barSizeSetting=spec["barSizeSetting"], + whatToShow=spec["whatToShow"], + useRTH=False, + formatDate=2, + ) + + +def _determine_next_timestamp(timestamps: list[pd.Timestamp], date: datetime.date, tz_name: str): + """ + While looping over available data, it is possible for very liquid products that a 1s + period may contain 1000 ticks, at which point we need to step the time forward to + avoid getting stuck when iterating. + """ + if not timestamps: + return pd.Timestamp(date, tz=tz_name).tz_convert("UTC") + unique_values = set(timestamps) + if len(unique_values) == 1: + timestamp = timestamps[-1] + return timestamp + pd.Timedelta(seconds=1) + else: + return timestamps[-1] + + +def parse_response_datetime( + dt: datetime.datetime | pd.Timestamp, + tz_name: str, +) -> datetime.datetime: + if isinstance(dt, pd.Timestamp): + dt = dt.to_pydatetime() + if dt.tzinfo is None: + tz = pytz.timezone(tz_name) + dt = tz.localize(dt) + return dt + + +def parse_historic_quote_ticks( + historic_ticks: list[HistoricalTickBidAsk], + instrument: Instrument, +) -> list[QuoteTick]: + trades = [] + for tick in historic_ticks: + ts_init = dt_to_unix_nanos(tick.time) + quote_tick = QuoteTick( + instrument_id=instrument.id, + bid=Price(value=tick.priceBid, precision=instrument.price_precision), + bid_size=Quantity(value=tick.sizeBid, precision=instrument.size_precision), + ask=Price(value=tick.priceAsk, precision=instrument.price_precision), + ask_size=Quantity(value=tick.sizeAsk, precision=instrument.size_precision), + ts_init=ts_init, + ts_event=ts_init, + ) + trades.append(quote_tick) + + return trades + + +def parse_historic_trade_ticks( + historic_ticks: list[HistoricalTickLast], + instrument: Instrument, +) -> list[TradeTick]: + trades = [] + for tick in historic_ticks: + ts_init = dt_to_unix_nanos(tick.time) + trade_tick = TradeTick( + instrument_id=instrument.id, + price=Price(value=tick.price, precision=instrument.price_precision), + size=Quantity(value=tick.size, precision=instrument.size_precision), + aggressor_side=AggressorSide.NO_AGGRESSOR, + trade_id=generate_trade_id( + ts_event=ts_init, + price=tick.price, + size=tick.size, + ), + ts_init=ts_init, + ts_event=ts_init, + ) + trades.append(trade_tick) + + return trades + + +def parse_historic_bars( + historic_bars: list[BarData], + instrument: Instrument, + kind: str, +) -> list[Bar]: + bars = [] + bar_type = BarType( + bar_spec=BarSpecification.from_str(kind.split("-", maxsplit=1)[1]), + instrument_id=instrument.id, + aggregation_source=AggregationSource.EXTERNAL, + ) + precision = instrument.price_precision + for bar in historic_bars: + ts_init = dt_to_unix_nanos(bar.date) + trade_tick = Bar( + bar_type=bar_type, + open=Price(bar.open, precision), + high=Price(bar.high, precision), + low=Price(bar.low, precision), + close=Price(bar.close, precision), + volume=Quantity(bar.volume, instrument.size_precision), + ts_init=ts_init, + ts_event=ts_init, + ) + bars.append(trade_tick) + + return bars From 00d936b5434bb4bb166e7b14ea2200df0a267dc4 Mon Sep 17 00:00:00 2001 From: limx0 Date: Fri, 27 Oct 2023 20:18:45 +1100 Subject: [PATCH 02/12] WIP --- .../adapters/interactive_brokers/common.py | 2 +- .../adapters/interactive_brokers/gateway.py | 6 +- .../interactive_brokers/historic/client.py | 309 ++++++++++++++- .../interactive_brokers/historic/core.py | 370 ++---------------- 4 files changed, 332 insertions(+), 355 deletions(-) diff --git a/nautilus_trader/adapters/interactive_brokers/common.py b/nautilus_trader/adapters/interactive_brokers/common.py index 2e48c2329759..b24f0e6af400 100644 --- a/nautilus_trader/adapters/interactive_brokers/common.py +++ b/nautilus_trader/adapters/interactive_brokers/common.py @@ -85,7 +85,7 @@ class IBContract(NautilusConfig, frozen=True, repr_omit_defaults=True): Exchange where security is traded. Will be SMART for Stocks. primaryExchange: str Exchange where security is registered. Applies to Stocks. - localSymbol: str + symbol: str Unique Symbol registered in Exchange. build_options_chain: bool (default: None) Search for full option chain diff --git a/nautilus_trader/adapters/interactive_brokers/gateway.py b/nautilus_trader/adapters/interactive_brokers/gateway.py index 375b609afd93..074a2bbe10a9 100644 --- a/nautilus_trader/adapters/interactive_brokers/gateway.py +++ b/nautilus_trader/adapters/interactive_brokers/gateway.py @@ -45,14 +45,14 @@ class InteractiveBrokersGateway: A class to manage starting an Interactive Brokers Gateway docker container. """ - IMAGE: ClassVar[str] = "ghcr.io/unusualalpha/ib-gateway:stable" + IMAGE: ClassVar[str] = "ghcr.io/unusualalpha/ib-gateway:10.19" CONTAINER_NAME: ClassVar[str] = "nautilus-ib-gateway" PORTS: ClassVar[dict[str, int]] = {"paper": 4002, "live": 4001} def __init__( self, - username: str, - password: str, + username: str | None = None, + password: str | None = None, host: str | None = "localhost", port: int | None = None, trading_mode: str | None = "paper", diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index 9d35632bd815..8734e0efef25 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -1,14 +1,21 @@ import asyncio +import datetime + +import pandas as pd from nautilus_trader.adapters.interactive_brokers.client import InteractiveBrokersClient +from nautilus_trader.adapters.interactive_brokers.common import IBContract +from nautilus_trader.adapters.interactive_brokers.gateway import InteractiveBrokersGateway from nautilus_trader.cache.cache import Cache from nautilus_trader.common.clock import LiveClock from nautilus_trader.common.logging import Logger +from nautilus_trader.common.logging import LoggerAdapter +from nautilus_trader.model.data import TradeTick from nautilus_trader.model.identifiers import TraderId from nautilus_trader.msgbus.bus import MessageBus -class HistoricInteractiveBrokersClient(InteractiveBrokersClient): +class HistoricInteractiveBrokersClient: def __init__( self, host: str = "127.0.0.1", @@ -18,13 +25,14 @@ def __init__( loop = asyncio.get_event_loop() clock = LiveClock() logger = Logger(clock) + self.log = LoggerAdapter("HistoricInteractiveBrokersClient", logger) msgbus = MessageBus( - TraderId("historic_interactive_brokers_client"), + TraderId("historic_interactive_brokers_client-001"), clock, logger, ) cache = Cache(logger) - super().__init__( + self.client = InteractiveBrokersClient( loop=loop, msgbus=msgbus, cache=cache, @@ -35,6 +43,299 @@ def __init__( client_id=client_id, ) + async def connect(self): + await self.client._socket_connect() + + async def request_trade_ticks( + self, + contract: IBContract, + date: datetime.date, + tz_name: str, + ) -> list[TradeTick]: + data: list[TradeTick] = [] + while True: + start_time = _determine_next_timestamp( + date=date, + timestamps=[d.time for d in data], + tz_name=tz_name, + ) + _ = await self.client.get_historical_ticks( + contract=contract, + tick_type="TRADES", + end_date_time=start_time, + use_rth=True, + ) + + return data + + # def request_tick_data( + # self, + # contract: Contract, + # date: datetime.date, + # kind: str, + # tz_name: str, + # ) -> list: + # assert kind in ("TRADES", "BID_ASK") + # data: list = [] + # + # while True: + # start_time = _determine_next_timestamp( + # date=date, + # timestamps=[d.time for d in data], + # tz_name=tz_name, + # ) + # self.log.debug(f"Using start_time: {start_time}") + # + # ticks = _request_historical_ticks( + # client=client, + # contract=contract, + # start_time=start_time.strftime("%Y%m%d %H:%M:%S %Z"), + # what=kind, + # ) + # + # ticks = [t for t in ticks if t not in data] + # + # if not ticks or ticks[-1].time < start_time: + # break + # + # self.log.debug( + # f"Received {len(ticks)} ticks between {ticks[0].time} and {ticks[-1].time}", + # ) + # + # last_timestamp = pd.Timestamp(ticks[-1].time) + # last_date = last_timestamp.astimezone(tz_name).date() + # + # if last_date != date: + # # May contain data from next date, filter this out + # data.extend( + # [ + # tick + # for tick in ticks + # if pd.Timestamp(tick.time).astimezone(tz_name).date() == date + # ], + # ) + # break + # else: + # data.extend(ticks) + # return data + + # def request_bar_data( + # self, + # client: InteractiveBrokersClient, + # contract: Contract, + # date: datetime.date, + # tz_name: str, + # bar_spec: BarSpecification, + # ) -> list: + # data: list = [] + # + # start_time = pd.Timestamp(date).tz_localize(tz_name).tz_convert("UTC") + # end_time = start_time + datetime.timedelta(days=1) + # + # while True: + # self.self.log.debug(f"Using end_time: {end_time}") + # + # # bar_data_list: BarDataList = _request_historical_bars( + # bar_data_list = _request_historical_bars( + # client=client, + # contract=contract, + # end_time=end_time.strftime("%Y%m%d %H:%M:%S %Z"), + # bar_spec=bar_spec, + # ) + # + # bars = [bar for bar in bar_data_list if bar not in data and bar.volume != 0] + # + # if not bars: + # break + # + # self.log.info(f"Received {len(bars)} bars between {bars[0].date} and {bars[-1].date}") + # + # # We're requesting from end_date backwards, set our timestamp to the earliest timestamp + # first_timestamp = pd.Timestamp(bars[0].date).tz_convert(tz_name) + # first_date = first_timestamp.date() + # + # if first_date != date: + # # May contain data from next date, filter this out + # data.extend( + # [ + # bar + # for bar in bars + # if parse_response_datetime(bar.date, tz_name=tz_name).date() == date + # ], + # ) + # break + # else: + # data.extend(bars) + # + # end_time = first_timestamp + # + # return data + # + # def _request_historical_ticks( + # self, + # client: InteractiveBrokersClient, + # contract: Contract, + # start_time: str, + # what="BID_ASK", + # ): + # return client.reqHistoricalTicks( + # contract=contract, + # startDateTime=start_time, + # endDateTime="", + # numberOfTicks=1000, + # whatToShow=what, + # useRth=False, + # ) + # + # def _bar_spec_to_hist_data_request(self, bar_spec: BarSpecification) -> dict[str, str]: + # aggregation = bar_aggregation_to_str(bar_spec.aggregation) + # price_type = price_type_to_str(bar_spec.price_type) + # accepted_aggregations = ("SECOND", "MINUTE", "HOUR") + # + # err = f"Loading historic bars is for intraday data, bar_spec.aggregation should be {accepted_aggregations}" + # assert aggregation in accepted_aggregations, err + # + # price_mapping = {"MID": "MIDPOINT", "LAST": "TRADES"} + # what_to_show = price_mapping.get(price_type, price_type) + # + # size_mapping = {"SECOND": "sec", "MINUTE": "min", "HOUR": "hour"} + # suffix = "" if bar_spec.step == 1 and aggregation != "SECOND" else "s" + # bar_size = size_mapping.get(aggregation, aggregation) + # bar_size_setting = f"{bar_spec.step} {bar_size + suffix}" + # return { + # "durationStr": "1 D", + # "barSizeSetting": bar_size_setting, + # "whatToShow": what_to_show, + # } + # + # def _request_historical_bars( + # self, + # contract: Contract, + # end_time: str, + # bar_spec: BarSpecification, + # ): + # spec = _bar_spec_to_hist_data_request(bar_spec=bar_spec) + # return client._client.reqHistoricalData( + # contract=contract, + # endDateTime=end_time, + # durationStr=spec["durationStr"], + # barSizeSetting=spec["barSizeSetting"], + # whatToShow=spec["whatToShow"], + # useRTH=False, + # formatDate=2, + # ) + + # def parse_response_datetime( + # dt: datetime.datetime | pd.Timestamp, + # tz_name: str, + # ) -> datetime.datetime: + # if isinstance(dt, pd.Timestamp): + # dt = dt.to_pydatetime() + # if dt.tzinfo is None: + # tz = pytz.timezone(tz_name) + # dt = tz.localize(dt) + # return dt + # + # def parse_historic_quote_ticks( + # historic_ticks: list[HistoricalTickBidAsk], + # instrument: Instrument, + # ) -> list[QuoteTick]: + # trades = [] + # for tick in historic_ticks: + # ts_init = dt_to_unix_nanos(tick.time) + # quote_tick = QuoteTick( + # instrument_id=instrument.id, + # bid=Price(value=tick.priceBid, precision=instrument.price_precision), + # bid_size=Quantity(value=tick.sizeBid, precision=instrument.size_precision), + # ask=Price(value=tick.priceAsk, precision=instrument.price_precision), + # ask_size=Quantity(value=tick.sizeAsk, precision=instrument.size_precision), + # ts_init=ts_init, + # ts_event=ts_init, + # ) + # trades.append(quote_tick) + # + # return trades + # + # def parse_historic_trade_ticks( + # historic_ticks: list[HistoricalTickLast], + # instrument: Instrument, + # ) -> list[TradeTick]: + # trades = [] + # for tick in historic_ticks: + # ts_init = dt_to_unix_nanos(tick.time) + # trade_tick = TradeTick( + # instrument_id=instrument.id, + # price=Price(value=tick.price, precision=instrument.price_precision), + # size=Quantity(value=tick.size, precision=instrument.size_precision), + # aggressor_side=AggressorSide.NO_AGGRESSOR, + # trade_id=generate_trade_id( + # ts_event=ts_init, + # price=tick.price, + # size=tick.size, + # ), + # ts_init=ts_init, + # ts_event=ts_init, + # ) + # trades.append(trade_tick) + # + # return trades + # + # def parse_historic_bars( + # historic_bars: list[BarData], + # instrument: Instrument, + # kind: str, + # ) -> list[Bar]: + # bars = [] + # bar_type = BarType( + # bar_spec=BarSpecification.from_str(kind.split("-", maxsplit=1)[1]), + # instrument_id=instrument.id, + # aggregation_source=AggregationSource.EXTERNAL, + # ) + # precision = instrument.price_precision + # for bar in historic_bars: + # ts_init = dt_to_unix_nanos(bar.date) + # trade_tick = Bar( + # bar_type=bar_type, + # open=Price(bar.open, precision), + # high=Price(bar.high, precision), + # low=Price(bar.low, precision), + # close=Price(bar.close, precision), + # volume=Quantity(bar.volume, instrument.size_precision), + # ts_init=ts_init, + # ts_event=ts_init, + # ) + # bars.append(trade_tick) + # + # return bars + + +def _determine_next_timestamp(timestamps: list[pd.Timestamp], date: datetime.date, tz_name: str): + """ + While looping over available data, it is possible for very liquid products that a 1s + period may contain 1000 ticks, at which point we need to step the time forward to + avoid getting stuck when iterating. + """ + if not timestamps: + return pd.Timestamp(date, tz=tz_name).tz_convert("UTC") + unique_values = set(timestamps) + if len(unique_values) == 1: + timestamp = timestamps[-1] + return timestamp + pd.Timedelta(seconds=1) + else: + return timestamps[-1] + + +async def main(): + contract = IBContract(secType="STK", exchange="SM", localSymbol="AAPL", currency="USD") + _ = InteractiveBrokersGateway(start=True) + client = HistoricInteractiveBrokersClient(port=4002) + await client.connect() + await client.request_trade_ticks( + contract=contract, + date=datetime.date(2023, 10, 25), + tz_name="America/New_York", + ) + if __name__ == "__main__": - client = HistoricInteractiveBrokersClient() + asyncio.run(main()) diff --git a/nautilus_trader/adapters/interactive_brokers/historic/core.py b/nautilus_trader/adapters/interactive_brokers/historic/core.py index 0f93c5f5adcf..7347d7825d9c 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/core.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/core.py @@ -17,30 +17,10 @@ from typing import Literal import pandas as pd -import pytz -from ibapi.common import BarData -from ibapi.common import HistoricalTickBidAsk -from ibapi.common import HistoricalTickLast from ibapi.contract import Contract -from nautilus_trader.adapters.interactive_brokers.client import InteractiveBrokersClient from nautilus_trader.adapters.interactive_brokers.historic.client import HistoricInteractiveBrokersClient -from nautilus_trader.adapters.interactive_brokers.parsing.data import generate_trade_id -from nautilus_trader.adapters.interactive_brokers.parsing.instruments import parse_instrument -from nautilus_trader.core.datetime import dt_to_unix_nanos -from nautilus_trader.model.data.bar import Bar -from nautilus_trader.model.data.bar import BarSpecification -from nautilus_trader.model.data.bar import BarType -from nautilus_trader.model.data.tick import QuoteTick -from nautilus_trader.model.data.tick import TradeTick -from nautilus_trader.model.enums import AggregationSource -from nautilus_trader.model.enums import AggressorSide -from nautilus_trader.model.enums import bar_aggregation_to_str -from nautilus_trader.model.enums import price_type_to_str from nautilus_trader.model.identifiers import InstrumentId -from nautilus_trader.model.instruments import Instrument -from nautilus_trader.model.objects import Price -from nautilus_trader.model.objects import Quantity from nautilus_trader.persistence.catalog.parquet import ParquetDataCatalog @@ -57,7 +37,7 @@ def generate_filename( return f"{catalog.path}/data/{fn_kind}.parquet/instrument_id={instrument_id.value}/{date:%Y%m%d}-0.parquet" -def back_fill_catalog( +async def back_fill_catalog( catalog: ParquetDataCatalog, contracts: list[Contract], start_date: datetime.date, @@ -70,8 +50,6 @@ def back_fill_catalog( Parameters ---------- - ib : IB - The ib_insync client. catalog : ParquetDataCatalog The data catalog to write the data to. contracts : list[Contract] @@ -92,331 +70,29 @@ def back_fill_catalog( client = HistoricInteractiveBrokersClient() for date in pd.bdate_range(start_date, end_date, tz=tz_name): for contract in contracts: - [details] = client.reqContractDetails(contract=contract) - instrument = parse_instrument(contract_details=details) + # [details] = client.reqContractDetails(contract=contract) + # instrument = parse_instrument(contract_details=details) # Check if this instrument exists in the catalog, if not, write it. - if not catalog.instruments(instrument_ids=[instrument.id.value]): - catalog.write_data([instrument]) - - for kind in kinds: - fn = generate_filename(catalog, instrument_id=instrument.id, kind=kind, date=date) - if catalog.fs.exists(fn): - logger.info( - f"file for {instrument.id.value} {kind} {date:%Y-%m-%d} exists, skipping", - ) - continue - logger.info(f"Fetching {instrument.id.value} {kind} for {date:%Y-%m-%d}") - - data = request_data( - contract=contract, - instrument=instrument, - date=date.date(), - kind=kind, - tz_name=tz_name, - client=client, - ) - if data is None: - continue - - template = f"{date:%Y%m%d}" + "-{i}.parquet" - catalog.write_data(data, basename_template=template) - - -def request_data( - client: HistoricInteractiveBrokersClient, - contract: Contract, - instrument: Instrument, - date: datetime.date, - kind: str, - tz_name: str, -): - if kind in ("TRADES", "BID_ASK"): - raw = request_tick_data( - client=client, - contract=contract, - date=date, - kind=kind, - tz_name=tz_name, - ) - elif kind.split("-")[0] == "BARS": - bar_spec = BarSpecification.from_str(kind.split("-", maxsplit=1)[1]) - raw = request_bar_data( - contract=contract, - date=date, - bar_spec=bar_spec, - tz_name=tz_name, - client=client, - ) - else: - raise RuntimeError(f"Unknown {kind=}") - - if not raw: - logging.info(f"No ticks for {date=} {kind=} {contract=}, skipping") - return - logger.info(f"Fetched {len(raw)} raw {kind}") - if kind == "TRADES": - return parse_historic_trade_ticks(historic_ticks=raw, instrument=instrument) - elif kind == "BID_ASK": - return parse_historic_quote_ticks(historic_ticks=raw, instrument=instrument) - elif kind.split("-")[0] == "BARS": - return parse_historic_bars(historic_bars=raw, instrument=instrument, kind=kind) - else: - raise RuntimeError(f"Unknown {kind=}") - - -def request_tick_data( - client: HistoricInteractiveBrokersClient, - contract: Contract, - date: datetime.date, - kind: str, - tz_name: str, -) -> list: - assert kind in ("TRADES", "BID_ASK") - data: list = [] - - while True: - start_time = _determine_next_timestamp( - date=date, - timestamps=[d.time for d in data], - tz_name=tz_name, - ) - logger.debug(f"Using start_time: {start_time}") - - ticks = _request_historical_ticks( - client=client, - contract=contract, - start_time=start_time.strftime("%Y%m%d %H:%M:%S %Z"), - what=kind, - ) - - ticks = [t for t in ticks if t not in data] - - if not ticks or ticks[-1].time < start_time: - break - - logger.debug(f"Received {len(ticks)} ticks between {ticks[0].time} and {ticks[-1].time}") - - last_timestamp = pd.Timestamp(ticks[-1].time) - last_date = last_timestamp.astimezone(tz_name).date() - - if last_date != date: - # May contain data from next date, filter this out - data.extend( - [ - tick - for tick in ticks - if pd.Timestamp(tick.time).astimezone(tz_name).date() == date - ], + # if not catalog.instruments(instrument_ids=[instrument.id.value]): + # catalog.write_data([instrument]) + # + # for kind in kinds: + # fn = generate_filename(catalog, instrument_id=instrument.id, kind=kind, date=date) + # if catalog.fs.exists(fn): + # logger.info( + # f"file for {instrument.id.value} {kind} {date:%Y-%m-%d} exists, skipping", + # ) + # continue + # logger.info(f"Fetching {instrument.id.value} {kind} for {date:%Y-%m-%d}") + # + data = await client.request_trade_ticks( + contract=contract, # typing: ignore + date=date.date(), + tz_name=tz_name, ) - break - else: - data.extend(ticks) - return data - - -def request_bar_data( - client: InteractiveBrokersClient, - contract: Contract, - date: datetime.date, - tz_name: str, - bar_spec: BarSpecification, -) -> list: - data: list = [] - - start_time = pd.Timestamp(date).tz_localize(tz_name).tz_convert("UTC") - end_time = start_time + datetime.timedelta(days=1) - - while True: - logger.debug(f"Using end_time: {end_time}") - - # bar_data_list: BarDataList = _request_historical_bars( - bar_data_list = _request_historical_bars( - client=client, - contract=contract, - end_time=end_time.strftime("%Y%m%d %H:%M:%S %Z"), - bar_spec=bar_spec, - ) - - bars = [bar for bar in bar_data_list if bar not in data and bar.volume != 0] - - if not bars: - break - - logger.info(f"Received {len(bars)} bars between {bars[0].date} and {bars[-1].date}") - - # We're requesting from end_date backwards, set our timestamp to the earliest timestamp - first_timestamp = pd.Timestamp(bars[0].date).tz_convert(tz_name) - first_date = first_timestamp.date() - - if first_date != date: - # May contain data from next date, filter this out - data.extend( - [ - bar - for bar in bars - if parse_response_datetime(bar.date, tz_name=tz_name).date() == date - ], - ) - break - else: - data.extend(bars) - - end_time = first_timestamp - - return data - - -def _request_historical_ticks( - client: InteractiveBrokersClient, - contract: Contract, - start_time: str, - what="BID_ASK", -): - return client.reqHistoricalTicks( - contract=contract, - startDateTime=start_time, - endDateTime="", - numberOfTicks=1000, - whatToShow=what, - useRth=False, - ) - - -def _bar_spec_to_hist_data_request(bar_spec: BarSpecification) -> dict[str, str]: - aggregation = bar_aggregation_to_str(bar_spec.aggregation) - price_type = price_type_to_str(bar_spec.price_type) - accepted_aggregations = ("SECOND", "MINUTE", "HOUR") - - err = f"Loading historic bars is for intraday data, bar_spec.aggregation should be {accepted_aggregations}" - assert aggregation in accepted_aggregations, err - - price_mapping = {"MID": "MIDPOINT", "LAST": "TRADES"} - what_to_show = price_mapping.get(price_type, price_type) - - size_mapping = {"SECOND": "sec", "MINUTE": "min", "HOUR": "hour"} - suffix = "" if bar_spec.step == 1 and aggregation != "SECOND" else "s" - bar_size = size_mapping.get(aggregation, aggregation) - bar_size_setting = f"{bar_spec.step} {bar_size + suffix}" - return {"durationStr": "1 D", "barSizeSetting": bar_size_setting, "whatToShow": what_to_show} - - -def _request_historical_bars( - client: HistoricInteractiveBrokersClient, - contract: Contract, - end_time: str, - bar_spec: BarSpecification, -): - spec = _bar_spec_to_hist_data_request(bar_spec=bar_spec) - return client._client.reqHistoricalData( - contract=contract, - endDateTime=end_time, - durationStr=spec["durationStr"], - barSizeSetting=spec["barSizeSetting"], - whatToShow=spec["whatToShow"], - useRTH=False, - formatDate=2, - ) - - -def _determine_next_timestamp(timestamps: list[pd.Timestamp], date: datetime.date, tz_name: str): - """ - While looping over available data, it is possible for very liquid products that a 1s - period may contain 1000 ticks, at which point we need to step the time forward to - avoid getting stuck when iterating. - """ - if not timestamps: - return pd.Timestamp(date, tz=tz_name).tz_convert("UTC") - unique_values = set(timestamps) - if len(unique_values) == 1: - timestamp = timestamps[-1] - return timestamp + pd.Timedelta(seconds=1) - else: - return timestamps[-1] - - -def parse_response_datetime( - dt: datetime.datetime | pd.Timestamp, - tz_name: str, -) -> datetime.datetime: - if isinstance(dt, pd.Timestamp): - dt = dt.to_pydatetime() - if dt.tzinfo is None: - tz = pytz.timezone(tz_name) - dt = tz.localize(dt) - return dt - - -def parse_historic_quote_ticks( - historic_ticks: list[HistoricalTickBidAsk], - instrument: Instrument, -) -> list[QuoteTick]: - trades = [] - for tick in historic_ticks: - ts_init = dt_to_unix_nanos(tick.time) - quote_tick = QuoteTick( - instrument_id=instrument.id, - bid=Price(value=tick.priceBid, precision=instrument.price_precision), - bid_size=Quantity(value=tick.sizeBid, precision=instrument.size_precision), - ask=Price(value=tick.priceAsk, precision=instrument.price_precision), - ask_size=Quantity(value=tick.sizeAsk, precision=instrument.size_precision), - ts_init=ts_init, - ts_event=ts_init, - ) - trades.append(quote_tick) - - return trades - - -def parse_historic_trade_ticks( - historic_ticks: list[HistoricalTickLast], - instrument: Instrument, -) -> list[TradeTick]: - trades = [] - for tick in historic_ticks: - ts_init = dt_to_unix_nanos(tick.time) - trade_tick = TradeTick( - instrument_id=instrument.id, - price=Price(value=tick.price, precision=instrument.price_precision), - size=Quantity(value=tick.size, precision=instrument.size_precision), - aggressor_side=AggressorSide.NO_AGGRESSOR, - trade_id=generate_trade_id( - ts_event=ts_init, - price=tick.price, - size=tick.size, - ), - ts_init=ts_init, - ts_event=ts_init, - ) - trades.append(trade_tick) - - return trades - - -def parse_historic_bars( - historic_bars: list[BarData], - instrument: Instrument, - kind: str, -) -> list[Bar]: - bars = [] - bar_type = BarType( - bar_spec=BarSpecification.from_str(kind.split("-", maxsplit=1)[1]), - instrument_id=instrument.id, - aggregation_source=AggregationSource.EXTERNAL, - ) - precision = instrument.price_precision - for bar in historic_bars: - ts_init = dt_to_unix_nanos(bar.date) - trade_tick = Bar( - bar_type=bar_type, - open=Price(bar.open, precision), - high=Price(bar.high, precision), - low=Price(bar.low, precision), - close=Price(bar.close, precision), - volume=Quantity(bar.volume, instrument.size_precision), - ts_init=ts_init, - ts_event=ts_init, - ) - bars.append(trade_tick) + if data is None: + continue - return bars + template = f"{date:%Y%m%d}" + "-{i}.parquet" + catalog.write_data(data, basename_template=template) From be8baceeba60d56ef13e5b462551e4c384822bfc Mon Sep 17 00:00:00 2001 From: Bradley McElroy Date: Fri, 27 Oct 2023 21:03:29 +1100 Subject: [PATCH 03/12] WIP --- .pre-commit-config.yaml | 2 +- .../interactive_brokers/historic/client.py | 114 ++++-------------- .../parsing/instruments.py | 2 +- 3 files changed, 24 insertions(+), 94 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fb9e3d26c61e..74970264a39c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -76,7 +76,7 @@ repos: exclude: "docs/_pygments/monokai.py" - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.2 + rev: v0.1.3 hooks: - id: ruff args: ["--fix"] diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index 8734e0efef25..9ad11441f7da 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -2,10 +2,10 @@ import datetime import pandas as pd +from ibapi.common import MarketDataTypeEnum from nautilus_trader.adapters.interactive_brokers.client import InteractiveBrokersClient from nautilus_trader.adapters.interactive_brokers.common import IBContract -from nautilus_trader.adapters.interactive_brokers.gateway import InteractiveBrokersGateway from nautilus_trader.cache.cache import Cache from nautilus_trader.common.clock import LiveClock from nautilus_trader.common.logging import Logger @@ -21,6 +21,7 @@ def __init__( host: str = "127.0.0.1", port: int = 7497, client_id: int = 1, + market_data_type: MarketDataTypeEnum = MarketDataTypeEnum.REALTIME, ): loop = asyncio.get_event_loop() clock = LiveClock() @@ -32,7 +33,8 @@ def __init__( logger, ) cache = Cache(logger) - self.client = InteractiveBrokersClient( + self.market_data_type = market_data_type + self._client = InteractiveBrokersClient( loop=loop, msgbus=msgbus, cache=cache, @@ -43,8 +45,13 @@ def __init__( client_id=client_id, ) - async def connect(self): - await self.client._socket_connect() + async def _connect(self): + # Connect client + await self._client.is_running_async() + self._client.registered_nautilus_clients.add(1) + + # Set Market Data Type + await self._client.set_market_data_type(self.market_data_type) async def request_trade_ticks( self, @@ -59,12 +66,13 @@ async def request_trade_ticks( timestamps=[d.time for d in data], tz_name=tz_name, ) - _ = await self.client.get_historical_ticks( + response = await self._client.get_historical_ticks( contract=contract, tick_type="TRADES", end_date_time=start_time, use_rth=True, ) + print(response) return data @@ -225,89 +233,6 @@ async def request_trade_ticks( # formatDate=2, # ) - # def parse_response_datetime( - # dt: datetime.datetime | pd.Timestamp, - # tz_name: str, - # ) -> datetime.datetime: - # if isinstance(dt, pd.Timestamp): - # dt = dt.to_pydatetime() - # if dt.tzinfo is None: - # tz = pytz.timezone(tz_name) - # dt = tz.localize(dt) - # return dt - # - # def parse_historic_quote_ticks( - # historic_ticks: list[HistoricalTickBidAsk], - # instrument: Instrument, - # ) -> list[QuoteTick]: - # trades = [] - # for tick in historic_ticks: - # ts_init = dt_to_unix_nanos(tick.time) - # quote_tick = QuoteTick( - # instrument_id=instrument.id, - # bid=Price(value=tick.priceBid, precision=instrument.price_precision), - # bid_size=Quantity(value=tick.sizeBid, precision=instrument.size_precision), - # ask=Price(value=tick.priceAsk, precision=instrument.price_precision), - # ask_size=Quantity(value=tick.sizeAsk, precision=instrument.size_precision), - # ts_init=ts_init, - # ts_event=ts_init, - # ) - # trades.append(quote_tick) - # - # return trades - # - # def parse_historic_trade_ticks( - # historic_ticks: list[HistoricalTickLast], - # instrument: Instrument, - # ) -> list[TradeTick]: - # trades = [] - # for tick in historic_ticks: - # ts_init = dt_to_unix_nanos(tick.time) - # trade_tick = TradeTick( - # instrument_id=instrument.id, - # price=Price(value=tick.price, precision=instrument.price_precision), - # size=Quantity(value=tick.size, precision=instrument.size_precision), - # aggressor_side=AggressorSide.NO_AGGRESSOR, - # trade_id=generate_trade_id( - # ts_event=ts_init, - # price=tick.price, - # size=tick.size, - # ), - # ts_init=ts_init, - # ts_event=ts_init, - # ) - # trades.append(trade_tick) - # - # return trades - # - # def parse_historic_bars( - # historic_bars: list[BarData], - # instrument: Instrument, - # kind: str, - # ) -> list[Bar]: - # bars = [] - # bar_type = BarType( - # bar_spec=BarSpecification.from_str(kind.split("-", maxsplit=1)[1]), - # instrument_id=instrument.id, - # aggregation_source=AggregationSource.EXTERNAL, - # ) - # precision = instrument.price_precision - # for bar in historic_bars: - # ts_init = dt_to_unix_nanos(bar.date) - # trade_tick = Bar( - # bar_type=bar_type, - # open=Price(bar.open, precision), - # high=Price(bar.high, precision), - # low=Price(bar.low, precision), - # close=Price(bar.close, precision), - # volume=Quantity(bar.volume, instrument.size_precision), - # ts_init=ts_init, - # ts_event=ts_init, - # ) - # bars.append(trade_tick) - # - # return bars - def _determine_next_timestamp(timestamps: list[pd.Timestamp], date: datetime.date, tz_name: str): """ @@ -326,10 +251,15 @@ def _determine_next_timestamp(timestamps: list[pd.Timestamp], date: datetime.dat async def main(): - contract = IBContract(secType="STK", exchange="SM", localSymbol="AAPL", currency="USD") - _ = InteractiveBrokersGateway(start=True) - client = HistoricInteractiveBrokersClient(port=4002) - await client.connect() + contract = IBContract( + secType="STK", + symbol="AAPL", + exchange="SMART", + primaryExchange="NASDAQ", + ) + client = HistoricInteractiveBrokersClient(client_id=5) + await client._connect() + await asyncio.sleep(2) await client.request_trade_ticks( contract=contract, date=datetime.date(2023, 10, 25), diff --git a/nautilus_trader/adapters/interactive_brokers/parsing/instruments.py b/nautilus_trader/adapters/interactive_brokers/parsing/instruments.py index caeb4091787d..7e524c5d1726 100644 --- a/nautilus_trader/adapters/interactive_brokers/parsing/instruments.py +++ b/nautilus_trader/adapters/interactive_brokers/parsing/instruments.py @@ -300,7 +300,7 @@ def ib_contract_to_instrument_id(contract: IBContract) -> InstrumentId: security_type = contract.secType if security_type == "STK": - symbol = contract.localSymbol.replace(" ", "-") + symbol = (contract.localSymbol or contract.symbol).replace(" ", "-") venue = contract.primaryExchange if contract.exchange == "SMART" else contract.exchange elif security_type == "OPT": symbol = contract.localSymbol.replace(" ", "") or contract.symbol.replace(" ", "") From e3cc70c3777e68f64be15addb31ef5219a2e9b75 Mon Sep 17 00:00:00 2001 From: Ben Singleton Date: Mon, 6 Nov 2023 20:40:25 -0500 Subject: [PATCH 04/12] WIP --- .../interactive_brokers/historic_download.py | 104 ++--- .../interactive_brokers/client/client.py | 17 +- .../interactive_brokers/historic/__init__.py | 6 + .../historic/async_actor.py | 140 ------ .../interactive_brokers/historic/bar_data.py | 116 ----- .../interactive_brokers/historic/client.py | 442 ++++++++++-------- .../interactive_brokers/historic/core.py | 98 ---- .../adapters/interactive_brokers/providers.py | 1 + 8 files changed, 307 insertions(+), 617 deletions(-) delete mode 100644 nautilus_trader/adapters/interactive_brokers/historic/async_actor.py delete mode 100644 nautilus_trader/adapters/interactive_brokers/historic/bar_data.py delete mode 100644 nautilus_trader/adapters/interactive_brokers/historic/core.py diff --git a/examples/live/interactive_brokers/historic_download.py b/examples/live/interactive_brokers/historic_download.py index 79b401aa6ce9..fd350996e969 100644 --- a/examples/live/interactive_brokers/historic_download.py +++ b/examples/live/interactive_brokers/historic_download.py @@ -13,78 +13,54 @@ # See the License for the specific language governing permissions and # limitations under the License. # ------------------------------------------------------------------------------------------------- +import asyncio +import datetime -import pandas as pd +from nautilus_trader.adapters.interactive_brokers.common import IBContract +from nautilus_trader.adapters.interactive_brokers.historic import HistoricInteractiveBrokersClient +from nautilus_trader.persistence.catalog import ParquetDataCatalog -# fmt: off -from nautilus_trader.adapters.interactive_brokers.config import InteractiveBrokersDataClientConfig -from nautilus_trader.adapters.interactive_brokers.factories import InteractiveBrokersLiveDataClientFactory -from nautilus_trader.adapters.interactive_brokers.factories import InteractiveBrokersLiveExecClientFactory -from nautilus_trader.adapters.interactive_brokers.historic.bar_data import BarDataDownloader -from nautilus_trader.adapters.interactive_brokers.historic.bar_data import BarDataDownloaderConfig -from nautilus_trader.config import LoggingConfig -from nautilus_trader.config import TradingNodeConfig -from nautilus_trader.live.node import TradingNode -from nautilus_trader.model.data import Bar +async def main(): + # Define one or many IBContracts or instrument_ids to fetch data for + contract = IBContract( + secType="STK", + symbol="AAPL", + exchange="SMART", + primaryExchange="NASDAQ", + ) + instrument_id = "TSLA.NASDAQ" -# fmt: on + client = HistoricInteractiveBrokersClient(port=4002, client_id=5) + await client._connect() + await asyncio.sleep(2) -# *** MAKE SURE YOU HAVE REQUIRED DATA SUBSCRIPTION FOR THIS WORK WORK AS INTENDED. *** + instruments = await client.request_instruments( + contracts=[contract], + instrument_ids=[instrument_id], + ) -df = pd.DataFrame() + bars = await client.request_bars( + bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], + end_date_time=datetime.datetime.now(), + duration="1 D", + contracts=[contract], + instrument_ids=[instrument_id], + ) + ticks = await client.request_ticks( + "TRADES", + start_date_time=datetime.date.today() - datetime.timedelta(days=4), + end_date_time=datetime.date.today() - datetime.timedelta(days=3), + tz_name="America/New_York", + contracts=[contract], + instrument_ids=[instrument_id], + ) -# Data Handler for BarDataDownloader -def do_something_with_bars(bars: list): - global df - bars_dict = [Bar.to_dict(bar) for bar in bars] - df = pd.concat([df, pd.DataFrame(bars_dict)]) - df = df.sort_values(by="ts_init") + catalog = ParquetDataCatalog("./catalog") + # You can write any Nautilus Data to the catalog + catalog.write_data(instruments + bars + ticks) -# Configure the trading node -config_node = TradingNodeConfig( - trader_id="TESTER-001", - logging=LoggingConfig(log_level="INFO"), - data_clients={ - "InteractiveBrokers": InteractiveBrokersDataClientConfig( - ibg_host="127.0.0.1", - ibg_port=7497, - ibg_client_id=1, - ), - }, - timeout_connection=90.0, -) - -# Instantiate the node with a configuration -node = TradingNode(config=config_node) - -# Configure your strategy -downloader_config = BarDataDownloaderConfig( - start_iso_ts="2023-09-01T00:00:00+00:00", - end_iso_ts="2023-09-30T00:00:00+00:00", - bar_types=[ - "AAPL.NASDAQ-1-MINUTE-BID-EXTERNAL", - "AAPL.NASDAQ-1-MINUTE-ASK-EXTERNAL", - "AAPL.NASDAQ-1-MINUTE-LAST-EXTERNAL", - ], - handler=do_something_with_bars, - freq="1W", -) - -# Instantiate the downloader and add into node -downloader = BarDataDownloader(config=downloader_config) -node.trader.add_actor(downloader) - -# Register your client factories with the node (can take user defined factories) -node.add_data_client_factory("InteractiveBrokers", InteractiveBrokersLiveDataClientFactory) -node.add_exec_client_factory("InteractiveBrokers", InteractiveBrokersLiveExecClientFactory) -node.build() - -# Stop and dispose of the node with SIGINT/CTRL+C if __name__ == "__main__": - try: - node.run() - finally: - node.dispose() + asyncio.run(main()) diff --git a/nautilus_trader/adapters/interactive_brokers/client/client.py b/nautilus_trader/adapters/interactive_brokers/client/client.py index 7c00587c7a13..624759aa13bf 100644 --- a/nautilus_trader/adapters/interactive_brokers/client/client.py +++ b/nautilus_trader/adapters/interactive_brokers/client/client.py @@ -1297,7 +1297,7 @@ async def get_historical_ticks( self, contract: IBContract, tick_type: str, - end_date_time: pd.Timestamp, + start_date_time: pd.Timestamp, use_rth: bool, ): name = (str(ib_contract_to_instrument_id(contract)), tick_type) @@ -1310,8 +1310,8 @@ async def get_historical_ticks( self._client.reqHistoricalTicks, reqId=req_id, contract=contract, - startDateTime="", - endDateTime=end_date_time.strftime("%Y%m%d %H:%M:%S %Z"), + startDateTime=start_date_time.strftime("%Y%m%d %H:%M:%S %Z"), + endDateTime="", numberOfTicks=1000, whatToShow=tick_type, useRth=use_rth, @@ -1347,7 +1347,7 @@ def historicalTicksBidAsk(self, req_id: int, ticks: list, done: bool): self._end_request(req_id) - def historicalTicksLast(self, req_id: int, ticks: list, done: bool): + def historicalTicksLast(self, req_id: int, ticks: list, done: bool): # : Override the EWrapper self.logAnswer(current_fn_name(), vars()) self._process_trade_ticks(req_id, ticks) @@ -1356,10 +1356,15 @@ def historicalTicks(self, req_id: int, ticks: list, done: bool): self._process_trade_ticks(req_id, ticks) def _process_trade_ticks(self, req_id: int, ticks: list): - if request := self.requests.get(req_id=req_id): + # inspect self.requests and compare to req_id. sometimes + # req_id is one lower than self.requests. this is a bug. + # if request := self.requests.get(req_id=req_id): + if len(ticks) == 0: + self._end_request(req_id) + if request := self.requests.get(req_id=req_id + 1): + # if request := self.requests[0]: instrument_id = InstrumentId.from_str(request.name[0]) instrument = self._cache.instrument(instrument_id) - for tick in ticks: ts_event = pd.Timestamp.fromtimestamp(tick.time, tz=pytz.utc).value trade_tick = TradeTick( diff --git a/nautilus_trader/adapters/interactive_brokers/historic/__init__.py b/nautilus_trader/adapters/interactive_brokers/historic/__init__.py index ca16b56e4794..326b137862e3 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/__init__.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/__init__.py @@ -12,3 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # ------------------------------------------------------------------------------------------------- +from nautilus_trader.adapters.interactive_brokers.historic.client import HistoricInteractiveBrokersClient + + +__all__ = [ + "HistoricInteractiveBrokersClient", +] diff --git a/nautilus_trader/adapters/interactive_brokers/historic/async_actor.py b/nautilus_trader/adapters/interactive_brokers/historic/async_actor.py deleted file mode 100644 index 86515d7e91cf..000000000000 --- a/nautilus_trader/adapters/interactive_brokers/historic/async_actor.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python3 -# ------------------------------------------------------------------------------------------------- -# Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved. -# https://nautechsystems.io -# -# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ------------------------------------------------------------------------------------------------- - -import asyncio -import functools -from collections.abc import Callable - -# fmt: off -from collections.abc import Coroutine - -import async_timeout - -from nautilus_trader.common.actor import Actor -from nautilus_trader.common.actor import ActorConfig -from nautilus_trader.common.clock import LiveClock -from nautilus_trader.config.common import Environment -from nautilus_trader.core.rust.common import LogColor -from nautilus_trader.core.uuid import UUID4 - - -# fmt: on - - -class AsyncActor(Actor): - def __init__(self, config: ActorConfig): - super().__init__(config) - - self.environment: Environment | None = Environment.BACKTEST - - # Hot Cache - self._pending_async_requests: dict[UUID4, asyncio.Event] = {} - - # Initialized in on_start - self._loop: asyncio.AbstractEventLoop | None = None - - def on_start(self): - if isinstance(self.clock, LiveClock): - self.environment = Environment.LIVE - - if self.environment == Environment.LIVE: - self._loop = asyncio.get_running_loop() - self.create_task(self._on_start()) - else: - asyncio.run(self._on_start()) - - async def _on_start(self): - raise NotImplementedError( # pragma: no cover - "implement the `_on_start` coroutine", # pragma: no cover - ) - - def _finish_response(self, request_id: UUID4): - super()._finish_response(request_id) - if request_id in self._pending_async_requests.keys(): - self._pending_async_requests[request_id].set() - - async def await_request(self, request_id: UUID4, timeout: int = 30): - self._pending_async_requests[request_id] = asyncio.Event() - try: - async with async_timeout.timeout(timeout): - await self._pending_async_requests[request_id].wait() - except asyncio.TimeoutError: - self.log.error(f"Failed to download data for {request_id}") - del self._pending_async_requests[request_id] - - def create_task( - self, - coro: Coroutine, - log_msg: str | None = None, - actions: Callable | None = None, - success: str | None = None, - ) -> asyncio.Task: - """ - Run the given coroutine with error handling and optional callback actions when - done. - - Parameters - ---------- - coro : Coroutine - The coroutine to run. - log_msg : str, optional - The log message for the task. - actions : Callable, optional - The actions callback to run when the coroutine is done. - success : str, optional - The log message to write on actions success. - - Returns - ------- - asyncio.Task - - """ - log_msg = log_msg or coro.__name__ - self._log.debug(f"Creating task {log_msg}.") - task = self._loop.create_task( - coro, - name=coro.__name__, - ) - task.add_done_callback( - functools.partial( - self._on_task_completed, - actions, - success, - ), - ) - return task - - def _on_task_completed( - self, - actions: Callable | None, - success: str | None, - task: asyncio.Task, - ) -> None: - if task.exception(): - self._log.error( - f"Error on `{task.get_name()}`: " f"{task.exception()!r}", - ) - else: - if actions: - try: - actions() - except Exception as e: - self._log.error( - f"Failed triggering action {actions.__name__} on `{task.get_name()}`: " - f"{e!r}", - ) - if success: - self._log.info(success, LogColor.GREEN) diff --git a/nautilus_trader/adapters/interactive_brokers/historic/bar_data.py b/nautilus_trader/adapters/interactive_brokers/historic/bar_data.py deleted file mode 100644 index a3d095bfb25a..000000000000 --- a/nautilus_trader/adapters/interactive_brokers/historic/bar_data.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python3 -# ------------------------------------------------------------------------------------------------- -# Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved. -# https://nautechsystems.io -# -# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ------------------------------------------------------------------------------------------------- - -from collections.abc import Callable - -import pandas as pd - -# fmt: off -from nautilus_trader.adapters.interactive_brokers.historic.async_actor import AsyncActor -from nautilus_trader.common.actor import ActorConfig -from nautilus_trader.core.correctness import PyCondition -from nautilus_trader.model.data.bar import Bar -from nautilus_trader.model.data.bar import BarType - - -# fmt: on - - -class BarDataDownloaderConfig(ActorConfig): - """ - Configuration for `BarDataDownloader` instances. - """ - - start_iso_ts: str - end_iso_ts: str - bar_types: list[str] - handler: Callable - freq: str = "1W" - request_timeout: int = 30 - - -class BarDataDownloader(AsyncActor): - def __init__(self, config: BarDataDownloaderConfig): - super().__init__(config) - try: - self.start_time: pd.Timestamp = pd.to_datetime( - config.start_iso_ts, - format="%Y-%m-%dT%H:%M:%S%z", - ) - self.end_time: pd.Timestamp = pd.to_datetime( - config.end_iso_ts, - format="%Y-%m-%dT%H:%M:%S%z", - ) - except ValueError: - raise ValueError("`start_iso_ts` and `end_iso_ts` must be like '%Y-%m-%dT%H:%M:%S%z'") - - self.bar_types: list[BarType] = [] - for bar_type in config.bar_types: - self.bar_types.append(BarType.from_str(bar_type)) - - self.handler: Callable | None = config.handler - self.freq: str = config.freq - - async def _on_start(self): - instrument_ids = {bar_type.instrument_id for bar_type in self.bar_types} - for instrument_id in instrument_ids: - request_id = self.request_instrument(instrument_id) - await self.await_request(request_id, timeout=self.config.request_timeout) - - request_dates = list(pd.date_range(self.start_time, self.end_time, freq=self.freq)) - - for request_date in request_dates: - for bar_type in self.bar_types: - request_id = self.request_bars( - bar_type=bar_type, - start=request_date, - end=request_date + pd.Timedelta(self.freq), - ) - await self.await_request(request_id, timeout=self.config.request_timeout) - - self.stop() - - def handle_bars(self, bars: list): - """ - Handle the given historical bar data by handling each bar individually. - - Parameters - ---------- - bars : list[Bar] - The bars to handle. - - Warnings - -------- - System method (not intended to be called by user code). - - """ - PyCondition.not_none(bars, "bars") # Can be empty - - length = len(bars) - first: Bar = bars[0] if length > 0 else None - last: Bar = bars[length - 1] if length > 0 else None - - if length > 0: - self._log.info(f"Received data for {first.bar_type}.") - else: - self._log.error(f"Received data for unknown bar type.") - return - - if length > 0 and first.ts_init > last.ts_init: - raise RuntimeError(f"cannot handle data: incorrectly sorted") - - # Send Bars response as a whole to handler - self.handler(bars) diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index 9ad11441f7da..db2ddbb90ec2 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -1,21 +1,40 @@ import asyncio import datetime +import re +from typing import Literal import pandas as pd from ibapi.common import MarketDataTypeEnum from nautilus_trader.adapters.interactive_brokers.client import InteractiveBrokersClient from nautilus_trader.adapters.interactive_brokers.common import IBContract +from nautilus_trader.adapters.interactive_brokers.parsing.instruments import ib_contract_to_instrument_id +from nautilus_trader.adapters.interactive_brokers.parsing.instruments import instrument_id_to_ib_contract +from nautilus_trader.adapters.interactive_brokers.providers import InteractiveBrokersInstrumentProvider +from nautilus_trader.adapters.interactive_brokers.providers import InteractiveBrokersInstrumentProviderConfig from nautilus_trader.cache.cache import Cache from nautilus_trader.common.clock import LiveClock from nautilus_trader.common.logging import Logger from nautilus_trader.common.logging import LoggerAdapter +from nautilus_trader.core.datetime import unix_nanos_to_dt +from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick +from nautilus_trader.model.data.bar import Bar +from nautilus_trader.model.data.bar import BarSpecification +from nautilus_trader.model.data.bar import BarType +from nautilus_trader.model.enums import AggregationSource +from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.identifiers import TraderId +from nautilus_trader.model.instruments.base import Instrument from nautilus_trader.msgbus.bus import MessageBus +from nautilus_trader.persistence.catalog import ParquetDataCatalog class HistoricInteractiveBrokersClient: + """ + Provides a means of requesting historical market data for backtesting. + """ + def __init__( self, host: str = "127.0.0.1", @@ -24,6 +43,7 @@ def __init__( market_data_type: MarketDataTypeEnum = MarketDataTypeEnum.REALTIME, ): loop = asyncio.get_event_loop() + loop.set_debug(True) clock = LiveClock() logger = Logger(clock) self.log = LoggerAdapter("HistoricInteractiveBrokersClient", logger) @@ -53,201 +73,211 @@ async def _connect(self): # Set Market Data Type await self._client.set_market_data_type(self.market_data_type) - async def request_trade_ticks( + async def request_ticks( self, - contract: IBContract, - date: datetime.date, + tick_type: Literal["TRADES", "BID_ASK"], + start_date_time: datetime.datetime, + end_date_time: datetime.datetime, tz_name: str, - ) -> list[TradeTick]: - data: list[TradeTick] = [] - while True: - start_time = _determine_next_timestamp( - date=date, - timestamps=[d.time for d in data], - tz_name=tz_name, + contracts: list[IBContract] | None = None, + instrument_ids: list[str] | None = None, + use_rth: bool = True, + ) -> list[TradeTick | QuoteTick]: + """ + Return TradeTicks or QuoteTicks for one or more bar specifications for a list of + IBContracts and/or InstrumentId strings. + + Parameters + ---------- + tick_type : Literal["TRADES", "BID_ASK"] + The type of ticks to retrieve. + start_date_time : datetime.date + The start date for the ticks. + end_date_time : datetime.date + The end date for the ticks. + tz_name : str + The timezone to use. + contracts : list[IBContract], default 'None' + IBContracts defining which ticks to retrieve. + instrument_ids : list[str], default 'None' + Instrument IDs (e.g. AAPL.NASDAQ) defining which ticks to retrieve. + use_rth : bool, default 'True' + Whether to use regular trading hours. + + Returns + ------- + list[TradeTick | QuoteTick] + + """ + start_date_time = pd.Timestamp(start_date_time, tz=tz_name).tz_convert("UTC") + end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") + contracts = contracts or [] + instrument_ids = instrument_ids or [] + if not contracts and not instrument_ids: + raise ValueError("Either contracts or instrument_ids must be provided") + + # Convert instrument_id strings to IBContracts + contracts.extend( + [ + instrument_id_to_ib_contract( + InstrumentId.from_str(instrument_id), + ) + for instrument_id in instrument_ids + ], + ) + + data: list[TradeTick | QuoteTick] = [] + for contract in contracts: + while True: + # breakpoint() + print("Requesting ticks from", start_date_time) + ticks = await self._client.get_historical_ticks( + contract=contract, + tick_type=tick_type, + start_date_time=start_date_time, + use_rth=use_rth, + ) + print("Number of ticks retrieved:", len(ticks)) + if not ticks: + continue + data.extend(ticks) + print("Number of ticks in data:", len(data)) + + timestamps = [tick.ts_event for tick in ticks] + min_timestamp: pd.Timestamp = unix_nanos_to_dt(min(timestamps)) + max_timestamp: pd.Timestamp = unix_nanos_to_dt(max(timestamps)) + + # For very liquid products, a 1s period may contain 1000 ticks - the maximum that IB allows + # -- so we need to step the time forward to the next second avoid getting stuck when iterating. + if min_timestamp.floor("S") == max_timestamp.floor("S"): + max_timestamp = max_timestamp.floor("S") + pd.Timedelta(seconds=1) + + if max_timestamp >= end_date_time: + break + + start_date_time = max_timestamp + + return data + + async def request_instruments( + self, + instrument_provider_config: InteractiveBrokersInstrumentProviderConfig | None = None, + contracts: list[IBContract] | None = None, + instrument_ids: list[str] | None = None, + ) -> list[Instrument]: + """ + Return Instruments given either a InteractiveBrokersInstrumentProviderConfig or + a list of IBContracts and/or InstrumentId strings. + + Parameters + ---------- + instrument_provider_config : InteractiveBrokersInstrumentProviderConfig + An instrument provider config defining which instruments to retrieve. + contracts : list[IBContract], default 'None' + IBContracts defining which instruments to retrieve. + instrument_ids : list[str], default 'None' + Instrument IDs (e.g. AAPL.NASDAQ) defining which instruments to retrieve. + + Returns + ------- + list[Instrument] + + """ + if instrument_provider_config and (contracts or instrument_ids): + raise ValueError( + "Either instrument_provider_config or ib_contracts/instrument_ids should be provided, not both.", ) - response = await self._client.get_historical_ticks( - contract=contract, - tick_type="TRADES", - end_date_time=start_time, - use_rth=True, + if instrument_provider_config is None: + instrument_provider_config = InteractiveBrokersInstrumentProviderConfig( + load_contracts=frozenset(contracts) if contracts else None, + load_ids=frozenset(instrument_ids) if instrument_ids else None, ) - print(response) + provider = InteractiveBrokersInstrumentProvider( + self._client, + instrument_provider_config, + Logger(LiveClock()), + ) + await provider.load_all_async() + return list(provider._instruments.values()) - return data + async def request_bars( + self, + bar_specifications: list[str], + end_date_time: datetime.datetime, + duration: str, + contracts: list[IBContract] | None = None, + instrument_ids: list[str] | None = None, + use_rth: bool = True, + ) -> list[Bar]: + """ + Return Bars for one or more bar specifications for a list of IBContracts and/or + InstrumentId strings. - # def request_tick_data( - # self, - # contract: Contract, - # date: datetime.date, - # kind: str, - # tz_name: str, - # ) -> list: - # assert kind in ("TRADES", "BID_ASK") - # data: list = [] - # - # while True: - # start_time = _determine_next_timestamp( - # date=date, - # timestamps=[d.time for d in data], - # tz_name=tz_name, - # ) - # self.log.debug(f"Using start_time: {start_time}") - # - # ticks = _request_historical_ticks( - # client=client, - # contract=contract, - # start_time=start_time.strftime("%Y%m%d %H:%M:%S %Z"), - # what=kind, - # ) - # - # ticks = [t for t in ticks if t not in data] - # - # if not ticks or ticks[-1].time < start_time: - # break - # - # self.log.debug( - # f"Received {len(ticks)} ticks between {ticks[0].time} and {ticks[-1].time}", - # ) - # - # last_timestamp = pd.Timestamp(ticks[-1].time) - # last_date = last_timestamp.astimezone(tz_name).date() - # - # if last_date != date: - # # May contain data from next date, filter this out - # data.extend( - # [ - # tick - # for tick in ticks - # if pd.Timestamp(tick.time).astimezone(tz_name).date() == date - # ], - # ) - # break - # else: - # data.extend(ticks) - # return data - - # def request_bar_data( - # self, - # client: InteractiveBrokersClient, - # contract: Contract, - # date: datetime.date, - # tz_name: str, - # bar_spec: BarSpecification, - # ) -> list: - # data: list = [] - # - # start_time = pd.Timestamp(date).tz_localize(tz_name).tz_convert("UTC") - # end_time = start_time + datetime.timedelta(days=1) - # - # while True: - # self.self.log.debug(f"Using end_time: {end_time}") - # - # # bar_data_list: BarDataList = _request_historical_bars( - # bar_data_list = _request_historical_bars( - # client=client, - # contract=contract, - # end_time=end_time.strftime("%Y%m%d %H:%M:%S %Z"), - # bar_spec=bar_spec, - # ) - # - # bars = [bar for bar in bar_data_list if bar not in data and bar.volume != 0] - # - # if not bars: - # break - # - # self.log.info(f"Received {len(bars)} bars between {bars[0].date} and {bars[-1].date}") - # - # # We're requesting from end_date backwards, set our timestamp to the earliest timestamp - # first_timestamp = pd.Timestamp(bars[0].date).tz_convert(tz_name) - # first_date = first_timestamp.date() - # - # if first_date != date: - # # May contain data from next date, filter this out - # data.extend( - # [ - # bar - # for bar in bars - # if parse_response_datetime(bar.date, tz_name=tz_name).date() == date - # ], - # ) - # break - # else: - # data.extend(bars) - # - # end_time = first_timestamp - # - # return data - # - # def _request_historical_ticks( - # self, - # client: InteractiveBrokersClient, - # contract: Contract, - # start_time: str, - # what="BID_ASK", - # ): - # return client.reqHistoricalTicks( - # contract=contract, - # startDateTime=start_time, - # endDateTime="", - # numberOfTicks=1000, - # whatToShow=what, - # useRth=False, - # ) - # - # def _bar_spec_to_hist_data_request(self, bar_spec: BarSpecification) -> dict[str, str]: - # aggregation = bar_aggregation_to_str(bar_spec.aggregation) - # price_type = price_type_to_str(bar_spec.price_type) - # accepted_aggregations = ("SECOND", "MINUTE", "HOUR") - # - # err = f"Loading historic bars is for intraday data, bar_spec.aggregation should be {accepted_aggregations}" - # assert aggregation in accepted_aggregations, err - # - # price_mapping = {"MID": "MIDPOINT", "LAST": "TRADES"} - # what_to_show = price_mapping.get(price_type, price_type) - # - # size_mapping = {"SECOND": "sec", "MINUTE": "min", "HOUR": "hour"} - # suffix = "" if bar_spec.step == 1 and aggregation != "SECOND" else "s" - # bar_size = size_mapping.get(aggregation, aggregation) - # bar_size_setting = f"{bar_spec.step} {bar_size + suffix}" - # return { - # "durationStr": "1 D", - # "barSizeSetting": bar_size_setting, - # "whatToShow": what_to_show, - # } - # - # def _request_historical_bars( - # self, - # contract: Contract, - # end_time: str, - # bar_spec: BarSpecification, - # ): - # spec = _bar_spec_to_hist_data_request(bar_spec=bar_spec) - # return client._client.reqHistoricalData( - # contract=contract, - # endDateTime=end_time, - # durationStr=spec["durationStr"], - # barSizeSetting=spec["barSizeSetting"], - # whatToShow=spec["whatToShow"], - # useRTH=False, - # formatDate=2, - # ) - - -def _determine_next_timestamp(timestamps: list[pd.Timestamp], date: datetime.date, tz_name: str): - """ - While looping over available data, it is possible for very liquid products that a 1s - period may contain 1000 ticks, at which point we need to step the time forward to - avoid getting stuck when iterating. - """ - if not timestamps: - return pd.Timestamp(date, tz=tz_name).tz_convert("UTC") - unique_values = set(timestamps) - if len(unique_values) == 1: - timestamp = timestamps[-1] - return timestamp + pd.Timedelta(seconds=1) - else: - return timestamps[-1] + Parameters + ---------- + bar_specifications : list[str] + BarSpecifications represented as strings defining which bars to retrieve. + (e.g. '1-HOUR-LAST', '5-MINUTE-MID') + end_date_time : datetime.datetime + The end date time for the bars. + duration : str + The amount of time to go back from the end_date_time. + Valid values follow the pattern of an integer followed by S|D|W|M|Y + for seconds, days, weeks, months, or years respectively. + contracts : list[IBContract], default 'None' + IBContracts defining which bars to retrieve. + instrument_ids : list[str], default 'None' + Instrument IDs (e.g. AAPL.NASDAQ) defining which bars to retrieve. + use_rth : bool, default 'True' + Whether to use regular trading hours. + + Returns + ------- + list[Bar] + + """ + pattern = r"^\d+\s[SDWMY]$" + if not re.match(pattern, duration): + raise ValueError("duration must be in format: 'int S|D|W|M|Y'") + + contracts = contracts or [] + instrument_ids = instrument_ids or [] + if not contracts and not instrument_ids: + raise ValueError("Either contracts or instrument_ids must be provided") + + # Convert instrument_id strings to IBContracts + contracts.extend( + [ + instrument_id_to_ib_contract( + InstrumentId.from_str(instrument_id), + ) + for instrument_id in instrument_ids + ], + ) + + data: list[Bar] = [] + + for contract in contracts: + # Request instruments if not in cache because get_historical_bars + # requires instruments for the make_price method to construct bars + if not self._client._cache.instrument(ib_contract_to_instrument_id(contract)): + await self.request_instruments(contracts=[contract]) + + for bar_spec in bar_specifications: + bars = await self._client.get_historical_bars( + BarType( + ib_contract_to_instrument_id(contract), + BarSpecification.from_str(bar_spec), + AggregationSource.EXTERNAL, + ), + contract, + use_rth, + end_date_time.strftime("%Y%m%d-%H:%M:%S"), + duration, + ) + data.extend(bars) + + return data async def main(): @@ -257,14 +287,40 @@ async def main(): exchange="SMART", primaryExchange="NASDAQ", ) - client = HistoricInteractiveBrokersClient(client_id=5) + instrument_id = "TSLA.NASDAQ" + + client = HistoricInteractiveBrokersClient(port=4002, client_id=5) await client._connect() await asyncio.sleep(2) - await client.request_trade_ticks( - contract=contract, - date=datetime.date(2023, 10, 25), + instruments = await client.request_instruments( + contracts=[contract], + # instrument_ids=[instrument_id], + ) + # bars = await client.request_bars( + # bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], + # end_date_time=datetime.datetime.now(), + # duration="1 D", + # contracts=[contract], + # instrument_ids=[instrument_id], + # ) + + # Configure global logging level (optional, can be more restrictive) + # logging.basicConfig(level=logging.DEBUG) + + # Set the logging level for a specific library + # library_logger = logging.getLogger('ibapi.connection') + # library_logger.setLevel(logging.DEBUG) + + ticks = await client.request_ticks( + "TRADES", + start_date_time=datetime.date.today() - datetime.timedelta(days=4), + end_date_time=datetime.date.today() - datetime.timedelta(days=3), tz_name="America/New_York", + contracts=[contract], + # instrument_ids=[instrument_id], ) + catalog = ParquetDataCatalog("./catalog") + catalog.write_data(instruments + bars + ticks) if __name__ == "__main__": diff --git a/nautilus_trader/adapters/interactive_brokers/historic/core.py b/nautilus_trader/adapters/interactive_brokers/historic/core.py deleted file mode 100644 index 7347d7825d9c..000000000000 --- a/nautilus_trader/adapters/interactive_brokers/historic/core.py +++ /dev/null @@ -1,98 +0,0 @@ -# ------------------------------------------------------------------------------------------------- -# Copyright (C) 2015-2023 Nautech Systems Pty Ltd. All rights reserved. -# https://nautechsystems.io -# -# Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ------------------------------------------------------------------------------------------------- -import datetime -import logging -from typing import Literal - -import pandas as pd -from ibapi.contract import Contract - -from nautilus_trader.adapters.interactive_brokers.historic.client import HistoricInteractiveBrokersClient -from nautilus_trader.model.identifiers import InstrumentId -from nautilus_trader.persistence.catalog.parquet import ParquetDataCatalog - - -logger = logging.getLogger(__name__) - - -def generate_filename( - catalog: ParquetDataCatalog, - instrument_id: InstrumentId, - kind: Literal["BID_ASK", "TRADES"], - date: datetime.date, -) -> str: - fn_kind = {"BID_ASK": "quote_tick", "TRADES": "trade_tick", "BARS": "bars"}[kind.split("-")[0]] - return f"{catalog.path}/data/{fn_kind}.parquet/instrument_id={instrument_id.value}/{date:%Y%m%d}-0.parquet" - - -async def back_fill_catalog( - catalog: ParquetDataCatalog, - contracts: list[Contract], - start_date: datetime.date, - end_date: datetime.date, - tz_name: str, - kinds=("BID_ASK", "TRADES"), -): - """ - Backfill the data catalog with market data from Interactive Brokers. - - Parameters - ---------- - catalog : ParquetDataCatalog - The data catalog to write the data to. - contracts : list[Contract] - The list of IB Contracts to collect data for. - start_date : datetime.date - The start_date for the backfill. - end_date : datetime.date - The end_date for the backfill. - tz_name : str - The timezone of the contracts - kinds : tuple[str] (default: ('BID_ASK', 'TRADES') - The kinds to query data for, can be any of: - - BID_ASK - - TRADES - - A bar specification, i.e. BARS-1-MINUTE-LAST or BARS-5-SECOND-MID - - """ - client = HistoricInteractiveBrokersClient() - for date in pd.bdate_range(start_date, end_date, tz=tz_name): - for contract in contracts: - # [details] = client.reqContractDetails(contract=contract) - # instrument = parse_instrument(contract_details=details) - - # Check if this instrument exists in the catalog, if not, write it. - # if not catalog.instruments(instrument_ids=[instrument.id.value]): - # catalog.write_data([instrument]) - # - # for kind in kinds: - # fn = generate_filename(catalog, instrument_id=instrument.id, kind=kind, date=date) - # if catalog.fs.exists(fn): - # logger.info( - # f"file for {instrument.id.value} {kind} {date:%Y-%m-%d} exists, skipping", - # ) - # continue - # logger.info(f"Fetching {instrument.id.value} {kind} for {date:%Y-%m-%d}") - # - data = await client.request_trade_ticks( - contract=contract, # typing: ignore - date=date.date(), - tz_name=tz_name, - ) - if data is None: - continue - - template = f"{date:%Y%m%d}" + "-{i}.parquet" - catalog.write_data(data, basename_template=template) diff --git a/nautilus_trader/adapters/interactive_brokers/providers.py b/nautilus_trader/adapters/interactive_brokers/providers.py index f60b6a8c4cc4..36d3b929a8a9 100644 --- a/nautilus_trader/adapters/interactive_brokers/providers.py +++ b/nautilus_trader/adapters/interactive_brokers/providers.py @@ -279,6 +279,7 @@ async def load_async( continue self._log.info(f"Adding {instrument=} from InteractiveBrokersInstrumentProvider") self.add(instrument) + self._client._cache.add_instrument(instrument) self.contract_details[instrument.id.value] = details self.contract_id_to_instrument_id[details.contract.conId] = instrument.id From 36965736473b736f609a4f490114a1530318318c Mon Sep 17 00:00:00 2001 From: Ben Singleton Date: Mon, 6 Nov 2023 21:20:43 -0500 Subject: [PATCH 05/12] WIP --- .../interactive_brokers/historic/client.py | 241 ++++++++++-------- 1 file changed, 128 insertions(+), 113 deletions(-) diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index db2ddbb90ec2..b5d44fbbd9bf 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -65,7 +65,7 @@ def __init__( client_id=client_id, ) - async def _connect(self): + async def _connect(self) -> None: # Connect client await self._client.is_running_async() self._client.registered_nautilus_clients.add(1) @@ -73,92 +73,6 @@ async def _connect(self): # Set Market Data Type await self._client.set_market_data_type(self.market_data_type) - async def request_ticks( - self, - tick_type: Literal["TRADES", "BID_ASK"], - start_date_time: datetime.datetime, - end_date_time: datetime.datetime, - tz_name: str, - contracts: list[IBContract] | None = None, - instrument_ids: list[str] | None = None, - use_rth: bool = True, - ) -> list[TradeTick | QuoteTick]: - """ - Return TradeTicks or QuoteTicks for one or more bar specifications for a list of - IBContracts and/or InstrumentId strings. - - Parameters - ---------- - tick_type : Literal["TRADES", "BID_ASK"] - The type of ticks to retrieve. - start_date_time : datetime.date - The start date for the ticks. - end_date_time : datetime.date - The end date for the ticks. - tz_name : str - The timezone to use. - contracts : list[IBContract], default 'None' - IBContracts defining which ticks to retrieve. - instrument_ids : list[str], default 'None' - Instrument IDs (e.g. AAPL.NASDAQ) defining which ticks to retrieve. - use_rth : bool, default 'True' - Whether to use regular trading hours. - - Returns - ------- - list[TradeTick | QuoteTick] - - """ - start_date_time = pd.Timestamp(start_date_time, tz=tz_name).tz_convert("UTC") - end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") - contracts = contracts or [] - instrument_ids = instrument_ids or [] - if not contracts and not instrument_ids: - raise ValueError("Either contracts or instrument_ids must be provided") - - # Convert instrument_id strings to IBContracts - contracts.extend( - [ - instrument_id_to_ib_contract( - InstrumentId.from_str(instrument_id), - ) - for instrument_id in instrument_ids - ], - ) - - data: list[TradeTick | QuoteTick] = [] - for contract in contracts: - while True: - # breakpoint() - print("Requesting ticks from", start_date_time) - ticks = await self._client.get_historical_ticks( - contract=contract, - tick_type=tick_type, - start_date_time=start_date_time, - use_rth=use_rth, - ) - print("Number of ticks retrieved:", len(ticks)) - if not ticks: - continue - data.extend(ticks) - print("Number of ticks in data:", len(data)) - - timestamps = [tick.ts_event for tick in ticks] - min_timestamp: pd.Timestamp = unix_nanos_to_dt(min(timestamps)) - max_timestamp: pd.Timestamp = unix_nanos_to_dt(max(timestamps)) - - # For very liquid products, a 1s period may contain 1000 ticks - the maximum that IB allows - # -- so we need to step the time forward to the next second avoid getting stuck when iterating. - if min_timestamp.floor("S") == max_timestamp.floor("S"): - max_timestamp = max_timestamp.floor("S") + pd.Timedelta(seconds=1) - - if max_timestamp >= end_date_time: - break - - start_date_time = max_timestamp - - return data - async def request_instruments( self, instrument_provider_config: InteractiveBrokersInstrumentProviderConfig | None = None, @@ -261,15 +175,18 @@ async def request_bars( # Request instruments if not in cache because get_historical_bars # requires instruments for the make_price method to construct bars if not self._client._cache.instrument(ib_contract_to_instrument_id(contract)): + self.log.info(f"Fetching Instrument for: {ib_contract_to_instrument_id(contract)}") await self.request_instruments(contracts=[contract]) for bar_spec in bar_specifications: + bar_type = BarType( + ib_contract_to_instrument_id(contract), + BarSpecification.from_str(bar_spec), + AggregationSource.EXTERNAL, + ) + self.log.info(f"Requesting bars: {bar_type}") bars = await self._client.get_historical_bars( - BarType( - ib_contract_to_instrument_id(contract), - BarSpecification.from_str(bar_spec), - AggregationSource.EXTERNAL, - ), + bar_type, contract, use_rth, end_date_time.strftime("%Y%m%d-%H:%M:%S"), @@ -279,6 +196,99 @@ async def request_bars( return data + async def request_ticks( + self, + tick_type: Literal["TRADES", "BID_ASK"], + start_date_time: datetime.datetime, + end_date_time: datetime.datetime, + tz_name: str, + contracts: list[IBContract] | None = None, + instrument_ids: list[str] | None = None, + use_rth: bool = True, + ) -> list[TradeTick | QuoteTick]: + """ + Return TradeTicks or QuoteTicks for one or more bar specifications for a list of + IBContracts and/or InstrumentId strings. + + Parameters + ---------- + tick_type : Literal["TRADES", "BID_ASK"] + The type of ticks to retrieve. + start_date_time : datetime.date + The start date for the ticks. + end_date_time : datetime.date + The end date for the ticks. + tz_name : str + The timezone to use. + contracts : list[IBContract], default 'None' + IBContracts defining which ticks to retrieve. + instrument_ids : list[str], default 'None' + Instrument IDs (e.g. AAPL.NASDAQ) defining which ticks to retrieve. + use_rth : bool, default 'True' + Whether to use regular trading hours. + + Returns + ------- + list[TradeTick | QuoteTick] + + """ + start_date_time = pd.Timestamp(start_date_time, tz=tz_name).tz_convert("UTC") + end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") + contracts = contracts or [] + instrument_ids = instrument_ids or [] + if not contracts and not instrument_ids: + raise ValueError("Either contracts or instrument_ids must be provided") + + # Convert instrument_id strings to IBContracts + contracts.extend( + [ + instrument_id_to_ib_contract( + InstrumentId.from_str(instrument_id), + ) + for instrument_id in instrument_ids + ], + ) + + data: list[TradeTick | QuoteTick] = [] + for contract in contracts: + self.log.info(f"Requesting ticks for: {ib_contract_to_instrument_id(contract)}") + while True: + self.log.info(f"Requesting ticks from: {start_date_time}") + ticks = await self._client.get_historical_ticks( + contract=contract, + tick_type=tick_type, + start_date_time=start_date_time, + use_rth=use_rth, + ) + self.log.info(f"Number of ticks retrieved: {len(ticks)}") + if not ticks: + continue + + self.log.info(f"Number of ticks in data: {len(data)}") + + timestamps = [tick.ts_event for tick in ticks] + min_timestamp: pd.Timestamp = unix_nanos_to_dt(min(timestamps)) + max_timestamp: pd.Timestamp = unix_nanos_to_dt(max(timestamps)) + + # For very liquid products, a 1s period may contain 1000 ticks - the maximum that IB allows + # -- so we need to step the time forward to the next second avoid getting stuck when iterating. + if min_timestamp.floor("S") == max_timestamp.floor("S"): + max_timestamp = max_timestamp.floor("S") + pd.Timedelta(seconds=1) + + if max_timestamp >= end_date_time: + # Filter out any ticks after the end_date_time + ticks = [ + tick for tick in ticks if unix_nanos_to_dt(tick.ts_event) <= end_date_time + ] + data.extend(ticks) + break + + data.extend(ticks) + + start_date_time = max_timestamp + + return data + async def main(): contract = IBContract( @@ -292,35 +302,40 @@ async def main(): client = HistoricInteractiveBrokersClient(port=4002, client_id=5) await client._connect() await asyncio.sleep(2) + instruments = await client.request_instruments( contracts=[contract], - # instrument_ids=[instrument_id], + instrument_ids=[instrument_id], + ) + + bars = await client.request_bars( + bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], + end_date_time=datetime.datetime(2023, 11, 6, 16, 0), + duration="1 D", + contracts=[contract], + instrument_ids=[instrument_id], ) - # bars = await client.request_bars( - # bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], - # end_date_time=datetime.datetime.now(), - # duration="1 D", - # contracts=[contract], - # instrument_ids=[instrument_id], - # ) - - # Configure global logging level (optional, can be more restrictive) - # logging.basicConfig(level=logging.DEBUG) - - # Set the logging level for a specific library - # library_logger = logging.getLogger('ibapi.connection') - # library_logger.setLevel(logging.DEBUG) - - ticks = await client.request_ticks( + + trade_ticks = await client.request_ticks( "TRADES", - start_date_time=datetime.date.today() - datetime.timedelta(days=4), - end_date_time=datetime.date.today() - datetime.timedelta(days=3), + start_date_time=datetime.datetime(2023, 11, 6, 10, 0), + end_date_time=datetime.datetime(2023, 11, 6, 10, 1), tz_name="America/New_York", contracts=[contract], - # instrument_ids=[instrument_id], + instrument_ids=[instrument_id], ) + + quote_ticks = await client.request_ticks( + "BID_ASK", + start_date_time=datetime.datetime(2023, 11, 6, 10, 0), + end_date_time=datetime.datetime(2023, 11, 6, 10, 1), + tz_name="America/New_York", + contracts=[contract], + instrument_ids=[instrument_id], + ) + catalog = ParquetDataCatalog("./catalog") - catalog.write_data(instruments + bars + ticks) + catalog.write_data(instruments + bars + trade_ticks + quote_ticks) if __name__ == "__main__": From 08f0638541bd63061a85ad77063b40abb278b5ec Mon Sep 17 00:00:00 2001 From: Ben Singleton Date: Mon, 6 Nov 2023 21:24:48 -0500 Subject: [PATCH 06/12] fix example --- .../interactive_brokers/historic_download.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/examples/live/interactive_brokers/historic_download.py b/examples/live/interactive_brokers/historic_download.py index fd350996e969..ccc550881512 100644 --- a/examples/live/interactive_brokers/historic_download.py +++ b/examples/live/interactive_brokers/historic_download.py @@ -22,7 +22,6 @@ async def main(): - # Define one or many IBContracts or instrument_ids to fetch data for contract = IBContract( secType="STK", symbol="AAPL", @@ -42,24 +41,32 @@ async def main(): bars = await client.request_bars( bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], - end_date_time=datetime.datetime.now(), + end_date_time=datetime.datetime(2023, 11, 6, 16, 0), duration="1 D", contracts=[contract], instrument_ids=[instrument_id], ) - ticks = await client.request_ticks( + trade_ticks = await client.request_ticks( "TRADES", - start_date_time=datetime.date.today() - datetime.timedelta(days=4), - end_date_time=datetime.date.today() - datetime.timedelta(days=3), + start_date_time=datetime.datetime(2023, 11, 6, 10, 0), + end_date_time=datetime.datetime(2023, 11, 6, 10, 1), + tz_name="America/New_York", + contracts=[contract], + instrument_ids=[instrument_id], + ) + + quote_ticks = await client.request_ticks( + "BID_ASK", + start_date_time=datetime.datetime(2023, 11, 6, 10, 0), + end_date_time=datetime.datetime(2023, 11, 6, 10, 1), tz_name="America/New_York", contracts=[contract], instrument_ids=[instrument_id], ) catalog = ParquetDataCatalog("./catalog") - # You can write any Nautilus Data to the catalog - catalog.write_data(instruments + bars + ticks) + catalog.write_data(instruments + bars + trade_ticks + quote_ticks) if __name__ == "__main__": From f558ed762aacf47bb4dd78443e16127c8e410b8c Mon Sep 17 00:00:00 2001 From: Ben Singleton Date: Tue, 7 Nov 2023 07:40:24 -0500 Subject: [PATCH 07/12] WIP refactoring --- .../interactive_brokers/historic/client.py | 59 +++++++++++-------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index b5d44fbbd9bf..a678fc5dc9b6 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -169,15 +169,12 @@ async def request_bars( ], ) + # Ensure instruments are fetched and cached + await self.fetch_instruments_if_not_cached(contracts) + data: list[Bar] = [] for contract in contracts: - # Request instruments if not in cache because get_historical_bars - # requires instruments for the make_price method to construct bars - if not self._client._cache.instrument(ib_contract_to_instrument_id(contract)): - self.log.info(f"Fetching Instrument for: {ib_contract_to_instrument_id(contract)}") - await self.request_instruments(contracts=[contract]) - for bar_spec in bar_specifications: bar_type = BarType( ib_contract_to_instrument_id(contract), @@ -249,6 +246,9 @@ async def request_ticks( ], ) + # Ensure instruments are fetched and cached + await self.fetch_instruments_if_not_cached(contracts) + data: list[TradeTick | QuoteTick] = [] for contract in contracts: self.log.info(f"Requesting ticks for: {ib_contract_to_instrument_id(contract)}") @@ -266,29 +266,42 @@ async def request_ticks( self.log.info(f"Number of ticks in data: {len(data)}") - timestamps = [tick.ts_event for tick in ticks] - min_timestamp: pd.Timestamp = unix_nanos_to_dt(min(timestamps)) - max_timestamp: pd.Timestamp = unix_nanos_to_dt(max(timestamps)) - - # For very liquid products, a 1s period may contain 1000 ticks - the maximum that IB allows - # -- so we need to step the time forward to the next second avoid getting stuck when iterating. - if min_timestamp.floor("S") == max_timestamp.floor("S"): - max_timestamp = max_timestamp.floor("S") + pd.Timedelta(seconds=1) - - if max_timestamp >= end_date_time: - # Filter out any ticks after the end_date_time - ticks = [ - tick for tick in ticks if unix_nanos_to_dt(tick.ts_event) <= end_date_time - ] - data.extend(ticks) + start_date_time, should_continue = self.handle_timestamp_iteration( + ticks, + start_date_time, + end_date_time, + ) + + if not should_continue: break data.extend(ticks) - start_date_time = max_timestamp - return data + def handle_timestamp_iteration(self, ticks, start_date_time, end_date_time): + if not ticks: + return start_date_time, False + + timestamps = [unix_nanos_to_dt(tick.ts_event) for tick in ticks] + min_timestamp = min(timestamps) + max_timestamp = max(timestamps) + + if min_timestamp.floor("S") == max_timestamp.floor("S"): + max_timestamp = max_timestamp.floor("S") + pd.Timedelta(seconds=1) + + if max_timestamp >= end_date_time: + return end_date_time, False + + return max_timestamp, True + + async def fetch_instruments_if_not_cached(self, contracts: list[IBContract]) -> None: + for contract in contracts: + instrument_id = ib_contract_to_instrument_id(contract) + if not self._client._cache.instrument(instrument_id): + self.log.info(f"Fetching Instrument for: {instrument_id}") + await self.request_instruments(contracts=[contract]) + async def main(): contract = IBContract( From d05b8d09f9a2c1c278fdfe8ece20bbfe912c9162 Mon Sep 17 00:00:00 2001 From: Ben Singleton Date: Tue, 7 Nov 2023 22:07:55 -0500 Subject: [PATCH 08/12] fix docstring --- .../interactive_brokers/client/client.py | 34 +++++---- .../adapters/interactive_brokers/data.py | 4 +- .../interactive_brokers/historic/client.py | 76 ++++++++++++++----- 3 files changed, 79 insertions(+), 35 deletions(-) diff --git a/nautilus_trader/adapters/interactive_brokers/client/client.py b/nautilus_trader/adapters/interactive_brokers/client/client.py index b2dc70ec825c..0307749b344b 100644 --- a/nautilus_trader/adapters/interactive_brokers/client/client.py +++ b/nautilus_trader/adapters/interactive_brokers/client/client.py @@ -1294,9 +1294,15 @@ async def get_historical_ticks( self, contract: IBContract, tick_type: str, - start_date_time: pd.Timestamp, - use_rth: bool, + start_date_time: pd.Timestamp = "", + end_date_time: pd.Timestamp = "", + use_rth: bool = True, ): + if isinstance(start_date_time, pd.Timestamp): + start_date_time = start_date_time.strftime("%Y%m%d %H:%M:%S %Z") + if isinstance(end_date_time, pd.Timestamp): + end_date_time = end_date_time.strftime("%Y%m%d %H:%M:%S %Z") + name = (str(ib_contract_to_instrument_id(contract)), tick_type) if not (request := self.requests.get(name=name)): req_id = self._next_req_id() @@ -1307,8 +1313,8 @@ async def get_historical_ticks( self._client.reqHistoricalTicks, reqId=req_id, contract=contract, - startDateTime=start_date_time.strftime("%Y%m%d %H:%M:%S %Z"), - endDateTime="", + startDateTime=start_date_time, + endDateTime=end_date_time, numberOfTicks=1000, whatToShow=tick_type, useRth=use_rth, @@ -1324,7 +1330,8 @@ async def get_historical_ticks( def historicalTicksBidAsk(self, req_id: int, ticks: list, done: bool): self.logAnswer(current_fn_name(), vars()) - + if not done: + return if request := self.requests.get(req_id=req_id): instrument_id = InstrumentId.from_str(request.name[0]) instrument = self._cache.instrument(instrument_id) @@ -1335,8 +1342,8 @@ def historicalTicksBidAsk(self, req_id: int, ticks: list, done: bool): instrument_id=instrument_id, bid_price=instrument.make_price(tick.priceBid), ask_price=instrument.make_price(tick.priceAsk), - bid_size=instrument.make_price(tick.sizeBid), - ask_size=instrument.make_price(tick.sizeAsk), + bid_size=instrument.make_qty(tick.sizeBid), + ask_size=instrument.make_qty(tick.sizeAsk), ts_event=ts_event, ts_init=ts_event, ) @@ -1346,22 +1353,21 @@ def historicalTicksBidAsk(self, req_id: int, ticks: list, done: bool): def historicalTicksLast(self, req_id: int, ticks: list, done: bool): # : Override the EWrapper self.logAnswer(current_fn_name(), vars()) + if not done: + return self._process_trade_ticks(req_id, ticks) def historicalTicks(self, req_id: int, ticks: list, done: bool): self.logAnswer(current_fn_name(), vars()) + if not done: + return self._process_trade_ticks(req_id, ticks) def _process_trade_ticks(self, req_id: int, ticks: list): - # inspect self.requests and compare to req_id. sometimes - # req_id is one lower than self.requests. this is a bug. - # if request := self.requests.get(req_id=req_id): - if len(ticks) == 0: - self._end_request(req_id) - if request := self.requests.get(req_id=req_id + 1): - # if request := self.requests[0]: + if request := self.requests.get(req_id=req_id): instrument_id = InstrumentId.from_str(request.name[0]) instrument = self._cache.instrument(instrument_id) + for tick in ticks: ts_event = pd.Timestamp.fromtimestamp(tick.time, tz=pytz.utc).value trade_tick = TradeTick( diff --git a/nautilus_trader/adapters/interactive_brokers/data.py b/nautilus_trader/adapters/interactive_brokers/data.py index afe66ae0c816..4fac7a13133a 100644 --- a/nautilus_trader/adapters/interactive_brokers/data.py +++ b/nautilus_trader/adapters/interactive_brokers/data.py @@ -376,8 +376,8 @@ async def _handle_ticks_request( ticks_part = await self._client.get_historical_ticks( contract, tick_type, - end, - self._use_regular_trading_hours, + end_date_time=end, + use_rth=self._use_regular_trading_hours, ) if not ticks_part: break diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index a678fc5dc9b6..e15ec4bbb748 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -16,6 +16,7 @@ from nautilus_trader.common.clock import LiveClock from nautilus_trader.common.logging import Logger from nautilus_trader.common.logging import LoggerAdapter +from nautilus_trader.core.datetime import dt_to_unix_nanos from nautilus_trader.core.datetime import unix_nanos_to_dt from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.data import TradeTick @@ -118,6 +119,7 @@ async def request_bars( self, bar_specifications: list[str], end_date_time: datetime.datetime, + tz_name: str, duration: str, contracts: list[IBContract] | None = None, instrument_ids: list[str] | None = None, @@ -134,6 +136,8 @@ async def request_bars( (e.g. '1-HOUR-LAST', '5-MINUTE-MID') end_date_time : datetime.datetime The end date time for the bars. + tz_name : str + The timezone to use. (e.g. 'America/New_York', 'UTC') duration : str The amount of time to go back from the end_date_time. Valid values follow the pattern of an integer followed by S|D|W|M|Y @@ -159,6 +163,8 @@ async def request_bars( if not contracts and not instrument_ids: raise ValueError("Either contracts or instrument_ids must be provided") + end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") + # Convert instrument_id strings to IBContracts contracts.extend( [ @@ -176,12 +182,13 @@ async def request_bars( for contract in contracts: for bar_spec in bar_specifications: + instrument_id = ib_contract_to_instrument_id(contract) bar_type = BarType( - ib_contract_to_instrument_id(contract), + instrument_id, BarSpecification.from_str(bar_spec), AggregationSource.EXTERNAL, ) - self.log.info(f"Requesting bars: {bar_type}") + self.log.info(f"{instrument_id}: Requesting bars: {bar_type}") bars = await self._client.get_historical_bars( bar_type, contract, @@ -189,9 +196,16 @@ async def request_bars( end_date_time.strftime("%Y%m%d-%H:%M:%S"), duration, ) - data.extend(bars) + if bars: + self.log.info( + f"{instrument_id}: Number of bars retrieved in batch: {len(bars)}", + ) + data.extend(bars) + self.log.info(f"Total number of bars in data: {len(data)}") + else: + self.log.info(f"{instrument_id}: No bars retrieved for: {bar_type}") - return data + return sorted(data, key=lambda x: x.ts_init) async def request_ticks( self, @@ -216,7 +230,7 @@ async def request_ticks( end_date_time : datetime.date The end date for the ticks. tz_name : str - The timezone to use. + The timezone to use. (e.g. 'America/New_York', 'UTC') contracts : list[IBContract], default 'None' IBContracts defining which ticks to retrieve. instrument_ids : list[str], default 'None' @@ -229,6 +243,10 @@ async def request_ticks( list[TradeTick | QuoteTick] """ + if tick_type not in ["TRADES", "BID_ASK"]: + raise ValueError( + "tick_type must be one of: 'TRADES' (for TradeTicks), 'BID_ASK' (for QuoteTicks)", + ) start_date_time = pd.Timestamp(start_date_time, tz=tz_name).tz_convert("UTC") end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") contracts = contracts or [] @@ -251,37 +269,53 @@ async def request_ticks( data: list[TradeTick | QuoteTick] = [] for contract in contracts: - self.log.info(f"Requesting ticks for: {ib_contract_to_instrument_id(contract)}") + instrument_id = ib_contract_to_instrument_id(contract) + current_start_date_time = start_date_time while True: - self.log.info(f"Requesting ticks from: {start_date_time}") + self.log.info( + f"{instrument_id}: Requesting {tick_type} ticks from {current_start_date_time}", + ) + ticks = await self._client.get_historical_ticks( contract=contract, tick_type=tick_type, - start_date_time=start_date_time, + start_date_time=current_start_date_time, use_rth=use_rth, ) - self.log.info(f"Number of ticks retrieved: {len(ticks)}") + if not ticks: - continue + break - self.log.info(f"Number of ticks in data: {len(data)}") + self.log.info( + f"{instrument_id}: Number of {tick_type} ticks retrieved in batch: {len(ticks)}", + ) - start_date_time, should_continue = self.handle_timestamp_iteration( + current_start_date_time, should_continue = self.handle_timestamp_iteration( ticks, - start_date_time, end_date_time, ) if not should_continue: + # Filter out ticks that are after the end_date_time + ticks = [ + tick for tick in ticks if tick.ts_event <= dt_to_unix_nanos(end_date_time) + ] + data.extend(ticks) + self.log.info(f"Total number of {tick_type} ticks in data: {len(data)}") break data.extend(ticks) + self.log.info(f"Total number of {tick_type} ticks in data: {len(data)}") - return data + return sorted(data, key=lambda x: x.ts_init) - def handle_timestamp_iteration(self, ticks, start_date_time, end_date_time): + def handle_timestamp_iteration( + self, + ticks: list[TradeTick | QuoteTick], + end_date_time: pd.Timestamp, + ) -> tuple[pd.Timestamp | None, bool]: if not ticks: - return start_date_time, False + return None, False timestamps = [unix_nanos_to_dt(tick.ts_event) for tick in ticks] min_timestamp = min(timestamps) @@ -291,7 +325,7 @@ def handle_timestamp_iteration(self, ticks, start_date_time, end_date_time): max_timestamp = max_timestamp.floor("S") + pd.Timedelta(seconds=1) if max_timestamp >= end_date_time: - return end_date_time, False + return None, False return max_timestamp, True @@ -323,7 +357,8 @@ async def main(): bars = await client.request_bars( bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], - end_date_time=datetime.datetime(2023, 11, 6, 16, 0), + end_date_time=datetime.datetime(2023, 11, 6, 16, 30), + tz_name="America/New_York", duration="1 D", contracts=[contract], instrument_ids=[instrument_id], @@ -348,7 +383,10 @@ async def main(): ) catalog = ParquetDataCatalog("./catalog") - catalog.write_data(instruments + bars + trade_ticks + quote_ticks) + catalog.write_data(instruments) + catalog.write_data(bars) + catalog.write_data(trade_ticks) + catalog.write_data(quote_ticks) if __name__ == "__main__": From d1a004a296165a0533469232680515c6e16181a9 Mon Sep 17 00:00:00 2001 From: Ben Singleton Date: Tue, 7 Nov 2023 22:14:40 -0500 Subject: [PATCH 09/12] update example and fix type hints --- .../live/interactive_brokers/historic_download.py | 8 ++++++-- .../adapters/interactive_brokers/client/client.py | 13 +++++++++---- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/examples/live/interactive_brokers/historic_download.py b/examples/live/interactive_brokers/historic_download.py index ccc550881512..afcd2980ad0f 100644 --- a/examples/live/interactive_brokers/historic_download.py +++ b/examples/live/interactive_brokers/historic_download.py @@ -41,7 +41,8 @@ async def main(): bars = await client.request_bars( bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], - end_date_time=datetime.datetime(2023, 11, 6, 16, 0), + end_date_time=datetime.datetime(2023, 11, 6, 16, 30), + tz_name="America/New_York", duration="1 D", contracts=[contract], instrument_ids=[instrument_id], @@ -66,7 +67,10 @@ async def main(): ) catalog = ParquetDataCatalog("./catalog") - catalog.write_data(instruments + bars + trade_ticks + quote_ticks) + catalog.write_data(instruments) + catalog.write_data(bars) + catalog.write_data(trade_ticks) + catalog.write_data(quote_ticks) if __name__ == "__main__": diff --git a/nautilus_trader/adapters/interactive_brokers/client/client.py b/nautilus_trader/adapters/interactive_brokers/client/client.py index 0307749b344b..5b45a9051c13 100644 --- a/nautilus_trader/adapters/interactive_brokers/client/client.py +++ b/nautilus_trader/adapters/interactive_brokers/client/client.py @@ -1294,8 +1294,8 @@ async def get_historical_ticks( self, contract: IBContract, tick_type: str, - start_date_time: pd.Timestamp = "", - end_date_time: pd.Timestamp = "", + start_date_time: pd.Timestamp | str = "", + end_date_time: pd.Timestamp | str = "", use_rth: bool = True, ): if isinstance(start_date_time, pd.Timestamp): @@ -1328,7 +1328,12 @@ async def get_historical_ticks( else: self._log.info(f"Request already exist for {request}") - def historicalTicksBidAsk(self, req_id: int, ticks: list, done: bool): + def historicalTicksBidAsk( + self, + req_id: int, + ticks: list, + done: bool, + ): # : Override the EWrapper self.logAnswer(current_fn_name(), vars()) if not done: return @@ -1357,7 +1362,7 @@ def historicalTicksLast(self, req_id: int, ticks: list, done: bool): # : Overri return self._process_trade_ticks(req_id, ticks) - def historicalTicks(self, req_id: int, ticks: list, done: bool): + def historicalTicks(self, req_id: int, ticks: list, done: bool): # : Override the EWrapper self.logAnswer(current_fn_name(), vars()) if not done: return From 09a22bd71159e8b573967a3f1282fd32bafa3d79 Mon Sep 17 00:00:00 2001 From: Ben Singleton Date: Wed, 8 Nov 2023 23:05:31 -0500 Subject: [PATCH 10/12] docstrings and duration logic --- .../interactive_brokers/client/client.py | 4 +- .../interactive_brokers/historic/client.py | 182 +++++++++++++++--- 2 files changed, 156 insertions(+), 30 deletions(-) diff --git a/nautilus_trader/adapters/interactive_brokers/client/client.py b/nautilus_trader/adapters/interactive_brokers/client/client.py index 5b45a9051c13..2d9a944b0200 100644 --- a/nautilus_trader/adapters/interactive_brokers/client/client.py +++ b/nautilus_trader/adapters/interactive_brokers/client/client.py @@ -1114,7 +1114,7 @@ async def get_historical_bars( ) self._log.debug(f"reqHistoricalData: {request.req_id=}, {contract=}") request.handle() - return await self._await_request(request, 20) + return await self._await_request(request, 60) else: self._log.info(f"Request already exist for {request}") @@ -1324,7 +1324,7 @@ async def get_historical_ticks( cancel=functools.partial(self._client.cancelHistoricalData, reqId=req_id), ) request.handle() - return await self._await_request(request, 20) + return await self._await_request(request, 60) else: self._log.info(f"Request already exist for {request}") diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index e15ec4bbb748..53d6b496b70e 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -120,7 +120,8 @@ async def request_bars( bar_specifications: list[str], end_date_time: datetime.datetime, tz_name: str, - duration: str, + start_date_time: datetime.datetime | None = None, + duration: str | None = None, contracts: list[IBContract] | None = None, instrument_ids: list[str] | None = None, use_rth: bool = True, @@ -134,6 +135,8 @@ async def request_bars( bar_specifications : list[str] BarSpecifications represented as strings defining which bars to retrieve. (e.g. '1-HOUR-LAST', '5-MINUTE-MID') + start_date_time : datetime.datetime + The start date time for the bars. If provided, duration is derived. end_date_time : datetime.datetime The end date time for the bars. tz_name : str @@ -154,17 +157,25 @@ async def request_bars( list[Bar] """ - pattern = r"^\d+\s[SDWMY]$" - if not re.match(pattern, duration): - raise ValueError("duration must be in format: 'int S|D|W|M|Y'") + if start_date_time and duration: + raise ValueError("Either start_date_time or duration should be provided, not both.") + + # Derive duration if start_date_time and end_date_time are provided + if start_date_time and end_date_time: + start_date_time = pd.Timestamp(start_date_time, tz=tz_name).tz_convert("UTC") + end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") + if start_date_time >= end_date_time: + raise ValueError("Start date must be before end date.") + else: + pattern = r"^\d+\s[SDWMY]$" + if not re.match(pattern, duration): + raise ValueError("duration must be in format: 'int S|D|W|M|Y'") contracts = contracts or [] instrument_ids = instrument_ids or [] if not contracts and not instrument_ids: raise ValueError("Either contracts or instrument_ids must be provided") - end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") - # Convert instrument_id strings to IBContracts contracts.extend( [ @@ -176,7 +187,7 @@ async def request_bars( ) # Ensure instruments are fetched and cached - await self.fetch_instruments_if_not_cached(contracts) + await self._fetch_instruments_if_not_cached(contracts) data: list[Bar] = [] @@ -188,22 +199,32 @@ async def request_bars( BarSpecification.from_str(bar_spec), AggregationSource.EXTERNAL, ) - self.log.info(f"{instrument_id}: Requesting bars: {bar_type}") - bars = await self._client.get_historical_bars( - bar_type, - contract, - use_rth, - end_date_time.strftime("%Y%m%d-%H:%M:%S"), - duration, - ) - if bars: + for duration_segment in self._calculate_duration_segments( + start_date_time, + end_date_time, + ): + segment_end_date_time = duration_segment["date"] + duration = duration_segment["duration"] + self.log.info( - f"{instrument_id}: Number of bars retrieved in batch: {len(bars)}", + f"{instrument_id}: Requesting historical bars: {bar_type} ending on '{end_date_time}' with duration '{duration}'", + ) + + bars = await self._client.get_historical_bars( + bar_type, + contract, + use_rth, + segment_end_date_time.strftime("%Y%m%d-%H:%M:%S"), + duration, ) - data.extend(bars) - self.log.info(f"Total number of bars in data: {len(data)}") - else: - self.log.info(f"{instrument_id}: No bars retrieved for: {bar_type}") + if bars: + self.log.info( + f"{instrument_id}: Number of bars retrieved in batch: {len(bars)}", + ) + data.extend(bars) + self.log.info(f"Total number of bars in data: {len(data)}") + else: + self.log.info(f"{instrument_id}: No bars retrieved for: {bar_type}") return sorted(data, key=lambda x: x.ts_init) @@ -247,8 +268,16 @@ async def request_ticks( raise ValueError( "tick_type must be one of: 'TRADES' (for TradeTicks), 'BID_ASK' (for QuoteTicks)", ) + if start_date_time >= end_date_time: + raise ValueError("Start date must be before end date.") start_date_time = pd.Timestamp(start_date_time, tz=tz_name).tz_convert("UTC") end_date_time = pd.Timestamp(end_date_time, tz=tz_name).tz_convert("UTC") + if (end_date_time - start_date_time) > pd.Timedelta(days=1): + self.log.warning( + "Requesting tick data for more than 1 day may take a long time, particularly for liquid instruments. " + "You may want to consider sourcing tick data elsewhere.", + ) + contracts = contracts or [] instrument_ids = instrument_ids or [] if not contracts and not instrument_ids: @@ -265,7 +294,7 @@ async def request_ticks( ) # Ensure instruments are fetched and cached - await self.fetch_instruments_if_not_cached(contracts) + await self._fetch_instruments_if_not_cached(contracts) data: list[TradeTick | QuoteTick] = [] for contract in contracts: @@ -290,7 +319,7 @@ async def request_ticks( f"{instrument_id}: Number of {tick_type} ticks retrieved in batch: {len(ticks)}", ) - current_start_date_time, should_continue = self.handle_timestamp_iteration( + current_start_date_time, should_continue = self._handle_timestamp_iteration( ticks, end_date_time, ) @@ -309,11 +338,28 @@ async def request_ticks( return sorted(data, key=lambda x: x.ts_init) - def handle_timestamp_iteration( + def _handle_timestamp_iteration( self, ticks: list[TradeTick | QuoteTick], end_date_time: pd.Timestamp, ) -> tuple[pd.Timestamp | None, bool]: + """ + Return the max timestamp from the given ticks and whether to continue iterating. + If all timestamps occur in the same second, the max timestamp will be + incremented by 1 second. + + Parameters + ---------- + ticks : list[TradeTick | QuoteTick] + The type of ticks to retrieve. + end_date_time : datetime.date + The end date for the ticks. + + Returns + ------- + tuple[pd.Timestamp | None, bool] + + """ if not ticks: return None, False @@ -329,14 +375,94 @@ def handle_timestamp_iteration( return max_timestamp, True - async def fetch_instruments_if_not_cached(self, contracts: list[IBContract]) -> None: + async def _fetch_instruments_if_not_cached(self, contracts: list[IBContract]) -> None: + """ + Fetch and cache Instruments for the given IBContracts if they are not already + cached. + + Parameters + ---------- + contracts : list[IBContract] + A list of IBContracts to fetch Instruments for. + + Returns + ------- + None + + """ for contract in contracts: instrument_id = ib_contract_to_instrument_id(contract) if not self._client._cache.instrument(instrument_id): self.log.info(f"Fetching Instrument for: {instrument_id}") await self.request_instruments(contracts=[contract]) + def _calculate_duration_segments(self, start_date, end_date): + """ + Calculate the difference in years, days, and seconds between two dates for the + purpose of requesting specific date ranges for historical bars. + + This function breaks down the time difference between two provided dates (start_date and end_date) into + separate components: years, days, and seconds. It accounts for leap years in its calculation of years and + considers detailed time components (hours, minutes, seconds) for precise calculation of seconds. + + Each component of the time difference (years, days, seconds) is represented as a dictionary in the returned list. + The 'date' key in each dictionary indicates the end point of that time segment when moving from start_date to end_date. + For example, if the function calculates 1 year, the 'date' key for the year entry will be the end date after 1 year + has passed from start_date. This helps in understanding the progression of time from start_date to end_date in segmented intervals. + + Parameters + ---------- + start_date : pd.Timestamp + The starting date and time. + end_date : pd.Timestamp + The ending date and time. + + Returns + ------- + list: A list of dictionaries, each containing a 'date' and a 'duration' key. + The 'date' key represents the end point of each calculated time segment (year, day, second), + and the 'duration' key holds the length of the time segment as a string. + + """ + total_delta = end_date - start_date + + # Calculate full years in the time delta + years = total_delta.days // 365 + minus_years_date = end_date - pd.Timedelta(days=365 * years) + + # Calculate remaining days after subtracting full years + days = (minus_years_date - start_date).days + minus_days_date = minus_years_date - pd.Timedelta(days=days) + + # Calculate remaining time in seconds + delta = minus_days_date - start_date + subsecond = ( + 1 + if delta.components.milliseconds > 0 + or delta.components.microseconds > 0 + or delta.components.nanoseconds > 0 + else 0 + ) + seconds = ( + delta.components.hours * 3600 + + delta.components.minutes * 60 + + delta.components.seconds + + subsecond + ) + + results = [] + if years: + results.append({"date": end_date, "duration": f"{years} Y"}) + + if days: + results.append({"date": minus_years_date, "duration": f"{days} D"}) + + if seconds: + results.append({"date": minus_days_date, "duration": f"{seconds} S"}) + + return results +# will remove this post testing and review async def main(): contract = IBContract( secType="STK", @@ -356,10 +482,10 @@ async def main(): ) bars = await client.request_bars( - bar_specifications=["1-HOUR-LAST", "30-MINUTE-MID"], - end_date_time=datetime.datetime(2023, 11, 6, 16, 30), + bar_specifications=["1-DAY-LAST", "8-HOUR-MID"], + start_date_time=datetime.datetime(2022, 10, 15, 3), + end_date_time=datetime.datetime(2023, 11, 1), tz_name="America/New_York", - duration="1 D", contracts=[contract], instrument_ids=[instrument_id], ) From d1eaedc7bccd0a1ab2c75e064c31917110e0623d Mon Sep 17 00:00:00 2001 From: limx0 Date: Fri, 10 Nov 2023 11:40:45 +1100 Subject: [PATCH 11/12] Fix formatting --- .../adapters/interactive_brokers/historic/__init__.py | 3 +++ .../adapters/interactive_brokers/historic/client.py | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/nautilus_trader/adapters/interactive_brokers/historic/__init__.py b/nautilus_trader/adapters/interactive_brokers/historic/__init__.py index 326b137862e3..d45066a414dc 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/__init__.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/__init__.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # ------------------------------------------------------------------------------------------------- +# fmt: off from nautilus_trader.adapters.interactive_brokers.historic.client import HistoricInteractiveBrokersClient +# fmt: on + __all__ = [ "HistoricInteractiveBrokersClient", ] diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index 53d6b496b70e..45510c44cada 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -6,12 +6,15 @@ import pandas as pd from ibapi.common import MarketDataTypeEnum +# fmt: off from nautilus_trader.adapters.interactive_brokers.client import InteractiveBrokersClient from nautilus_trader.adapters.interactive_brokers.common import IBContract from nautilus_trader.adapters.interactive_brokers.parsing.instruments import ib_contract_to_instrument_id from nautilus_trader.adapters.interactive_brokers.parsing.instruments import instrument_id_to_ib_contract from nautilus_trader.adapters.interactive_brokers.providers import InteractiveBrokersInstrumentProvider from nautilus_trader.adapters.interactive_brokers.providers import InteractiveBrokersInstrumentProviderConfig + +# fmt: on from nautilus_trader.cache.cache import Cache from nautilus_trader.common.clock import LiveClock from nautilus_trader.common.logging import Logger @@ -462,6 +465,7 @@ def _calculate_duration_segments(self, start_date, end_date): return results + # will remove this post testing and review async def main(): contract = IBContract( From 60a2076a8f2083e1bda89fc952a5891debdb2379 Mon Sep 17 00:00:00 2001 From: Ben Singleton Date: Thu, 9 Nov 2023 22:22:07 -0500 Subject: [PATCH 12/12] misc bug fixes --- .../interactive_brokers/client/client.py | 3 +- .../interactive_brokers/historic/client.py | 62 ++++++++++++------- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/nautilus_trader/adapters/interactive_brokers/client/client.py b/nautilus_trader/adapters/interactive_brokers/client/client.py index 2d9a944b0200..20952aa43d05 100644 --- a/nautilus_trader/adapters/interactive_brokers/client/client.py +++ b/nautilus_trader/adapters/interactive_brokers/client/client.py @@ -1089,6 +1089,7 @@ async def get_historical_bars( use_rth: bool, end_date_time: str, duration: str, + timeout: int = 60, ): name = str(bar_type) if not (request := self.requests.get(name=name)): @@ -1114,7 +1115,7 @@ async def get_historical_bars( ) self._log.debug(f"reqHistoricalData: {request.req_id=}, {contract=}") request.handle() - return await self._await_request(request, 60) + return await self._await_request(request, timeout) else: self._log.info(f"Request already exist for {request}") diff --git a/nautilus_trader/adapters/interactive_brokers/historic/client.py b/nautilus_trader/adapters/interactive_brokers/historic/client.py index 45510c44cada..e4faeb6e3b85 100644 --- a/nautilus_trader/adapters/interactive_brokers/historic/client.py +++ b/nautilus_trader/adapters/interactive_brokers/historic/client.py @@ -128,6 +128,7 @@ async def request_bars( contracts: list[IBContract] | None = None, instrument_ids: list[str] | None = None, use_rth: bool = True, + timeout: int = 120, ) -> list[Bar]: """ Return Bars for one or more bar specifications for a list of IBContracts and/or @@ -154,6 +155,8 @@ async def request_bars( Instrument IDs (e.g. AAPL.NASDAQ) defining which bars to retrieve. use_rth : bool, default 'True' Whether to use regular trading hours. + timeout : int, default '120' + The timeout in seconds for each request. Returns ------- @@ -202,15 +205,15 @@ async def request_bars( BarSpecification.from_str(bar_spec), AggregationSource.EXTERNAL, ) - for duration_segment in self._calculate_duration_segments( + + for segment_end_date_time, segment_duration in self._calculate_duration_segments( start_date_time, end_date_time, + duration, ): - segment_end_date_time = duration_segment["date"] - duration = duration_segment["duration"] - self.log.info( - f"{instrument_id}: Requesting historical bars: {bar_type} ending on '{end_date_time}' with duration '{duration}'", + f"{instrument_id}: Requesting historical bars: {bar_type} ending on '{segment_end_date_time}' " + "with duration '{segment_duration}'", ) bars = await self._client.get_historical_bars( @@ -218,7 +221,8 @@ async def request_bars( contract, use_rth, segment_end_date_time.strftime("%Y%m%d-%H:%M:%S"), - duration, + segment_duration, + timeout=timeout, ) if bars: self.log.info( @@ -399,34 +403,50 @@ async def _fetch_instruments_if_not_cached(self, contracts: list[IBContract]) -> self.log.info(f"Fetching Instrument for: {instrument_id}") await self.request_instruments(contracts=[contract]) - def _calculate_duration_segments(self, start_date, end_date): + def _calculate_duration_segments( + self, + start_date: pd.Timestamp | None, + end_date: pd.Timestamp, + duration: str | None, + ) -> list[tuple[pd.Timestamp, str]]: """ Calculate the difference in years, days, and seconds between two dates for the purpose of requesting specific date ranges for historical bars. - This function breaks down the time difference between two provided dates (start_date and end_date) into - separate components: years, days, and seconds. It accounts for leap years in its calculation of years and - considers detailed time components (hours, minutes, seconds) for precise calculation of seconds. + This function breaks down the time difference between two provided dates (start_date + and end_date) into separate components: years, days, and seconds. It accounts for leap + years in its calculation of years and considers detailed time components (hours, minutes, + seconds) for precise calculation of seconds. - Each component of the time difference (years, days, seconds) is represented as a dictionary in the returned list. - The 'date' key in each dictionary indicates the end point of that time segment when moving from start_date to end_date. - For example, if the function calculates 1 year, the 'date' key for the year entry will be the end date after 1 year - has passed from start_date. This helps in understanding the progression of time from start_date to end_date in segmented intervals. + Each component of the time difference (years, days, seconds) is represented as a + tuple in the returned list. + The first element is the date that indicates the end point of that time segment + when moving from start_date to end_date. For example, if the function calculates 1 + year, the date for the year entry will be the end date after 1 year has passed + from start_date. This helps in understanding the progression of time from start_date + to end_date in segmented intervals. Parameters ---------- - start_date : pd.Timestamp + start_date : pd.Timestamp | None The starting date and time. end_date : pd.Timestamp The ending date and time. + duration : str + The amount of time to go back from the end_date_time. + Valid values follow the pattern of an integer followed by S|D|W|M|Y + for seconds, days, weeks, months, or years respectively. Returns ------- - list: A list of dictionaries, each containing a 'date' and a 'duration' key. - The 'date' key represents the end point of each calculated time segment (year, day, second), - and the 'duration' key holds the length of the time segment as a string. + tuple[pd.Timestamp, str]: A list of tuples, each containing a date and a duration. + The date represents the end point of each calculated time segment (year, day, second), + and the duration is the length of the time segment as a string. """ + if duration: + return [(end_date, duration)] + total_delta = end_date - start_date # Calculate full years in the time delta @@ -455,13 +475,13 @@ def _calculate_duration_segments(self, start_date, end_date): results = [] if years: - results.append({"date": end_date, "duration": f"{years} Y"}) + results.append((end_date, f"{years} Y")) if days: - results.append({"date": minus_years_date, "duration": f"{days} D"}) + results.append((minus_years_date, f"{days} D")) if seconds: - results.append({"date": minus_days_date, "duration": f"{seconds} S"}) + results.append((minus_days_date, f"{seconds} S")) return results