From 60a9a9c81122ab38dee1c1b6952f1fbb16fd7c7e Mon Sep 17 00:00:00 2001 From: Antonyjin Date: Mon, 3 Jun 2024 16:23:31 +0200 Subject: [PATCH] Fix: Script crash because of unhandled errors Some of the request returned an error which wasn't checked This result to the end of execution of the script Solution: Adding a check about Exception and add the possibility to debug inside the logger --- setup.cfg | 1 + src/aleph_client/commands/utils.py | 117 +++++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 15 deletions(-) diff --git a/setup.cfg b/setup.cfg index fe2c4e5c..b521f71d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,6 +42,7 @@ install_requires = pygments==2.16.1 rich==13.6.0 aiodns==3.1.1 + psutil==5.9.8 # The usage of test_requires is discouraged, see `Dependency Management` docs # tests_require = pytest; pytest-cov # Require a specific Python version, e.g. Python 2.7 or >= 3.4 diff --git a/src/aleph_client/commands/utils.py b/src/aleph_client/commands/utils.py index 3fc1cf06..b537f251 100644 --- a/src/aleph_client/commands/utils.py +++ b/src/aleph_client/commands/utils.py @@ -8,6 +8,7 @@ import aiohttp import asyncio +import aiohttp.client_exceptions import async_timeout import typer from aleph.sdk.types import GenericMessage @@ -21,6 +22,9 @@ from rich.progress import Progress, TaskID from typer import echo from aleph_client.conf import settings +from aleph_message.models.execution.environment import CpuProperties +from pydantic import BaseModel +import psutil def colorful_json(obj: str): """Render a JSON string with colors.""" @@ -210,6 +214,7 @@ def is_environment_interactive() -> bool: ) ) +logger = logging.getLogger(__name__) from aleph_client.commands.node import NodeInfo, _fetch_nodes, _escape_and_normalize, _remove_ansi_escape, _format_score @@ -223,6 +228,64 @@ def __rich_console__(self, console, options): yield self.progress yield self.table +# This is a copy from aleph-vm + +class LoadAverage(BaseModel): + load1: float + load5: float + load15: float + + @classmethod + def from_psutil(cls, psutil_loadavg: tuple[float, float, float]): + return cls( + load1=psutil_loadavg[0], + load5=psutil_loadavg[1], + load15=psutil_loadavg[2], + ) + +class CoreFrequencies(BaseModel): + min: float + max: float + + @classmethod + def from_psutil(cls, psutil_freq: psutil._common.scpufreq): + min_ = psutil_freq.min or psutil_freq.current + max_ = psutil_freq.max or psutil_freq.current + return cls(min=min_, max=max_) + +class CpuUsage(BaseModel): + count: int + load_average: LoadAverage + core_frequencies: CoreFrequencies + + +class MemoryUsage(BaseModel): + total_kB: int + available_kB: int + + +class DiskUsage(BaseModel): + total_kB: int + available_kB: int + + +class UsagePeriod(BaseModel): + start_timestamp: datetime + duration_seconds: float + + +class MachineProperties(BaseModel): + cpu: CpuProperties + + +class MachineUsage(BaseModel): + cpu: CpuUsage + mem: MemoryUsage + disk: DiskUsage + period: UsagePeriod + properties: MachineProperties + active: bool = True + async def fetch_crn_info(): node_info = await _fetch_nodes() @@ -253,19 +316,22 @@ async def fetch_crn_info(): return item_hashes -async def fetch_data(session: aiohttp.ClientSession, node_info: NodeInfo, queue: asyncio.Queue, progress: Progress, task: TaskID, item_hashes: list): + +async def fetch_data(session: aiohttp.ClientSession, node_info: NodeInfo, queue: asyncio.Queue[MachineUsage], progress: Progress, task: TaskID, item_hashes: list): tasks = [fetch_and_queue(session, node, queue, progress, task, item_hashes) for node in node_info.nodes] await asyncio.gather(*tasks) await queue.put(None) async def fetch_and_queue(session: aiohttp.ClientSession, node: NodeInfo, queue: asyncio.Queue, progress: Progress, task: TaskID, item_hashes: list): + url: str = node["address"].rstrip('/') + '/status/check/ipv6' + try: system_info, version = await asyncio.gather( fetch_crn_system(session, node), get_crn_version(session, node) ) + # print(system_info.architecture) async with async_timeout.timeout(settings.HTTP_REQUEST_TIMEOUT + settings.HTTP_REQUEST_TIMEOUT * 0.3 * random()): - url: str = node["address"].rstrip('/') + '/status/check/ipv6' async with session.get(url) as resp: resp.raise_for_status() node_stream = node["stream_reward"] @@ -274,13 +340,20 @@ async def fetch_and_queue(session: aiohttp.ClientSession, node: NodeInfo, queue: node_name = _remove_ansi_escape(node_name) node_address = node["address"] score = _format_score(node["score"]) - cpu = f"{system_info['cpu']['count']} {system_info['properties']['cpu']['architecture']}" - hdd = f"{system_info['disk']['available_kB'] / 1024 / 1024:.2f} GB" - ram = f"{system_info['mem']['available_kB'] / 1024 / 1024:.2f} GB" + # cpu = f"{system_info['cpu']['count']} {system_info['properties']['cpu']['architecture']}" + # hdd = f"{system_info['disk']['available_kB'] / 1024 / 1024:.2f} GB" + # ram = f"{system_info['mem']['available_kB'] / 1024 / 1024:.2f} GB" + cpu = "TESTING" + hdd = "TESTING" + ram = "TESTING" await queue.put((score, node_name, cpu, ram, hdd, version, node_stream, node_address)) item_hashes.append(node_stream) + except TimeoutError: + logger.debug(f'Timeout while fetching: {url}') + except aiohttp.client_exceptions.ClientConnectionError: + logger.debug(f'Error on connection: {url}') except Exception as e: - pass + logger.debug(f'This error occured: {e}') finally: progress.update(task, advance=1) @@ -291,28 +364,42 @@ async def update_table(queue: asyncio.Queue, table: Table): break table.add_row(*data) -async def fetch_crn_system(session: aiohttp.ClientSession, node: NodeInfo) -> str: +async def fetch_crn_system(session: aiohttp.ClientSession, node: NodeInfo) -> CpuProperties: + data = None + try: async with async_timeout.timeout(settings.HTTP_REQUEST_TIMEOUT + settings.HTTP_REQUEST_TIMEOUT * 0.3 * random()): url: str = node["address"].rstrip('/') + '/about/usage/system' async with session.get(url) as resp: resp.raise_for_status() - data = await resp.json() + data_raw = await resp.json() + data = MachineUsage.parse_obj(data_raw) + # print("data:", data) + except TimeoutError: + logger.debug(f'Timeout while fetching: {url}') + except aiohttp.client_exceptions.ClientConnectionError: + logger.debug(f'Error on connection: {url}') except Exception as e: - data = None + logger.debug(f'This error occured: {e}') return data async def get_crn_version(session: aiohttp.ClientSession, node: NodeInfo) -> str: - version = None + url = node["address"] + version = "Can't fetch the version" + try: async with async_timeout.timeout(3 * settings.HTTP_REQUEST_TIMEOUT + 3 * settings.HTTP_REQUEST_TIMEOUT * 0.3 * random()): - async with session.get(node["address"]) as resp: + async with session.get(url) as resp: resp.raise_for_status() if "Server" in resp.headers: for server in resp.headers.getall("Server"): version_match = re.findall(r"^aleph-vm/(.*)$", server) if version_match and version_match[0]: - return version_match[0] - except (aiohttp.ClientError, asyncio.TimeoutError) as e: - return "Can't fetch the version" - return version or "Can't fetch the version" \ No newline at end of file + version = version_match[0] + except (asyncio.TimeoutError): + logger.debug(f'Timeout while fetching: {url}') + except aiohttp.client_exceptions.ClientConnectionError: + logger.debug(f'Error on connection: {url}') + except Exception as e: + logger.debug(f'This error occured: {e}') + return version \ No newline at end of file