Skip to content

Commit

Permalink
StreamFetcher POC
Browse files Browse the repository at this point in the history
  • Loading branch information
IgorWounds committed May 23, 2024
1 parent 60ff2bd commit aeeae61
Show file tree
Hide file tree
Showing 11 changed files with 2,252 additions and 0 deletions.
64 changes: 64 additions & 0 deletions openbb_platform/core/openbb_core/provider/abstract/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# ruff: noqa: S101, E501
# pylint: disable=E1101, C0301

import asyncio
import json
from typing import (
Any,
Dict,
Expand All @@ -14,6 +16,7 @@
get_origin,
)

import websockets
from pandas import DataFrame

from openbb_core.provider.abstract.annotated_result import AnnotatedResult
Expand Down Expand Up @@ -227,3 +230,64 @@ def test(
assert issubclass(
type(transformed_data), cls.return_type
), f"Transformed data must be of the correct type. Expected: {cls.return_type} Got: {type(transformed_data)}"


class StreamFetcher(Generic[Q, R]):
"""Class to fetch live streaming data using WebSocket connections."""

@classmethod
async def connect(
cls,
uri: str,
):
"""Connect to a WebSocket server."""
cls.websocket = await websockets.connect(uri)
print("Connected to WebSocket server.")
asyncio.create_task(cls.receive_data())

@staticmethod
def transform_data(data: Any, **kwargs) -> Union[R, AnnotatedResult[R]]:
"""Transform the provider-specific data."""
raise NotImplementedError

@classmethod
async def receive_data(cls, **kwargs):
"""Receive data from the WebSocket server."""
try:
while True:
message = await cls.websocket.recv()
processed_data = await cls.process_message(message, **kwargs)
if processed_data:
print(processed_data)

except websockets.exceptions.ConnectionClosed:
print("WebSocket connection closed.")

@classmethod
async def process_message(cls, message: str, **kwargs) -> Optional[R]:
"""Process incoming WebSocket messages."""
try:
json_data = json.loads(message)
transformed_data = cls.transform_data(json_data, **kwargs)
return transformed_data
except Exception as e:
print(f"Error processing message: {e}")
return None

@classmethod
async def disconnect(cls):
"""Disconnect the WebSocket."""
await cls.websocket.close()

@classmethod
async def fetch_data(
cls, # pylint: disable=unused-argument
params: Dict[str, Any],
credentials: Optional[Dict[str, str]] = None, # pylint: disable=unused-argument
**kwargs,
) -> Union[R, AnnotatedResult[R]]:
"""Fetch live data from a provider."""
# In a streaming context, this method may just ensure the connection is open.
if not hasattr(cls, "websocket"):
await cls.connect(params.get("uri"))
# Data handling is asynchronous and managed by `receive_data`.
26 changes: 26 additions & 0 deletions openbb_platform/extensions/crypto/openbb_crypto/crypto_router.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Crypto Router."""

import asyncio

from openbb_core.app.model.command_context import CommandContext
from openbb_core.app.model.example import APIEx
from openbb_core.app.model.obbject import OBBject
Expand All @@ -10,6 +12,9 @@
)
from openbb_core.app.query import Query
from openbb_core.app.router import Router
from providers.binance.openbb_binance.models.crypto_historical import (
BinanceStreamFetcher,
)

from openbb_crypto.price.price_router import router as price_router

Expand All @@ -33,3 +38,24 @@ async def search(
) -> OBBject:
"""Search available cryptocurrency pairs within a provider."""
return await OBBject.from_query(Query(**locals()))


# pylint: disable=unused-argument
async def crypto_historical():
"Define the POC."
full_url = "wss://stream.binance.com:9443/ws/ethbtc@miniTicker"
await BinanceStreamFetcher.connect(full_url)
try:
await asyncio.sleep(10) # Keep connection open for 60 seconds
finally:
await BinanceStreamFetcher.disconnect()

# Adjusted setup for existing asyncio event loops
loop = asyncio.get_event_loop()

if loop.is_running():
# Scheduling the coroutine to run and handling with the existing event loop
loop.create_task(crypto_historical())
else:
# If the loop is not running, run until complete
loop.run_until_complete(crypto_historical())
14 changes: 14 additions & 0 deletions openbb_platform/providers/binance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# OpenBB Biztoc Provider

This extension integrates the Biztoc data provider
into the OpenBB Platform.

## Installation

To install the extension, run the following command in this folder:

```bash
pip install openbb-biztoc
```

Documentation available [here](https://docs.openbb.co/platform/development/contributing).
1 change: 1 addition & 0 deletions openbb_platform/providers/binance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Biztoc provider."""
24 changes: 24 additions & 0 deletions openbb_platform/providers/binance/openbb_binance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Biztoc provider module."""

from openbb_binance.models.crypto_historical import BinanceStreamFetcher
from openbb_core.provider.abstract.provider import Provider

binance_provider = Provider(
name="binance",
website="https://api.binance.com",
description="""BizToc uses Rapid API for its REST API.
You may sign up for your free account at https://rapidapi.com/thma/api/binance.
The Base URL for all requests is:
https://binance.p.rapidapi.com/
If you're not a developer but would still like to use Biztoc outside of the main website,
we've partnered with OpenBB, allowing you to pull in BizToc's news stream in their Terminal.""",
# credentials=["api_key"],
fetcher_dict={
"bcrypto_historical": BinanceStreamFetcher,
},
repr_name="Binance",
instructions="The BizToc API is hosted on RapidAPI. To set up, go to: https://rapidapi.com/thma/api/binance.\n\n![binance0](https://github.com/marban/OpenBBTerminal/assets/18151143/04cdd423-f65e-4ad8-ad5a-4a59b0f5ddda)\n\nIn the top right, select 'Sign Up'. After answering some questions, you will be prompted to select one of their plans.\n\n![binance1](https://github.com/marban/OpenBBTerminal/assets/18151143/9f3b72ea-ded7-48c5-aa33-bec5c0de8422)\n\nAfter signing up, navigate back to https://rapidapi.com/thma/api/binance. If you are logged in, you will see a header called X-RapidAPI-Key.\n\n![binance2](https://github.com/marban/OpenBBTerminal/assets/18151143/0f3b6c91-07e0-447a-90cd-a9e23522929f)", # noqa: E501 pylint: disable=line-too-long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Biztoc Provider models."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Binance Crypto Historical WS Data."""

import json
from datetime import datetime
from typing import Any, Dict, Optional

from openbb_core.provider.standard_models.crypto_historical import (
CryptoHistoricalData,
)
from pydantic import Field

from openbb_platform.core.openbb_core.provider.abstract.fetcher import StreamFetcher

# pylint: disable=unused-kwargs


class BinanceCryptoHistoricalData(CryptoHistoricalData):
"""Binance Crypto Historical Data."""

__alias_dict__ = {
"symbol": "s",
"close": "c",
"open": "o",
"high": "h",
"low": "l",
"volume": "v",
}
event_type: Optional[str] = Field(
default=None,
description="Event type",
alias="e",
)
quote_asset_volume: Optional[str] = Field(
default=None,
description="Total traded quote asset volume",
alias="q",
)


class BinanceStreamFetcher(StreamFetcher):
"""Define Binance Stream Fetcher."""

@staticmethod
def transform_data(data: Dict[str, Any], **kwargs) -> BinanceCryptoHistoricalData:
"""Transform the incoming data."""
if "date" not in data:
data["date"] = datetime.now().isoformat()
return BinanceCryptoHistoricalData(**data)

@classmethod
async def process_message(
cls, message: str, **kwargs
) -> Optional[BinanceCryptoHistoricalData]:
"""Process incoming WebSocket messages."""
try:
json_data = json.loads(message)
transformed_data = cls.transform_data(json_data)
return transformed_data
except Exception as e:
print(f"Error processing message from Binance: {e}")
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Biztoc utils."""
Loading

0 comments on commit aeeae61

Please sign in to comment.