Skip to content

Commit

Permalink
Remove staging-optimized replace strategy for synapse (#1231)
Browse files Browse the repository at this point in the history
* remove staging-optimized replace strategy for synapse

* fix athena iceberg locations

---------

Co-authored-by: Jorrit Sandbrink <[email protected]>
Co-authored-by: Dave <[email protected]>
  • Loading branch information
3 people authored and zem360 committed Apr 17, 2024
1 parent 981c26b commit 24709b3
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 54 deletions.
45 changes: 1 addition & 44 deletions dlt/destinations/impl/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,31 +127,14 @@ def _get_columstore_valid_column(self, c: TColumnSchema) -> TColumnSchema:
def _create_replace_followup_jobs(
self, table_chain: Sequence[TTableSchema]
) -> List[NewLoadJob]:
if self.config.replace_strategy == "staging-optimized":
# we must recreate staging table after SCHEMA TRANSFER
job_params: SqlJobParams = {"table_chain_create_table_statements": {}}
create_statements = job_params["table_chain_create_table_statements"]
with self.with_staging_dataset():
for table in table_chain:
columns = [c for c in self.schema.get_table_columns(table["name"]).values()]
# generate CREATE TABLE statement
create_statements[table["name"]] = self._get_table_update_sql(
table["name"], columns, generate_alter=False
)
return [
SynapseStagingCopyJob.from_table_chain(table_chain, self.sql_client, job_params)
]
return super()._create_replace_followup_jobs(table_chain)
return SqlJobClientBase._create_replace_followup_jobs(self, table_chain)

def prepare_load_table(self, table_name: str, staging: bool = False) -> TTableSchema:
table = super().prepare_load_table(table_name, staging)
if staging and self.config.replace_strategy == "insert-from-staging":
# Staging tables should always be heap tables, because "when you are
# temporarily landing data in dedicated SQL pool, you may find that
# using a heap table makes the overall process faster."
# "staging-optimized" is not included, because in that strategy the
# staging table becomes the final table, so we should already create
# it with the desired index type.
table[TABLE_INDEX_TYPE_HINT] = "heap" # type: ignore[typeddict-unknown-key]
elif table_name in self.schema.dlt_table_names():
# dlt tables should always be heap tables, because "for small lookup
Expand Down Expand Up @@ -186,32 +169,6 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
return job


class SynapseStagingCopyJob(SqlStagingCopyJob):
@classmethod
def generate_sql(
cls,
table_chain: Sequence[TTableSchema],
sql_client: SqlClientBase[Any],
params: Optional[SqlJobParams] = None,
) -> List[str]:
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset(staging=True):
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
# drop destination table
sql.append(f"DROP TABLE {table_name};")
# moving staging table to destination schema
sql.append(
f"ALTER SCHEMA {sql_client.fully_qualified_dataset_name()} TRANSFER"
f" {staging_table_name};"
)
# recreate staging table
sql.extend(params["table_chain_create_table_statements"][table["name"]])

return sql


class SynapseCopyFileLoadJob(CopyRemoteFileLoadJob):
def __init__(
self,
Expand Down
3 changes: 1 addition & 2 deletions docs/website/docs/dlt-ecosystem/destinations/synapse.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pipeline = dlt.pipeline(
## Write disposition
All write dispositions are supported.

If you set the [`replace` strategy](../../general-usage/full-loading.md) to `staging-optimized`, the destination tables will be dropped and replaced by the staging tables with an `ALTER SCHEMA ... TRANSFER` command. Please note that this operation is **not** atomic—it involves multiple DDL commands and Synapse does not support DDL transactions.
> ❗ The `staging-optimized` [`replace` strategy](../../general-usage/full-loading.md) is **not** implemented for Synapse.
## Data loading
Data is loaded via `INSERT` statements by default.
Expand Down Expand Up @@ -166,7 +166,6 @@ Possible values:
>* **CLUSTERED COLUMNSTORE INDEX tables do not support the `varchar(max)`, `nvarchar(max)`, and `varbinary(max)` data types.** If you don't specify the `precision` for columns that map to any of these types, `dlt` will use the maximum lengths `varchar(4000)`, `nvarchar(4000)`, and `varbinary(8000)`.
>* **While Synapse creates CLUSTERED COLUMNSTORE INDEXES by default, `dlt` creates HEAP tables by default.** HEAP is a more robust choice because it supports all data types and doesn't require conversions.
>* **When using the `insert-from-staging` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are always created as HEAP tables**—any configuration of the table index types is ignored. The HEAP strategy makes sense for staging tables for reasons explained [here](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables).
>* **When using the `staging-optimized` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are already created with the configured table index type**, because the staging table becomes the final table.
>* **`dlt` system tables are always created as HEAP tables, regardless of any configuration.** This is in line with Microsoft's recommendation that "for small lookup tables, less than 60 million rows, consider using HEAP or clustered index for faster query performance."
>* Child tables, if any, inherit the table index type of their parent table.
Expand Down
9 changes: 1 addition & 8 deletions tests/load/synapse/test_synapse_table_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,11 @@ def items_with_table_index_type_specified() -> Iterator[Any]:
@pytest.mark.parametrize(
"table_index_type,column_schema", TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID
)
@pytest.mark.parametrize(
# Also test staging replace strategies, to make sure the final table index
# type is not affected by staging table index type adjustments.
"replace_strategy",
["insert-from-staging", "staging-optimized"],
)
def test_resource_table_index_type_configuration(
table_index_type: TTableIndexType,
column_schema: Union[List[TColumnSchema], None],
replace_strategy: str,
) -> None:
os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy
os.environ["DESTINATION__REPLACE_STRATEGY"] = "insert-from-staging"

@dlt.resource(
name="items_with_table_index_type_specified",
Expand Down

0 comments on commit 24709b3

Please sign in to comment.