From 062b8cdc0b8ec1ae2e308efbf6317c56386cd3c3 Mon Sep 17 00:00:00 2001 From: Danglewood <85772166+deeleeramone@users.noreply.github.com> Date: Mon, 6 Jan 2025 20:16:47 -0800 Subject: [PATCH] more linting --- .../provider/utils/websockets/broadcast.py | 31 ++++++++++--------- .../provider/utils/websockets/client.py | 5 --- .../provider/utils/websockets/database.py | 21 ++++++++----- .../openbb_websockets/websockets_router.py | 3 -- .../openbb_intrinio/utils/websocket_client.py | 4 +-- .../models/websocket_connection.py | 1 + .../openbb_polygon/utils/websocket_client.py | 4 +-- 7 files changed, 35 insertions(+), 34 deletions(-) diff --git a/openbb_platform/core/openbb_core/provider/utils/websockets/broadcast.py b/openbb_platform/core/openbb_core/provider/utils/websockets/broadcast.py index 62ef54fcaadd..10517d4cf299 100644 --- a/openbb_platform/core/openbb_core/provider/utils/websockets/broadcast.py +++ b/openbb_platform/core/openbb_core/provider/utils/websockets/broadcast.py @@ -1,5 +1,7 @@ """Broadcast server for streaming results to connected clients via WebSocket.""" +# pylint: disable=too-many-positional-arguments + import asyncio import json import logging @@ -20,19 +22,19 @@ ) from starlette.websockets import WebSocketState -kwargs = parse_kwargs() +KWARGS = parse_kwargs() -HOST = kwargs.pop("host", None) or "localhost" -PORT = kwargs.pop("port", None) or 6666 +HOST = KWARGS.pop("host", None) or "localhost" +PORT = KWARGS.pop("port", None) or 6666 PORT = int(PORT) -RESULTS_FILE = kwargs.pop("results_file", None) -TABLE_NAME = kwargs.pop("table_name", None) or "records" -SLEEP_TIME = kwargs.pop("sleep_time", None) or 0.25 -AUTH_TOKEN = kwargs.pop("auth_token", None) +RESULTS_FILE = KWARGS.pop("results_file", None) +TABLE_NAME = KWARGS.pop("table_name", None) or "records" +SLEEP_TIME = KWARGS.pop("sleep_time", None) or 0.25 +AUTH_TOKEN = KWARGS.pop("auth_token", None) -SQL = kwargs.pop("sql", None) -SQL_CONNECT_KWARGS = kwargs.pop("sql_connect_kwargs", None) or {} +SQL = KWARGS.pop("sql", None) +SQL_CONNECT_KWARGS = KWARGS.pop("sql_connect_kwargs", None) or {} app = FastAPI() @@ -69,7 +71,7 @@ async def read_stdin(): for client in CONNECTED_CLIENTS: client.logger.error(err_msg) - for client in CONNECTED_CLIENTS: + for client in CONNECTED_CLIENTS.copy(): if client.websocket.client_state != WebSocketState.DISCONNECTED: await client.websocket.send_json(command) else: @@ -96,7 +98,7 @@ async def websocket_endpoint( # noqa: PLR0915 str(AUTH_TOKEN), sql=sql, ) - broadcast_server.replay = replay # type: ignore + broadcast_server.replay = replay # type: ignore # pylint: disable=attribute-defined-outside-init auth_token = str(auth_token) if sql and ( @@ -318,7 +320,7 @@ def start_app(self, host: str = "127.0.0.1", port: int = 6666): self._app, host=host, port=port, - **kwargs, + **KWARGS, ) @@ -350,6 +352,7 @@ def run_broadcast_server(broadcast_server, host, port, **kwargs): async def main(): """Run the main function.""" + # pylint: disable=import-outside-toplevel import threading loop = asyncio.get_running_loop() @@ -364,13 +367,13 @@ async def main(): SQL_CONNECT_KWARGS, SQL, ) - global MAIN_CLIENT # noqa: PLW0603 + global MAIN_CLIENT # noqa: PLW0603 pylint: disable=global-statement MAIN_CLIENT = broadcast_server try: broadcast_thread = threading.Thread( target=run_broadcast_server, args=(broadcast_server, HOST, PORT), - kwargs=kwargs, + kwargs=KWARGS, daemon=True, ) broadcast_thread.start() diff --git a/openbb_platform/core/openbb_core/provider/utils/websockets/client.py b/openbb_platform/core/openbb_core/provider/utils/websockets/client.py index c7dae8e53b29..3f92170db1b4 100644 --- a/openbb_platform/core/openbb_core/provider/utils/websockets/client.py +++ b/openbb_platform/core/openbb_core/provider/utils/websockets/client.py @@ -129,7 +129,6 @@ def __init__( # noqa: PLR0913 # pylint: disable=too-many-arguments,too-many-po import threading from queue import Queue from pathlib import Path - from openbb_core.app.model.abstract.error import OpenBBError from openbb_core.provider.utils.websockets.database import Database from openbb_core.provider.utils.websockets.helpers import ( encrypt_value, @@ -381,8 +380,6 @@ def connect(self) -> None: # pylint: disable=too-many-locals import queue import subprocess import threading - import time - from openbb_core.app.model.abstract.error import OpenBBError from openbb_core.provider.utils.websockets.helpers import decrypt_value if self.is_running: @@ -528,7 +525,6 @@ def subscribe(self, symbol) -> None: # pylint: disable=import-outside-toplevel import json # noqa import time - from openbb_core.app.model.abstract.error import OpenBBError if not self.is_running: raise OpenBBError("Provider connection is not running.") @@ -555,7 +551,6 @@ def unsubscribe(self, symbol) -> None: # pylint: disable=import-outside-toplevel import json # noqa import time - from openbb_core.app.model.abstract.error import OpenBBError if not self.symbol: self.logger.info("No subscribed symbols.") diff --git a/openbb_platform/core/openbb_core/provider/utils/websockets/database.py b/openbb_platform/core/openbb_core/provider/utils/websockets/database.py index 5afee8365e29..7afd9bf02093 100644 --- a/openbb_platform/core/openbb_core/provider/utils/websockets/database.py +++ b/openbb_platform/core/openbb_core/provider/utils/websockets/database.py @@ -1,6 +1,6 @@ """Database module for serialized websockets results.""" -# pylint: disable=too-many-lines,too-many-arguments,too-many-locals,too-many-branches,too-many-statements,protected-access +# pylint: disable=too-many-lines,too-many-arguments,too-many-locals,too-many-branches,too-many-statements,protected-access,too-many-instance-attributes,too-many-positional-arguments import asyncio import threading @@ -297,7 +297,7 @@ async def _write_to_db(self, message) -> None: """, # noqa (message,), ) - self._at_limit = False + self._at_limit = False # pylint: disable=attribute-defined-outside-init if self.limit is not None and not self._at_limit: limit = max(0, int(self.limit)) @@ -309,7 +309,9 @@ async def _write_to_db(self, message) -> None: count = await count_cursor.fetchone() if count[0] > limit: - self._at_limit = True + self._at_limit = ( + True # pylint: disable=attribute-defined-outside-init + ) await count_cursor.close() @@ -382,9 +384,7 @@ async def _deserialize_row(self, row, cursor) -> dict: or isinstance(row[0], bytes) else row[0] ) - else: - # Multiple column case (extracted fields) - return {cursor.description[i][0]: row[i] for i in range(len(row))} + return {cursor.description[i][0]: row[i] for i in range(len(row))} except (json.JSONDecodeError, AttributeError) as e: self.logger.error(f"Failed to deserialize row: {e}") @@ -990,7 +990,8 @@ async def _start_prune_task(self): if not self._last_processed_timestamp: last_date = await self.database._query_db( - f"SELECT json_extract(message, '$.date') FROM {self.database.table_name} ORDER BY json_extract(message, '$.date') DESC LIMIT 1" # noqa + "SELECT json_extract(message, '$.date') FROM" # noqa + f" {self.database.table_name} ORDER BY json_extract(message, '$.date') DESC LIMIT 1" ) if not last_date: continue @@ -1022,6 +1023,7 @@ async def _start_prune_task(self): async def _start_export_task(self): """Start a background task to prune the database periodically.""" + # pylint: disable=import-outside-toplevel from pandas import to_datetime minutes = self.export_interval or 5 @@ -1029,7 +1031,10 @@ async def _start_export_task(self): while self.export_thread is not None and not self._shutdown: # Get the initial row to determine the "first time" try: - query = f"SELECT json_extract(message, '$.date') FROM {self.database.table_name} ORDER BY json_extract(message, '$.date') ASC LIMIT 1" # noqa + query = ( + "SELECT json_extract(message, '$.date') FROM" # noqa + f" {self.database.table_name} ORDER BY json_extract(message, '$.date') ASC LIMIT 1" + ) initial_row = await self.database._query_db(query) if not initial_row: await asyncio.sleep(1) diff --git a/openbb_platform/extensions/websockets/openbb_websockets/websockets_router.py b/openbb_platform/extensions/websockets/openbb_websockets/websockets_router.py index dd7672cdbe7a..1642f8917aef 100644 --- a/openbb_platform/extensions/websockets/openbb_websockets/websockets_router.py +++ b/openbb_platform/extensions/websockets/openbb_websockets/websockets_router.py @@ -80,9 +80,6 @@ async def create_connection( obbject = await OBBject.from_query(Query(**locals())) client = obbject.results.client - # pylint: disable=import-outside-toplevel - import asyncio - await asyncio.sleep(1) if not client.is_running or client._exception is not None: diff --git a/openbb_platform/providers/intrinio/openbb_intrinio/utils/websocket_client.py b/openbb_platform/providers/intrinio/openbb_intrinio/utils/websocket_client.py index 2743acaeaf2a..0afd5dadcf94 100644 --- a/openbb_platform/providers/intrinio/openbb_intrinio/utils/websocket_client.py +++ b/openbb_platform/providers/intrinio/openbb_intrinio/utils/websocket_client.py @@ -124,14 +124,14 @@ async def process_stdin_queue(): async def connect_and_stream(): """Connect to the WebSocket and stream data to file.""" + stdin_task = asyncio.create_task(read_stdin_and_queue_commands()) + process_stdin_task = asyncio.create_task(process_stdin_queue()) try: symbol = kwargs.pop("symbol", "lobby") symbol = ["lobby"] if "*" in symbol else symbol.split(",") - stdin_task = asyncio.create_task(read_stdin_and_queue_commands()) await DATABASE.start_writer() client.connect() client.join(symbol) - process_stdin_task = asyncio.create_task(process_stdin_queue()) finally: stdin_task.cancel() process_stdin_task.cancel() diff --git a/openbb_platform/providers/polygon/openbb_polygon/models/websocket_connection.py b/openbb_platform/providers/polygon/openbb_polygon/models/websocket_connection.py index 0a8b0a085fb2..d716ad481fbb 100644 --- a/openbb_platform/providers/polygon/openbb_polygon/models/websocket_connection.py +++ b/openbb_platform/providers/polygon/openbb_polygon/models/websocket_connection.py @@ -1198,6 +1198,7 @@ def extract_data( **kwargs: Any, ) -> dict: """Extract data from the WebSocket.""" + # pylint: disable=import-outside-toplevel import time api_key = credentials.get("polygon_api_key") if credentials else "" diff --git a/openbb_platform/providers/polygon/openbb_polygon/utils/websocket_client.py b/openbb_platform/providers/polygon/openbb_polygon/utils/websocket_client.py index dea05d4df59e..d73ab6352b4d 100644 --- a/openbb_platform/providers/polygon/openbb_polygon/utils/websocket_client.py +++ b/openbb_platform/providers/polygon/openbb_polygon/utils/websocket_client.py @@ -457,8 +457,8 @@ def _process_in_thread(): loop.run_forever() except (websockets.ConnectionClosed, websockets.ConnectionClosedError) as e: - msg = f"PROVIDER INFO: The WebSocket connection was closed -> {e}" - logger.info(msg) + MSG = f"PROVIDER INFO: The WebSocket connection was closed -> {e}" + logger.info(MSG) # Attempt to reopen the connection logger.info("PROVIDER INFO: Attempting to reconnect...") time.sleep(1)