Skip to content

Commit

Permalink
chore: remove pydantic in legacy consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
frodehk committed Dec 18, 2024
1 parent 92f87cb commit 56c0263
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,15 @@ def evaluate_rate_ps_pd(
"""
# subtract an epsilon to make robust comparison.
if rate is not None:
# Ensure rate is a NumPy array
rate = np.array(rate, dtype=np.float64)
# To avoid bringing rate below zero.
rate = np.where(rate > 0, rate - EPSILON, rate)
if suction_pressure is not None:
suction_pressure = np.array(suction_pressure, dtype=np.float64)

if discharge_pressure is not None:
discharge_pressure = np.array(discharge_pressure, dtype=np.float64)

number_of_data_points = 0
if rate is not None:
Expand Down
7 changes: 7 additions & 0 deletions src/libecalc/core/models/model_input_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ def validate_model_input(
NDArray[np.float64],
list[ModelInputFailureStatus],
]:
# Ensure input is a NumPy array
rate = np.array(rate, dtype=np.float64)
suction_pressure = np.array(suction_pressure, dtype=np.float64)
discharge_pressure = np.array(discharge_pressure, dtype=np.float64)

indices_to_validate = _find_indices_to_validate(rate=rate)
validated_failure_status = [ModelInputFailureStatus.NO_FAILURE] * len(suction_pressure)
validated_rate = rate.copy()
validated_suction_pressure = suction_pressure.copy()
validated_discharge_pressure = discharge_pressure.copy()
if intermediate_pressure is not None:
intermediate_pressure = np.array(intermediate_pressure, dtype=np.float64)
validated_intermediate_pressure = intermediate_pressure
if len(indices_to_validate) >= 1:
(
Expand Down Expand Up @@ -82,6 +88,7 @@ def _find_indices_to_validate(rate: NDArray[np.float64]) -> list[int]:
For a 1D array, this means returning the indices where rate is positive.
For a 2D array, this means returning the indices where at least one rate is positive (along 0-axis).
"""
rate = np.atleast_1d(rate) # Ensure rate is at least 1D
return np.where(np.any(rate != 0, axis=0) if np.ndim(rate) == 2 else rate != 0)[0].tolist()


Expand Down
10 changes: 9 additions & 1 deletion src/libecalc/core/models/pump/pump.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ def evaluate_streams(

@staticmethod
def _calculate_head(
ps: NDArray[np.float64], pd: NDArray[np.float64], density: Union[NDArray[np.float64], float]
ps: Union[NDArray[np.float64], list[float]],
pd: Union[NDArray[np.float64], list[float]],
density: Union[NDArray[np.float64], float],
) -> NDArray[np.float64]:
""":return: Head in joule per kg [J/kg]"""
ps = np.array(ps, dtype=np.float64)
pd = np.array(pd, dtype=np.float64)
return np.array(Unit.BARA.to(Unit.PASCAL)(pd - ps) / density)

@staticmethod
Expand Down Expand Up @@ -288,6 +292,10 @@ def evaluate_rate_ps_pd_density(
:param discharge_pressures:
:param fluid_density:
"""
# Ensure rate is a NumPy array
rate = np.array(rate, dtype=np.float64)
fluid_density = np.array(fluid_density, dtype=np.float64)

# Ensure that the pump does not run when rate is <= 0.
stream_day_rate = np.where(rate > 0, rate, 0)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,93 @@
from __future__ import annotations

import copy
from abc import abstractmethod
from enum import Enum
from typing import Literal, Optional, Union

import numpy as np
from pydantic import BaseModel, ConfigDict
from numpy.typing import NDArray

from libecalc.common.logger import logger
from libecalc.common.time_utils import Periods
from libecalc.core.models.results.base import EnergyFunctionResult
from libecalc.core.utils.array_type import PydanticNDArray
from libecalc.domain.infrastructure.energy_components.legacy_consumer.consumer_function.types import (
ConsumerFunctionType,
)


class ConsumerFunctionResultBase(BaseModel):
class ConsumerFunctionResultBase:
"""Result object for ConsumerFunction.
Units:
energy_usage [MW]
"""

typ: ConsumerFunctionType

periods: Periods
is_valid: PydanticNDArray
energy_usage: PydanticNDArray
energy_usage_before_power_loss_factor: Optional[PydanticNDArray] = None
condition: Optional[PydanticNDArray] = None
power_loss_factor: Optional[PydanticNDArray] = None
energy_function_result: Optional[Union[EnergyFunctionResult, list[EnergyFunctionResult]]] = None

# New! to support fuel to power rate...for e.g. compressors emulating turbine
power: Optional[PydanticNDArray] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(
self,
typ: ConsumerFunctionType,
periods: Periods,
is_valid: NDArray,
energy_usage: NDArray,
energy_usage_before_power_loss_factor: Optional[NDArray] = None,
condition: Optional[NDArray] = None,
power_loss_factor: Optional[NDArray] = None,
energy_function_result: Optional[Union[EnergyFunctionResult, list[EnergyFunctionResult]]] = None,
# New! to support fuel to power rate...for e.g. compressors emulating turbine
power: Optional[NDArray] = None,
):
self.typ = typ
self.periods = periods
self.is_valid = is_valid
self.energy_usage = energy_usage
self.energy_usage_before_power_loss_factor = energy_usage_before_power_loss_factor
self.condition = condition
self.power_loss_factor = power_loss_factor
self.energy_function_result = energy_function_result
self.power = power

@abstractmethod
def extend(self, other: object) -> ConsumerFunctionResultBase: ...

def model_copy(self, deep: bool = False) -> ConsumerFunctionResultBase:
if deep:
return copy.deepcopy(self)
return copy.copy(self)


class ConsumerFunctionResult(ConsumerFunctionResultBase):
typ: Literal[ConsumerFunctionType.SINGLE] = ConsumerFunctionType.SINGLE # type: ignore[valid-type]
def __init__(
self,
periods: Periods,
is_valid: NDArray,
energy_usage: NDArray,
typ: Literal[ConsumerFunctionType.SINGLE] = ConsumerFunctionType.SINGLE, # type: ignore[valid-type]
energy_usage_before_power_loss_factor: Optional[NDArray] = None,
condition: Optional[NDArray] = None,
power_loss_factor: Optional[NDArray] = None,
energy_function_result: Optional[Union[EnergyFunctionResult, list[EnergyFunctionResult]]] = None,
power: Optional[NDArray] = None,
):
super().__init__(
typ,
periods,
is_valid,
energy_usage,
energy_usage_before_power_loss_factor,
condition,
power_loss_factor,
energy_function_result,
power,
)
self.typ = typ
self.periods = periods
self.is_valid = is_valid
self.energy_usage = energy_usage
self.energy_usage_before_power_loss_factor = energy_usage_before_power_loss_factor
self.condition = condition
self.power_loss_factor = power_loss_factor
self.energy_function_result = energy_function_result
self.power = power

def extend(self, other) -> ConsumerFunctionResult:
"""This is used when merging different time slots when the energy function of a consumer changes over time."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@

import numpy as np
from numpy.typing import NDArray
from pydantic import BaseModel, ConfigDict, model_validator

from libecalc.common.errors.exceptions import EcalcError, IncompatibleDataError
from libecalc.common.logger import logger
from libecalc.common.utils.rates import Rates
from libecalc.core.utils.array_type import PydanticNDArray
from libecalc.expression import Expression


class ConsumerSystemOperationalSettingExpressions(BaseModel):
class ConsumerSystemOperationalSettingExpressions:
"""Each index of a setting is aligned with a consumer. The first consumer has rate self.rates[0], etc.
cross_overs: Defines what consumer to send exceeding rates to (Warning! index starts at 1!).
Expand All @@ -28,17 +26,25 @@ class ConsumerSystemOperationalSettingExpressions(BaseModel):
Note that circular references is not possible.
"""

rates: list[Expression]
suction_pressures: list[Expression]
discharge_pressures: list[Expression]
cross_overs: Optional[list[int]] = None
fluid_densities: Optional[list[Expression]] = None
def __init__(
self,
rates: list[Expression],
suction_pressures: list[Expression],
discharge_pressures: list[Expression],
cross_overs: Optional[list[int]] = None,
fluid_densities: Optional[list[Expression]] = None,
):
self.rates = rates
self.suction_pressures = suction_pressures
self.discharge_pressures = discharge_pressures
self.cross_overs = cross_overs
self.fluid_densities = fluid_densities
self.check_list_length()

@property
def number_of_consumers(self):
return len(self.rates)

@model_validator(mode="after")
def check_list_length(self):
def _log_error(field: str, field_values: list[Any], n_rates) -> None:
msg = (
Expand Down Expand Up @@ -68,20 +74,36 @@ class CompressorSystemOperationalSettingExpressions(ConsumerSystemOperationalSet


class PumpSystemOperationalSettingExpressions(ConsumerSystemOperationalSettingExpressions):
fluid_densities: list[Expression]
def __init__(
self,
rates: list[Expression],
suction_pressures: list[Expression],
fluid_densities: list[Expression],
discharge_pressures: list[Expression],
cross_overs: Optional[list[int]] = None,
):
super().__init__(rates, suction_pressures, discharge_pressures, cross_overs)
self.fluid_densities = fluid_densities


class ConsumerSystemOperationalSetting(BaseModel):
class ConsumerSystemOperationalSetting:
"""Warning! The methods below are fragile to changes in attribute names and types."""

rates: list[PydanticNDArray]
suction_pressures: list[PydanticNDArray]
discharge_pressures: list[PydanticNDArray]
cross_overs: Optional[list[int]] = None
fluid_densities: Optional[list[PydanticNDArray]] = None
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
def __init__(
self,
rates: list[NDArray[np.float64]],
suction_pressures: list[NDArray[np.float64]],
discharge_pressures: list[NDArray[np.float64]],
cross_overs: Optional[list[int]] = None,
fluid_densities: Optional[list[NDArray[np.float64]]] = None,
):
self.rates = rates
self.suction_pressures = suction_pressures
self.discharge_pressures = discharge_pressures
self.cross_overs = cross_overs
self.fluid_densities = fluid_densities
self.check_list_length()

@model_validator(mode="after")
def check_list_length(self):
def _log_error(field: str, field_values: list[Any], n_rates: int) -> None:
error_message = (
Expand Down Expand Up @@ -142,9 +164,25 @@ def set_rates_after_cross_over(
data.update({"rates": rates_after_cross_over})
return self.__class__(**data)

def model_copy(self, update: dict) -> ConsumerSystemOperationalSetting:
"""Create a copy of the current model with updates."""
new_model = deepcopy(self)
for key, value in update.items():
setattr(new_model, key, value)
return new_model


class CompressorSystemOperationalSetting(ConsumerSystemOperationalSetting): ...


class PumpSystemOperationalSetting(ConsumerSystemOperationalSetting):
fluid_densities: list[PydanticNDArray]
def __init__(
self,
rates: list[NDArray[np.float64]],
suction_pressures: list[NDArray[np.float64]],
discharge_pressures: list[NDArray[np.float64]],
fluid_densities: list[NDArray[np.float64]],
cross_overs: Optional[list[int]] = None,
):
super().__init__(rates, suction_pressures, discharge_pressures, cross_overs, fluid_densities)
self.fluid_densities = fluid_densities
Loading

0 comments on commit 56c0263

Please sign in to comment.