Skip to content

Commit

Permalink
fix merge sql for clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Apr 22, 2024
1 parent 544a0a4 commit cb9f35b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
11 changes: 4 additions & 7 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,6 @@ def gen_key_table_clauses(
key_clauses: Sequence[str],
for_delete: bool,
) -> List[str]:
if for_delete:
# clickhouse doesn't support alias in DELETE FROM
return [
f"FROM {root_table_name} WHERE EXISTS (SELECT 1 FROM"
f" {staging_root_table_name} WHERE"
f" {' OR '.join([c.format(d=root_table_name,s=staging_root_table_name) for c in key_clauses])})"
]
join_conditions = " AND ".join([c.format(d="d", s="s") for c in key_clauses])
return [
f"FROM {root_table_name} AS d JOIN {staging_root_table_name} AS s ON {join_conditions}"
Expand All @@ -282,6 +275,10 @@ def gen_key_table_clauses(
def gen_update_table_prefix(cls, table_name: str) -> str:
return f"ALTER TABLE {table_name} UPDATE"

@classmethod
def supports_simple_merge_delete(cls) -> bool:
return False


class ClickHouseClient(SqlJobClientWithStaging, SupportsStagingDestination):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand Down
10 changes: 9 additions & 1 deletion dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def gen_merge_sql(
unique_column: str = None
root_key_column: str = None

if len(table_chain) == 1:
if len(table_chain) == 1 and cls.supports_simple_merge_delete():
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=True
)
Expand Down Expand Up @@ -590,3 +590,11 @@ def gen_scd2_sql(
@classmethod
def gen_update_table_prefix(cls, table_name: str) -> str:
return f"UPDATE {table_name} SET"

@classmethod
def supports_simple_merge_delete(cls) -> bool:
"""this could also be a capabitiy, but probably it is better stored here
this identifies destinations that can have a simplified method for merging single
table table chains
"""
return True
11 changes: 11 additions & 0 deletions dlt/helpers/dbt/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,14 @@ databricks:
http_path: "{{ env_var('DLT__CREDENTIALS__HTTP_PATH') }}"
token: "{{ env_var('DLT__CREDENTIALS__ACCESS_TOKEN') }}"
threads: 4


clickhouse:
target: analytics
outputs:
analytics:
type: clickhouse
schema: "{{ var('destination_dataset_name', var('source_dataset_name')) }}"
host: "{{ env_var('DLT__CREDENTIALS__HOST') }}"
user: "{{ env_var('DLT__CREDENTIALS__USERNAME') }}"
password: "{{ env_var('DLT__CREDENTIALS__PASSWORD') }}"
13 changes: 9 additions & 4 deletions tests/load/pipeline/test_merge_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_merge_on_ad_hoc_primary_key(destination_config: DestinationTestConfigur


@dlt.source(root_key=True)
def github():
def github(remove_lists: bool = False):
@dlt.resource(
table_name="issues",
write_disposition="merge",
Expand All @@ -150,7 +150,12 @@ def load_issues():
with open(
"tests/normalize/cases/github.issues.load_page_5_duck.json", "r", encoding="utf-8"
) as f:
yield from json.load(f)
for item in json.load(f):
# for clickhouse we cannot have lists in json fields
if remove_lists:
item.pop("assignees")
item.pop("labels")
yield item

return load_issues

Expand Down Expand Up @@ -212,7 +217,7 @@ def test_merge_source_compound_keys_and_changes(
)
def test_merge_no_child_tables(destination_config: DestinationTestConfiguration) -> None:
p = destination_config.setup_pipeline("github_3", full_refresh=True)
github_data = github()
github_data = github(True)
assert github_data.max_table_nesting is None
assert github_data.root_key is True
# set max nesting to 0 so no child tables are generated
Expand All @@ -231,7 +236,7 @@ def test_merge_no_child_tables(destination_config: DestinationTestConfiguration)
assert github_1_counts["issues"] == 15

# load all
github_data = github()
github_data = github(True)
github_data.max_table_nesting = 0
info = p.run(github_data, loader_file_format=destination_config.file_format)
assert_load_info(info)
Expand Down

0 comments on commit cb9f35b

Please sign in to comment.