From 6879a094b24c3569f783b57d00293f3635f14aa5 Mon Sep 17 00:00:00 2001 From: David Scharf Date: Wed, 24 Apr 2024 13:27:49 +0200 Subject: [PATCH] Make merge write-disposition fall back to append if no primary or merge keys are specified (#1225) * add sanity check to prevent missing config setup * fall back to append for merge without merge keys * add test for checking behavior of hard_delete without key * add schema warning * fix athena iceberg locations * add note in docs about merge fallback behavior * fix merge switching tests * fix one additional test with fallback --- dlt/common/destination/reference.py | 8 ++ dlt/destinations/sql_jobs.py | 109 ++++++++++-------- .../docs/general-usage/incremental-loading.md | 6 + .../load/pipeline/test_filesystem_pipeline.py | 83 +++---------- tests/load/pipeline/test_merge_disposition.py | 49 +++++--- .../test_write_disposition_changes.py | 6 +- tests/load/utils.py | 4 + 7 files changed, 132 insertions(+), 133 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 5422414cf3..2f9650a446 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -353,6 +353,14 @@ def _verify_schema(self) -> None: f'"{table["x-merge-strategy"]}" is not a valid merge strategy. ' # type: ignore[typeddict-item] f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}.""" ) + if not has_column_with_prop(table, "primary_key") and not has_column_with_prop( + table, "merge_key" + ): + logger.warning( + f"Table {table_name} has write_disposition set to merge, but no primary or" + " merge keys defined. " + + "dlt will fall back to append for this table." + ) if has_column_with_prop(table, "hard_delete"): if len(get_columns_names_with_prop(table, "hard_delete")) > 1: raise SchemaException( diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 86eaa9236a..e7993106e1 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -145,7 +145,10 @@ def generate_sql( class SqlMergeJob(SqlBaseJob): - """Generates a list of sql statements that merge the data from staging dataset into destination dataset.""" + """ + Generates a list of sql statements that merge the data from staging dataset into destination dataset. + If no merge keys are discovered, falls back to append. + """ failed_text: str = "Tried to generate a merge sql job for the following tables:" @@ -382,68 +385,74 @@ def gen_merge_sql( get_columns_names_with_prop(root_table, "merge_key"), ) ) - key_clauses = cls._gen_key_table_clauses(primary_keys, merge_keys) - unique_column: str = None - root_key_column: str = None + # if we do not have any merge keys to select from, we will fall back to a staged append, i.E. + # just skip the delete part + append_fallback = (len(primary_keys) + len(merge_keys)) == 0 - if len(table_chain) == 1: - key_table_clauses = cls.gen_key_table_clauses( - root_table_name, staging_root_table_name, key_clauses, for_delete=True - ) - # if no child tables, just delete data from top table - for clause in key_table_clauses: - sql.append(f"DELETE {clause};") - else: - key_table_clauses = cls.gen_key_table_clauses( - root_table_name, staging_root_table_name, key_clauses, for_delete=False - ) - # use unique hint to create temp table with all identifiers to delete - unique_columns = get_columns_names_with_prop(root_table, "unique") - if not unique_columns: - raise MergeDispositionException( - sql_client.fully_qualified_dataset_name(), - staging_root_table_name, - [t["name"] for t in table_chain], - f"There is no unique column (ie _dlt_id) in top table {root_table['name']} so" - " it is not possible to link child tables to it.", - ) - # get first unique column - unique_column = escape_id(unique_columns[0]) - # create temp table with unique identifier - create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql( - unique_column, key_table_clauses, sql_client - ) - sql.extend(create_delete_temp_table_sql) + if not append_fallback: + key_clauses = cls._gen_key_table_clauses(primary_keys, merge_keys) - # delete from child tables first. This is important for databricks which does not support temporary tables, - # but uses temporary views instead - for table in table_chain[1:]: - table_name = sql_client.make_qualified_table_name(table["name"]) - root_key_columns = get_columns_names_with_prop(table, "root_key") - if not root_key_columns: + unique_column: str = None + root_key_column: str = None + + if len(table_chain) == 1: + key_table_clauses = cls.gen_key_table_clauses( + root_table_name, staging_root_table_name, key_clauses, for_delete=True + ) + # if no child tables, just delete data from top table + for clause in key_table_clauses: + sql.append(f"DELETE {clause};") + else: + key_table_clauses = cls.gen_key_table_clauses( + root_table_name, staging_root_table_name, key_clauses, for_delete=False + ) + # use unique hint to create temp table with all identifiers to delete + unique_columns = get_columns_names_with_prop(root_table, "unique") + if not unique_columns: raise MergeDispositionException( sql_client.fully_qualified_dataset_name(), staging_root_table_name, [t["name"] for t in table_chain], - "There is no root foreign key (ie _dlt_root_id) in child table" - f" {table['name']} so it is not possible to refer to top level table" - f" {root_table['name']} unique column {unique_column}", + "There is no unique column (ie _dlt_id) in top table" + f" {root_table['name']} so it is not possible to link child tables to it.", ) - root_key_column = escape_id(root_key_columns[0]) + # get first unique column + unique_column = escape_id(unique_columns[0]) + # create temp table with unique identifier + create_delete_temp_table_sql, delete_temp_table_name = ( + cls.gen_delete_temp_table_sql(unique_column, key_table_clauses, sql_client) + ) + sql.extend(create_delete_temp_table_sql) + + # delete from child tables first. This is important for databricks which does not support temporary tables, + # but uses temporary views instead + for table in table_chain[1:]: + table_name = sql_client.make_qualified_table_name(table["name"]) + root_key_columns = get_columns_names_with_prop(table, "root_key") + if not root_key_columns: + raise MergeDispositionException( + sql_client.fully_qualified_dataset_name(), + staging_root_table_name, + [t["name"] for t in table_chain], + "There is no root foreign key (ie _dlt_root_id) in child table" + f" {table['name']} so it is not possible to refer to top level table" + f" {root_table['name']} unique column {unique_column}", + ) + root_key_column = escape_id(root_key_columns[0]) + sql.append( + cls.gen_delete_from_sql( + table_name, root_key_column, delete_temp_table_name, unique_column + ) + ) + + # delete from top table now that child tables have been prcessed sql.append( cls.gen_delete_from_sql( - table_name, root_key_column, delete_temp_table_name, unique_column + root_table_name, unique_column, delete_temp_table_name, unique_column ) ) - # delete from top table now that child tables have been prcessed - sql.append( - cls.gen_delete_from_sql( - root_table_name, unique_column, delete_temp_table_name, unique_column - ) - ) - # get name of column with hard_delete hint, if specified not_deleted_cond: str = None hard_delete_col = get_first_column_name_with_prop(root_table, "hard_delete") diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 28d2f862b2..e7a7faddb0 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -132,6 +132,12 @@ def github_repo_events(last_created_at = dlt.sources.incremental("created_at", " yield from _get_rest_pages("events") ``` +:::note +If you use the `merge` write disposition, but do not specify merge or primary keys, merge will fallback to `append`. +The appended data will be inserted from a staging table in one transaction for most destinations in this case. +::: + + #### Delete records The `hard_delete` column hint can be used to delete records from the destination dataset. The behavior of the delete mechanism depends on the data type of the column marked with the hint: 1) `bool` type: only `True` leads to a delete—`None` and `False` values are disregarded diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 5e94d42748..c08bd488ef 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -29,30 +29,14 @@ skip_if_not_active("filesystem") -def assert_file_matches( - layout: str, job: LoadJobInfo, load_id: str, client: FilesystemClient -) -> None: - """Verify file contents of load job are identical to the corresponding file in destination""" - local_path = Path(job.file_path) - filename = local_path.name - destination_fn = create_path( - layout, - filename, - client.schema.name, - load_id, - extra_placeholders=client.config.extra_placeholders, - ) - destination_path = posixpath.join(client.dataset_path, destination_fn) - - assert local_path.read_bytes() == client.fs_client.read_bytes(destination_path) - - def test_pipeline_merge_write_disposition(default_buckets_env: str) -> None: """Run pipeline twice with merge write disposition - Resource with primary key falls back to append. Resource without keys falls back to replace. + Regardless wether primary key is set or not, filesystem appends """ import pyarrow.parquet as pq # Module is evaluated by other tests + os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" + pipeline = dlt.pipeline( pipeline_name="test_" + uniq_id(), destination="filesystem", @@ -71,54 +55,25 @@ def other_data(): def some_source(): return [some_data(), other_data()] - info1 = pipeline.run(some_source(), write_disposition="merge") - info2 = pipeline.run(some_source(), write_disposition="merge") - - client: FilesystemClient = pipeline.destination_client() # type: ignore[assignment] - layout = client.config.layout - - append_glob = list(client._get_table_dirs(["some_data"]))[0] - replace_glob = list(client._get_table_dirs(["other_data"]))[0] - - append_files = client.fs_client.ls(append_glob, detail=False, refresh=True) - replace_files = client.fs_client.ls(replace_glob, detail=False, refresh=True) - - load_id1 = info1.loads_ids[0] - load_id2 = info2.loads_ids[0] - - # resource with pk is loaded with append and has 1 copy for each load - assert len(append_files) == 2 - assert any(load_id1 in fn for fn in append_files) - assert any(load_id2 in fn for fn in append_files) - - # resource without pk is treated as append disposition - assert len(replace_files) == 2 - assert any(load_id1 in fn for fn in replace_files) - assert any(load_id2 in fn for fn in replace_files) - - # Verify file contents - assert info2.load_packages - for pkg in info2.load_packages: - assert pkg.jobs["completed_jobs"] - for job in pkg.jobs["completed_jobs"]: - assert_file_matches(layout, job, pkg.load_id, client) - - complete_fn = f"{client.schema.name}__%s.jsonl" + pipeline.run(some_source(), write_disposition="merge") + assert load_table_counts(pipeline, "some_data", "other_data") == { + "some_data": 3, + "other_data": 5, + } - # Test complete_load markers are saved - assert client.fs_client.isfile( - posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id1) - ) - assert client.fs_client.isfile( - posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id2) - ) + # second load shows that merge always appends on filesystem + pipeline.run(some_source(), write_disposition="merge") + assert load_table_counts(pipeline, "some_data", "other_data") == { + "some_data": 6, + "other_data": 10, + } - # Force replace + # Force replace, back to initial values pipeline.run(some_source(), write_disposition="replace") - append_files = client.fs_client.ls(append_glob, detail=False, refresh=True) - replace_files = client.fs_client.ls(replace_glob, detail=False, refresh=True) - assert len(append_files) == 1 - assert len(replace_files) == 1 + assert load_table_counts(pipeline, "some_data", "other_data") == { + "some_data": 3, + "other_data": 5, + } @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index bfcdccfba4..2924aeb6df 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -240,10 +240,17 @@ def test_merge_no_child_tables(destination_config: DestinationTestConfiguration) assert github_2_counts["issues"] == 100 if destination_config.supports_merge else 115 +# mark as essential for now +@pytest.mark.essential @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) -> None: + # NOTE: we can test filesystem destination merge behavior here too, will also fallback! + if destination_config.file_format == "insert_values": + pytest.skip("Insert values row count checking is buggy, skipping") p = destination_config.setup_pipeline("github_3", full_refresh=True) github_data = github() # remove all keys @@ -264,8 +271,8 @@ def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) - info = p.run(github_data, loader_file_format=destination_config.file_format) assert_load_info(info) github_1_counts = load_table_counts(p, *[t["name"] for t in p.default_schema.data_tables()]) - # only ten rows remains. merge falls back to replace when no keys are specified - assert github_1_counts["issues"] == 10 if destination_config.supports_merge else 100 - 45 + # we have 10 rows more, merge falls back to append if no keys present + assert github_1_counts["issues"] == 100 - 45 + 10 @pytest.mark.parametrize( @@ -291,14 +298,14 @@ def test_merge_keys_non_existing_columns(destination_config: DestinationTestConf if not destination_config.supports_merge: return - # all the keys are invalid so the merge falls back to replace + # all the keys are invalid so the merge falls back to append github_data = github() github_data.load_issues.apply_hints(merge_key=("mA1", "Ma2"), primary_key=("123-x",)) github_data.load_issues.add_filter(take_first(1)) info = p.run(github_data, loader_file_format=destination_config.file_format) assert_load_info(info) github_2_counts = load_table_counts(p, *[t["name"] for t in p.default_schema.data_tables()]) - assert github_2_counts["issues"] == 1 + assert github_2_counts["issues"] == 100 - 45 + 1 with p._sql_job_client(p.default_schema) as job_c: _, table_schema = job_c.get_storage_table("issues") assert "url" in table_schema @@ -589,8 +596,10 @@ def r(data): destinations_configs(default_sql_configs=True, supports_merge=True), ids=lambda x: x.name, ) -@pytest.mark.parametrize("key_type", ["primary_key", "merge_key"]) +@pytest.mark.parametrize("key_type", ["primary_key", "merge_key", "no_key"]) def test_hard_delete_hint(destination_config: DestinationTestConfiguration, key_type: str) -> None: + # no_key setting will have the effect that hard deletes have no effect, since hard delete records + # can not be matched table_name = "test_hard_delete_hint" @dlt.resource( @@ -605,6 +614,9 @@ def data_resource(data): data_resource.apply_hints(primary_key="id", merge_key="") elif key_type == "merge_key": data_resource.apply_hints(primary_key="", merge_key="id") + elif key_type == "no_key": + # we test what happens if there are no merge keys + pass p = destination_config.setup_pipeline(f"abstract_{key_type}", full_refresh=True) @@ -623,7 +635,7 @@ def data_resource(data): ] info = p.run(data_resource(data), loader_file_format=destination_config.file_format) assert_load_info(info) - assert load_table_counts(p, table_name)[table_name] == 1 + assert load_table_counts(p, table_name)[table_name] == (1 if key_type != "no_key" else 2) # update one record (None for hard_delete column is treated as "not True") data = [ @@ -631,16 +643,17 @@ def data_resource(data): ] info = p.run(data_resource(data), loader_file_format=destination_config.file_format) assert_load_info(info) - assert load_table_counts(p, table_name)[table_name] == 1 + assert load_table_counts(p, table_name)[table_name] == (1 if key_type != "no_key" else 3) # compare observed records with expected records - qual_name = p.sql_client().make_qualified_table_name(table_name) - observed = [ - {"id": row[0], "val": row[1], "deleted": row[2]} - for row in select_data(p, f"SELECT id, val, deleted FROM {qual_name}") - ] - expected = [{"id": 2, "val": "baz", "deleted": None}] - assert sorted(observed, key=lambda d: d["id"]) == expected + if key_type != "no_key": + qual_name = p.sql_client().make_qualified_table_name(table_name) + observed = [ + {"id": row[0], "val": row[1], "deleted": row[2]} + for row in select_data(p, f"SELECT id, val, deleted FROM {qual_name}") + ] + expected = [{"id": 2, "val": "baz", "deleted": None}] + assert sorted(observed, key=lambda d: d["id"]) == expected # insert two records with same key data = [ @@ -654,6 +667,12 @@ def data_resource(data): assert counts == 2 elif key_type == "merge_key": assert counts == 3 + elif key_type == "no_key": + assert counts == 5 + + # we do not need to test "no_key" further + if key_type == "no_key": + return # delete one key, resulting in one (primary key) or two (merge key) deleted records data = [ diff --git a/tests/load/pipeline/test_write_disposition_changes.py b/tests/load/pipeline/test_write_disposition_changes.py index 50986727ed..2a7a94ef6b 100644 --- a/tests/load/pipeline/test_write_disposition_changes.py +++ b/tests/load/pipeline/test_write_disposition_changes.py @@ -10,6 +10,7 @@ from dlt.pipeline.exceptions import PipelineStepFailed +@dlt.resource(primary_key="id") def data_with_subtables(offset: int) -> Any: for _, index in enumerate(range(offset, offset + 100), 1): yield { @@ -96,13 +97,10 @@ def test_switch_to_merge(destination_config: DestinationTestConfiguration, with_ pipeline_name="test_switch_to_merge", full_refresh=True ) - @dlt.resource() - def resource(): - yield data_with_subtables(10) @dlt.source() def source(): - return resource() + return data_with_subtables(10) s = source() s.root_key = with_root_key diff --git a/tests/load/utils.py b/tests/load/utils.py index 110c2b433d..2f20e91e69 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -226,6 +226,10 @@ def destinations_configs( DestinationTestConfiguration(destination="synapse", supports_dbt=False), ] + # sanity check that when selecting default destinations, one of each sql destination is actually + # provided + assert set(SQL_DESTINATIONS) == {d.destination for d in destination_configs} + if default_vector_configs: # for now only weaviate destination_configs += [DestinationTestConfiguration(destination="weaviate")]