From 0046f747f043b3dcad6eca92a0d8bfac9d27b51f Mon Sep 17 00:00:00 2001 From: Stefan Ruedisuehli Date: Wed, 8 Jun 2022 15:27:15 +0200 Subject: [PATCH] fix linting issues and clean up Note: pydocstyle complaints due to missing docstrings in overloaded magic methods ignored as they should be fixed in the forseeable future (see https://github.com/PyCQA/pydocstyle/issues/525) --- src/atmcirclib/simulations.py | 180 +++++++++++++++++++--------------- 1 file changed, 99 insertions(+), 81 deletions(-) diff --git a/src/atmcirclib/simulations.py b/src/atmcirclib/simulations.py index 5513055..0aba7bc 100644 --- a/src/atmcirclib/simulations.py +++ b/src/atmcirclib/simulations.py @@ -25,12 +25,13 @@ # Third-party import numpy as np +import numpy.typing as npt import pandas as pd -# Generic type aliases +# First-party +from atmcirclib.typing import PathLike_T -# Misc. -RawPathT = Union[Path, str] +# Generic type aliases # Time step notations DDHH = Tuple[int, int] @@ -38,18 +39,18 @@ YYYYMMDDHH = Tuple[int, int, int, int] # Raw time interval(s) -RawIntervalT = Union[pd.Interval, Tuple[Union[DDHH, DDHHMM], Union[DDHH, DDHHMM]]] -RawIntervalsT = Sequence[RawIntervalT] -RawIntervalST = Union[RawIntervalT, RawIntervalsT] +IntervalLike_T = Union[pd.Interval, Tuple[Union[DDHH, DDHHMM], Union[DDHH, DDHHMM]]] +IntervalsLike_T = Sequence[IntervalLike_T] +IntervalSLike_T = Union[IntervalLike_T, IntervalsLike_T] # Raw output stream type(s) -RawOutputStreamTypeT = Union["OutputStreamType", str] -RawOutputStreamTypesT = Sequence[RawOutputStreamTypeT] -RawOutputStreamTypeST = Union[RawOutputStreamTypeT, RawOutputStreamTypesT] +OutputStreamTypeLike_T = Union["OutputStreamType", str] +OutputStreamTypesLike_T = Sequence[OutputStreamTypeLike_T] +OutputStreamTypeSLike_T = Union[OutputStreamTypeLike_T, OutputStreamTypesLike_T] # Raw output streams dict -RawOutputStreamsDictT = Dict[RawOutputStreamTypeST, RawIntervalST] -RawOutputStreamsT = Union["OutputStreams", RawOutputStreamsDictT] +OutputStreamsDictLike_T = Dict[OutputStreamTypeSLike_T, IntervalSLike_T] +OutputStreamsLike_T = Union["OutputStreams", OutputStreamsDictLike_T] class NamedObj(Protocol): @@ -104,6 +105,7 @@ def __getitem__( return obj def __repr__(self) -> str: + """Return string representation.""" return f"{type(self).__name__}([\n " + "\n ".join(map(str, self)) + "\n])" @@ -185,6 +187,7 @@ class OutputStream: intervals: list[pd.Interval] def __post_init__(self) -> None: + """Finalize initialization.""" self._run: Optional[SimulationRun] = None self.check_intervals() @@ -304,8 +307,8 @@ def check_intervals(self, steps: Optional[pd.DatetimeIndex] = None) -> None: @classmethod def create( cls, - stream_type: RawOutputStreamTypeT, - intervals: RawIntervalST, + stream_type: OutputStreamTypeLike_T, + intervals: IntervalSLike_T, *, output_stream_types: Optional[OutputStreamTypes] = None, start: Optional[pd.Timestamp] = None, @@ -371,6 +374,17 @@ def get_all_steps(self) -> pd.DatetimeIndex: stream.check_intervals(all_steps) return all_steps + def count_per_step(self) -> tuple[npt.NDArray[np.int_], pd.DatetimeIndex]: + """Count the number of streams overing each step.""" + steps = self.get_all_steps() + counts = np.zeros(steps.size, np.int32) + for stream in self: + for interval in stream.intervals: + i0 = steps.get_loc(interval.left) + i1 = steps.get_loc(interval.right) + counts[i0 : i1 + 1] += 1 + return counts, steps + def set_run(self, run: SimulationRun) -> None: """Associate all output streams with a simulation run.""" for stream in self: @@ -379,7 +393,7 @@ def set_run(self, run: SimulationRun) -> None: @classmethod def create( cls, - output: RawOutputStreamsT, + output: OutputStreamsLike_T, *, output_stream_types: Optional[OutputStreamTypes] = None, start: Optional[pd.Timestamp] = None, @@ -388,9 +402,9 @@ def create( if isinstance(output, OutputStreams): return copy(output) streams = cls() - stream_type_s: RawOutputStreamTypeST - stream_type: RawOutputStreamTypeT - interval_s: RawIntervalST + stream_type_s: OutputStreamTypeSLike_T + stream_type: OutputStreamTypeLike_T + interval_s: IntervalSLike_T for stream_type_s, interval_s in output.items(): if isinstance(stream_type_s, str) or not isinstance( stream_type_s, Collection @@ -444,61 +458,14 @@ def __init__( self, *, start: Union[pd.Timestamp, YYYYMMDDHH], - path: Optional[RawPathT] = None, - rel_path: Optional[RawPathT] = None, - output: Optional[RawOutputStreamsT] = None, + path: Optional[PathLike_T] = None, + rel_path: Optional[PathLike_T] = None, + output: Optional[OutputStreamsLike_T] = None, end_rel: Optional[Union[pd.Timedelta, DDHH, DDHHMM]] = None, end_type: Optional[Union[SimulationRunEndType, str]] = None, simulation_run_end_types: Optional[SimulationRunEndTypes] = None, ) -> None: """Create an instance of ``SimulationRun``.""" - - def init_end_rel( - raw_end_rel: Optional[Union[pd.Timedelta, DDHH, DDHHMM]], - output: OutputStreams, - start: pd.Timestamp, - ) -> pd.Timedelta: - """Initialize ``end_rel``.""" - if raw_end_rel is None: - if not output: - raw_end_rel = pd.Timedelta(0) - else: - last_output = max( - interval.right - for stream in output - for interval in stream.intervals - ) - raw_end_rel = last_output - start - return init_timedelta(raw_end_rel) - - def init_end_type( - raw_end_type: Optional[Union[SimulationRunEndType, str]] - ) -> SimulationRunEndType: - """Initialize ``end_type``.""" - if isinstance(raw_end_type, SimulationRunEndType): - return raw_end_type - if simulation_run_end_types is None: - raise ValueError( - "must pass simulation_run_end_types if end_type is not of type" - " SimulationRunEndType" - ) - return simulation_run_end_types[raw_end_type or "success"] - - def init_label(paths: Collection[Optional[Path]]) -> str: - label = "" - for path in paths: - if path is not None: - if not label: - label = path.name - elif label != path.name: - raise ValueError( - f"inconsistent labels: {label} != {path.name}; paths: " - + ", ".join(map(str, paths)) - ) - if not label: - raise ValueError(f"could not derive label from paths: {paths}") - return label - self.abs_path: Optional[Path] = None if path is None else Path(path) self.rel_path: Optional[Path] = None if rel_path is None else Path(rel_path) self.start: pd.Timestamp = init_timestamp(start) @@ -508,11 +475,15 @@ def init_label(paths: Collection[Optional[Path]]) -> str: path = self.abs_path or self.rel_path raise Exception(f"error creating output streams for run at {path}") from e self.output: OutputStreams = streams - self.end_rel: pd.Timedelta = init_end_rel(end_rel, self.output, self.start) - self.end_type: SimulationRunEndType = init_end_type(end_type) + self.end_rel: pd.Timedelta = self._init_end_rel( + end_rel, self.output, self.start + ) + self.end_type: SimulationRunEndType = self._init_end_type( + end_type, simulation_run_end_types + ) self.output.set_run(self) - self.label: str = init_label([self.abs_path, self.rel_path]) + self.label: str = self._init_label([self.abs_path, self.rel_path]) self.end: pd.Timestamp = self.start + self.end_rel self.write_start: pd.Timestamp = min( [interval.left for stream in self.output for interval in stream.intervals] @@ -554,7 +525,9 @@ def get_full_path(self) -> Path: ) return path - def init_path(path: Optional[RawPathT], rel_path: Optional[RawPathT]) -> Path: + def init_path( + path: Optional[PathLike_T], rel_path: Optional[PathLike_T] + ) -> Path: if path is None and rel_path is None: raise ValueError("path and rel_path are both None") elif rel_path is not None: @@ -569,16 +542,65 @@ def exists(self) -> bool: return self.get_full_path().exists() def __repr__(self) -> str: + """Return a string representation.""" path = self.get_full_path() return f"{type(self).__name__}('{path}', {self.start})" + @staticmethod + def _init_end_rel( + raw_end_rel: Optional[Union[pd.Timedelta, DDHH, DDHHMM]], + output: OutputStreams, + start: pd.Timestamp, + ) -> pd.Timedelta: + """Initialize ``end_rel``.""" + if raw_end_rel is None: + if not output: + raw_end_rel = pd.Timedelta(0) + else: + last_output = max( + interval.right for stream in output for interval in stream.intervals + ) + raw_end_rel = last_output - start + return init_timedelta(raw_end_rel) + + @staticmethod + def _init_end_type( + raw_end_type: Optional[Union[SimulationRunEndType, str]], + simulation_run_end_types: Optional[SimulationRunEndTypes], + ) -> SimulationRunEndType: + """Initialize ``end_type``.""" + if isinstance(raw_end_type, SimulationRunEndType): + return raw_end_type + if simulation_run_end_types is None: + raise ValueError( + "must pass simulation_run_end_types if end_type is not of type" + " SimulationRunEndType" + ) + return simulation_run_end_types[raw_end_type or "success"] + + @staticmethod + def _init_label(paths: Collection[Optional[Path]]) -> str: + label = "" + for path in paths: + if path is not None: + if not label: + label = path.name + elif label != path.name: + raise ValueError( + f"inconsistent labels: {label} != {path.name}; paths: " + + ", ".join(map(str, paths)) + ) + if not label: + raise ValueError(f"could not derive label from paths: {paths}") + return label + class Simulation: """A simulation comprised of one or more simulation runs.""" def __init__( self, - path: RawPathT, + path: PathLike_T, runs: Optional[Sequence[SimulationRun]] = None, ) -> None: """Create an instance of ``Simulation``.""" @@ -632,16 +654,10 @@ def find_redundant_output( if stream_type.removed_files: continue try: - steps = streams.get_all_steps() + counts, steps = streams.count_per_step() except EmptyOutputStreamError: continue - coverage = np.zeros(steps.size) - for stream in streams: - for interval in stream.intervals: - i0 = steps.get_loc(interval.left) - i1 = steps.get_loc(interval.right) - coverage[i0 : i1 + 1] += 1 - idcs = np.where(steps[coverage > 1])[0] + idcs = np.where(steps[counts > 1])[0] if idcs.size == 0: continue multi_steps[stream_type] = {} @@ -657,6 +673,7 @@ def find_redundant_output( return multi_steps def __repr__(self) -> str: + """Return a string representation.""" return f"{type(self).__name__}([" + ", ".join(map(str, self.get_runs())) + "])" @@ -734,6 +751,7 @@ def collect_redundant_output_files(self) -> list[list[Path]]: return paths_sims def __repr__(self) -> str: + """Return a string representation.""" return f"{type(self).__name__}([\n " + "\n ".join(map(str, self)) + "\n])" @classmethod @@ -790,7 +808,7 @@ def init_timedelta(val: Union[pd.Timedelta, DDHH, DDHHMM]) -> pd.Timedelta: def init_interval( - val: RawIntervalT, + val: IntervalLike_T, start: Optional[pd.Timestamp] = None, ) -> pd.Interval: """Initialize a time interval object."""