-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add representative time mapping #19
base: fix/ingest-csv
Are you sure you want to change the base?
Conversation
initial commit
remove stale code
df[config.time_column] = df[config.time_column].dt.tz_convert("UTC") | ||
else: | ||
df[config.time_column] = df[config.time_column].dt.tz_localize("UTC") | ||
pl.DataFrame(df).write_database(schema.name, connection=conn, if_table_exists="append") | ||
pl.DataFrame(df).write_database(schema.name, connection=conn, if_table_exists="replace") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why replace? It will delete whatever data currently exists? Is there a use case for this? If so, the parameter needs to be passed in. We definitely need append.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be an arg input
@@ -18,6 +18,9 @@ def read_database(query: Selectable | str, conn: Connection, schema: TableSchema | |||
df[config.time_column] = pd.to_datetime(df[config.time_column], utc=True) | |||
else: | |||
df[config.time_column] = df[config.time_column].dt.tz_localize("UTC") | |||
elif conn.engine.name == "sqlite" and isinstance(config, DatetimeRange): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor so sqlite related processing is in one place. Suggest removing needs_utc_conversion
@@ -129,7 +131,7 @@ class TimeBasedDataAdjustment(ChronifyBaseModel): | |||
class TimeBaseModel(ChronifyBaseModel, abc.ABC): | |||
"""Defines a base model common to all time dimensions.""" | |||
|
|||
length: int | |||
measurement_type: MeasurementType = MeasurementType.TOTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
@@ -233,7 +235,8 @@ class AnnualTimeRange(TimeBaseModel): | |||
time_column: str = Field(description="Column in the table that represents time.") | |||
time_type: Literal[TimeType.ANNUAL] = TimeType.ANNUAL | |||
start: int | |||
# TODO: measurement_type must be TOTAL | |||
length: int | |||
# TODO: measurement_type must be TOTAL, not necessarily right? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add interval_type (base class)
|
||
def list_time_columns(self) -> list[str]: | ||
return self.time_columns | ||
match self.time_format: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assign self._handler to make polymorphic
return OneWeekdayDayAndWeekendDayPerMonthByHourHandler().iter_timestamps() | ||
|
||
def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[Any]: | ||
return df[self.list_time_columns()].drop_duplicates().apply(tuple, axis=1).to_list() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
class OneWeekPerMonthByHourHandler(RepresentativeTimeFormatHandlerBase): | ||
"""Handler for format with hourly data that includes one week per month.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIME COLUMNS = define names here instead
representative_period_columns[RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR]
@staticmethod | ||
def iter_timestamps() -> Generator[Any, None, None]: | ||
for month in range(1, 13): | ||
for is_weekday in sorted([False, True]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove sorted
] | ||
|
||
@staticmethod | ||
def iter_timestamps() -> Generator[Any, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generator[tuple[int, bool, int], None, None]
dtto
if isinstance(from_schema.time_config, RepresentativePeriodTimeRange) and isinstance( | ||
to_schema.time_config, DatetimeRange | ||
): | ||
return MapperRepresentativeTimeToDatetime( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No return
class TimeSeriesMapper: | ||
"""Maps time series data from one configuration to another.""" | ||
def map_time(engine, metadata, from_schema, to_schema): | ||
"""Factory function to map time using the appropriate TimeSeriesMapper model.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove factory
class TimeSeriesMapperBase(abc.ABC): | ||
"""Maps time series data from one configuration to another.""" | ||
|
||
@abc.abstractmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not required, remove.
self._metadata = metadata | ||
self.from_schema = from_schema | ||
self.to_schema = to_schema | ||
self.weekday_column = representative_weekday_column[from_schema.time_config.time_format] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be derived classes that have no concept of weekday.
|
||
def _check_source_table_has_time_zone(self) -> None: | ||
if "time_zone" not in self.from_schema.time_array_id_columns: | ||
msg = f"time_zone is required for representative time mapping and it is missing from source table: {self.from_schema.time_config.name}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.from_schema.name
?
|
||
def _check_source_table_has_time_zone(self) -> None: | ||
if "time_zone" not in self.from_schema.time_array_id_columns: | ||
msg = f"time_zone is required for representative time mapping and it is missing from source table: {self.from_schema.time_config.name}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the message include something about time zone aware?
pd.Series(self.to_schema.time_config.list_timestamps()).rename(to_time_col).to_frame() | ||
) | ||
|
||
# Apply checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments like these are not necessary.
|
||
# Ingest mapping into db | ||
time_array_id_columns = [ | ||
x for x in self.from_schema.time_config.list_time_columns() if x != "hour" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
x != "hour"
feels out of place in this class, which should be generic to any representative time.
return schema | ||
|
||
|
||
def get_timeseries(time_config: DatetimeRange) -> pd.Series: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this function be a test of DatetimeRange.list_timestamps
? And then from that time on we call our method?
|
||
|
||
def check_mapped_table(engine: Engine, dfs: pd.DataFrame, ts: pd.Series) -> None: | ||
res = sorted(dfs["timestamp"].drop_duplicates().tolist()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to_list
?
check_mapped_table(engine, queried, truth) | ||
|
||
|
||
def check_mapped_table(engine: Engine, dfs: pd.DataFrame, ts: pd.Series) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if this is nit-picky, I'm just trying to understand conventions. Does dfs
mean something?
|
||
|
||
@pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern"), None]) | ||
def test__one_week_per_month_by_hour(iter_engines: Engine, tzinfo): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What convention are you following with the double underscore?
return ts | ||
|
||
|
||
def run_test(engine: Engine, one_week: bool = False, tzinfo: ZoneInfo | None = None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like these parameters should be required and not have defaults, right?
No description provided.