Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make merge write-disposition fall back to append if no primary or merge keys are specified #1225

Merged
merged 11 commits into from
Apr 24, 2024
8 changes: 8 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,14 @@ def _verify_schema(self) -> None:
f'"{table["x-merge-strategy"]}" is not a valid merge strategy. ' # type: ignore[typeddict-item]
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}."""
)
if not has_column_with_prop(table, "primary_key") and not has_column_with_prop(
table, "merge_key"
):
logger.warning(
f"Table {table_name} has write_disposition set to merge, but no primary or"
" merge keys defined. "
+ "dlt will fall back to append for this table."
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
raise SchemaException(
Expand Down
109 changes: 59 additions & 50 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ def generate_sql(


class SqlMergeJob(SqlBaseJob):
"""Generates a list of sql statements that merge the data from staging dataset into destination dataset."""
"""
Generates a list of sql statements that merge the data from staging dataset into destination dataset.
If no merge keys are discovered, falls back to append.
"""

failed_text: str = "Tried to generate a merge sql job for the following tables:"

Expand Down Expand Up @@ -382,68 +385,74 @@ def gen_merge_sql(
get_columns_names_with_prop(root_table, "merge_key"),
)
)
key_clauses = cls._gen_key_table_clauses(primary_keys, merge_keys)

unique_column: str = None
root_key_column: str = None
# if we do not have any merge keys to select from, we will fall back to a staged append, i.E.
# just skip the delete part
append_fallback = (len(primary_keys) + len(merge_keys)) == 0

if len(table_chain) == 1:
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=True
)
# if no child tables, just delete data from top table
for clause in key_table_clauses:
sql.append(f"DELETE {clause};")
else:
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=False
)
# use unique hint to create temp table with all identifiers to delete
unique_columns = get_columns_names_with_prop(root_table, "unique")
if not unique_columns:
raise MergeDispositionException(
sql_client.fully_qualified_dataset_name(),
staging_root_table_name,
[t["name"] for t in table_chain],
f"There is no unique column (ie _dlt_id) in top table {root_table['name']} so"
" it is not possible to link child tables to it.",
)
# get first unique column
unique_column = escape_id(unique_columns[0])
# create temp table with unique identifier
create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql(
unique_column, key_table_clauses, sql_client
)
sql.extend(create_delete_temp_table_sql)
if not append_fallback:
key_clauses = cls._gen_key_table_clauses(primary_keys, merge_keys)

# delete from child tables first. This is important for databricks which does not support temporary tables,
# but uses temporary views instead
for table in table_chain[1:]:
table_name = sql_client.make_qualified_table_name(table["name"])
root_key_columns = get_columns_names_with_prop(table, "root_key")
if not root_key_columns:
unique_column: str = None
root_key_column: str = None

if len(table_chain) == 1:
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=True
)
# if no child tables, just delete data from top table
for clause in key_table_clauses:
sql.append(f"DELETE {clause};")
else:
key_table_clauses = cls.gen_key_table_clauses(
root_table_name, staging_root_table_name, key_clauses, for_delete=False
)
# use unique hint to create temp table with all identifiers to delete
unique_columns = get_columns_names_with_prop(root_table, "unique")
if not unique_columns:
raise MergeDispositionException(
sql_client.fully_qualified_dataset_name(),
staging_root_table_name,
[t["name"] for t in table_chain],
"There is no root foreign key (ie _dlt_root_id) in child table"
f" {table['name']} so it is not possible to refer to top level table"
f" {root_table['name']} unique column {unique_column}",
"There is no unique column (ie _dlt_id) in top table"
f" {root_table['name']} so it is not possible to link child tables to it.",
)
root_key_column = escape_id(root_key_columns[0])
# get first unique column
unique_column = escape_id(unique_columns[0])
# create temp table with unique identifier
create_delete_temp_table_sql, delete_temp_table_name = (
cls.gen_delete_temp_table_sql(unique_column, key_table_clauses, sql_client)
)
sql.extend(create_delete_temp_table_sql)

# delete from child tables first. This is important for databricks which does not support temporary tables,
# but uses temporary views instead
for table in table_chain[1:]:
table_name = sql_client.make_qualified_table_name(table["name"])
root_key_columns = get_columns_names_with_prop(table, "root_key")
if not root_key_columns:
raise MergeDispositionException(
sql_client.fully_qualified_dataset_name(),
staging_root_table_name,
[t["name"] for t in table_chain],
"There is no root foreign key (ie _dlt_root_id) in child table"
f" {table['name']} so it is not possible to refer to top level table"
f" {root_table['name']} unique column {unique_column}",
)
root_key_column = escape_id(root_key_columns[0])
sql.append(
cls.gen_delete_from_sql(
table_name, root_key_column, delete_temp_table_name, unique_column
)
)

# delete from top table now that child tables have been prcessed
sql.append(
cls.gen_delete_from_sql(
table_name, root_key_column, delete_temp_table_name, unique_column
root_table_name, unique_column, delete_temp_table_name, unique_column
)
)

# delete from top table now that child tables have been prcessed
sql.append(
cls.gen_delete_from_sql(
root_table_name, unique_column, delete_temp_table_name, unique_column
)
)

# get name of column with hard_delete hint, if specified
not_deleted_cond: str = None
hard_delete_col = get_first_column_name_with_prop(root_table, "hard_delete")
Expand Down
6 changes: 6 additions & 0 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def github_repo_events(last_created_at = dlt.sources.incremental("created_at", "
yield from _get_rest_pages("events")
```

:::note
If you use the `merge` write disposition, but do not specify merge or primary keys, merge will fallback to `append`.
The appended data will be inserted from a staging table in one transaction for most destinations in this case.
:::


#### Delete records
The `hard_delete` column hint can be used to delete records from the destination dataset. The behavior of the delete mechanism depends on the data type of the column marked with the hint:
1) `bool` type: only `True` leads to a delete—`None` and `False` values are disregarded
Expand Down
83 changes: 19 additions & 64 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,14 @@
skip_if_not_active("filesystem")


def assert_file_matches(
layout: str, job: LoadJobInfo, load_id: str, client: FilesystemClient
) -> None:
"""Verify file contents of load job are identical to the corresponding file in destination"""
local_path = Path(job.file_path)
filename = local_path.name
destination_fn = create_path(
layout,
filename,
client.schema.name,
load_id,
extra_placeholders=client.config.extra_placeholders,
)
destination_path = posixpath.join(client.dataset_path, destination_fn)

assert local_path.read_bytes() == client.fs_client.read_bytes(destination_path)


def test_pipeline_merge_write_disposition(default_buckets_env: str) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test shows that filesystem always falls back to append when "merge" is set. this is also what it says in the docs. I'm not sure why this test says it would replace when there is a primary key, this did not work before and it does not work that way now and this test was somehow very strange (not sure if I wrote that) but now it runs correctly. I we want to replace in certain cases, I have to add it and specify that in the docs.

"""Run pipeline twice with merge write disposition
Resource with primary key falls back to append. Resource without keys falls back to replace.
Regardless wether primary key is set or not, filesystem appends
"""
import pyarrow.parquet as pq # Module is evaluated by other tests

os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True"

pipeline = dlt.pipeline(
pipeline_name="test_" + uniq_id(),
destination="filesystem",
Expand All @@ -71,54 +55,25 @@ def other_data():
def some_source():
return [some_data(), other_data()]

info1 = pipeline.run(some_source(), write_disposition="merge")
info2 = pipeline.run(some_source(), write_disposition="merge")

client: FilesystemClient = pipeline.destination_client() # type: ignore[assignment]
layout = client.config.layout

append_glob = list(client._get_table_dirs(["some_data"]))[0]
replace_glob = list(client._get_table_dirs(["other_data"]))[0]

append_files = client.fs_client.ls(append_glob, detail=False, refresh=True)
replace_files = client.fs_client.ls(replace_glob, detail=False, refresh=True)

load_id1 = info1.loads_ids[0]
load_id2 = info2.loads_ids[0]

# resource with pk is loaded with append and has 1 copy for each load
assert len(append_files) == 2
assert any(load_id1 in fn for fn in append_files)
assert any(load_id2 in fn for fn in append_files)

# resource without pk is treated as append disposition
assert len(replace_files) == 2
assert any(load_id1 in fn for fn in replace_files)
assert any(load_id2 in fn for fn in replace_files)

# Verify file contents
assert info2.load_packages
for pkg in info2.load_packages:
assert pkg.jobs["completed_jobs"]
for job in pkg.jobs["completed_jobs"]:
assert_file_matches(layout, job, pkg.load_id, client)

complete_fn = f"{client.schema.name}__%s.jsonl"
pipeline.run(some_source(), write_disposition="merge")
assert load_table_counts(pipeline, "some_data", "other_data") == {
"some_data": 3,
"other_data": 5,
}

# Test complete_load markers are saved
assert client.fs_client.isfile(
posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id1)
)
assert client.fs_client.isfile(
posixpath.join(client.dataset_path, client.schema.loads_table_name, complete_fn % load_id2)
)
# second load shows that merge always appends on filesystem
pipeline.run(some_source(), write_disposition="merge")
assert load_table_counts(pipeline, "some_data", "other_data") == {
"some_data": 6,
"other_data": 10,
}

# Force replace
# Force replace, back to initial values
pipeline.run(some_source(), write_disposition="replace")
append_files = client.fs_client.ls(append_glob, detail=False, refresh=True)
replace_files = client.fs_client.ls(replace_glob, detail=False, refresh=True)
assert len(append_files) == 1
assert len(replace_files) == 1
assert load_table_counts(pipeline, "some_data", "other_data") == {
"some_data": 3,
"other_data": 5,
}


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
Expand Down
49 changes: 34 additions & 15 deletions tests/load/pipeline/test_merge_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,17 @@ def test_merge_no_child_tables(destination_config: DestinationTestConfiguration)
assert github_2_counts["issues"] == 100 if destination_config.supports_merge else 115


# mark as essential for now
@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, local_filesystem_configs=True),
ids=lambda x: x.name,
)
def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) -> None:
# NOTE: we can test filesystem destination merge behavior here too, will also fallback!
if destination_config.file_format == "insert_values":
pytest.skip("Insert values row count checking is buggy, skipping")
p = destination_config.setup_pipeline("github_3", full_refresh=True)
github_data = github()
# remove all keys
Expand All @@ -264,8 +271,8 @@ def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) -
info = p.run(github_data, loader_file_format=destination_config.file_format)
assert_load_info(info)
github_1_counts = load_table_counts(p, *[t["name"] for t in p.default_schema.data_tables()])
# only ten rows remains. merge falls back to replace when no keys are specified
assert github_1_counts["issues"] == 10 if destination_config.supports_merge else 100 - 45
# we have 10 rows more, merge falls back to append if no keys present
assert github_1_counts["issues"] == 100 - 45 + 10


@pytest.mark.parametrize(
Expand All @@ -291,14 +298,14 @@ def test_merge_keys_non_existing_columns(destination_config: DestinationTestConf
if not destination_config.supports_merge:
return

# all the keys are invalid so the merge falls back to replace
# all the keys are invalid so the merge falls back to append
github_data = github()
github_data.load_issues.apply_hints(merge_key=("mA1", "Ma2"), primary_key=("123-x",))
github_data.load_issues.add_filter(take_first(1))
info = p.run(github_data, loader_file_format=destination_config.file_format)
assert_load_info(info)
github_2_counts = load_table_counts(p, *[t["name"] for t in p.default_schema.data_tables()])
assert github_2_counts["issues"] == 1
assert github_2_counts["issues"] == 100 - 45 + 1
with p._sql_job_client(p.default_schema) as job_c:
_, table_schema = job_c.get_storage_table("issues")
assert "url" in table_schema
Expand Down Expand Up @@ -589,8 +596,10 @@ def r(data):
destinations_configs(default_sql_configs=True, supports_merge=True),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("key_type", ["primary_key", "merge_key"])
@pytest.mark.parametrize("key_type", ["primary_key", "merge_key", "no_key"])
def test_hard_delete_hint(destination_config: DestinationTestConfiguration, key_type: str) -> None:
# no_key setting will have the effect that hard deletes have no effect, since hard delete records
# can not be matched
table_name = "test_hard_delete_hint"

@dlt.resource(
Expand All @@ -605,6 +614,9 @@ def data_resource(data):
data_resource.apply_hints(primary_key="id", merge_key="")
elif key_type == "merge_key":
data_resource.apply_hints(primary_key="", merge_key="id")
elif key_type == "no_key":
# we test what happens if there are no merge keys
pass

p = destination_config.setup_pipeline(f"abstract_{key_type}", full_refresh=True)

Expand All @@ -623,24 +635,25 @@ def data_resource(data):
]
info = p.run(data_resource(data), loader_file_format=destination_config.file_format)
assert_load_info(info)
assert load_table_counts(p, table_name)[table_name] == 1
assert load_table_counts(p, table_name)[table_name] == (1 if key_type != "no_key" else 2)

# update one record (None for hard_delete column is treated as "not True")
data = [
{"id": 2, "val": "baz", "deleted": None},
]
info = p.run(data_resource(data), loader_file_format=destination_config.file_format)
assert_load_info(info)
assert load_table_counts(p, table_name)[table_name] == 1
assert load_table_counts(p, table_name)[table_name] == (1 if key_type != "no_key" else 3)

# compare observed records with expected records
qual_name = p.sql_client().make_qualified_table_name(table_name)
observed = [
{"id": row[0], "val": row[1], "deleted": row[2]}
for row in select_data(p, f"SELECT id, val, deleted FROM {qual_name}")
]
expected = [{"id": 2, "val": "baz", "deleted": None}]
assert sorted(observed, key=lambda d: d["id"]) == expected
if key_type != "no_key":
qual_name = p.sql_client().make_qualified_table_name(table_name)
observed = [
{"id": row[0], "val": row[1], "deleted": row[2]}
for row in select_data(p, f"SELECT id, val, deleted FROM {qual_name}")
]
expected = [{"id": 2, "val": "baz", "deleted": None}]
assert sorted(observed, key=lambda d: d["id"]) == expected

# insert two records with same key
data = [
Expand All @@ -654,6 +667,12 @@ def data_resource(data):
assert counts == 2
elif key_type == "merge_key":
assert counts == 3
elif key_type == "no_key":
assert counts == 5

# we do not need to test "no_key" further
if key_type == "no_key":
return

# delete one key, resulting in one (primary key) or two (merge key) deleted records
data = [
Expand Down
Loading
Loading