Skip to content

Commit

Permalink
more linting
Browse files Browse the repository at this point in the history
  • Loading branch information
deeleeramone committed Jan 7, 2025
1 parent b5770d6 commit 062b8cd
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Broadcast server for streaming results to connected clients via WebSocket."""

# pylint: disable=too-many-positional-arguments

import asyncio
import json
import logging
Expand All @@ -20,19 +22,19 @@
)
from starlette.websockets import WebSocketState

kwargs = parse_kwargs()
KWARGS = parse_kwargs()

HOST = kwargs.pop("host", None) or "localhost"
PORT = kwargs.pop("port", None) or 6666
HOST = KWARGS.pop("host", None) or "localhost"
PORT = KWARGS.pop("port", None) or 6666
PORT = int(PORT)

RESULTS_FILE = kwargs.pop("results_file", None)
TABLE_NAME = kwargs.pop("table_name", None) or "records"
SLEEP_TIME = kwargs.pop("sleep_time", None) or 0.25
AUTH_TOKEN = kwargs.pop("auth_token", None)
RESULTS_FILE = KWARGS.pop("results_file", None)
TABLE_NAME = KWARGS.pop("table_name", None) or "records"
SLEEP_TIME = KWARGS.pop("sleep_time", None) or 0.25
AUTH_TOKEN = KWARGS.pop("auth_token", None)

SQL = kwargs.pop("sql", None)
SQL_CONNECT_KWARGS = kwargs.pop("sql_connect_kwargs", None) or {}
SQL = KWARGS.pop("sql", None)
SQL_CONNECT_KWARGS = KWARGS.pop("sql_connect_kwargs", None) or {}

app = FastAPI()

Expand Down Expand Up @@ -69,7 +71,7 @@ async def read_stdin():
for client in CONNECTED_CLIENTS:
client.logger.error(err_msg)

for client in CONNECTED_CLIENTS:
for client in CONNECTED_CLIENTS.copy():
if client.websocket.client_state != WebSocketState.DISCONNECTED:
await client.websocket.send_json(command)
else:
Expand All @@ -96,7 +98,7 @@ async def websocket_endpoint( # noqa: PLR0915
str(AUTH_TOKEN),
sql=sql,
)
broadcast_server.replay = replay # type: ignore
broadcast_server.replay = replay # type: ignore # pylint: disable=attribute-defined-outside-init
auth_token = str(auth_token)

if sql and (
Expand Down Expand Up @@ -318,7 +320,7 @@ def start_app(self, host: str = "127.0.0.1", port: int = 6666):
self._app,
host=host,
port=port,
**kwargs,
**KWARGS,
)


Expand Down Expand Up @@ -350,6 +352,7 @@ def run_broadcast_server(broadcast_server, host, port, **kwargs):

async def main():
"""Run the main function."""
# pylint: disable=import-outside-toplevel
import threading

loop = asyncio.get_running_loop()
Expand All @@ -364,13 +367,13 @@ async def main():
SQL_CONNECT_KWARGS,
SQL,
)
global MAIN_CLIENT # noqa: PLW0603
global MAIN_CLIENT # noqa: PLW0603 pylint: disable=global-statement
MAIN_CLIENT = broadcast_server
try:
broadcast_thread = threading.Thread(
target=run_broadcast_server,
args=(broadcast_server, HOST, PORT),
kwargs=kwargs,
kwargs=KWARGS,
daemon=True,
)
broadcast_thread.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def __init__( # noqa: PLR0913 # pylint: disable=too-many-arguments,too-many-po
import threading
from queue import Queue
from pathlib import Path
from openbb_core.app.model.abstract.error import OpenBBError
from openbb_core.provider.utils.websockets.database import Database
from openbb_core.provider.utils.websockets.helpers import (
encrypt_value,
Expand Down Expand Up @@ -381,8 +380,6 @@ def connect(self) -> None: # pylint: disable=too-many-locals
import queue
import subprocess
import threading
import time
from openbb_core.app.model.abstract.error import OpenBBError
from openbb_core.provider.utils.websockets.helpers import decrypt_value

if self.is_running:
Expand Down Expand Up @@ -528,7 +525,6 @@ def subscribe(self, symbol) -> None:
# pylint: disable=import-outside-toplevel
import json # noqa
import time
from openbb_core.app.model.abstract.error import OpenBBError

if not self.is_running:
raise OpenBBError("Provider connection is not running.")
Expand All @@ -555,7 +551,6 @@ def unsubscribe(self, symbol) -> None:
# pylint: disable=import-outside-toplevel
import json # noqa
import time
from openbb_core.app.model.abstract.error import OpenBBError

if not self.symbol:
self.logger.info("No subscribed symbols.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Database module for serialized websockets results."""

# pylint: disable=too-many-lines,too-many-arguments,too-many-locals,too-many-branches,too-many-statements,protected-access
# pylint: disable=too-many-lines,too-many-arguments,too-many-locals,too-many-branches,too-many-statements,protected-access,too-many-instance-attributes,too-many-positional-arguments

import asyncio
import threading
Expand Down Expand Up @@ -297,7 +297,7 @@ async def _write_to_db(self, message) -> None:
""", # noqa
(message,),
)
self._at_limit = False
self._at_limit = False # pylint: disable=attribute-defined-outside-init

if self.limit is not None and not self._at_limit:
limit = max(0, int(self.limit))
Expand All @@ -309,7 +309,9 @@ async def _write_to_db(self, message) -> None:
count = await count_cursor.fetchone()

if count[0] > limit:
self._at_limit = True
self._at_limit = (
True # pylint: disable=attribute-defined-outside-init
)

await count_cursor.close()

Expand Down Expand Up @@ -382,9 +384,7 @@ async def _deserialize_row(self, row, cursor) -> dict:
or isinstance(row[0], bytes)
else row[0]
)
else:
# Multiple column case (extracted fields)
return {cursor.description[i][0]: row[i] for i in range(len(row))}
return {cursor.description[i][0]: row[i] for i in range(len(row))}

except (json.JSONDecodeError, AttributeError) as e:
self.logger.error(f"Failed to deserialize row: {e}")
Expand Down Expand Up @@ -990,7 +990,8 @@ async def _start_prune_task(self):

if not self._last_processed_timestamp:
last_date = await self.database._query_db(
f"SELECT json_extract(message, '$.date') FROM {self.database.table_name} ORDER BY json_extract(message, '$.date') DESC LIMIT 1" # noqa
"SELECT json_extract(message, '$.date') FROM" # noqa
f" {self.database.table_name} ORDER BY json_extract(message, '$.date') DESC LIMIT 1"
)
if not last_date:
continue
Expand Down Expand Up @@ -1022,14 +1023,18 @@ async def _start_prune_task(self):

async def _start_export_task(self):
"""Start a background task to prune the database periodically."""
# pylint: disable=import-outside-toplevel
from pandas import to_datetime

minutes = self.export_interval or 5

while self.export_thread is not None and not self._shutdown:
# Get the initial row to determine the "first time"
try:
query = f"SELECT json_extract(message, '$.date') FROM {self.database.table_name} ORDER BY json_extract(message, '$.date') ASC LIMIT 1" # noqa
query = (
"SELECT json_extract(message, '$.date') FROM" # noqa
f" {self.database.table_name} ORDER BY json_extract(message, '$.date') ASC LIMIT 1"
)
initial_row = await self.database._query_db(query)
if not initial_row:
await asyncio.sleep(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ async def create_connection(
obbject = await OBBject.from_query(Query(**locals()))
client = obbject.results.client

# pylint: disable=import-outside-toplevel
import asyncio

await asyncio.sleep(1)

if not client.is_running or client._exception is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,14 @@ async def process_stdin_queue():

async def connect_and_stream():
"""Connect to the WebSocket and stream data to file."""
stdin_task = asyncio.create_task(read_stdin_and_queue_commands())
process_stdin_task = asyncio.create_task(process_stdin_queue())
try:
symbol = kwargs.pop("symbol", "lobby")
symbol = ["lobby"] if "*" in symbol else symbol.split(",")
stdin_task = asyncio.create_task(read_stdin_and_queue_commands())
await DATABASE.start_writer()
client.connect()
client.join(symbol)
process_stdin_task = asyncio.create_task(process_stdin_queue())
finally:
stdin_task.cancel()
process_stdin_task.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ def extract_data(
**kwargs: Any,
) -> dict:
"""Extract data from the WebSocket."""
# pylint: disable=import-outside-toplevel
import time

api_key = credentials.get("polygon_api_key") if credentials else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ def _process_in_thread():
loop.run_forever()

except (websockets.ConnectionClosed, websockets.ConnectionClosedError) as e:
msg = f"PROVIDER INFO: The WebSocket connection was closed -> {e}"
logger.info(msg)
MSG = f"PROVIDER INFO: The WebSocket connection was closed -> {e}"
logger.info(MSG)
# Attempt to reopen the connection
logger.info("PROVIDER INFO: Attempting to reconnect...")
time.sleep(1)
Expand Down

0 comments on commit 062b8cd

Please sign in to comment.