Skip to content

Commit

Permalink
Reorganize Databento parsing functions
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Nov 11, 2023
1 parent 9779ad6 commit 7216175
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 74 deletions.
10 changes: 5 additions & 5 deletions nautilus_trader/adapters/databento/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ def nautilus_instrument_id_from_databento(
Return the Nautilus `InstrumentId` parsed from the given `symbol` and `publisher`
details.
Notes
-----
The Databento `instrument_id` is an integer, where as a Nautilus `InstrumentId` is a
symbol and venue combination.
Parameters
----------
raw_symbol : str
Expand All @@ -71,5 +66,10 @@ def nautilus_instrument_id_from_databento(
-------
InstrumentId
Notes
-----
The Databento `instrument_id` is an integer, where as a Nautilus `InstrumentId` is a
symbol and venue combination.
"""
return InstrumentId(Symbol(raw_symbol), Venue(publisher.venue))
67 changes: 2 additions & 65 deletions nautilus_trader/adapters/databento/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,10 @@

import databento
import msgspec
import pandas as pd
import pytz
from databento.common.symbology import InstrumentMap

from nautilus_trader.adapters.databento.common import check_file_path
from nautilus_trader.adapters.databento.common import nautilus_instrument_id_from_databento
from nautilus_trader.adapters.databento.enums import DatabentoInstrumentClass
from nautilus_trader.adapters.databento.parsing import parse_equity
from nautilus_trader.adapters.databento.parsing import parse_futures_contract
from nautilus_trader.adapters.databento.parsing import parse_mbo_msg
from nautilus_trader.adapters.databento.parsing import parse_mbp_or_tbbo_msg
from nautilus_trader.adapters.databento.parsing import parse_ohlcv_msg
from nautilus_trader.adapters.databento.parsing import parse_options_contract
from nautilus_trader.adapters.databento.parsing import parse_trade_msg
from nautilus_trader.adapters.databento.parsing import parse_record
from nautilus_trader.adapters.databento.types import DatabentoPublisher
from nautilus_trader.core.data import Data
from nautilus_trader.model.identifiers import InstrumentId
Expand Down Expand Up @@ -186,63 +176,10 @@ def from_dbn(self, path: PathLike[str] | str) -> list[Data]:
output: list[Data] = []

for record in store:
data = self._parse_record(record, instrument_map)
data = parse_record(record, instrument_map, self._publishers)
if isinstance(data, tuple):
output.extend(data)
else:
output.append(data)

return output

def _parse_record(self, record: databento.DBNRecord, instrument_map: InstrumentMap) -> Data:
if isinstance(record, databento.InstrumentDefMsg):
return self._parse_instrument_def(record)

record_date = pd.Timestamp(record.ts_event, tz=pytz.utc).date()
raw_symbol = instrument_map.resolve(record.instrument_id, date=record_date)
if raw_symbol is None:
raise ValueError(
f"Cannot resolve instrument_id {record.instrument_id} on {record_date}",
)

publisher = self._publishers[record.publisher_id]
instrument_id: InstrumentId = nautilus_instrument_id_from_databento(
raw_symbol=raw_symbol,
publisher=publisher,
)

if isinstance(record, databento.MBOMsg):
return parse_mbo_msg(record, instrument_id)
elif isinstance(record, databento.MBP1Msg | databento.MBP10Msg):
return parse_mbp_or_tbbo_msg(record, instrument_id)
elif isinstance(record, databento.TradeMsg):
return parse_trade_msg(record, instrument_id)
elif isinstance(record, databento.OHLCVMsg):
return parse_ohlcv_msg(record, instrument_id)
else:
raise ValueError(
f"Schema {type(record).__name__} is currently unsupported by NautilusTrader",
)

def _parse_instrument_def(self, record: databento.InstrumentDefMsg) -> Instrument:
publisher = self._publishers[record.publisher_id]
instrument_id: InstrumentId = nautilus_instrument_id_from_databento(
raw_symbol=record.raw_symbol,
publisher=publisher,
)

match record.instrument_class:
case DatabentoInstrumentClass.STOCK.value:
return parse_equity(record, instrument_id)
case DatabentoInstrumentClass.FUTURE.value | DatabentoInstrumentClass.FUTURE_SPREAD.value:
return parse_futures_contract(record, instrument_id)
case DatabentoInstrumentClass.CALL.value | DatabentoInstrumentClass.PUT.value:
return parse_options_contract(record, instrument_id)
case DatabentoInstrumentClass.FX_SPOT.value:
raise ValueError("`instrument_class` FX_SPOT not currently supported")
case DatabentoInstrumentClass.OPTION_SPREAD.value:
raise ValueError("`instrument_class` OPTION_SPREAD not currently supported")
case DatabentoInstrumentClass.MIXED_SPREAD.value:
raise ValueError("`instrument_class` MIXED_SPREAD not currently supported")
case _:
raise ValueError(f"Invalid `instrument_class`, was {record.instrument_class}")
72 changes: 71 additions & 1 deletion nautilus_trader/adapters/databento/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
# -------------------------------------------------------------------------------------------------

import databento

import pandas as pd
import pytz
from databento.common.symbology import InstrumentMap

from nautilus_trader.adapters.databento.common import nautilus_instrument_id_from_databento
from nautilus_trader.adapters.databento.enums import DatabentoInstrumentClass
from nautilus_trader.adapters.databento.types import DatabentoPublisher
from nautilus_trader.core.data import Data
from nautilus_trader.core.datetime import secs_to_nanos
from nautilus_trader.model.currencies import USD
from nautilus_trader.model.currency import Currency
Expand All @@ -37,6 +44,7 @@
from nautilus_trader.model.identifiers import TradeId
from nautilus_trader.model.instruments import Equity
from nautilus_trader.model.instruments import FuturesContract
from nautilus_trader.model.instruments import Instrument
from nautilus_trader.model.instruments import OptionsContract
from nautilus_trader.model.objects import FIXED_SCALAR
from nautilus_trader.model.objects import Price
Expand Down Expand Up @@ -286,3 +294,65 @@ def parse_ohlcv_msg(
ts_event=ts_event,
ts_init=ts_event,
)


def parse_record(
record: databento.DBNRecord,
instrument_map: InstrumentMap,
publishers: dict[int, DatabentoPublisher],
) -> Data:
if isinstance(record, databento.InstrumentDefMsg):
return parse_instrument_def(record, publishers)

record_date = pd.Timestamp(record.ts_event, tz=pytz.utc).date()
raw_symbol = instrument_map.resolve(record.instrument_id, date=record_date)
if raw_symbol is None:
raise ValueError(
f"Cannot resolve instrument_id {record.instrument_id} on {record_date}",
)

publisher: DatabentoPublisher = publishers[record.publisher_id]
instrument_id: InstrumentId = nautilus_instrument_id_from_databento(
raw_symbol=raw_symbol,
publisher=publisher,
)

if isinstance(record, databento.MBOMsg):
return parse_mbo_msg(record, instrument_id)
elif isinstance(record, databento.MBP1Msg | databento.MBP10Msg):
return parse_mbp_or_tbbo_msg(record, instrument_id)
elif isinstance(record, databento.TradeMsg):
return parse_trade_msg(record, instrument_id)
elif isinstance(record, databento.OHLCVMsg):
return parse_ohlcv_msg(record, instrument_id)
else:
raise ValueError(
f"Schema {type(record).__name__} is currently unsupported by NautilusTrader",
)


def parse_instrument_def(
record: databento.InstrumentDefMsg,
publishers: dict[int, DatabentoPublisher],
) -> Instrument:
publisher: DatabentoPublisher = publishers[record.publisher_id]
instrument_id: InstrumentId = nautilus_instrument_id_from_databento(
raw_symbol=record.raw_symbol,
publisher=publisher,
)

match record.instrument_class:
case DatabentoInstrumentClass.STOCK.value:
return parse_equity(record, instrument_id)
case DatabentoInstrumentClass.FUTURE.value | DatabentoInstrumentClass.FUTURE_SPREAD.value:
return parse_futures_contract(record, instrument_id)
case DatabentoInstrumentClass.CALL.value | DatabentoInstrumentClass.PUT.value:
return parse_options_contract(record, instrument_id)
case DatabentoInstrumentClass.FX_SPOT.value:
raise ValueError("`instrument_class` FX_SPOT not currently supported")
case DatabentoInstrumentClass.OPTION_SPREAD.value:
raise ValueError("`instrument_class` OPTION_SPREAD not currently supported")
case DatabentoInstrumentClass.MIXED_SPREAD.value:
raise ValueError("`instrument_class` MIXED_SPREAD not currently supported")
case _:
raise ValueError(f"Invalid `instrument_class`, was {record.instrument_class}")
20 changes: 17 additions & 3 deletions nautilus_trader/adapters/databento/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# limitations under the License.
# -------------------------------------------------------------------------------------------------

import databento

from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.common.clock import LiveClock
from nautilus_trader.common.logging import Logger
from nautilus_trader.common.providers import InstrumentProvider
Expand All @@ -26,8 +29,8 @@ class DatabentoInstrumentProvider(InstrumentProvider):
Parameters
----------
client : HistoricalClient
The historical client for the provider.
client : databento.Historical
The Databento historical data client for the provider.
logger : Logger
The logger for the provider.
clock : LiveClock
Expand All @@ -39,6 +42,7 @@ class DatabentoInstrumentProvider(InstrumentProvider):

def __init__(
self,
client: databento.Historical,
logger: Logger,
clock: LiveClock,
config: InstrumentProviderConfig | None = None,
Expand All @@ -49,9 +53,19 @@ def __init__(
)

self._clock = clock
self._config = config

self._loader = DatabentoDataLoader()

# HTTP API
self._http_client = client

async def load_all_async(self, filters: dict | None = None) -> None:
pass
raise RuntimeError(
"requesting all instrument definitions is not currently supported, "
"as this would mean every instrument definition for every dataset "
"(potentially millions)",
)

async def load_ids_async(
self,
Expand Down

0 comments on commit 7216175

Please sign in to comment.