From aaf083362188e4692dac3a61e187e93575c4e813 Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Tue, 12 Nov 2024 23:29:19 -0700 Subject: [PATCH] add rep_time support --- src/chronify/duckdb/functions.py | 23 --- src/chronify/sqlalchemy/functions.py | 7 +- src/chronify/time.py | 25 ++- src/chronify/time_configs.py | 108 +++++++++--- src/chronify/time_series_checker.py | 5 +- src/chronify/time_series_mapper.py | 145 ++-------------- src/chronify/time_series_mapper_base.py | 13 ++ .../time_series_mapper_representative.py | 164 ++++++++++++++++++ tests/mock_rep_time.py | 1 + tests/test_duckdb_functions.py | 74 -------- tests/test_store.py | 1 + tests/test_time_series_mapper.py | 149 ++++++++++++++++ 12 files changed, 459 insertions(+), 256 deletions(-) create mode 100644 src/chronify/time_series_mapper_base.py create mode 100644 src/chronify/time_series_mapper_representative.py delete mode 100644 tests/test_duckdb_functions.py create mode 100644 tests/test_time_series_mapper.py diff --git a/src/chronify/duckdb/functions.py b/src/chronify/duckdb/functions.py index 5f461f3..7c1cff3 100644 --- a/src/chronify/duckdb/functions.py +++ b/src/chronify/duckdb/functions.py @@ -57,26 +57,3 @@ def unpivot( ) """ return duckdb.sql(query) - - -def join( - left_rel: DuckDBPyRelation, - right_rel: DuckDBPyRelation, - on: list[str], - how: str = "inner", -) -> DuckDBPyRelation: - def get_join_statement(left_df, right_df, keys: list): - stmts = [f"{left_df.alias}.{key}={right_df.alias}.{key}" for key in keys] - return " and ".join(stmts) - - def get_select_after_join_statement(left_df, right_df, keys: list): - left_cols = [f"{left_df.alias}.{x}" for x in left_df.columns] - right_cols = [x for x in right_df.columns if x not in keys] - return ", ".join(left_cols + right_cols) - - join_stmt = get_join_statement(left_rel, right_rel, on) - select_stmt = get_select_after_join_statement(left_rel, right_rel, on) - query = f"SELECT {select_stmt} from {left_rel.alias} {how.upper()} JOIN {right_rel.alias} ON {join_stmt}" - breakpoint() - # return left_rel.join(right_rel, join_stmt).select(select_stmt) - return duckdb.sql(query) diff --git a/src/chronify/sqlalchemy/functions.py b/src/chronify/sqlalchemy/functions.py index 5c2e453..c6c74b9 100644 --- a/src/chronify/sqlalchemy/functions.py +++ b/src/chronify/sqlalchemy/functions.py @@ -18,6 +18,9 @@ def read_database(query: Selectable | str, conn: Connection, schema: TableSchema df[config.time_column] = pd.to_datetime(df[config.time_column], utc=True) else: df[config.time_column] = df[config.time_column].dt.tz_localize("UTC") + elif conn.engine.name == "sqlite" and isinstance(config, DatetimeRange): + if isinstance(df[config.time_column].dtype, ObjectDType): + df[config.time_column] = pd.to_datetime(df[config.time_column], utc=False) return df @@ -26,8 +29,8 @@ def write_database(df: pd.DataFrame, conn: Connection, schema: TableSchema) -> N config = schema.time_config if config.needs_utc_conversion(conn.engine.name): assert isinstance(config, DatetimeRange) - if isinstance(df.timestamp.dtype, DatetimeTZDtype): + if isinstance(df[config.time_column].dtype, DatetimeTZDtype): df[config.time_column] = df[config.time_column].dt.tz_convert("UTC") else: df[config.time_column] = df[config.time_column].dt.tz_localize("UTC") - pl.DataFrame(df).write_database(schema.name, connection=conn, if_table_exists="append") + pl.DataFrame(df).write_database(schema.name, connection=conn, if_table_exists="replace") diff --git a/src/chronify/time.py b/src/chronify/time.py index c189cff..180a4df 100644 --- a/src/chronify/time.py +++ b/src/chronify/time.py @@ -36,6 +36,27 @@ class RepresentativePeriodFormat(StrEnum): ) +representative_weekday_column = { + RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR: "day_of_week", + RepresentativePeriodFormat.ONE_WEEKDAY_DAY_AND_ONE_WEEKEND_DAY_PER_MONTH_BY_HOUR: "is_weekday", +} + +representative_period_columns = { + RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR: [ + "month", + representative_weekday_column[RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR], + "hour", + ], + RepresentativePeriodFormat.ONE_WEEKDAY_DAY_AND_ONE_WEEKEND_DAY_PER_MONTH_BY_HOUR: [ + "month", + representative_weekday_column[ + RepresentativePeriodFormat.ONE_WEEKDAY_DAY_AND_ONE_WEEKEND_DAY_PER_MONTH_BY_HOUR + ], + "hour", + ], +} + + class LeapDayAdjustmentType(StrEnum): """Leap day adjustment enum types""" @@ -149,7 +170,7 @@ def get_time_zone_offset(tz: TimeZone) -> str: return offset -def get_standard_timezone(tz: TimeZone) -> TimeZone: +def get_standard_time(tz: TimeZone) -> TimeZone: """Return the equivalent standard time zone.""" match tz: case TimeZone.UTC: @@ -173,7 +194,7 @@ def get_standard_timezone(tz: TimeZone) -> TimeZone: raise NotImplementedError(msg) -def get_prevailing_timezone(tz: TimeZone) -> TimeZone: +def get_prevailing_time(tz: TimeZone) -> TimeZone: """Return the equivalent prevailing time zone.""" match tz: case TimeZone.UTC: diff --git a/src/chronify/time_configs.py b/src/chronify/time_configs.py index 5f6798a..9bc4ab3 100644 --- a/src/chronify/time_configs.py +++ b/src/chronify/time_configs.py @@ -8,12 +8,10 @@ import pandas as pd from pydantic import ( Field, - field_validator, ) from typing_extensions import Annotated from chronify.base_models import ChronifyBaseModel -from chronify.exceptions import InvalidParameter from chronify.time import ( DatetimeFormat, DaylightSavingFallBackType, @@ -23,6 +21,8 @@ TimeIntervalType, TimeType, TimeZone, + RepresentativePeriodFormat, + representative_period_columns, ) # from chronify.time_utils import ( @@ -131,8 +131,6 @@ class TimeBasedDataAdjustment(ChronifyBaseModel): class TimeBaseModel(ChronifyBaseModel, abc.ABC): """Defines a base model common to all time dimensions.""" - length: int - def list_timestamps(self) -> list[Any]: """Return a list of timestamps for a time range. Type of the timestamps depends on the class. @@ -177,6 +175,7 @@ class DatetimeRange(TimeBaseModel): description="Start time of the range. If it includes a time zone, the timestamps in " "the data must also include time zones." ) + length: int resolution: timedelta time_based_data_adjustment: TimeBasedDataAdjustment = TimeBasedDataAdjustment() interval_type: TimeIntervalType = TimeIntervalType.PERIOD_ENDING @@ -235,6 +234,8 @@ class AnnualTimeRange(TimeBaseModel): time_column: str = Field(description="Column in the table that represents time.") time_type: Literal[TimeType.ANNUAL] = TimeType.ANNUAL start: int + length: int + measurement_type: MeasurementType = MeasurementType.TOTAL # TODO: measurement_type must be TOTAL def iter_timestamps(self) -> Generator[int, None, None]: @@ -248,11 +249,12 @@ def list_time_columns(self) -> list[str]: class IndexTimeRange(TimeBaseModel): time_type: Literal[TimeType.INDEX] = TimeType.INDEX start: int + length: int resolution: timedelta time_zone: TimeZone time_based_data_adjustment: TimeBasedDataAdjustment - interval_type: TimeIntervalType - measurement_type: MeasurementType + interval_type: TimeIntervalType = TimeIntervalType.PERIOD_ENDING + measurement_type: MeasurementType = MeasurementType.TOTAL # TODO DT: totally wrong # def iter_timestamps(self) -> Generator[datetime, None, None]: @@ -293,25 +295,87 @@ class IndexTimeRange(TimeBaseModel): class RepresentativePeriodTimeRange(TimeBaseModel): """Defines a representative time dimension.""" - time_columns: list[str] = Field(description="Columns in the table that represent time.") time_type: Literal[TimeType.REPRESENTATIVE_PERIOD] = TimeType.REPRESENTATIVE_PERIOD - measurement_type: MeasurementType - time_interval_type: TimeIntervalType - # TODO - - @field_validator("time_columns") - @classmethod - def check_columns(cls, columns: list[str]) -> list[str]: - type_1_columns = {"month", "day_of_week", "hour"} - type_2_columns = {"month", "is_weekday", "hour"} - if set(columns) != type_1_columns: - if set(columns) != type_2_columns: - msg = f"Unsupported {columns} for RepresentativePeriodTimeRange, expecting either {type_1_columns} or {type_2_columns}" - raise InvalidParameter(msg) - return columns + time_format: RepresentativePeriodFormat + # time_columns: list[str] = Field(description="Columns in the table that represent time.") + measurement_type: MeasurementType = MeasurementType.TOTAL + interval_type: TimeIntervalType = TimeIntervalType.PERIOD_ENDING + + # @model_validator(mode="after") + # def check_columns(self) -> "RepresentativePeriodTimeRange": + # expected = representative_period_columns[self.time_format] + + # if set(self.time_columns) != set(expected): + # msg = f"Incorrect {self.time_columns=} for {self.time_format=}, {expected=}" + # raise InvalidParameter(msg) + # return self def list_time_columns(self) -> list[str]: - return self.time_columns + match self.time_format: + case RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR: + return OneWeekPerMonthByHourHandler().list_time_columns() + case RepresentativePeriodFormat.ONE_WEEKDAY_DAY_AND_ONE_WEEKEND_DAY_PER_MONTH_BY_HOUR: + return OneWeekdayDayAndWeekendDayPerMonthByHourHandler().list_time_columns() + + def iter_timestamps(self) -> Generator[int, None, None]: + match self.time_format: + case RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR: + return OneWeekPerMonthByHourHandler().iter_timestamps() + case RepresentativePeriodFormat.ONE_WEEKDAY_DAY_AND_ONE_WEEKEND_DAY_PER_MONTH_BY_HOUR: + return OneWeekdayDayAndWeekendDayPerMonthByHourHandler().iter_timestamps() + + def list_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[Any]: + return df[self.list_time_columns()].drop_duplicates().apply(tuple, axis=1).to_list() + + +class RepresentativeTimeFormatHandlerBase(abc.ABC): + """Provides implementations for different representative time formats.""" + + @staticmethod + @abc.abstractmethod + def list_time_columns() -> list[str]: + """Return the columns in the table that represent time.""" + + @staticmethod + @abc.abstractmethod + def iter_timestamps() -> Generator[Any, None, None]: + """Return an iterator over all time indexes in the table. + Type of the time is dependent on the class. + """ + + +class OneWeekPerMonthByHourHandler(RepresentativeTimeFormatHandlerBase): + """Handler for format with hourly data that includes one week per month.""" + + @staticmethod + def list_time_columns() -> list[str]: + return representative_period_columns[RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR] + + @staticmethod + def iter_timestamps() -> Generator[Any, None, None]: + for month in range(1, 13): + for dow in range(7): + for hour in range(24): + yield (month, dow, hour) + + +class OneWeekdayDayAndWeekendDayPerMonthByHourHandler(RepresentativeTimeFormatHandlerBase): + """Handler for format with hourly data that includes one weekday day and one weekend day + per month. + """ + + @staticmethod + def list_time_columns() -> list[str]: + return representative_period_columns[ + RepresentativePeriodFormat.ONE_WEEKDAY_DAY_AND_ONE_WEEKEND_DAY_PER_MONTH_BY_HOUR + ] + + @staticmethod + def iter_timestamps() -> Generator[Any, None, None]: + for month in range(1, 13): + for is_weekday in sorted([False, True]): + for hour in range(24): + yield (month, is_weekday, hour) TimeConfig = Annotated[ diff --git a/src/chronify/time_series_checker.py b/src/chronify/time_series_checker.py index 4a3ba76..6834b76 100644 --- a/src/chronify/time_series_checker.py +++ b/src/chronify/time_series_checker.py @@ -74,6 +74,7 @@ def _run_timestamp_checks_on_tmp_table(self, table_name: str) -> None: result3 = self._conn.execute(text(query3)).fetchone() assert result3 is not None actual_count = result3[0] - if actual_count != self._schema.time_config.length: - msg = f"Time arrays must have length={self._schema.time_config.length}. Actual = {actual_count}" + expected_count = len(schema.time_config.list_timestamps()) + if actual_count != expected_count: + msg = f"Time arrays must have length={expected_count}. Actual = {actual_count}" raise InvalidTable(msg) diff --git a/src/chronify/time_series_mapper.py b/src/chronify/time_series_mapper.py index 2fcc396..0202369 100644 --- a/src/chronify/time_series_mapper.py +++ b/src/chronify/time_series_mapper.py @@ -1,135 +1,18 @@ -from sqlalchemy import Engine, MetaData, Table, select +from chronify.time_series_mapper_representative import MapperRepresentativeTimeToDatetime +from chronify.time_configs import RepresentativePeriodTimeRange, DatetimeRange -import pandas as pd -import polars as pl -from chronify.sqlalchemy.functions import read_database -from chronify.models import TableSchema -from chronify.exceptions import ( - InvalidParameter, - MissingParameter, - InvalidTable, -) -from chronify.time_configs import DatetimeRange, RepresentativePeriodTimeRange +def map_time(engine, metadata, from_schema, to_schema): + """Factory function to map time using the appropriate TimeSeriesMapper model.""" - -class TimeSeriesMapper: - """Maps time series data from one configuration to another.""" - - def __init__(self, engine: Engine, metadata: MetaData) -> None: - self._engine = engine - self._metadata = metadata - - def map_time_series( - self, - from_schema: TableSchema, - to_schema: TableSchema, - ) -> None: - pass - - def map_representative_time( - self, - from_schema: TableSchema, - to_schema: TableSchema, + # TODO: Different mapper based on from_schema only or from_ and to_schema? + if isinstance(from_schema.time_config, RepresentativePeriodTimeRange) and isinstance( + to_schema.time_config, DatetimeRange ): - if isinstance(from_schema.time_config) != RepresentativePeriodTimeRange: - msg = f"{from_schema=} needs to be RepresentativePeriodTimeRange" - raise InvalidParameter(msg) - - if isinstance(to_schema.time_config) != DatetimeRange: - msg = f"{to_schema=} needs to be DatetimeRange" - raise InvalidParameter(msg) - - # Destination time - to_time_col = to_schema.time_config.time_column - dft = pd.Series(to_schema.time_config.list_timestamps()).rename(to_time_col).to_frame() - - # Check source table has the right data - # [1] from_schema can produce to_schema - available_cols = from_schema.list_columns + [to_time_col] - final_cols = to_schema.list_columns - if diff := final_cols - available_cols: - msg = f"source table {from_schema.time_config.name} cannot produce the destination table columns {diff}" - raise InvalidTable(msg) - - # [2] src_table has time_zone - if "time_zone" not in from_schema.time_config.time_array_id_columns: - msg = f"time_zone is required for representative time mapping and it is missing from source table: {from_schema.time_config.name}" - raise MissingParameter(msg) # TODO does this belong in checker? - - # [3] src_table weekday column has the expected records - from_time_cols = from_schema.time_config.time_columns - week_col = [x for x in from_time_cols if x not in ["month", "hour"]] - assert len(week_col) == 1, f"Unexpected {week_col=}" - week_col = week_col[0] - - with self._engine.connect() as conn: - table = Table(from_schema.name, self._metadata) - stmt = select(table.c["time_zone"]).distinct().where(table.c["time_zone"].is_not(None)) - df_tz = read_database(stmt, conn) - - stmt2 = select(table.c[week_col]).distinct().where(table.c[week_col].is_not(None)) - week_records = read_database(stmt2, conn).to_list() - - self.check_week_col_data(week_col, week_records, from_schema.time_config.name) - - # Create mapping and ingest into db - dfm = [] - for idx, row in df_tz.iterrows(): - dfgt = dft.copy() - dfgt["timestamp_tmp"] = dfgt[to_time_col].dt.tz_convert(row["timezone"]) - dfgt["month"] = dfgt["timestamp_tmp"].dt.month - - dow = dfgt["timestamp_tmp"].dt.day_of_week - if week_col == "day_of_week": - dfgt["day_of_week"] = dow - elif week_col == "is_weekday": - dfgt["is_weekday"] = False - dfgt.loc[dow < 5, "is_weekday"] = True # TODO do these need to be in str format? - else: - msg = f"No representative time mapping support for time columns: {from_time_cols}" - raise NotImplementedError(msg) - - dfgt["hour"] = dfgt["timestamp_tmp"].dt.hour - dfgt["timezone"] = row["timezone"] - dfm.append(dfgt.drop(columns=["timestamp_tmp"])) - dfm = pd.concat(dfm, axis=0, ignore_index=True) - - with self._engine.connect() as conn: - pl.DataFrame(dfm).write_database( - "map_table", connection=conn, if_table_exists="append" - ) - conn.commit() - self.update_table_schema() - - # Apply mapping and downselect to final cols - keys = from_time_cols + ["timezone"] - with self._engine.connect() as conn: - left_table = Table(from_schema.name, self._metadata) - right_table = Table("map_table", self._metadata) - left_cols = [x for x in left_table.columns if x in final_cols] - right_cols = [x for x in right_table.columns if x in final_cols and x not in left_cols] - assert set(left_cols + right_cols) == set( - final_cols - ), f"table join does not produce the {final_cols=}" - select_stmt = [left_table.c[x] for x in left_cols] - select_stmt += [right_table.c[x] for x in right_cols] - on_stmt = [left_table.c[x] == right_table.c[x] for x in keys] - stmt = select(*select_stmt).where(*on_stmt) - df = read_database(stmt, conn, to_schema) # TODO create as new db table? - - return df - - @staticmethod - def check_week_col_data(week_col, week_records, table_name): - msg = f"Unexpected values in column: {week_col} of source table: {table_name}" - if week_col == "day_of_week": - if set(week_records) != set(range(7)): - msg2 = msg + f"\n{week_records}" - raise InvalidTable(msg2) - elif week_col == "is_weekday": - if set(week_records) != {True, False}: - msg2 = msg + f"\n{week_records}" - raise InvalidTable(msg2) # TODO does this belong in checker? - else: - pass + return MapperRepresentativeTimeToDatetime( + engine, metadata, from_schema, to_schema + ).map_time() + else: + msg = f"No mapping function for {from_schema.time_config.__class__=} >> {to_schema.time_config.__class__=}" + raise NotImplementedError(msg) + # TODO use class if more than one method, func > use class as needed diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py new file mode 100644 index 0000000..312cdc0 --- /dev/null +++ b/src/chronify/time_series_mapper_base.py @@ -0,0 +1,13 @@ +import abc + + +class TimeSeriesMapperBase(abc.ABC): + """Maps time series data from one configuration to another.""" + + @abc.abstractmethod + def check_schema_consistency(self) -> None: + """Check that from_schema can produce to_schema.""" + + @abc.abstractmethod + def map_time(self) -> None: + """Convert time columns with from_schema to to_schema configuration.""" diff --git a/src/chronify/time_series_mapper_representative.py b/src/chronify/time_series_mapper_representative.py new file mode 100644 index 0000000..c7b1143 --- /dev/null +++ b/src/chronify/time_series_mapper_representative.py @@ -0,0 +1,164 @@ +from sqlalchemy import Engine, MetaData, Table, select +import time + +import pandas as pd + +from chronify.sqlalchemy.functions import read_database, write_database +from chronify.models import TableSchema +from chronify.exceptions import ( + MissingParameter, + InvalidTable, +) +from chronify.time_series_mapper_base import TimeSeriesMapperBase +from chronify.time import ( + representative_weekday_column, +) +from chronify.utils.sqlalchemy_view import create_view + + +class MapperRepresentativeTimeToDatetime(TimeSeriesMapperBase): + def __init__( + self, engine: Engine, metadata: MetaData, from_schema: TableSchema, to_schema: TableSchema + ) -> None: + self._engine = engine + self._metadata = metadata + self.from_schema = from_schema + self.to_schema = to_schema + self.weekday_column = representative_weekday_column[from_schema.time_config.time_format] + + def check_schema_consistency(self) -> None: + available_cols = self.from_schema.list_columns() + [self.to_schema.time_config.time_column] + final_cols = self.to_schema.list_columns() + if diff := set(final_cols) - set(available_cols): + msg = f"source table {self.from_schema.time_config.name} cannot produce the destination table columns {diff}" + raise InvalidTable(msg) + + def _check_source_table_has_time_zone(self) -> None: + if "time_zone" not in self.from_schema.time_array_id_columns: + msg = f"time_zone is required for representative time mapping and it is missing from source table: {self.from_schema.time_config.name}" + raise MissingParameter(msg) + + def map_time(self) -> None: + # Destination time + to_time_col = self.to_schema.time_config.time_column + dft = ( + pd.Series(self.to_schema.time_config.list_timestamps()).rename(to_time_col).to_frame() + ) + + # Apply checks + self.check_schema_consistency() + if not self.to_schema.time_config.is_time_zone_naive(): + self._check_source_table_has_time_zone() + # TODO: check interval type (note annual has no interval type) + # TODO: check measurement type + + # Create mapping + if self.to_schema.time_config.is_time_zone_naive(): + dfm = self._create_mapping_dataframe_tz_naive(dft, to_time_col) + else: + dfm = self._create_mapping_dataframe_tz_aware(dft, to_time_col) + + # Ingest mapping into db + time_array_id_columns = [ + x for x in self.from_schema.time_config.list_time_columns() if x != "hour" + ] + if not self.to_schema.time_config.is_time_zone_naive(): + time_array_id_columns += ["time_zone"] + + map_table_schema = TableSchema( + name="map_table" + str(int(time.time())), + time_config=self.to_schema.time_config, + time_array_id_columns=time_array_id_columns, + value_column="hour", # this is a workaround + ) + with self._engine.connect() as conn: + write_database(dfm, conn, map_table_schema) + conn.commit() + self._metadata.reflect(self._engine, views=True) + + # Apply mapping and downselect to final cols + self._apply_mapping(map_table_schema) + + def _create_mapping_dataframe_tz_naive( + self, dft: pd.DataFrame, to_time_col: str + ) -> pd.DataFrame: + dfm = dft.copy() + dfm["month"] = dfm[to_time_col].dt.month + dow = dfm[to_time_col].dt.day_of_week + if self.weekday_column == "day_of_week": + dfm[self.weekday_column] = dow + elif self.weekday_column == "is_weekday": + dfm[self.weekday_column] = False + dfm.loc[dow < 5, self.weekday_column] = True + else: + msg = f"No representative time mapping support for time columns: {self.from_schema.time_config.list_time_columns()}" + raise NotImplementedError(msg) + + dfm["hour"] = dfm[to_time_col].dt.hour + return dfm + + def _create_mapping_dataframe_tz_aware( + self, dft: pd.DataFrame, to_time_col: str + ) -> pd.DataFrame: + with self._engine.connect() as conn: + table = Table(self.from_schema.name, self._metadata) + stmt = select(table.c["time_zone"]).distinct().where(table.c["time_zone"].is_not(None)) + df_tz = read_database(stmt, conn, self.from_schema) + + dfm = [] + for idx, row in df_tz.iterrows(): + dfgt = dft.copy() + dfgt["timestamp_tmp"] = dfgt[to_time_col].dt.tz_convert(row["time_zone"]) + dfgt["month"] = dfgt["timestamp_tmp"].dt.month + + dow = dfgt["timestamp_tmp"].dt.day_of_week + if self.weekday_column == "day_of_week": + dfgt[self.weekday_column] = dow + elif self.weekday_column == "is_weekday": + dfgt[self.weekday_column] = False + dfgt.loc[dow < 5, self.weekday_column] = True + else: + msg = f"No representative time mapping support for time columns: {self.from_schema.time_config.list_time_columns()}" + raise NotImplementedError(msg) + + dfgt["hour"] = dfgt["timestamp_tmp"].dt.hour + dfgt["time_zone"] = row["time_zone"] + dfm.append(dfgt.drop(columns=["timestamp_tmp"])) + dfm = pd.concat(dfm, axis=0, ignore_index=True) + return dfm + + def _apply_mapping(self, map_table_schema: TableSchema): + left_table = Table(self.from_schema.name, self._metadata) + right_table = Table(map_table_schema.name, self._metadata) + left_table_columns = [x.name for x in left_table.columns] + right_table_columns = [x.name for x in right_table.columns] + + final_cols = self.to_schema.list_columns() + left_cols = [x for x in left_table_columns if x in final_cols] + right_cols = [x for x in right_table_columns if x in final_cols and x not in left_cols] + assert set(left_cols + right_cols) == set( + final_cols + ), f"table join does not produce the {final_cols=}" + + select_stmt = [left_table.c[x] for x in left_cols] + select_stmt += [right_table.c[x] for x in right_cols] + + keys = ( + self.from_schema.time_config.list_time_columns().copy() + ) # TODO copy is required here not sure why + if not self.to_schema.time_config.is_time_zone_naive(): + keys += ["time_zone"] + assert ( + "time_zone" in left_table_columns + ), f"time_zone not in table={self.from_schema.name}" + assert ( + "time_zone" in right_table_columns + ), f"time_zone not in table={map_table_schema.name}" + on_stmt = () + for i, x in enumerate(keys): + if i == 0: + on_stmt = left_table.c[x] == right_table.c[x] + else: + on_stmt &= left_table.c[x] == right_table.c[x] + query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt) + create_view(self.to_schema.name, query, self._engine, self._metadata) diff --git a/tests/mock_rep_time.py b/tests/mock_rep_time.py index f0d8119..6deb593 100644 --- a/tests/mock_rep_time.py +++ b/tests/mock_rep_time.py @@ -73,6 +73,7 @@ def get_select_after_join_statement(left_df, right_df, keys: list): ## [4] Apply mapping in DUCKDB +breakpoint() keys = ["id"] join_stmt = get_join_statement(ddfd, ddfg, keys) select_stmt = get_select_after_join_statement(ddfd, ddfg, keys) diff --git a/tests/test_duckdb_functions.py b/tests/test_duckdb_functions.py deleted file mode 100644 index e063588..0000000 --- a/tests/test_duckdb_functions.py +++ /dev/null @@ -1,74 +0,0 @@ -import pandas as pd -import numpy as np -import pytest -import duckdb - -import chronify.duckdb.functions as ddbf - -conn = duckdb.connect(config={"TimeZone": "US/Mountain"}) # US/Mountain, UTC -conn_tz = conn.execute("select * from duckdb_settings() where name='TimeZone';").fetchall()[0][1] -print(f"Duckdb connection TimeZone = {conn_tz}") - - -@pytest.fixture -def generate_data(): - ## [1] Create data - # load data - dfd = pd.DataFrame( - { - "id": np.repeat(1, 12 * 24 * 7), - "month": np.repeat(range(1, 13), 24 * 7), - "dow": np.tile( - np.repeat(range(7), 24), 12 - ), # 0: Monday, 6: Sunday, ~ pyspark.weekday(), duckdb.isodow()-1, pd.day_of_week - "hour": np.tile(range(24), 12 * 7), - } - ) - dfd["is_weekday"] = False - dfd.loc[dfd["dow"] < 5, "is_weekday"] = True - dfd["value"] = dfd["month"] * 1000 + dfd["dow"] * 100 + dfd["hour"] - - dfd = pd.concat([dfd, dfd.assign(id=2).assign(value=dfd.value * 2)], axis=0) - - # project time - dft = pd.DataFrame( - { - "tid": range(8760), - "timestamp": pd.date_range( - start="2018-01-01", periods=8760, freq="h", tz="US/Eastern" - ), - } - ) - - # mapping data - dfg = pd.DataFrame( - { - "id": [1, 2], - "geography": ["IL", "CO"], - "timezone": ["US/Central", "US/Mountain"], - } - ) - - dfm = [] - for idx, row in dfg.iterrows(): - dfgt = dft.copy() - dfgt["timestamp_tmp"] = dfgt["timestamp"].dt.tz_convert(row["timezone"]) - dfgt["month"] = dfgt["timestamp_tmp"].dt.month - dfgt["dow"] = dfgt["timestamp_tmp"].dt.day_of_week - dfgt["hour"] = dfgt["timestamp_tmp"].dt.hour - dfgt["timezone"] = row["timezone"] - dfm.append(dfgt.drop(columns=["timestamp_tmp"])) - dfm = pd.concat(dfm, axis=0, ignore_index=True) - - ## [3] Convert to DUCKDB - ddfd = duckdb.sql("SELECT * FROM dfd").set_alias("ddfd") - ddfg = duckdb.sql("SELECT * FROM dfg").set_alias("ddfg") - ddfm = duckdb.sql("SELECT * FROM dfm").set_alias("ddfm") - - return ddfd, ddfg, ddfm - - -def test_join(generate_data): - breakpoint() - df = ddbf.join(generate_data[0], generate_data[1], ["id"]) - ddbf.join(df, generate_data[2], ["month", "dow", "hour", "timezone"]) diff --git a/tests/test_store.py b/tests/test_store.py index 433fbf0..d52ef8a 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -95,6 +95,7 @@ def test_ingest_csv(iter_engines: Engine, tmp_path, generators_schema, use_time_ ) store.ingest_from_csv(new_file, src_schema2, dst_schema) df = store.read_table(dst_schema) + breakpoint() assert len(df) == 8784 * 3 * 2 all(df.timestamp.unique() == dst_schema.time_config.list_timestamps()) diff --git a/tests/test_time_series_mapper.py b/tests/test_time_series_mapper.py new file mode 100644 index 0000000..602231f --- /dev/null +++ b/tests/test_time_series_mapper.py @@ -0,0 +1,149 @@ +import pandas as pd +import numpy as np +from sqlalchemy import Engine, MetaData +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo +import pytest + +from chronify.models import TableSchema +from chronify.time import RepresentativePeriodFormat, TimeIntervalType +from chronify.time_configs import RepresentativePeriodTimeRange, DatetimeRange +from chronify.sqlalchemy.functions import read_database, write_database +from chronify.time_series_mapper import map_time + + +def generate_data(one_week=False) -> pd.DataFrame: + def generate__one_week_per_month_by_hour(): + dfd = pd.DataFrame( + { + "id": np.repeat(1, 12 * 24 * 7), + "month": np.repeat(range(1, 13), 24 * 7), + "day_of_week": np.tile( + np.repeat(range(7), 24), 12 + ), # 0: Monday, 6: Sunday, ~ pyspark.weekday(), duckdb.isodow()-1, pd.day_of_week + "hour": np.tile(range(24), 12 * 7), + } + ) + dfd["value"] = dfd["month"] * 1000 + dfd["day_of_week"] * 100 + dfd["hour"] + dfd = pd.concat([dfd, dfd.assign(id=2).assign(value=dfd.value * 2)], axis=0) + return dfd + + def generate__one_weekday_day_and_one_weekend_day_per_month_by_hour(): + dfd = pd.DataFrame( + { + "id": np.repeat(1, 12 * 24 * 2), + "month": np.repeat(range(1, 13), 24 * 2), + "is_weekday": np.tile(np.repeat([True, False], 24), 12), + "hour": np.tile(range(24), 12 * 2), + } + ) + dfd["value"] = dfd["month"] * 1000 + dfd["is_weekday"] * 100 + dfd["hour"] + dfd = pd.concat([dfd, dfd.assign(id=2).assign(value=dfd.value * 2)], axis=0) + return dfd + + if one_week: + dfd = generate__one_week_per_month_by_hour() + else: + dfd = generate__one_weekday_day_and_one_weekend_day_per_month_by_hour() + + # Add more data + df = pd.DataFrame( + { + "id": [1, 2], + "geography": ["IL", "CO"], + "time_zone": ["US/Central", "US/Mountain"], + } + ) + dfd = dfd.merge(df, on="id", how="left") + + return dfd + + +def get_destination_schema(tzinfo) -> TableSchema: + schema = TableSchema( + name="mapped_data", + time_config=DatetimeRange( + start=datetime(year=2018, month=1, day=1, tzinfo=tzinfo), + resolution=timedelta(hours=1), + length=8760, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ), + time_array_id_columns=["id", "geography", "time_zone"], + value_column="value", + ) + return schema + + +def get_data_schema(one_week: bool = False) -> TableSchema: + if one_week: + time_format = RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR + else: + time_format = ( + RepresentativePeriodFormat.ONE_WEEKDAY_DAY_AND_ONE_WEEKEND_DAY_PER_MONTH_BY_HOUR + ) + schema = TableSchema( + name="load_data", + time_config=RepresentativePeriodTimeRange( + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_format=time_format, + ), + time_array_id_columns=["id", "geography", "time_zone"], + value_column="value", + ) + return schema + + +def get_timeseries(time_config: DatetimeRange) -> pd.Series: + ts = pd.date_range( + start=time_config.start, freq=time_config.resolution, periods=time_config.length + ).rename(time_config.time_column) + return ts + + +def run_test(engine: Engine, one_week: bool = False, tzinfo: ZoneInfo | None = None) -> None: + engine.clear_compiled_cache() + + # Generate + df = generate_data(one_week=one_week) + + # Ingest + metadata = MetaData() + schema = get_data_schema(one_week=one_week) + with engine.connect() as conn: + write_database(df, conn, schema) + conn.commit() + metadata.reflect(engine, views=True) + + ## Map + dest_schema = get_destination_schema(tzinfo=tzinfo) + map_time(engine, metadata, schema, dest_schema) # this creates a table + + # Check mapped table + with engine.connect() as conn: + query = f"select * from {dest_schema.name}" + queried = read_database(query, conn, dest_schema) + queried = queried.sort_values(by=["id", "timestamp"]).reset_index(drop=True) + + truth = get_timeseries(dest_schema.time_config) + check_mapped_table(engine, queried, truth) + + +def check_mapped_table(engine: Engine, dfs: pd.DataFrame, ts: pd.Series) -> None: + res = sorted(dfs["timestamp"].drop_duplicates().tolist()) + tru = sorted(ts) + assert res == tru, "wrong unique timestamps" + + res = dfs.groupby(["geography", "time_zone"])["timestamp"].count().unique().tolist() + tru = [len(ts)] + assert res == tru, "wrong number of timestamps" + + +@pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern"), None]) +def test__one_week_per_month_by_hour(iter_engines: Engine, tzinfo): + run_test(iter_engines, one_week=True, tzinfo=tzinfo) + + +@pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern"), None]) +def test__one_weekday_day_and_one_weekend_day_per_month_by_hour(iter_engines: Engine, tzinfo): + run_test(iter_engines, one_week=False, tzinfo=tzinfo)