Skip to content

Commit

Permalink
add support for scd2
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Apr 19, 2024
1 parent 56c2b9f commit 1525e73
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 3 deletions.
8 changes: 8 additions & 0 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,11 @@ def format_bigquery_datetime_literal(
"""Returns BigQuery-adjusted datetime literal by prefixing required `TIMESTAMP` indicator."""
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#timestamp_literals
return "TIMESTAMP " + format_datetime_literal(v, precision, no_tz)


def format_clickhouse_datetime_literal(
v: pendulum.DateTime, precision: int = 6, no_tz: bool = False
) -> str:
"""Returns clickhouse compatibel function"""
datetime = format_datetime_literal(v, precision, True)
return f"toDateTime64({datetime}, {precision}, '{v.tzinfo}')"
7 changes: 6 additions & 1 deletion dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import sys

from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.data_writers.escape import escape_clickhouse_identifier, escape_clickhouse_literal
from dlt.common.data_writers.escape import (
escape_clickhouse_identifier,
escape_clickhouse_literal,
format_clickhouse_datetime_literal,
)
from dlt.common.destination import DestinationCapabilitiesContext


Expand All @@ -13,6 +17,7 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.preferred_staging_file_format = "jsonl"
caps.supported_staging_file_formats = ["parquet", "jsonl"]

caps.format_datetime_literal = format_clickhouse_datetime_literal
caps.escape_identifier = escape_clickhouse_identifier
caps.escape_literal = escape_clickhouse_literal

Expand Down
4 changes: 4 additions & 0 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ def gen_key_table_clauses(
f"FROM {root_table_name} AS d JOIN {staging_root_table_name} AS s ON {join_conditions}"
]

@classmethod
def gen_update_table_prefix(cls, table_name: str) -> str:
return f"ALTER TABLE {table_name} UPDATE"


class ClickHouseClient(SqlJobClientWithStaging, SupportsStagingDestination):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand Down
6 changes: 5 additions & 1 deletion dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def gen_scd2_sql(

# retire updated and deleted records
sql.append(f"""
UPDATE {root_table_name} SET {to} = {boundary_ts}
{cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_ts}
WHERE {to} = {active_record_ts}
AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name});
""")
Expand Down Expand Up @@ -587,3 +587,7 @@ def gen_scd2_sql(
WHERE NOT EXISTS (SELECT 1 FROM {table_name} AS f WHERE f.{unique_column} = s.{unique_column});
""")
return sql

@classmethod
def gen_update_table_prefix(cls, table_name: str) -> str:
return f"UPDATE {table_name} SET"
3 changes: 2 additions & 1 deletion tests/load/test_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def test_get_update_basic_schema(client: SqlJobClientBase) -> None:

# update in storage
client._update_schema_in_storage(schema)
sleep(1)
this_schema = client.get_stored_schema_by_hash(schema.version_hash)
newest_schema = client.get_stored_schema()
assert this_schema == newest_schema
Expand All @@ -129,7 +130,7 @@ def test_get_update_basic_schema(client: SqlJobClientBase) -> None:
first_schema._bump_version()
assert first_schema.version == this_schema.version == 2
# wait to make load_newest_schema deterministic
sleep(2)
sleep(1)
client._update_schema_in_storage(first_schema)
this_schema = client.get_stored_schema_by_hash(first_schema.version_hash)
newest_schema = client.get_stored_schema()
Expand Down

0 comments on commit 1525e73

Please sign in to comment.