Skip to content

Commit

Permalink
Fix code style issues with Black
Browse files Browse the repository at this point in the history
  • Loading branch information
lint-action committed May 6, 2024
1 parent 928be75 commit f2d8610
Show file tree
Hide file tree
Showing 13 changed files with 2,809 additions and 1,313 deletions.
85 changes: 54 additions & 31 deletions co2intensity.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,64 @@
import pandas as pd


def load_generation_history(filename):
power_df = pd.read_csv(filename, sep=";", thousands=",", decimal=".")
hour_deltas = pd.to_datetime(power_df["Start"]).dt.hour.apply(lambda x: pd.Timedelta(hours=x))
power_df["Date"] = pd.to_datetime(power_df["Date"]) + hour_deltas
power_df = power_df.drop(columns=["Start", "End"])
power_df = pd.read_csv(filename, sep=";", thousands=",", decimal=".")
hour_deltas = pd.to_datetime(power_df["Start"]).dt.hour.apply(
lambda x: pd.Timedelta(hours=x)
)
power_df["Date"] = pd.to_datetime(power_df["Date"]) + hour_deltas
power_df = power_df.drop(columns=["Start", "End"])

return power_df

return power_df

def add_intensity_column(power_df, intensity_df):
intensity_lookup = intensity_df.set_index("Emissions [g CO2eq/kWh]")
power_df["MWh sum"] = sum([power_df[col] for col in power_df.columns if "[MWh] Calculated resolutions" in col])
power_df["Intensity [g CO2eq/kWh]"] = 0.0

for energy_type in power_df.columns:
if (intensity_df["Emissions [g CO2eq/kWh]"] == energy_type).any():
intensity_name = energy_type.replace("[MWh] Calculated resolutions", "[%]")
power_df[intensity_name] = power_df[energy_type] / power_df["MWh sum"] * 100
power_df["Intensity [g CO2eq/kWh]"] += power_df[intensity_name] * 1e-2 * intensity_lookup.loc[energy_type, "Med"]
else:
pass
# print(energy_type)
power_df.drop(columns=[col for col in power_df.columns if "[MWh] Calculated resolutions" in col], inplace=True)
power_df.drop(columns=["MWh sum"], inplace=True)

return power_df.set_index("Date")

def load_all(generation_path="data/co2intensity/Actual_generation_201501010000_202512312359_Hour.csv",
intensity_path="data/co2intensity/co2intensities.csv"):
intensity_df = pd.read_csv(intensity_path, sep=";")
power_df = load_generation_history(generation_path)
return add_intensity_column(power_df, intensity_df)
intensity_lookup = intensity_df.set_index("Emissions [g CO2eq/kWh]")
power_df["MWh sum"] = sum(
[
power_df[col]
for col in power_df.columns
if "[MWh] Calculated resolutions" in col
]
)
power_df["Intensity [g CO2eq/kWh]"] = 0.0

for energy_type in power_df.columns:
if (intensity_df["Emissions [g CO2eq/kWh]"] == energy_type).any():
intensity_name = energy_type.replace("[MWh] Calculated resolutions", "[%]")
power_df[intensity_name] = power_df[energy_type] / power_df["MWh sum"] * 100
power_df["Intensity [g CO2eq/kWh]"] += (
power_df[intensity_name]
* 1e-2
* intensity_lookup.loc[energy_type, "Med"]
)
else:
pass
# print(energy_type)
power_df.drop(
columns=[
col for col in power_df.columns if "[MWh] Calculated resolutions" in col
],
inplace=True,
)
power_df.drop(columns=["MWh sum"], inplace=True)

return power_df.set_index("Date")


def load_all(
generation_path="data/co2intensity/Actual_generation_201501010000_202512312359_Hour.csv",
intensity_path="data/co2intensity/co2intensities.csv",
):
intensity_df = pd.read_csv(intensity_path, sep=";")
power_df = load_generation_history(generation_path)
return add_intensity_column(power_df, intensity_df)


if __name__ == "__main__":
import plotly.express as px
import plotly.express as px

power_df = load_all()
# power_df["Intensity"] is the column of interest
power_df = load_all()
# power_df["Intensity"] is the column of interest

px.line(power_df, y="Intensity [g CO2eq/kWh]").show()
px.line(power_df, y="Intensity [g CO2eq/kWh]").show()
111 changes: 62 additions & 49 deletions datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,71 @@
import os
import requests
from joblib import Memory

memory = Memory("cache", verbose=0)


@memory.cache
def geopos_from_zipcode(country_code, zip_code):
nomi = pgeocode.Nominatim(country_code)
response_df = nomi.query_postal_code(str(zip_code))

if not response_df.empty:
return response_df["latitude"], response_df["longitude"]
return None, None

def load_TRY(selection:str):
"""the selection parameter should be one of : ["Wint", "Jahr", "Somm"]"""
if selection is not None:
if selection not in ["Wint", "Jahr", "Somm"]:
raise ValueError("TRY_dataset must be one of ['Wint', 'Jahr', 'Somm']")

filename = f"data/weather/DWD/TRY_488641091042/TRY2045_488641091042_{selection}.dat"
trydf = pd.read_csv(filename, delim_whitespace=True, skiprows=34).dropna()
trydf["year"] = 2045
trydf["Date"] = pd.to_datetime(dict(year=trydf["year"], month=trydf["MM"], day=trydf["DD"], hour=trydf["HH"]))
trydf = trydf.rename(columns={"t":"T_outside [°C]"})
trydf = trydf.set_index("Date")
return trydf
return None

def fetch_all(country_code, zip_code, start, end, TRY_dataset=None)->pd.DataFrame:
if start and end and zip_code:
if isinstance(start, str):
start = datetime.fromisoformat(start)
if isinstance(end, str):
end = datetime.fromisoformat(end)

tmpdf = co2intensity.load_all()
lat, lon = geopos_from_zipcode(country_code, zip_code)

if isinstance(TRY_dataset, str):
# TODO: All the years of other data should be shifted to 2045
df = temperatures.load_TRY(TRY_dataset).rename(columns={"T":"T_outside [°C]", "G":"solar [W/m2]"})
delta = df.index[0] - pd.to_datetime(start)
delta = pd.Timedelta(delta.year, 1, 1)
else:
df = temperatures.fetch_all(lat, lon, start, end)
df_sol = solar_heat.fetch_all(lat, lon, start, end)
df["p_solar south [kW/m2]"] = df_sol["p_solar south [kW/m2]"]
df["p_solar east [kW/m2]"] = df_sol["p_solar east [kW/m2]"]
df["p_solar west [kW/m2]"] = df_sol["p_solar west [kW/m2]"]

df = df.join(tmpdf, how="inner")

return df
nomi = pgeocode.Nominatim(country_code)
response_df = nomi.query_postal_code(str(zip_code))

if not response_df.empty:
return response_df["latitude"], response_df["longitude"]
return None, None


def load_TRY(selection: str):
"""the selection parameter should be one of : ["Wint", "Jahr", "Somm"]"""
if selection is not None:
if selection not in ["Wint", "Jahr", "Somm"]:
raise ValueError("TRY_dataset must be one of ['Wint', 'Jahr', 'Somm']")

filename = (
f"data/weather/DWD/TRY_488641091042/TRY2045_488641091042_{selection}.dat"
)
trydf = pd.read_csv(filename, delim_whitespace=True, skiprows=34).dropna()
trydf["year"] = 2045
trydf["Date"] = pd.to_datetime(
dict(
year=trydf["year"], month=trydf["MM"], day=trydf["DD"], hour=trydf["HH"]
)
)
trydf = trydf.rename(columns={"t": "T_outside [°C]"})
trydf = trydf.set_index("Date")
return trydf
return None


def fetch_all(country_code, zip_code, start, end, TRY_dataset=None) -> pd.DataFrame:
if start and end and zip_code:
if isinstance(start, str):
start = datetime.fromisoformat(start)
if isinstance(end, str):
end = datetime.fromisoformat(end)

tmpdf = co2intensity.load_all()
lat, lon = geopos_from_zipcode(country_code, zip_code)

if isinstance(TRY_dataset, str):
# TODO: All the years of other data should be shifted to 2045
df = temperatures.load_TRY(TRY_dataset).rename(
columns={"T": "T_outside [°C]", "G": "solar [W/m2]"}
)
delta = df.index[0] - pd.to_datetime(start)
delta = pd.Timedelta(delta.year, 1, 1)
else:
df = temperatures.fetch_all(lat, lon, start, end)
df_sol = solar_heat.fetch_all(lat, lon, start, end)
df["p_solar south [kW/m2]"] = df_sol["p_solar south [kW/m2]"]
df["p_solar east [kW/m2]"] = df_sol["p_solar east [kW/m2]"]
df["p_solar west [kW/m2]"] = df_sol["p_solar west [kW/m2]"]

df = df.join(tmpdf, how="inner")

return df


if __name__ == "__main__":
print(fetch_all("DE", 81829, "2020-01-01", "2022-01-02"))
print(fetch_all("DE", 81829, "2020-01-01", "2022-01-02", "Somm"))
print(fetch_all("DE", 81829, "2020-01-01", "2022-01-02"))
print(fetch_all("DE", 81829, "2020-01-01", "2022-01-02", "Somm"))
46 changes: 31 additions & 15 deletions electricity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,42 @@
import glob
import datetime


def list_readable_electricity_profiles():
return [elm.replace("data/heatingload/electric/synPRO_el_","")[:-4] for elm in list_electricity_profiles()]
return [
elm.replace("data/heatingload/electric/synPRO_el_", "")[:-4]
for elm in list_electricity_profiles()
]


def list_electricity_profiles():
return glob.glob("data/heatingload/electric/synPRO_el_*.dat")
return glob.glob("data/heatingload/electric/synPRO_el_*.dat")


name_to_file = {
path.replace("data/heatingload/electric/synPRO_el_", "")[:-4]: path
for path in glob.glob("data/heatingload/electric/synPRO_el_*.dat")
}


name_to_file = {path.replace("data/heatingload/electric/synPRO_el_","")[:-4] : path for path in glob.glob("data/heatingload/electric/synPRO_el_*.dat")}
def load_el_profile(df: pd.DataFrame, path):
df_el = (
pd.read_csv(path, comment="#", sep=";")
.rename(columns={"P_el": "P_el appliances [kW]"})
.drop(columns=["YYYYMMDD", "hhmmss"])
)
df_el["unixtimestamp"] = pd.to_datetime(
df_el["unixtimestamp"], unit="s"
) + datetime.timedelta(hours=1)
df_el["P_el appliances [kW]"] *= 1e-3

def load_el_profile(df:pd.DataFrame, path):
df_el = pd.read_csv(path, comment="#", sep=";").rename(columns={"P_el":"P_el appliances [kW]"}).drop(columns=["YYYYMMDD", "hhmmss"])
df_el["unixtimestamp"] = pd.to_datetime(df_el["unixtimestamp"],unit="s") + datetime.timedelta(hours=1)
df_el["P_el appliances [kW]"] *= 1e-3
df["P_el appliances [kW]"] = 0.0

df["P_el appliances [kW]"] = 0.0
for year in df.index.year.unique():
el_year = df_el["unixtimestamp"].dt.year[0]
df_el["unixtimestamp"] -= pd.Timedelta(days=(el_year - year) * 365)

for year in df.index.year.unique():
el_year = df_el["unixtimestamp"].dt.year[0]
df_el["unixtimestamp"] -= pd.Timedelta(days=(el_year - year)*365)

df.update(df_el.set_index("unixtimestamp"))
df.update(df_el.set_index("unixtimestamp"))

df["P_el appliances [kW]"] *= 0.95 # larissas presentation
return df
df["P_el appliances [kW]"] *= 0.95 # larissas presentation
return df
Loading

0 comments on commit f2d8610

Please sign in to comment.