Skip to content

Commit

Permalink
Merge branch 'feature/solve_list_timeout' into 'develop'
Browse files Browse the repository at this point in the history
Feature/solve list timeout

See merge request gmv-bda/upm/inesdata-mov/data-generation!29
  • Loading branch information
Javier Fontecha Guadaño committed Apr 1, 2024
2 parents 7ebdcc1 + e14a7f8 commit 55679b9
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 37 deletions.
1 change: 1 addition & 0 deletions inesdata_mov_datasets/sources/create/emt.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def create_eta_emt(settings: Settings, date: str) -> pd.DataFrame:
storage_config = settings.storage.config
storage_path = storage_config.local.path # tmpdirname
if settings.storage.default != "local":

async_download(
bucket=storage_config.minio.bucket,
prefix=f"raw/emt/{date}/eta/",
Expand Down
6 changes: 5 additions & 1 deletion inesdata_mov_datasets/sources/extract/aemet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import traceback
from pathlib import Path
import pytz

import requests
from loguru import logger
Expand Down Expand Up @@ -55,7 +56,10 @@ async def save_aemet(config: Settings, data: json):
config (Settings): Object with the config file.
data (json): Data with weather in json format.
"""
current_datetime = datetime.datetime.now().replace(second=0) # current date without seconds

# Get the timezone from Madrid and formated the dates for the object_name of the files
europe_timezone = pytz.timezone("Europe/Madrid")
current_datetime = datetime.datetime.now(europe_timezone).replace(second=0)
formatted_date_day = current_datetime.strftime(
"%Y%m%d"
) # formatted date year|month|day all together
Expand Down
27 changes: 25 additions & 2 deletions inesdata_mov_datasets/sources/extract/emt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
check_s3_file_exists,
read_obj,
upload_objs,
upload_metadata
)


Expand Down Expand Up @@ -528,21 +529,33 @@ async def get_emt(config: Settings):

# Upload the dict to s3 asynchronously if dict contains something (This means minio flag in convig was enabled)
if eta_dict_upload:
#List of str names of objects uploaded into s3
list_keys_str = [str(key.parent) + '/' + str(key.name) for key in eta_dict_upload]
logger.debug(f"Uploading {len(list_keys_str)} files")

await upload_objs(
config.storage.config.minio.bucket,
config.storage.config.minio.endpoint,
config.storage.config.minio.access_key,
config.storage.config.minio.secret_key,
eta_dict_upload,
)

upload_metadata(
config.storage.config.minio.bucket,
config.storage.config.minio.endpoint,
config.storage.config.minio.access_key,
config.storage.config.minio.secret_key,
list_keys_str
)


logger.error(f"{errors_ld} errors in Line Detail")
logger.error(f"{errors_eta} errors in ETA, list of stops erroring: {list_stops_error}")
eta_dict_upload = {}

# Retry the failed petitions
if errors_eta > 0:
eta_dict_upload = {}
list_stops_error_retry = []
eta_tasks2 = []
errors_eta_retry = 0
Expand Down Expand Up @@ -602,13 +615,23 @@ async def get_emt(config: Settings):

# Upload the dict to s3 asynchronously if dict contains something (This means minio flag in convig was enabled)
if eta_dict_upload:
#List of str names of objects uploaded into s3
list_keys_str = [str(key.parent) + '/' + str(key.name) for key in eta_dict_upload]
logger.debug(f"Uploading {len(list_keys_str)} files")
await upload_objs(
config.storage.config.minio.bucket,
config.storage.config.minio.endpoint,
config.storage.config.minio.access_key,
config.storage.config.minio.secret_key,
eta_dict_upload,
)
upload_metadata(
config.storage.config.minio.bucket,
config.storage.config.minio.endpoint,
config.storage.config.minio.access_key,
config.storage.config.minio.secret_key,
list_keys_str
)

end = datetime.datetime.now()
logger.debug(f"Time duration of EMT extraction {end - now}")
Expand Down
8 changes: 6 additions & 2 deletions inesdata_mov_datasets/sources/extract/informo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import traceback
from pathlib import Path
import pytz

import requests
import xmltodict
Expand Down Expand Up @@ -50,11 +51,14 @@ async def save_informo(config: Settings, data: json):
# Get the last update date from the response
date_from_file = data["pms"]["fecha_hora"]
dt = datetime.datetime.strptime(date_from_file, "%d/%m/%Y %H:%M:%S")


# Formatear el objeto datetime en el formato deseado
# Format date
formated_date = dt.strftime("%Y-%m-%dT%H%M")

current_datetime = datetime.datetime.now().replace(second=0) # current date without seconds
# Get the timezone from Madrid and formated the dates for the object_name of the files
europe_timezone = pytz.timezone("Europe/Madrid")
current_datetime = datetime.datetime.now(europe_timezone).replace(second=0)

formatted_date_slash = current_datetime.strftime(
"%Y/%m/%d"
Expand Down
129 changes: 97 additions & 32 deletions inesdata_mov_datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,44 @@
import asyncio
import os
from pathlib import Path

import botocore
from botocore.client import Config as BotoConfig
import aiofiles.os
import yaml
from aiobotocore.session import ClientCreatorContext, get_session
from loguru import logger

from inesdata_mov_datasets.settings import Settings

def list_objs(bucket: str, prefix: str, endpoint_url: str, aws_secret_access_key: str, aws_access_key_id: str) -> list:
"""List objects from s3 bucket.
Args:
bucket (str): Name of the bucket.
prefix (str): Prefix to list.
endpoint_url (str): url of minio bucket
aws_access_key_id (str): minio user
aws_secret_access_key (str): minio password
Returns:
list: List of the objects listed.
"""

session = botocore.session.get_session()
client = session.create_client(
"s3",
endpoint_url=endpoint_url,
aws_secret_access_key=aws_secret_access_key,
aws_access_key_id=aws_access_key_id,
)

paginator = client.get_paginator("list_objects_v2")
keys = []
for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get("Contents", []):
keys.append(c.get("Key"))

return keys

def async_download(
bucket: str,
Expand Down Expand Up @@ -36,26 +67,6 @@ def async_download(
)


async def list_objs(client: ClientCreatorContext, bucket: str, prefix: str) -> list:
"""List objects from s3 bucket.
Args:
client (ClientCreatorContext): Client with s3 connection.
bucket (str): Name of the bucket.
prefix (str): Prefix to list.
Returns:
list: List of the objects listed.
"""
paginator = client.get_paginator("list_objects")
keys = []
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get("Contents", []):
keys.append(c.get("Key"))

return keys


async def get_obj(client: ClientCreatorContext, bucket: str, key: str) -> str:
"""Get an object from s3.
Expand All @@ -72,7 +83,9 @@ async def get_obj(client: ClientCreatorContext, bucket: str, key: str) -> str:
return obj


async def download_obj(client: ClientCreatorContext, bucket: str, key: str, output_path: str):
async def download_obj(
client: ClientCreatorContext, bucket: str, key: str, output_path: str, semaphore=None
):
"""Download object from s3.
Args:
Expand All @@ -81,11 +94,12 @@ async def download_obj(client: ClientCreatorContext, bucket: str, key: str, outp
key (str): Object to request.
output_path (str): Local path to store output from minio.
"""
await aiofiles.os.makedirs(os.path.dirname(os.path.join(output_path, key)), exist_ok=True)
obj = await get_obj(client, bucket, key)
async with semaphore:
await aiofiles.os.makedirs(os.path.dirname(os.path.join(output_path, key)), exist_ok=True)
obj = await get_obj(client, bucket, key)

async with aiofiles.open(os.path.join(output_path, key), "w") as out:
await out.write(obj.decode())
async with aiofiles.open(os.path.join(output_path, key), "w") as out:
await out.write(obj.decode())


async def download_objs(
Expand Down Expand Up @@ -113,11 +127,31 @@ async def download_objs(
aws_secret_access_key=aws_secret_access_key,
aws_access_key_id=aws_access_key_id,
) as client:
keys = await list_objs(client, bucket, prefix)

tasks = [download_obj(client, bucket, key, output_path) for key in keys]

await asyncio.gather(*tasks)

logger.debug("Downloading files from s3")

if "/eta" in prefix:
metadata_path = prefix + "metadata.txt"

response = await client.get_object(Bucket = bucket, Key = metadata_path)
async with response['Body'] as stream:
keys = await stream.read()
keys = keys.decode('utf-8')

semaphore = asyncio.BoundedSemaphore(10000)
keys_list = keys.split('\n')
logger.debug(f"Downloading {len(keys_list)} files from emt endpoint")
tasks = [download_obj(client, bucket, key, output_path, semaphore) for key in keys_list]


await asyncio.gather(*tasks)

else:
keys = list_objs(bucket, prefix, endpoint_url, aws_secret_access_key, aws_access_key_id)
semaphore = asyncio.BoundedSemaphore(10000)
tasks = [download_obj(client, bucket, key, output_path, semaphore) for key in keys]

await asyncio.gather(*tasks)


async def read_obj(
Expand Down Expand Up @@ -163,7 +197,38 @@ async def upload_obj(client: ClientCreatorContext, bucket: str, key: str, object
"""
await client.put_object(Bucket=bucket, Key=str(key), Body=object_value.encode("utf-8"))


def upload_metadata(
bucket: str,
endpoint_url: str,
aws_access_key_id: str,
aws_secret_access_key: str,
keys: list
):
session = botocore.session.get_session()
client = session.create_client(
"s3",
endpoint_url=endpoint_url,
aws_secret_access_key=aws_secret_access_key,
aws_access_key_id=aws_access_key_id,
use_ssl=False, #TODO Change all s3 function connections to manage the ssl by config parameter
)

#Get the prefix of the metadata from the first name of the object from the keys list
prefix = "/".join(keys[0].split('/')[:-1]) + '/metadata.txt'
try:
#If file exists in the bucket
response = client.get_object(Bucket=bucket, Key=prefix)
#Get the previous content of the file
content = response['Body'].read().decode('utf-8')
#add the new names of files written
new_content = content + '\n' + '\n'.join(keys)
except :
#if metadata file does not exist (first execution of the day)
new_content = '\n'.join(keys)

# upload s3
client.put_object(Bucket=bucket, Key=prefix, Body=new_content.encode('utf-8'))

async def upload_objs(
bucket: str,
endpoint_url: str,
Expand Down

0 comments on commit 55679b9

Please sign in to comment.