Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(parsers, CO): adopt use_proxy, fix parsing, add tests #7690

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/zones/CO.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ capacity:
contributors:
- lav-julien
- alixunderplatz
- consideRatio
country: CO
delays:
production: 48
Expand Down
280 changes: 161 additions & 119 deletions parsers/CO.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,29 @@
TotalConsumptionList,
)
from electricitymap.contrib.lib.models.events import ProductionMix
from parsers.lib.config import refetch_frequency
from parsers.lib.config import refetch_frequency, use_proxy
from parsers.lib.exceptions import ParserException

# FIXME: Issues as of 2024-12-29
# The production, price, and historical consumption parser makes use of a third
# party library called pydataxm, for details on what data is available, refer to
# https://github.com/EquipoAnaliticaXM/API_XM/?tab=readme-ov-file#variables-disponibles-para-consumir-en-la-api-xm.
#
# 1. The API isn't accessible from anywhere, for example Sweden can't access it.
#
# 2. There is a use of `replace(hour=int(col[-2:]) - 1)`, but should it really
# be -1 there?
# Note:
# - Currently there is a delay of two days for data to be made available via
# this library.
# - Sometimes fetching data with start date / end date being the same day, you
# can get a full month of days.
#
# Delay live production and price data fetches, as data isn't made available
# that quick when using the pydataxm library
XM_DELAY_MIN = 2
XM_DELAY_MAX = 5

# PARSER FOR COLOMBIA / DEMAND-ONLY as of 2023-02-11 / 5-minute-granularity / returns demand data of the recent day
# MAIN_WEBSITE = https://www.xm.com.co/consumo/demanda-en-tiempo-real
colombia_demand_URL = "https://serviciosfacturacion.xm.com.co/XM.Portal.Indicadores/api/Operacion/DemandaTiempoReal"
# Fetching live consumption is done using another API currently, which as of
# 2023-02-11 provides demand data with 5-minute-granularity of the current day.
# For details, refer to https://www.xm.com.co/consumo/demanda-en-tiempo-real.
#
CO_DEMAND_URL = "https://serviciosfacturacion.xm.com.co/XM.Portal.Indicadores/api/Operacion/DemandaTiempoReal"

ZONE_INFO = ZoneInfo("America/Bogota") # UTC-5

Expand All @@ -45,13 +54,11 @@
"JET-A1": "oil",
}

XM_DELAY_MIN = 2
XM_DELAY_MAX = 5


@refetch_frequency(timedelta(days=1))
@use_proxy(country_code="CO", monkeypatch_for_pydataxm=True)
def fetch_consumption(
zone_key: ZoneKey,
zone_key: ZoneKey = ZoneKey("CO"),
Comment on lines +59 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think this makes sense I need to double check a few things internally before we can merge this, specifically how much bandwidth/data we have available first as they don't yet support pay as you go billing or if we need to increase this so not all parsers that use the proxy goes down.

Hopefully I can get this done tomorrow or on Friday otherwise it will be early next week.

session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
Expand All @@ -65,22 +72,23 @@ def fetch_consumption(


def _fetch_live_consumption(
zone_key: ZoneKey, session: Session, logger: Logger
zone_key: ZoneKey,
session: Session,
logger: Logger,
) -> list[dict[str, Any]]:
response: Response = session.get(colombia_demand_URL, verify=False)
if not response.ok:
raise ParserException(
parser="CO",
message=f"Error fetching data: {response.status_code} {response.reason}",
zone_key=zone_key,
)
response: Response = session.get(CO_DEMAND_URL, verify=False)
response.raise_for_status()

demand_data = response.json()["Variables"][0]["Datos"]
demand_list = TotalConsumptionList(logger)
for datapoint in demand_data:
# Date strings are formatted like 2023-09-14T00:00:00, see
# https://web.archive.org/web/20230914161535/https://serviciosfacturacion.xm.com.co/XM.Portal.Indicadores/api/Operacion/DemandaTiempoReal
dt = datetime.fromisoformat(datapoint["Fecha"]).replace(tzinfo=ZONE_INFO)
dt_string = datapoint["Fecha"]
# TODO: Remove the truncation of sub-seconds when we run on Python 3.11
# or above and fromisoformat can parse such strings
if dt_string.find(".") != -1:
dt_string = dt_string[: dt_string.find(".")]
dt = datetime.fromisoformat(dt_string).replace(tzinfo=ZONE_INFO)

demand_list.append(
zoneKey=zone_key,
datetime=dt,
Expand All @@ -95,38 +103,46 @@ def _fetch_historical_consumption(
target_datetime: datetime,
logger: Logger,
) -> list[dict[str, Any]]:
# API request consumption
objetoAPI = pydataxm.ReadDB()
df_consumption = objetoAPI.request_data(
"DemaReal",
"Sistema",
target_datetime.date(),
target_datetime.date(),
)
if df_consumption.empty:
api_client = pydataxm.ReadDB()
target_date = target_datetime.date()

# get system demand
df_cons = api_client.request_data("DemaReal", "Sistema", target_date, target_date)
if df_cons.empty:
raise ParserException(
parser="CO",
message=f"{target_datetime} : no consumption data available",
zone_key=zone_key,
)

demand_list = TotalConsumptionList(logger)
hour_columns = [col for col in df_consumption.columns if "Hour" in col]
for col in hour_columns:
consumption = float(df_consumption[col]) / 1000
dt = target_datetime.replace(hour=int(col[-2:]) - 1)
demand_list.append(
zoneKey=zone_key,
datetime=dt,
consumption=consumption,
source="xm.com.co",
)
return demand_list.to_list()
# preserve the date column as the index
df_cons = df_cons.set_index("Date")

# filter out column of relevance: hour
df_cons = df_cons[[c for c in df_cons.columns if "Hour" in c]]

consumption_list = TotalConsumptionList(logger)
for date, df in df_cons.groupby("Date"):
for hour_col in df.columns:
consumption = float(df[hour_col]) / 1000

# hour_col is "Values_Hour01" to "Values_Hour24"
hour = int(hour_col[-2:]) - 1
dt = date.to_pydatetime().replace(hour=hour).replace(tzinfo=ZONE_INFO)

consumption_list.append(
zoneKey=zone_key,
datetime=dt,
consumption=consumption,
source="xm.com.co",
)
return consumption_list.to_list()


@refetch_frequency(timedelta(days=1))
@use_proxy(country_code="CO", monkeypatch_for_pydataxm=True)
def fetch_production(
zone_key: ZoneKey,
zone_key: ZoneKey = ZoneKey("CO"),
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
Expand All @@ -141,71 +157,80 @@ def fetch_production(
target_datetime = target_datetime.astimezone(ZONE_INFO)
target_date_range = [target_datetime.date()]

acquired_datetime = None
objetoAPI = pydataxm.ReadDB()
api_client = pydataxm.ReadDB()
for date in target_date_range:
# API request list of power plants with ID (column 1) and type (column 7)
df_recursos = objetoAPI.request_data("ListadoRecursos", "Sistema", date, date)
# API request generation per power plant
df_generation = objetoAPI.request_data("Gene", "Recurso", date, date)
if not df_generation.empty and not df_recursos.empty:
acquired_datetime = datetime.combine(date, datetime.time.min).replace(
tzinfo=ZONE_INFO
)
break
if not acquired_datetime:
# get power production resources, where we are interested in a mapping
# between a resource id (Values_Code column) to power production type
# (Values_EnerSource column)
df_res = api_client.request_data("ListadoRecursos", "Sistema", date, date)
if df_res.empty:
continue

# get power production per resource
df_prod = api_client.request_data("Gene", "Recurso", date, date)
if df_prod.empty:
continue

break
if df_prod.empty:
raise ParserException(
parser="CO",
message=f"{target_datetime}: no production data available",
zone_key=zone_key,
)

df_units = (
df_recursos[["Values_Code", "Values_EnerSource"]]
.copy()
.set_index("Values_Code")
)
df_generation = df_generation.set_index("Values_code")
df_generation_mapped = df_generation.join(df_units, how="left").reset_index()
df_generation_mapped["Values_EnerSource"] = df_generation_mapped[
"Values_EnerSource"
].map(PRODUCTION_MAPPING)

filtered_columns = [
col
for col in df_generation_mapped.columns
if "Hour" in col or "Values_EnerSource" in col
# join the Values_EnerSource column from df_res to df_prod using Values_code column
df_res = df_res[["Values_Code", "Values_EnerSource"]].set_index("Values_Code")
df_prod = df_prod.set_index("Values_code")
df_prod = df_prod.join(df_res, how="left").reset_index()

# map power production type values to our terminology
df_prod["Values_EnerSource"] = df_prod["Values_EnerSource"].map(PRODUCTION_MAPPING)

# filter out columns of relevance: date, hour, and power production type
df_cols = [
c
for c in df_prod.columns
if "Date" in c or "Hour" in c or "Values_EnerSource" in c
]
df_generation_aggregated = (
df_generation_mapped[filtered_columns].groupby("Values_EnerSource").sum()
)
df_prod = df_prod[df_cols]

# sum all values for given combination of date and power production type,
# and make a multi index of those combinations
df_prod = df_prod.groupby(["Date", "Values_EnerSource"]).sum()

# loop over each date in the multi index
production_list = ProductionBreakdownList(logger)
for col in df_generation_aggregated.columns:
production_kw = df_generation_aggregated[col].to_dict()
# convert to MW
production_mw = {
mode: round(production_kw[mode] / 1000, 3) for mode in production_kw
}
# convert to ProductionMix
production_mix = ProductionMix()

for mode, value in production_mw.items():
production_mix.add_value(mode, value)

dt = acquired_datetime.replace(hour=int(col[-2:]) - 1)
production_list.append(
zoneKey=zone_key,
datetime=dt,
production=production_mix,
source="xm.com.co",
)
for date, df in df_prod.groupby(level=0):
# in the date specific df, drop the date from the index, leaving the
# power production type as index
df = df.droplevel(0)

for hour_col in df.columns:
prod_kw = df[hour_col].to_dict()
prod_mw = {mode: round(kw / 1000, 3) for mode, kw in prod_kw.items()}

production_mix = ProductionMix()
for mode, value in prod_mw.items():
production_mix.add_value(mode, value)

# hour_col is "Values_Hour01" to "Values_Hour24"
hour = int(hour_col[-2:]) - 1
dt = date.to_pydatetime().replace(hour=hour).replace(tzinfo=ZONE_INFO)

production_list.append(
zoneKey=zone_key,
datetime=dt,
production=production_mix,
source="xm.com.co",
)
return production_list.to_list()


@refetch_frequency(timedelta(days=1))
@use_proxy(country_code="CO", monkeypatch_for_pydataxm=True)
def fetch_price(
zone_key: ZoneKey,
zone_key: ZoneKey = ZoneKey("CO"),
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
Expand All @@ -220,43 +245,60 @@ def fetch_price(
target_datetime = target_datetime.astimezone(ZONE_INFO)
target_date_range = [target_datetime.date()]

acquired_datetime = None
objetoAPI = pydataxm.ReadDB()
api_client = pydataxm.ReadDB()
for date in target_date_range:
df_price = objetoAPI.request_data("PrecBolsNaci", "Sistema", date, date)
if not df_price.empty:
acquired_datetime = datetime.combine(date, datetime.time.min).replace(
tzinfo=ZONE_INFO
)
break
if not acquired_datetime:
df_price = api_client.request_data("PrecBolsNaci", "Sistema", date, date)
if df_price.empty:
continue
break
if df_price.empty:
raise ParserException(
parser="CO",
message=f"{target_datetime}: no price data available",
zone_key=zone_key,
)

# preserve the date column as the index
df_price = df_price.set_index("Date")

# filter out column of relevance: hour
df_price = df_price[[c for c in df_price.columns if "Hour" in c]]

price_list = PriceList(logger)
hour_columns = [col for col in df_price.columns if "Hour" in col]
for col in hour_columns:
price = float(df_price[col])
dt = acquired_datetime.replace(hour=int(col[-2:]) - 1)
price_list.append(
zoneKey=zone_key,
datetime=dt,
currency="COP",
price=price,
source="xm.com.co",
)
for date, df in df_price.groupby("Date"):
for hour_col in df.columns:
price = float(df[hour_col])

# hour_col is "Values_Hour01" to "Values_Hour24"
hour = int(hour_col[-2:]) - 1
dt = date.to_pydatetime().replace(hour=hour).replace(tzinfo=ZONE_INFO)

price_list.append(
zoneKey=zone_key,
datetime=dt,
currency="COP",
price=price,
source="xm.com.co",
)
return price_list.to_list()


if __name__ == "__main__":
print("fetch_consumption() ->")
print(fetch_consumption(ZoneKey("CO")))
print(fetch_consumption())

print("fetch_production() ->")
print(fetch_production(ZoneKey("CO")))
print(fetch_production())

print("fetch_price() ->")
print(fetch_price(ZoneKey("CO")))
print(fetch_price())

dt = datetime.fromisoformat("2025-01-01T00:00:00-05:00")
print("fetch_production(target_datetime=...) ->")
print(fetch_production(target_datetime=dt))

print("fetch_price(target_datetime=...) ->")
print(fetch_price(target_datetime=dt))

print("fetch_consumption(target_datetime=...) ->")
print(fetch_consumption(target_datetime=dt))
6 changes: 3 additions & 3 deletions parsers/ESTADISTICO_UT.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ def _parse_data(data: dict) -> list[dict]:

mwh_production = mwh_production_dict["0"]

# Python <=3.10 fromisoformat can't parse strings with sub-seconds,
# which the day string includes
day = day[: -len(".0000000")]
# TODO: Remove the truncation of sub-seconds when we run on Python 3.11
# or above and fromisoformat can parse such strings
day = day[: day.find(".")]
dt = datetime.fromisoformat(day).replace(hour=int(hour), tzinfo=ZONE_INFO)

data_points.append(
Expand Down
Loading
Loading