Skip to content

Commit

Permalink
fix linting issues and clean up
Browse files Browse the repository at this point in the history
Note: pydocstyle complaints due to missing docstrings in overloaded magic
methods ignored as they should be fixed in the forseeable future (see
PyCQA/pydocstyle#525)
  • Loading branch information
ruestefa committed Jun 8, 2022
1 parent 011e866 commit 0046f74
Showing 1 changed file with 99 additions and 81 deletions.
180 changes: 99 additions & 81 deletions src/atmcirclib/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,32 @@

# Third-party
import numpy as np
import numpy.typing as npt
import pandas as pd

# Generic type aliases
# First-party
from atmcirclib.typing import PathLike_T

# Misc.
RawPathT = Union[Path, str]
# Generic type aliases

# Time step notations
DDHH = Tuple[int, int]
DDHHMM = Tuple[int, int, int]
YYYYMMDDHH = Tuple[int, int, int, int]

# Raw time interval(s)
RawIntervalT = Union[pd.Interval, Tuple[Union[DDHH, DDHHMM], Union[DDHH, DDHHMM]]]
RawIntervalsT = Sequence[RawIntervalT]
RawIntervalST = Union[RawIntervalT, RawIntervalsT]
IntervalLike_T = Union[pd.Interval, Tuple[Union[DDHH, DDHHMM], Union[DDHH, DDHHMM]]]
IntervalsLike_T = Sequence[IntervalLike_T]
IntervalSLike_T = Union[IntervalLike_T, IntervalsLike_T]

# Raw output stream type(s)
RawOutputStreamTypeT = Union["OutputStreamType", str]
RawOutputStreamTypesT = Sequence[RawOutputStreamTypeT]
RawOutputStreamTypeST = Union[RawOutputStreamTypeT, RawOutputStreamTypesT]
OutputStreamTypeLike_T = Union["OutputStreamType", str]
OutputStreamTypesLike_T = Sequence[OutputStreamTypeLike_T]
OutputStreamTypeSLike_T = Union[OutputStreamTypeLike_T, OutputStreamTypesLike_T]

# Raw output streams dict
RawOutputStreamsDictT = Dict[RawOutputStreamTypeST, RawIntervalST]
RawOutputStreamsT = Union["OutputStreams", RawOutputStreamsDictT]
OutputStreamsDictLike_T = Dict[OutputStreamTypeSLike_T, IntervalSLike_T]
OutputStreamsLike_T = Union["OutputStreams", OutputStreamsDictLike_T]


class NamedObj(Protocol):
Expand Down Expand Up @@ -104,6 +105,7 @@ def __getitem__(
return obj

def __repr__(self) -> str:
"""Return string representation."""
return f"{type(self).__name__}([\n " + "\n ".join(map(str, self)) + "\n])"


Expand Down Expand Up @@ -185,6 +187,7 @@ class OutputStream:
intervals: list[pd.Interval]

def __post_init__(self) -> None:
"""Finalize initialization."""
self._run: Optional[SimulationRun] = None
self.check_intervals()

Expand Down Expand Up @@ -304,8 +307,8 @@ def check_intervals(self, steps: Optional[pd.DatetimeIndex] = None) -> None:
@classmethod
def create(
cls,
stream_type: RawOutputStreamTypeT,
intervals: RawIntervalST,
stream_type: OutputStreamTypeLike_T,
intervals: IntervalSLike_T,
*,
output_stream_types: Optional[OutputStreamTypes] = None,
start: Optional[pd.Timestamp] = None,
Expand Down Expand Up @@ -371,6 +374,17 @@ def get_all_steps(self) -> pd.DatetimeIndex:
stream.check_intervals(all_steps)
return all_steps

def count_per_step(self) -> tuple[npt.NDArray[np.int_], pd.DatetimeIndex]:
"""Count the number of streams overing each step."""
steps = self.get_all_steps()
counts = np.zeros(steps.size, np.int32)
for stream in self:
for interval in stream.intervals:
i0 = steps.get_loc(interval.left)
i1 = steps.get_loc(interval.right)
counts[i0 : i1 + 1] += 1
return counts, steps

def set_run(self, run: SimulationRun) -> None:
"""Associate all output streams with a simulation run."""
for stream in self:
Expand All @@ -379,7 +393,7 @@ def set_run(self, run: SimulationRun) -> None:
@classmethod
def create(
cls,
output: RawOutputStreamsT,
output: OutputStreamsLike_T,
*,
output_stream_types: Optional[OutputStreamTypes] = None,
start: Optional[pd.Timestamp] = None,
Expand All @@ -388,9 +402,9 @@ def create(
if isinstance(output, OutputStreams):
return copy(output)
streams = cls()
stream_type_s: RawOutputStreamTypeST
stream_type: RawOutputStreamTypeT
interval_s: RawIntervalST
stream_type_s: OutputStreamTypeSLike_T
stream_type: OutputStreamTypeLike_T
interval_s: IntervalSLike_T
for stream_type_s, interval_s in output.items():
if isinstance(stream_type_s, str) or not isinstance(
stream_type_s, Collection
Expand Down Expand Up @@ -444,61 +458,14 @@ def __init__(
self,
*,
start: Union[pd.Timestamp, YYYYMMDDHH],
path: Optional[RawPathT] = None,
rel_path: Optional[RawPathT] = None,
output: Optional[RawOutputStreamsT] = None,
path: Optional[PathLike_T] = None,
rel_path: Optional[PathLike_T] = None,
output: Optional[OutputStreamsLike_T] = None,
end_rel: Optional[Union[pd.Timedelta, DDHH, DDHHMM]] = None,
end_type: Optional[Union[SimulationRunEndType, str]] = None,
simulation_run_end_types: Optional[SimulationRunEndTypes] = None,
) -> None:
"""Create an instance of ``SimulationRun``."""

def init_end_rel(
raw_end_rel: Optional[Union[pd.Timedelta, DDHH, DDHHMM]],
output: OutputStreams,
start: pd.Timestamp,
) -> pd.Timedelta:
"""Initialize ``end_rel``."""
if raw_end_rel is None:
if not output:
raw_end_rel = pd.Timedelta(0)
else:
last_output = max(
interval.right
for stream in output
for interval in stream.intervals
)
raw_end_rel = last_output - start
return init_timedelta(raw_end_rel)

def init_end_type(
raw_end_type: Optional[Union[SimulationRunEndType, str]]
) -> SimulationRunEndType:
"""Initialize ``end_type``."""
if isinstance(raw_end_type, SimulationRunEndType):
return raw_end_type
if simulation_run_end_types is None:
raise ValueError(
"must pass simulation_run_end_types if end_type is not of type"
" SimulationRunEndType"
)
return simulation_run_end_types[raw_end_type or "success"]

def init_label(paths: Collection[Optional[Path]]) -> str:
label = ""
for path in paths:
if path is not None:
if not label:
label = path.name
elif label != path.name:
raise ValueError(
f"inconsistent labels: {label} != {path.name}; paths: "
+ ", ".join(map(str, paths))
)
if not label:
raise ValueError(f"could not derive label from paths: {paths}")
return label

self.abs_path: Optional[Path] = None if path is None else Path(path)
self.rel_path: Optional[Path] = None if rel_path is None else Path(rel_path)
self.start: pd.Timestamp = init_timestamp(start)
Expand All @@ -508,11 +475,15 @@ def init_label(paths: Collection[Optional[Path]]) -> str:
path = self.abs_path or self.rel_path
raise Exception(f"error creating output streams for run at {path}") from e
self.output: OutputStreams = streams
self.end_rel: pd.Timedelta = init_end_rel(end_rel, self.output, self.start)
self.end_type: SimulationRunEndType = init_end_type(end_type)
self.end_rel: pd.Timedelta = self._init_end_rel(
end_rel, self.output, self.start
)
self.end_type: SimulationRunEndType = self._init_end_type(
end_type, simulation_run_end_types
)

self.output.set_run(self)
self.label: str = init_label([self.abs_path, self.rel_path])
self.label: str = self._init_label([self.abs_path, self.rel_path])
self.end: pd.Timestamp = self.start + self.end_rel
self.write_start: pd.Timestamp = min(
[interval.left for stream in self.output for interval in stream.intervals]
Expand Down Expand Up @@ -554,7 +525,9 @@ def get_full_path(self) -> Path:
)
return path

def init_path(path: Optional[RawPathT], rel_path: Optional[RawPathT]) -> Path:
def init_path(
path: Optional[PathLike_T], rel_path: Optional[PathLike_T]
) -> Path:
if path is None and rel_path is None:
raise ValueError("path and rel_path are both None")
elif rel_path is not None:
Expand All @@ -569,16 +542,65 @@ def exists(self) -> bool:
return self.get_full_path().exists()

def __repr__(self) -> str:
"""Return a string representation."""
path = self.get_full_path()
return f"{type(self).__name__}('{path}', {self.start})"

@staticmethod
def _init_end_rel(
raw_end_rel: Optional[Union[pd.Timedelta, DDHH, DDHHMM]],
output: OutputStreams,
start: pd.Timestamp,
) -> pd.Timedelta:
"""Initialize ``end_rel``."""
if raw_end_rel is None:
if not output:
raw_end_rel = pd.Timedelta(0)
else:
last_output = max(
interval.right for stream in output for interval in stream.intervals
)
raw_end_rel = last_output - start
return init_timedelta(raw_end_rel)

@staticmethod
def _init_end_type(
raw_end_type: Optional[Union[SimulationRunEndType, str]],
simulation_run_end_types: Optional[SimulationRunEndTypes],
) -> SimulationRunEndType:
"""Initialize ``end_type``."""
if isinstance(raw_end_type, SimulationRunEndType):
return raw_end_type
if simulation_run_end_types is None:
raise ValueError(
"must pass simulation_run_end_types if end_type is not of type"
" SimulationRunEndType"
)
return simulation_run_end_types[raw_end_type or "success"]

@staticmethod
def _init_label(paths: Collection[Optional[Path]]) -> str:
label = ""
for path in paths:
if path is not None:
if not label:
label = path.name
elif label != path.name:
raise ValueError(
f"inconsistent labels: {label} != {path.name}; paths: "
+ ", ".join(map(str, paths))
)
if not label:
raise ValueError(f"could not derive label from paths: {paths}")
return label


class Simulation:
"""A simulation comprised of one or more simulation runs."""

def __init__(
self,
path: RawPathT,
path: PathLike_T,
runs: Optional[Sequence[SimulationRun]] = None,
) -> None:
"""Create an instance of ``Simulation``."""
Expand Down Expand Up @@ -632,16 +654,10 @@ def find_redundant_output(
if stream_type.removed_files:
continue
try:
steps = streams.get_all_steps()
counts, steps = streams.count_per_step()
except EmptyOutputStreamError:
continue
coverage = np.zeros(steps.size)
for stream in streams:
for interval in stream.intervals:
i0 = steps.get_loc(interval.left)
i1 = steps.get_loc(interval.right)
coverage[i0 : i1 + 1] += 1
idcs = np.where(steps[coverage > 1])[0]
idcs = np.where(steps[counts > 1])[0]
if idcs.size == 0:
continue
multi_steps[stream_type] = {}
Expand All @@ -657,6 +673,7 @@ def find_redundant_output(
return multi_steps

def __repr__(self) -> str:
"""Return a string representation."""
return f"{type(self).__name__}([" + ", ".join(map(str, self.get_runs())) + "])"


Expand Down Expand Up @@ -734,6 +751,7 @@ def collect_redundant_output_files(self) -> list[list[Path]]:
return paths_sims

def __repr__(self) -> str:
"""Return a string representation."""
return f"{type(self).__name__}([\n " + "\n ".join(map(str, self)) + "\n])"

@classmethod
Expand Down Expand Up @@ -790,7 +808,7 @@ def init_timedelta(val: Union[pd.Timedelta, DDHH, DDHHMM]) -> pd.Timedelta:


def init_interval(
val: RawIntervalT,
val: IntervalLike_T,
start: Optional[pd.Timestamp] = None,
) -> pd.Interval:
"""Initialize a time interval object."""
Expand Down

0 comments on commit 0046f74

Please sign in to comment.