Skip to content

Commit

Permalink
fix(parsers, CO): fix historical fetches and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
consideRatio committed Jan 7, 2025
1 parent de89d68 commit c5f8eb7
Show file tree
Hide file tree
Showing 11 changed files with 28,260 additions and 104 deletions.
247 changes: 143 additions & 104 deletions parsers/CO.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,25 @@
from parsers.lib.config import refetch_frequency, use_proxy
from parsers.lib.exceptions import ParserException

# PARSER FOR COLOMBIA / DEMAND-ONLY as of 2023-02-11 / 5-minute-granularity / returns demand data of the recent day
# MAIN_WEBSITE = https://www.xm.com.co/consumo/demanda-en-tiempo-real
# The production, price, and historical consumption parser makes use of a third
# party library called pydataxm, for details on what data is available, refer to
# https://github.com/EquipoAnaliticaXM/API_XM/?tab=readme-ov-file#variables-disponibles-para-consumir-en-la-api-xm.
#
# Note:
# - Currently there is a delay of two days for data to be made available via
# this library.
# - Sometimes fetching data with start date / end date being the same day, you
# can get a full month of days.
#
# Delay live production and price data fetches, as data isn't made available
# that quick when using the pydataxm library
XM_DELAY_MIN = 2
XM_DELAY_MAX = 5

# Fetching live consumption is done using another API currently, which as of
# 2023-02-11 provides demand data with 5-minute-granularity of the current day.
# For details, refer to https://www.xm.com.co/consumo/demanda-en-tiempo-real.
#
CO_DEMAND_URL = "https://serviciosfacturacion.xm.com.co/XM.Portal.Indicadores/api/Operacion/DemandaTiempoReal"

ZONE_INFO = ZoneInfo("America/Bogota") # UTC-5
Expand All @@ -38,12 +55,6 @@
}


# settle with fetching production and price data from past days, as data doesn't
# seem to be available for very recent days
XM_DELAY_MIN = 2
XM_DELAY_MAX = 5


@refetch_frequency(timedelta(days=1))
@use_proxy(country_code="CO", monkeypatch_for_pydataxm=True)
def fetch_consumption(
Expand All @@ -66,12 +77,7 @@ def _fetch_live_consumption(
logger: Logger,
) -> list[dict[str, Any]]:
response: Response = session.get(CO_DEMAND_URL, verify=False)
if not response.ok:
raise ParserException(
parser="CO",
message=f"Error fetching data: {response.status_code} {response.reason}",
zone_key=zone_key,
)
response.raise_for_status()

demand_data = response.json()["Variables"][0]["Datos"]
demand_list = TotalConsumptionList(logger)
Expand All @@ -82,6 +88,7 @@ def _fetch_live_consumption(
if dt_string.find(".") != -1:
dt_string = dt_string[: dt_string.find(".")]
dt = datetime.fromisoformat(dt_string).replace(tzinfo=ZONE_INFO)

demand_list.append(
zoneKey=zone_key,
datetime=dt,
Expand All @@ -96,33 +103,40 @@ def _fetch_historical_consumption(
target_datetime: datetime,
logger: Logger,
) -> list[dict[str, Any]]:
# API request consumption
objetoAPI = pydataxm.ReadDB()
df_consumption = objetoAPI.request_data(
"DemaReal",
"Sistema",
target_datetime.date(),
target_datetime.date(),
)
if df_consumption.empty:
api_client = pydataxm.ReadDB()
target_date = target_datetime.date()

# get system demand
df_cons = api_client.request_data("DemaReal", "Sistema", target_date, target_date)
if df_cons.empty:
raise ParserException(
parser="CO",
message=f"{target_datetime} : no consumption data available",
zone_key=zone_key,
)

demand_list = TotalConsumptionList(logger)
hour_columns = [col for col in df_consumption.columns if "Hour" in col]
for col in hour_columns:
consumption = float(df_consumption[col]) / 1000
dt = target_datetime.replace(hour=int(col[-2:]) - 1)
demand_list.append(
zoneKey=zone_key,
datetime=dt,
consumption=consumption,
source="xm.com.co",
)
return demand_list.to_list()
# preserve the date column as the index
df_cons = df_cons.set_index("Date")

# filter out column of relevance: hour
df_cons = df_cons[[c for c in df_cons.columns if "Hour" in c]]

consumption_list = TotalConsumptionList(logger)
for date, df in df_cons.groupby("Date"):
for hour_col in df.columns:
consumption = float(df[hour_col]) / 1000

# hour_col is "Values_Hour01" to "Values_Hour24"
hour = int(hour_col[-2:]) - 1
dt = date.to_pydatetime().replace(hour=hour).replace(tzinfo=ZONE_INFO)

consumption_list.append(
zoneKey=zone_key,
datetime=dt,
consumption=consumption,
source="xm.com.co",
)
return consumption_list.to_list()


@refetch_frequency(timedelta(days=1))
Expand All @@ -143,65 +157,73 @@ def fetch_production(
target_datetime = target_datetime.astimezone(ZONE_INFO)
target_date_range = [target_datetime.date()]

acquired_datetime = None
objetoAPI = pydataxm.ReadDB()
api_client = pydataxm.ReadDB()
for date in target_date_range:
# API request list of power plants with ID (column 1) and type (column 7)
df_recursos = objetoAPI.request_data("ListadoRecursos", "Sistema", date, date)
# API request generation per power plant
df_generation = objetoAPI.request_data("Gene", "Recurso", date, date)
if not df_generation.empty and not df_recursos.empty:
acquired_datetime = datetime.combine(date, datetime.min.time()).replace(
tzinfo=ZONE_INFO
)
break
if not acquired_datetime:
# get power production resources, where we are interested in a mapping
# between a resource id (Values_Code column) to power production type
# (Values_EnerSource column)
df_res = api_client.request_data("ListadoRecursos", "Sistema", date, date)
if df_res.empty:
continue

# get power production per resource
df_prod = api_client.request_data("Gene", "Recurso", date, date)
if df_prod.empty:
continue

break
if df_prod.empty:
raise ParserException(
parser="CO",
message=f"{target_datetime}: no production data available",
zone_key=zone_key,
)

df_units = (
df_recursos[["Values_Code", "Values_EnerSource"]]
.copy()
.set_index("Values_Code")
)
df_generation = df_generation.set_index("Values_code")
df_generation_mapped = df_generation.join(df_units, how="left").reset_index()
df_generation_mapped["Values_EnerSource"] = df_generation_mapped[
"Values_EnerSource"
].map(PRODUCTION_MAPPING)

filtered_columns = [
col
for col in df_generation_mapped.columns
if "Hour" in col or "Values_EnerSource" in col
# join the Values_EnerSource column from df_res to df_prod using Values_code column
df_res = df_res[["Values_Code", "Values_EnerSource"]].set_index("Values_Code")
df_prod = df_prod.set_index("Values_code")
df_prod = df_prod.join(df_res, how="left").reset_index()

# map power production type values to our terminology
df_prod["Values_EnerSource"] = df_prod["Values_EnerSource"].map(PRODUCTION_MAPPING)

# filter out columns of relevance: date, hour, and power production type
df_cols = [
c
for c in df_prod.columns
if "Date" in c or "Hour" in c or "Values_EnerSource" in c
]
df_generation_aggregated = (
df_generation_mapped[filtered_columns].groupby("Values_EnerSource").sum()
)
df_prod = df_prod[df_cols]

# sum all values for given combination of date and power production type,
# and make a multi index of those combinations
df_prod = df_prod.groupby(["Date", "Values_EnerSource"]).sum()

# loop over each date in the multi index
production_list = ProductionBreakdownList(logger)
for col in df_generation_aggregated.columns:
production_kw = df_generation_aggregated[col].to_dict()
# convert to MW
production_mw = {
mode: round(production_kw[mode] / 1000, 3) for mode in production_kw
}
# convert to ProductionMix
production_mix = ProductionMix()

for mode, value in production_mw.items():
production_mix.add_value(mode, value)

dt = acquired_datetime.replace(hour=int(col[-2:]) - 1)
production_list.append(
zoneKey=zone_key,
datetime=dt,
production=production_mix,
source="xm.com.co",
)
for date, df in df_prod.groupby(level=0):
# in the date specific df, drop the date from the index, leaving the
# power production type as index
df = df.droplevel(0)

for hour_col in df.columns:
prod_kw = df[hour_col].to_dict()
prod_mw = {mode: round(kw / 1000, 3) for mode, kw in prod_kw.items()}

production_mix = ProductionMix()
for mode, value in prod_mw.items():
production_mix.add_value(mode, value)

# hour_col is "Values_Hour01" to "Values_Hour24"
hour = int(hour_col[-2:]) - 1
dt = date.to_pydatetime().replace(hour=hour).replace(tzinfo=ZONE_INFO)

production_list.append(
zoneKey=zone_key,
datetime=dt,
production=production_mix,
source="xm.com.co",
)
return production_list.to_list()


Expand All @@ -223,34 +245,41 @@ def fetch_price(
target_datetime = target_datetime.astimezone(ZONE_INFO)
target_date_range = [target_datetime.date()]

acquired_datetime = None
objetoAPI = pydataxm.ReadDB()
api_client = pydataxm.ReadDB()
for date in target_date_range:
df_price = objetoAPI.request_data("PrecBolsNaci", "Sistema", date, date)
if not df_price.empty:
acquired_datetime = datetime.combine(date, datetime.min.time()).replace(
tzinfo=ZONE_INFO
)
break
if not acquired_datetime:
df_price = api_client.request_data("PrecBolsNaci", "Sistema", date, date)
if df_price.empty:
continue
break
if df_price.empty:
raise ParserException(
parser="CO",
message=f"{target_datetime}: no price data available",
zone_key=zone_key,
)

# preserve the date column as the index
df_price = df_price.set_index("Date")

# filter out column of relevance: hour
df_price = df_price[[c for c in df_price.columns if "Hour" in c]]

price_list = PriceList(logger)
hour_columns = [col for col in df_price.columns if "Hour" in col]
for col in hour_columns:
price = float(df_price[col])
dt = acquired_datetime.replace(hour=int(col[-2:]) - 1)
price_list.append(
zoneKey=zone_key,
datetime=dt,
currency="COP",
price=price,
source="xm.com.co",
)
for date, df in df_price.groupby("Date"):
for hour_col in df.columns:
price = float(df[hour_col])

# hour_col is "Values_Hour01" to "Values_Hour24"
hour = int(hour_col[-2:]) - 1
dt = date.to_pydatetime().replace(hour=hour).replace(tzinfo=ZONE_INFO)

price_list.append(
zoneKey=zone_key,
datetime=dt,
currency="COP",
price=price,
source="xm.com.co",
)
return price_list.to_list()


Expand All @@ -263,3 +292,13 @@ def fetch_price(

print("fetch_price() ->")
print(fetch_price())

dt = datetime.fromisoformat("2025-01-01T00:00:00-05:00")
print("fetch_production(target_datetime=...) ->")
print(fetch_production(target_datetime=dt))

print("fetch_price(target_datetime=...) ->")
print(fetch_price(target_datetime=dt))

print("fetch_consumption(target_datetime=...) ->")
print(fetch_consumption(target_datetime=dt))
Loading

0 comments on commit c5f8eb7

Please sign in to comment.