Skip to content

Commit

Permalink
Fix SQL incompatibility issues for scd2 on bigquery and `databric…
Browse files Browse the repository at this point in the history
…ks` (#1247)

* add bigquery datetime literal formatting

* refactor not exists to not in for bigquery and databricks compatibility

* mark main scd2 test as essential

---------

Co-authored-by: Jorrit Sandbrink <[email protected]>
  • Loading branch information
jorritsandbrink and Jorrit Sandbrink authored Apr 19, 2024
1 parent 902963c commit c43ac6f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 15 deletions.
10 changes: 9 additions & 1 deletion dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,12 @@ def format_datetime_literal(v: pendulum.DateTime, precision: int = 6, no_tz: boo
timespec = "milliseconds"
elif precision < 3:
timespec = "seconds"
return v.isoformat(sep=" ", timespec=timespec)
return "'" + v.isoformat(sep=" ", timespec=timespec) + "'"


def format_bigquery_datetime_literal(
v: pendulum.DateTime, precision: int = 6, no_tz: bool = False
) -> str:
"""Returns BigQuery-adjusted datetime literal by prefixing required `TIMESTAMP` indicator."""
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#timestamp_literals
return "TIMESTAMP " + format_datetime_literal(v, precision, no_tz)
5 changes: 5 additions & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DestinationLoadingWithoutStagingNotSupported,
)
from dlt.common.utils import identity
from dlt.common.pendulum import pendulum

from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.wei import EVM_DECIMAL_PRECISION
Expand All @@ -32,6 +33,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
supported_staging_file_formats: Sequence[TLoaderFileFormat] = None
escape_identifier: Callable[[str], str] = None
escape_literal: Callable[[Any], Any] = None
format_datetime_literal: Callable[..., str] = None
decimal_precision: Tuple[int, int] = None
wei_precision: Tuple[int, int] = None
max_identifier_length: int = None
Expand Down Expand Up @@ -61,13 +63,16 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
def generic_capabilities(
preferred_loader_file_format: TLoaderFileFormat = None,
) -> "DestinationCapabilitiesContext":
from dlt.common.data_writers.escape import format_datetime_literal

caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = preferred_loader_file_format
caps.supported_loader_file_formats = ["jsonl", "insert_values", "parquet", "csv"]
caps.preferred_staging_file_format = None
caps.supported_staging_file_formats = []
caps.escape_identifier = identity
caps.escape_literal = serialize_value
caps.format_datetime_literal = format_datetime_literal
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (EVM_DECIMAL_PRECISION, 0)
caps.max_identifier_length = 65536
Expand Down
6 changes: 5 additions & 1 deletion dlt/destinations/impl/bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from dlt.common.data_writers.escape import escape_bigquery_identifier
from dlt.common.data_writers.escape import (
escape_bigquery_identifier,
format_bigquery_datetime_literal,
)
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE

Expand All @@ -11,6 +14,7 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.supported_staging_file_formats = ["parquet", "jsonl"]
caps.escape_identifier = escape_bigquery_identifier
caps.escape_literal = None
caps.format_datetime_literal = format_bigquery_datetime_literal
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (76, 38)
caps.max_identifier_length = 1024
Expand Down
27 changes: 14 additions & 13 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Any, Dict, List, Sequence, Tuple, cast, TypedDict, Optional

import yaml
from dlt.common.data_writers.escape import format_datetime_literal
from dlt.common.logger import pretty_format_exception

from dlt.common.pendulum import pendulum
Expand Down Expand Up @@ -521,38 +520,40 @@ def gen_scd2_sql(
staging_root_table_name = sql_client.make_qualified_table_name(root_table["name"])

# get column names
escape_id = sql_client.capabilities.escape_identifier
caps = sql_client.capabilities
escape_id = caps.escape_identifier
from_, to = list(map(escape_id, get_validity_column_names(root_table))) # validity columns
hash_ = escape_id(
get_first_column_name_with_prop(root_table, "x-row-version")
) # row hash column

# define values for validity columns
format_datetime_literal = caps.format_datetime_literal
if format_datetime_literal is None:
format_datetime_literal = (
DestinationCapabilitiesContext.generic_capabilities().format_datetime_literal
)
boundary_ts = format_datetime_literal(
current_load_package()["state"]["created_at"],
sql_client.capabilities.timestamp_precision,
)
active_record_ts = format_datetime_literal(
HIGH_TS, sql_client.capabilities.timestamp_precision
caps.timestamp_precision,
)
active_record_ts = format_datetime_literal(HIGH_TS, caps.timestamp_precision)

# retire updated and deleted records
sql.append(f"""
UPDATE {root_table_name} SET {to} = '{boundary_ts}'
WHERE NOT EXISTS (
SELECT s.{hash_} FROM {staging_root_table_name} AS s
WHERE {root_table_name}.{hash_} = s.{hash_}
) AND {to} = '{active_record_ts}';
UPDATE {root_table_name} SET {to} = {boundary_ts}
WHERE {to} = {active_record_ts}
AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name});
""")

# insert new active records in root table
columns = map(escape_id, list(root_table["columns"].keys()))
col_str = ", ".join([c for c in columns if c not in (from_, to)])
sql.append(f"""
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, '{boundary_ts}' AS {from_}, '{active_record_ts}' AS {to}
SELECT {col_str}, {boundary_ts} AS {from_}, {active_record_ts} AS {to}
FROM {staging_root_table_name} AS s
WHERE NOT EXISTS (SELECT s.{hash_} FROM {root_table_name} AS f WHERE f.{hash_} = s.{hash_});
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name});
""")

# insert list elements for new active records in child tables
Expand Down
1 change: 1 addition & 0 deletions tests/load/pipeline/test_scd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def assert_records_as_set(actual: List[Dict[str, Any]], expected: List[Dict[str,
assert actual_set == expected_set


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config,simple,validity_column_names",
[ # test basic case for alle SQL destinations supporting merge
Expand Down

0 comments on commit c43ac6f

Please sign in to comment.