From e14a7f8289031080cc702d24b63e9435a28a9a3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Fontecha=20Guada=C3=B1o?= Date: Mon, 1 Apr 2024 08:05:57 +0000 Subject: [PATCH] Feature/solve list timeout --- inesdata_mov_datasets/sources/create/emt.py | 1 + .../sources/extract/aemet.py | 6 +- inesdata_mov_datasets/sources/extract/emt.py | 27 +++- .../sources/extract/informo.py | 8 +- inesdata_mov_datasets/utils.py | 129 +++++++++++++----- 5 files changed, 134 insertions(+), 37 deletions(-) diff --git a/inesdata_mov_datasets/sources/create/emt.py b/inesdata_mov_datasets/sources/create/emt.py index cd7412f..9fa4e12 100644 --- a/inesdata_mov_datasets/sources/create/emt.py +++ b/inesdata_mov_datasets/sources/create/emt.py @@ -302,6 +302,7 @@ def create_eta_emt(settings: Settings, date: str) -> pd.DataFrame: storage_config = settings.storage.config storage_path = storage_config.local.path # tmpdirname if settings.storage.default != "local": + async_download( bucket=storage_config.minio.bucket, prefix=f"raw/emt/{date}/eta/", diff --git a/inesdata_mov_datasets/sources/extract/aemet.py b/inesdata_mov_datasets/sources/extract/aemet.py index eb4d736..d59e0e9 100644 --- a/inesdata_mov_datasets/sources/extract/aemet.py +++ b/inesdata_mov_datasets/sources/extract/aemet.py @@ -4,6 +4,7 @@ import json import traceback from pathlib import Path +import pytz import requests from loguru import logger @@ -55,7 +56,10 @@ async def save_aemet(config: Settings, data: json): config (Settings): Object with the config file. data (json): Data with weather in json format. """ - current_datetime = datetime.datetime.now().replace(second=0) # current date without seconds + + # Get the timezone from Madrid and formated the dates for the object_name of the files + europe_timezone = pytz.timezone("Europe/Madrid") + current_datetime = datetime.datetime.now(europe_timezone).replace(second=0) formatted_date_day = current_datetime.strftime( "%Y%m%d" ) # formatted date year|month|day all together diff --git a/inesdata_mov_datasets/sources/extract/emt.py b/inesdata_mov_datasets/sources/extract/emt.py index 4f99116..1cf1ed7 100644 --- a/inesdata_mov_datasets/sources/extract/emt.py +++ b/inesdata_mov_datasets/sources/extract/emt.py @@ -18,6 +18,7 @@ check_s3_file_exists, read_obj, upload_objs, + upload_metadata ) @@ -528,6 +529,10 @@ async def get_emt(config: Settings): # Upload the dict to s3 asynchronously if dict contains something (This means minio flag in convig was enabled) if eta_dict_upload: + #List of str names of objects uploaded into s3 + list_keys_str = [str(key.parent) + '/' + str(key.name) for key in eta_dict_upload] + logger.debug(f"Uploading {len(list_keys_str)} files") + await upload_objs( config.storage.config.minio.bucket, config.storage.config.minio.endpoint, @@ -535,14 +540,22 @@ async def get_emt(config: Settings): config.storage.config.minio.secret_key, eta_dict_upload, ) + + upload_metadata( + config.storage.config.minio.bucket, + config.storage.config.minio.endpoint, + config.storage.config.minio.access_key, + config.storage.config.minio.secret_key, + list_keys_str + ) + logger.error(f"{errors_ld} errors in Line Detail") logger.error(f"{errors_eta} errors in ETA, list of stops erroring: {list_stops_error}") eta_dict_upload = {} - + # Retry the failed petitions if errors_eta > 0: - eta_dict_upload = {} list_stops_error_retry = [] eta_tasks2 = [] errors_eta_retry = 0 @@ -602,6 +615,9 @@ async def get_emt(config: Settings): # Upload the dict to s3 asynchronously if dict contains something (This means minio flag in convig was enabled) if eta_dict_upload: + #List of str names of objects uploaded into s3 + list_keys_str = [str(key.parent) + '/' + str(key.name) for key in eta_dict_upload] + logger.debug(f"Uploading {len(list_keys_str)} files") await upload_objs( config.storage.config.minio.bucket, config.storage.config.minio.endpoint, @@ -609,6 +625,13 @@ async def get_emt(config: Settings): config.storage.config.minio.secret_key, eta_dict_upload, ) + upload_metadata( + config.storage.config.minio.bucket, + config.storage.config.minio.endpoint, + config.storage.config.minio.access_key, + config.storage.config.minio.secret_key, + list_keys_str + ) end = datetime.datetime.now() logger.debug(f"Time duration of EMT extraction {end - now}") diff --git a/inesdata_mov_datasets/sources/extract/informo.py b/inesdata_mov_datasets/sources/extract/informo.py index 149b70d..b83b5e7 100644 --- a/inesdata_mov_datasets/sources/extract/informo.py +++ b/inesdata_mov_datasets/sources/extract/informo.py @@ -3,6 +3,7 @@ import json import traceback from pathlib import Path +import pytz import requests import xmltodict @@ -50,11 +51,14 @@ async def save_informo(config: Settings, data: json): # Get the last update date from the response date_from_file = data["pms"]["fecha_hora"] dt = datetime.datetime.strptime(date_from_file, "%d/%m/%Y %H:%M:%S") + - # Formatear el objeto datetime en el formato deseado + # Format date formated_date = dt.strftime("%Y-%m-%dT%H%M") - current_datetime = datetime.datetime.now().replace(second=0) # current date without seconds + # Get the timezone from Madrid and formated the dates for the object_name of the files + europe_timezone = pytz.timezone("Europe/Madrid") + current_datetime = datetime.datetime.now(europe_timezone).replace(second=0) formatted_date_slash = current_datetime.strftime( "%Y/%m/%d" diff --git a/inesdata_mov_datasets/utils.py b/inesdata_mov_datasets/utils.py index b80906b..6cb9a8a 100644 --- a/inesdata_mov_datasets/utils.py +++ b/inesdata_mov_datasets/utils.py @@ -2,13 +2,44 @@ import asyncio import os from pathlib import Path - +import botocore +from botocore.client import Config as BotoConfig import aiofiles.os import yaml from aiobotocore.session import ClientCreatorContext, get_session +from loguru import logger from inesdata_mov_datasets.settings import Settings +def list_objs(bucket: str, prefix: str, endpoint_url: str, aws_secret_access_key: str, aws_access_key_id: str) -> list: + """List objects from s3 bucket. + + Args: + bucket (str): Name of the bucket. + prefix (str): Prefix to list. + endpoint_url (str): url of minio bucket + aws_access_key_id (str): minio user + aws_secret_access_key (str): minio password + + Returns: + list: List of the objects listed. + """ + + session = botocore.session.get_session() + client = session.create_client( + "s3", + endpoint_url=endpoint_url, + aws_secret_access_key=aws_secret_access_key, + aws_access_key_id=aws_access_key_id, + ) + + paginator = client.get_paginator("list_objects_v2") + keys = [] + for result in paginator.paginate(Bucket=bucket, Prefix=prefix): + for c in result.get("Contents", []): + keys.append(c.get("Key")) + + return keys def async_download( bucket: str, @@ -36,26 +67,6 @@ def async_download( ) -async def list_objs(client: ClientCreatorContext, bucket: str, prefix: str) -> list: - """List objects from s3 bucket. - - Args: - client (ClientCreatorContext): Client with s3 connection. - bucket (str): Name of the bucket. - prefix (str): Prefix to list. - - Returns: - list: List of the objects listed. - """ - paginator = client.get_paginator("list_objects") - keys = [] - async for result in paginator.paginate(Bucket=bucket, Prefix=prefix): - for c in result.get("Contents", []): - keys.append(c.get("Key")) - - return keys - - async def get_obj(client: ClientCreatorContext, bucket: str, key: str) -> str: """Get an object from s3. @@ -72,7 +83,9 @@ async def get_obj(client: ClientCreatorContext, bucket: str, key: str) -> str: return obj -async def download_obj(client: ClientCreatorContext, bucket: str, key: str, output_path: str): +async def download_obj( + client: ClientCreatorContext, bucket: str, key: str, output_path: str, semaphore=None +): """Download object from s3. Args: @@ -81,11 +94,12 @@ async def download_obj(client: ClientCreatorContext, bucket: str, key: str, outp key (str): Object to request. output_path (str): Local path to store output from minio. """ - await aiofiles.os.makedirs(os.path.dirname(os.path.join(output_path, key)), exist_ok=True) - obj = await get_obj(client, bucket, key) + async with semaphore: + await aiofiles.os.makedirs(os.path.dirname(os.path.join(output_path, key)), exist_ok=True) + obj = await get_obj(client, bucket, key) - async with aiofiles.open(os.path.join(output_path, key), "w") as out: - await out.write(obj.decode()) + async with aiofiles.open(os.path.join(output_path, key), "w") as out: + await out.write(obj.decode()) async def download_objs( @@ -113,11 +127,31 @@ async def download_objs( aws_secret_access_key=aws_secret_access_key, aws_access_key_id=aws_access_key_id, ) as client: - keys = await list_objs(client, bucket, prefix) - - tasks = [download_obj(client, bucket, key, output_path) for key in keys] - - await asyncio.gather(*tasks) + + logger.debug("Downloading files from s3") + + if "/eta" in prefix: + metadata_path = prefix + "metadata.txt" + + response = await client.get_object(Bucket = bucket, Key = metadata_path) + async with response['Body'] as stream: + keys = await stream.read() + keys = keys.decode('utf-8') + + semaphore = asyncio.BoundedSemaphore(10000) + keys_list = keys.split('\n') + logger.debug(f"Downloading {len(keys_list)} files from emt endpoint") + tasks = [download_obj(client, bucket, key, output_path, semaphore) for key in keys_list] + + + await asyncio.gather(*tasks) + + else: + keys = list_objs(bucket, prefix, endpoint_url, aws_secret_access_key, aws_access_key_id) + semaphore = asyncio.BoundedSemaphore(10000) + tasks = [download_obj(client, bucket, key, output_path, semaphore) for key in keys] + + await asyncio.gather(*tasks) async def read_obj( @@ -163,7 +197,38 @@ async def upload_obj(client: ClientCreatorContext, bucket: str, key: str, object """ await client.put_object(Bucket=bucket, Key=str(key), Body=object_value.encode("utf-8")) - +def upload_metadata( + bucket: str, + endpoint_url: str, + aws_access_key_id: str, + aws_secret_access_key: str, + keys: list +): + session = botocore.session.get_session() + client = session.create_client( + "s3", + endpoint_url=endpoint_url, + aws_secret_access_key=aws_secret_access_key, + aws_access_key_id=aws_access_key_id, + use_ssl=False, #TODO Change all s3 function connections to manage the ssl by config parameter + ) + + #Get the prefix of the metadata from the first name of the object from the keys list + prefix = "/".join(keys[0].split('/')[:-1]) + '/metadata.txt' + try: + #If file exists in the bucket + response = client.get_object(Bucket=bucket, Key=prefix) + #Get the previous content of the file + content = response['Body'].read().decode('utf-8') + #add the new names of files written + new_content = content + '\n' + '\n'.join(keys) + except : + #if metadata file does not exist (first execution of the day) + new_content = '\n'.join(keys) + + # upload s3 + client.put_object(Bucket=bucket, Key=prefix, Body=new_content.encode('utf-8')) + async def upload_objs( bucket: str, endpoint_url: str,