Skip to content

Commit

Permalink
add correct high_ts for clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Apr 19, 2024
1 parent 1525e73 commit 15855d7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 12 deletions.
4 changes: 4 additions & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# sql - any sql statement
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv"]
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
HIGH_TS = pendulum.datetime(9999, 12, 31)


@configspec
Expand Down Expand Up @@ -53,6 +54,9 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
insert_values_writer_type: str = "default"
supports_multiple_statements: bool = True
supports_clone_table: bool = False
scd2_high_timestamp: pendulum.DateTime = HIGH_TS
"""High timestamp used to indicate active records in `scd2` merge strategy."""

"""Destination supports CREATE TABLE ... CLONE ... statements"""
max_table_nesting: Optional[int] = None # destination can overwrite max table nesting

Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys

from dlt.common.pendulum import pendulum
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.data_writers.escape import (
escape_clickhouse_identifier,
Expand All @@ -24,6 +25,7 @@ def capabilities() -> DestinationCapabilitiesContext:
# https://stackoverflow.com/questions/68358686/what-is-the-maximum-length-of-a-column-in-clickhouse-can-it-be-modified
caps.max_identifier_length = 255
caps.max_column_identifier_length = 255
caps.scd2_high_timestamp = pendulum.datetime(2299, 12, 31) # this is the max datetime...

# ClickHouse has no max `String` type length.
caps.max_text_data_type_length = sys.maxsize
Expand Down
9 changes: 3 additions & 6 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import yaml
from dlt.common.logger import pretty_format_exception

from dlt.common.pendulum import pendulum
from dlt.common.schema.typing import (
TTableSchema,
TSortOrder,
Expand All @@ -24,10 +23,6 @@
from dlt.pipeline.current import load_package as current_load_package


HIGH_TS = pendulum.datetime(9999, 12, 31)
"""High timestamp used to indicate active records in `scd2` merge strategy."""


class SqlJobParams(TypedDict, total=False):
replace: Optional[bool]
table_chain_create_table_statements: Dict[str, Sequence[str]]
Expand Down Expand Up @@ -537,7 +532,9 @@ def gen_scd2_sql(
current_load_package()["state"]["created_at"],
caps.timestamp_precision,
)
active_record_ts = format_datetime_literal(HIGH_TS, caps.timestamp_precision)
active_record_ts = format_datetime_literal(
caps.scd2_high_timestamp, caps.timestamp_precision
)

# retire updated and deleted records
sql.append(f"""
Expand Down
12 changes: 6 additions & 6 deletions tests/load/pipeline/test_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
from dlt.common.normalizers.json.relational import DataItemNormalizer
from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention
from dlt.common.time import ensure_pendulum_datetime, reduce_pendulum_datetime_precision
from dlt.common.typing import TDataItem
from dlt.destinations.sql_jobs import HIGH_TS
from dlt.extract.resource import DltResource
from dlt.pipeline.exceptions import PipelineStepFailed

Expand All @@ -29,8 +27,9 @@


def get_active_ts(pipeline: dlt.Pipeline) -> datetime:
high_ts = pipeline.destination.capabilities().scd2_high_timestamp
caps = pipeline._get_destination_capabilities()
active_ts = HIGH_TS.in_timezone(tz="UTC").replace(tzinfo=None)
active_ts = high_ts.in_timezone(tz="UTC").replace(tzinfo=None)
return reduce_pendulum_datetime_precision(active_ts, caps.timestamp_precision)


Expand All @@ -46,10 +45,10 @@ def get_load_package_created_at(pipeline: dlt.Pipeline, load_info: LoadInfo) ->
return reduce_pendulum_datetime_precision(created_at, caps.timestamp_precision)


def strip_timezone(ts: datetime) -> datetime:
def strip_timezone(ts: datetime, high_ts: datetime) -> datetime:
"""Converts timezone of datetime object to UTC and removes timezone awareness."""
ts = ensure_pendulum_datetime(ts)
if ts.replace(tzinfo=None) == HIGH_TS:
if ts.replace(tzinfo=None) == high_ts:
return ts.replace(tzinfo=None)
else:
return ts.astimezone(tz=timezone.utc).replace(tzinfo=None)
Expand All @@ -59,10 +58,11 @@ def get_table(
pipeline: dlt.Pipeline, table_name: str, sort_column: str, include_root_id: bool = True
) -> List[Dict[str, Any]]:
"""Returns destination table contents as list of dictionaries."""
high_ts = pipeline.destination.capabilities().scd2_high_timestamp
return sorted(
[
{
k: strip_timezone(v) if isinstance(v, datetime) else v
k: strip_timezone(v, high_ts) if isinstance(v, datetime) else v
for k, v in r.items()
if not k.startswith("_dlt")
or k in DEFAULT_VALIDITY_COLUMN_NAMES
Expand Down

0 comments on commit 15855d7

Please sign in to comment.