Skip to content

Commit

Permalink
Fix: Script crash because of unhandled errors
Browse files Browse the repository at this point in the history
Some of the request returned an error which wasn't checked
This result to the end of execution of the script

Solution: Adding a check about Exception and add
the possibility to debug inside the logger
  • Loading branch information
Antonyjin committed Jun 3, 2024
1 parent 6215eaf commit 60a9a9c
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 15 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ install_requires =
pygments==2.16.1
rich==13.6.0
aiodns==3.1.1
psutil==5.9.8
# The usage of test_requires is discouraged, see `Dependency Management` docs
# tests_require = pytest; pytest-cov
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
Expand Down
117 changes: 102 additions & 15 deletions src/aleph_client/commands/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import aiohttp
import asyncio

import aiohttp.client_exceptions
import async_timeout
import typer
from aleph.sdk.types import GenericMessage
Expand All @@ -21,6 +22,9 @@
from rich.progress import Progress, TaskID
from typer import echo
from aleph_client.conf import settings
from aleph_message.models.execution.environment import CpuProperties
from pydantic import BaseModel
import psutil

def colorful_json(obj: str):
"""Render a JSON string with colors."""
Expand Down Expand Up @@ -210,6 +214,7 @@ def is_environment_interactive() -> bool:
)
)

logger = logging.getLogger(__name__)

from aleph_client.commands.node import NodeInfo, _fetch_nodes, _escape_and_normalize, _remove_ansi_escape, _format_score

Expand All @@ -223,6 +228,64 @@ def __rich_console__(self, console, options):
yield self.progress
yield self.table

# This is a copy from aleph-vm

class LoadAverage(BaseModel):
load1: float
load5: float
load15: float

@classmethod
def from_psutil(cls, psutil_loadavg: tuple[float, float, float]):
return cls(
load1=psutil_loadavg[0],
load5=psutil_loadavg[1],
load15=psutil_loadavg[2],
)

class CoreFrequencies(BaseModel):
min: float
max: float

@classmethod
def from_psutil(cls, psutil_freq: psutil._common.scpufreq):
min_ = psutil_freq.min or psutil_freq.current
max_ = psutil_freq.max or psutil_freq.current
return cls(min=min_, max=max_)

class CpuUsage(BaseModel):
count: int
load_average: LoadAverage
core_frequencies: CoreFrequencies


class MemoryUsage(BaseModel):
total_kB: int
available_kB: int


class DiskUsage(BaseModel):
total_kB: int
available_kB: int


class UsagePeriod(BaseModel):
start_timestamp: datetime
duration_seconds: float


class MachineProperties(BaseModel):
cpu: CpuProperties


class MachineUsage(BaseModel):
cpu: CpuUsage
mem: MemoryUsage
disk: DiskUsage
period: UsagePeriod
properties: MachineProperties
active: bool = True


async def fetch_crn_info():
node_info = await _fetch_nodes()
Expand Down Expand Up @@ -253,19 +316,22 @@ async def fetch_crn_info():

return item_hashes

async def fetch_data(session: aiohttp.ClientSession, node_info: NodeInfo, queue: asyncio.Queue, progress: Progress, task: TaskID, item_hashes: list):

async def fetch_data(session: aiohttp.ClientSession, node_info: NodeInfo, queue: asyncio.Queue[MachineUsage], progress: Progress, task: TaskID, item_hashes: list):
tasks = [fetch_and_queue(session, node, queue, progress, task, item_hashes) for node in node_info.nodes]
await asyncio.gather(*tasks)
await queue.put(None)

async def fetch_and_queue(session: aiohttp.ClientSession, node: NodeInfo, queue: asyncio.Queue, progress: Progress, task: TaskID, item_hashes: list):
url: str = node["address"].rstrip('/') + '/status/check/ipv6'

try:
system_info, version = await asyncio.gather(
fetch_crn_system(session, node),
get_crn_version(session, node)
)
# print(system_info.architecture)
async with async_timeout.timeout(settings.HTTP_REQUEST_TIMEOUT + settings.HTTP_REQUEST_TIMEOUT * 0.3 * random()):
url: str = node["address"].rstrip('/') + '/status/check/ipv6'
async with session.get(url) as resp:
resp.raise_for_status()
node_stream = node["stream_reward"]
Expand All @@ -274,13 +340,20 @@ async def fetch_and_queue(session: aiohttp.ClientSession, node: NodeInfo, queue:
node_name = _remove_ansi_escape(node_name)
node_address = node["address"]
score = _format_score(node["score"])
cpu = f"{system_info['cpu']['count']} {system_info['properties']['cpu']['architecture']}"
hdd = f"{system_info['disk']['available_kB'] / 1024 / 1024:.2f} GB"
ram = f"{system_info['mem']['available_kB'] / 1024 / 1024:.2f} GB"
# cpu = f"{system_info['cpu']['count']} {system_info['properties']['cpu']['architecture']}"
# hdd = f"{system_info['disk']['available_kB'] / 1024 / 1024:.2f} GB"
# ram = f"{system_info['mem']['available_kB'] / 1024 / 1024:.2f} GB"
cpu = "TESTING"
hdd = "TESTING"
ram = "TESTING"
await queue.put((score, node_name, cpu, ram, hdd, version, node_stream, node_address))
item_hashes.append(node_stream)
except TimeoutError:
logger.debug(f'Timeout while fetching: {url}')
except aiohttp.client_exceptions.ClientConnectionError:
logger.debug(f'Error on connection: {url}')
except Exception as e:
pass
logger.debug(f'This error occured: {e}')
finally:
progress.update(task, advance=1)

Expand All @@ -291,28 +364,42 @@ async def update_table(queue: asyncio.Queue, table: Table):
break
table.add_row(*data)

async def fetch_crn_system(session: aiohttp.ClientSession, node: NodeInfo) -> str:
async def fetch_crn_system(session: aiohttp.ClientSession, node: NodeInfo) -> CpuProperties:
data = None

try:
async with async_timeout.timeout(settings.HTTP_REQUEST_TIMEOUT + settings.HTTP_REQUEST_TIMEOUT * 0.3 * random()):
url: str = node["address"].rstrip('/') + '/about/usage/system'
async with session.get(url) as resp:
resp.raise_for_status()
data = await resp.json()
data_raw = await resp.json()
data = MachineUsage.parse_obj(data_raw)
# print("data:", data)
except TimeoutError:
logger.debug(f'Timeout while fetching: {url}')
except aiohttp.client_exceptions.ClientConnectionError:
logger.debug(f'Error on connection: {url}')
except Exception as e:
data = None
logger.debug(f'This error occured: {e}')
return data

async def get_crn_version(session: aiohttp.ClientSession, node: NodeInfo) -> str:
version = None
url = node["address"]
version = "Can't fetch the version"

try:
async with async_timeout.timeout(3 * settings.HTTP_REQUEST_TIMEOUT + 3 * settings.HTTP_REQUEST_TIMEOUT * 0.3 * random()):
async with session.get(node["address"]) as resp:
async with session.get(url) as resp:
resp.raise_for_status()
if "Server" in resp.headers:
for server in resp.headers.getall("Server"):
version_match = re.findall(r"^aleph-vm/(.*)$", server)
if version_match and version_match[0]:
return version_match[0]
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
return "Can't fetch the version"
return version or "Can't fetch the version"
version = version_match[0]
except (asyncio.TimeoutError):
logger.debug(f'Timeout while fetching: {url}')
except aiohttp.client_exceptions.ClientConnectionError:
logger.debug(f'Error on connection: {url}')
except Exception as e:
logger.debug(f'This error occured: {e}')
return version

0 comments on commit 60a9a9c

Please sign in to comment.