Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: emission intensity and results #773

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/libecalc/application/energy/emitter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import abc
from typing import Optional

from pydantic import Field

from libecalc.application.energy.component_energy_context import ComponentEnergyContext
from libecalc.application.energy.energy_model import EnergyModel
from libecalc.common.variables import ExpressionEvaluator
Expand All @@ -12,6 +14,9 @@ class Emitter(abc.ABC):
Something that emits something.
"""

expression_evaluator: Optional[ExpressionEvaluator] = Field(default=None, exclude=True)
emission_results: Optional[dict[str, EmissionResult]] = Field(default={}, exclude=True)

@property
@abc.abstractmethod
def id(self) -> str: ...
Expand All @@ -21,5 +26,7 @@ def evaluate_emissions(
self,
energy_context: ComponentEnergyContext,
energy_model: EnergyModel,
expression_evaluator: ExpressionEvaluator,
) -> Optional[dict[str, EmissionResult]]: ...

def set_expression_evaluator(self, expression_evaluator: ExpressionEvaluator):
self.expression_evaluator = expression_evaluator
7 changes: 1 addition & 6 deletions src/libecalc/application/energy_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ def evaluate_energy_usage(self) -> dict[str, EcalcModelResult]:
for energy_component in energy_components:
if hasattr(energy_component, "evaluate_energy_usage"):
context = self._get_context(energy_component.id)
self._consumer_results.update(
energy_component.evaluate_energy_usage(
context=context, expression_evaluator=self._expression_evaluator
)
)
self._consumer_results.update(energy_component.evaluate_energy_usage(context=context))

self._consumer_results = Numbers.format_results_to_precision(self._consumer_results, precision=6)
return self._consumer_results
Expand All @@ -91,7 +87,6 @@ def evaluate_emissions(self) -> dict[str, dict[str, EmissionResult]]:
emission_result = energy_component.evaluate_emissions(
energy_context=self._get_context(energy_component.id),
energy_model=self._energy_model,
expression_evaluator=self._expression_evaluator,
)

if emission_result is not None:
Expand Down
138 changes: 138 additions & 0 deletions src/libecalc/common/utils/emission_intensity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from datetime import datetime
from typing import Optional

import numpy as np
import pandas as pd

from libecalc.common.time_utils import Period, Periods
from libecalc.common.units import Unit
from libecalc.common.utils.rates import (
TimeSeriesCalendarDayRate,
TimeSeriesIntensity,
TimeSeriesVolumesCumulative,
)


class EmissionIntensity:
def __init__(
self,
emission_cumulative: TimeSeriesVolumesCumulative,
hydrocarbon_export_cumulative: TimeSeriesVolumesCumulative,
unit: Unit = Unit.KG_SM3,
):
self.emission_cumulative = emission_cumulative
self.hydrocarbon_export_cumulative = hydrocarbon_export_cumulative
self.unit = unit
self.periods = emission_cumulative.periods
self.yearly_periods = self._create_yearly_periods()
self.time_vector = self.periods.all_dates
self.start_years = [period.start.year for period in self.periods]

def _create_yearly_periods(self) -> Periods:
yearly_periods = []
added_periods = set()

for period in self.periods:
for year in range(period.start.year, period.end.year + 1):
start_date = datetime(year, 1, 1)
end_date = datetime(year + 1, 1, 1)
period_tuple = (start_date, end_date)

if period_tuple not in added_periods:
yearly_periods.append(Period(start=start_date, end=end_date))
added_periods.add(period_tuple)

return Periods(yearly_periods)

def _calculate_yearly_old(self) -> list[float]:
"""Standard emission intensity at time k, is the sum of emissions from startup until time k
divided by the sum of export from startup until time k.
Thus, intensity_k = ( sum_t=1:k emission(t) ) / ( sum_t=1:k export(t) ).
The yearly emission intensity for year k is the sum of emissions in year k divided by
the sum of export in year k (and thus independent of the years before year k)
I.e. intensity_yearly_k = emission_year_k / export_year_k
emission_year_k may be computed as emission_cumulative(1. january year=k+1) - emission_cumulative(1. january year=k)
hcexport_year_k may be computed as hcexport_cumulative(1. january year=k+1) - hcexport_cumulative(1. january year=k)
To be able to evaluate cumulative emission and hydrocarbon_export at 1. january each year, a linear interpolation function
is created between the time vector and the cumulative function. To be able to treat time as the x-variable, this is
first converted to number of seconds from the beginning of the time vector
"""

df = pd.DataFrame(
data={
"emission_cumulative": self.emission_cumulative.values,
"hydrocarbon_cumulative": self.hydrocarbon_export_cumulative.values,
},
index=self.emission_cumulative.end_dates, # Assuming dates are aligned with cumulative values
)

# Reindex the DataFrame to match the time_vector and fill missing values
df = df.reindex(self.time_vector).ffill().fillna(0)

# df = pd.DataFrame(
# index=self.time_vector,
# data=list(zip(self.emission_cumulative.values, self.hydrocarbon_export_cumulative.values)),
# columns=["emission_cumulative", "hydrocarbon_cumulative"],
# )

# Extending the time vector back and forth 1 year used as padding when calculating yearly buckets.
time_vector_interpolated = pd.date_range(
start=datetime(self.time_vector[0].year - 1, 1, 1),
end=datetime(self.time_vector[-1].year + 1, 1, 1),
freq="YS",
)

# Linearly interpolating by time using the built-in functionality in Pandas.
cumulative_interpolated: Optional[pd.DataFrame] = df.reindex(
sorted(set(self.periods.all_dates).union(time_vector_interpolated))
).interpolate("time")

if cumulative_interpolated is None:
raise ValueError("Time interpolation of cumulative yearly emission intensity failed")

cumulative_yearly = cumulative_interpolated.bfill().loc[time_vector_interpolated]

# Remove the extrapolated timesteps
emissions_per_year = np.diff(cumulative_yearly.emission_cumulative[1:])
hcexport_per_year = np.diff(cumulative_yearly.hydrocarbon_cumulative[1:])

yearly_emission_intensity = np.divide(
emissions_per_year,
hcexport_per_year,
out=np.full_like(emissions_per_year, fill_value=np.nan),
where=hcexport_per_year != 0,
)

return yearly_emission_intensity.tolist()

def calculate_old(self) -> TimeSeriesCalendarDayRate:
"""Legacy code that computes yearly intensity and casts the results back to the original time-vector."""
yearly_buckets = range(self.time_vector[0].year, self.time_vector[-1].year + 1)
yearly_intensity = self._calculate_yearly()
return TimeSeriesCalendarDayRate(
periods=self.periods,
values=[yearly_intensity[yearly_buckets.index(period.start.year)] for period in self.periods],
unit=Unit.KG_SM3,
)

def calculate_periods(self):
emission_volumes = self.emission_cumulative.to_volumes()
hydrocarbon_export_volumes = self.hydrocarbon_export_cumulative.to_volumes()

intensity = emission_volumes / hydrocarbon_export_volumes

return TimeSeriesIntensity(
periods=self.periods,
values=intensity.values,
unit=self.unit,
)

def calculate(self) -> TimeSeriesIntensity:
"""Write description here"""
intensity = self.emission_cumulative / self.hydrocarbon_export_cumulative

return TimeSeriesIntensity(
periods=self.periods,
values=intensity.values,
unit=self.unit,
)
75 changes: 75 additions & 0 deletions src/libecalc/common/utils/rates.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,27 @@ def to_rate(self, regularity: Optional[list[float]] = None) -> TimeSeriesRate:
rate_type=RateType.CALENDAR_DAY,
)

def __truediv__(self, other: object) -> TimeSeriesCalendarDayRate:
if not isinstance(other, TimeSeriesVolumes):
raise TypeError(f"Dividing TimeSeriesVolumes by '{str(other.__class__)}' is not supported.")

if self.unit == Unit.KILO and other.unit == Unit.STANDARD_CUBIC_METER:
unit = Unit.KG_SM3
else:
raise ProgrammingError(
f"Unable to divide unit '{self.unit}' by unit '{other.unit}'. Please add unit conversion."
)
return TimeSeriesCalendarDayRate(
periods=self.periods,
values=np.divide(
self.values,
other.values,
out=np.full_like(self.values, fill_value=np.nan),
where=np.asarray(other.values) != 0.0,
).tolist(),
unit=unit,
)


class TimeSeriesStreamDayRate(TimeSeriesFloat):
"""
Expand Down Expand Up @@ -1061,3 +1082,57 @@ def to_stream_day_timeseries(self) -> TimeSeriesStreamDayRate:
return TimeSeriesStreamDayRate(
periods=stream_day_rate.periods, values=stream_day_rate.values, unit=stream_day_rate.unit
)


class TimeSeriesIntensity(TimeSeries[float]):
@field_validator("values", mode="before")
@classmethod
def convert_none_to_nan(cls, v: Any) -> list[TimeSeriesValue]:
if isinstance(v, list):
# convert None to nan
return [i if i is not None else math.nan for i in v]
return v

def resample(
self, freq: Frequency, include_start_date: bool = True, include_end_date: bool = True
) -> TimeSeriesIntensity:
"""
Resample emission intensity according to given frequency.
Slinear is used in order to only interpolate, not extrapolate.
Args:
freq: The frequency the time series should be resampled to
Returns:
TimeSeriesIntensity resampled to the given frequency
"""
if freq is Frequency.NONE:
return self.model_copy()

ds = pd.Series(index=self.all_dates, data=[0] + self.values)

new_periods = resample_periods(
self.periods, frequency=freq, include_start_date=include_start_date, include_end_date=include_end_date
)

new_dates = new_periods.all_dates
if ds.index[-1] not in new_dates:
logger.warning(
f"The final date in the rate input ({ds.index[-1].strftime('%m/%d/%Y')}) does not "
f"correspond to the end of a period with the requested output frequency. There is a "
f"possibility that the resampling will drop intensities."
)
ds_interpolated = ds.reindex(ds.index.union(new_dates)).interpolate("time")

# New resampled pd.Series
resampled: list[float] = ds_interpolated.reindex(new_dates).values.tolist()

if not include_start_date:
dropped_intensity = resampled[0]
resampled = [value - dropped_intensity for value in resampled[1:]]
else:
resampled = resampled[1:]

return TimeSeriesIntensity(
periods=new_periods,
values=resampled,
unit=self.unit,
)
5 changes: 5 additions & 0 deletions src/libecalc/common/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import numpy as np
from numpy.typing import NDArray
from pydantic import BaseModel, ConfigDict, Field, model_validator
from pydantic_core import CoreSchema, core_schema

from libecalc.common.errors.exceptions import ProgrammingError
from libecalc.common.temporal_model import TemporalModel
Expand Down Expand Up @@ -176,3 +177,7 @@ def get_subset_for_period(self, period: Period) -> ExpressionEvaluator: ...
def evaluate(
self, expression: Union[Expression, TemporalModel, dict[Period, Expression]]
) -> NDArray[np.float64]: ...

@classmethod
def __get_pydantic_core_schema__(cls, source, handler) -> CoreSchema:
return core_schema.any_schema()
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
ComponentType.COMPRESSOR_SYSTEM,
],
energy_usage_model: dict[Period, ElectricEnergyUsageModel],
expression_evaluator: ExpressionEvaluator,
consumes: Literal[ConsumptionType.ELECTRICITY] = ConsumptionType.ELECTRICITY,
):
self.name = name
Expand All @@ -47,6 +48,7 @@ def __init__(
self.energy_usage_model = self.check_energy_usage_model(energy_usage_model)
self._validate_el_consumer_temporal_model(self.energy_usage_model)
self._check_model_energy_usage(self.energy_usage_model)
self.expression_evaluator = expression_evaluator
self.consumes = consumes
self.component_type = component_type

Expand Down Expand Up @@ -78,9 +80,10 @@ def get_component_process_type(self) -> ComponentType:
def get_name(self) -> str:
return self.name

def evaluate_energy_usage(
self, expression_evaluator: ExpressionEvaluator, context: ComponentEnergyContext
) -> dict[str, EcalcModelResult]:
def set_consumer_results(self, consumer_results: dict[str, EcalcModelResult]):
self.consumer_results = consumer_results

def evaluate_energy_usage(self, context: ComponentEnergyContext) -> dict[str, EcalcModelResult]:
consumer_results: dict[str, EcalcModelResult] = {}
consumer = ConsumerEnergyComponent(
id=self.id,
Expand All @@ -95,7 +98,8 @@ def evaluate_energy_usage(
}
),
)
consumer_results[self.id] = consumer.evaluate(expression_evaluator=expression_evaluator)
consumer_results[self.id] = consumer.evaluate(expression_evaluator=self.expression_evaluator)
self.set_consumer_results(consumer_results)

return consumer_results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ def __init__(
],
fuel: dict[Period, FuelType],
energy_usage_model: dict[Period, FuelEnergyUsageModel],
expression_evaluator: ExpressionEvaluator,
consumes: Literal[ConsumptionType.FUEL] = ConsumptionType.FUEL,
):
self.name = name
self.regularity = self.check_regularity(regularity)
validate_temporal_model(self.regularity)
self.user_defined_category = user_defined_category
self.energy_usage_model = self.check_energy_usage_model(energy_usage_model)
self.expression_evaluator = expression_evaluator
self.fuel = self.validate_fuel_exist(name=self.name, fuel=fuel, consumes=consumes)
self._validate_fuel_consumer_temporal_models(self.energy_usage_model, self.fuel)
self._check_model_energy_usage(self.energy_usage_model)
Expand Down Expand Up @@ -88,9 +90,13 @@ def get_component_process_type(self) -> ComponentType:
def get_name(self) -> str:
return self.name

def evaluate_energy_usage(
self, expression_evaluator: ExpressionEvaluator, context: ComponentEnergyContext
) -> dict[str, EcalcModelResult]:
def set_emission_results(self, emission_results: dict[str, EmissionResult]):
self.emission_results = emission_results

def set_consumer_results(self, consumer_results: dict[str, EcalcModelResult]):
self.consumer_results = consumer_results

def evaluate_energy_usage(self, context: ComponentEnergyContext) -> dict[str, EcalcModelResult]:
consumer_results: dict[str, EcalcModelResult] = {}
consumer = ConsumerEnergyComponent(
id=self.id,
Expand All @@ -105,25 +111,27 @@ def evaluate_energy_usage(
}
),
)
consumer_results[self.id] = consumer.evaluate(expression_evaluator=expression_evaluator)
consumer_results[self.id] = consumer.evaluate(expression_evaluator=self.expression_evaluator)
self.set_consumer_results(consumer_results)

return consumer_results

def evaluate_emissions(
self,
energy_context: ComponentEnergyContext,
energy_model: EnergyModel,
expression_evaluator: ExpressionEvaluator,
) -> Optional[dict[str, EmissionResult]]:
fuel_model = FuelModel(self.fuel)
fuel_usage = energy_context.get_fuel_usage()

assert fuel_usage is not None

return fuel_model.evaluate_emissions(
expression_evaluator=expression_evaluator,
emissions = fuel_model.evaluate_emissions(
expression_evaluator=self.expression_evaluator,
fuel_rate=fuel_usage.values,
)
self.set_emission_results(emissions)
return emissions

@staticmethod
def check_energy_usage_model(energy_usage_model: dict[Period, FuelEnergyUsageModel]):
Expand Down
Loading
Loading