From 24709b38513267a1055baa9d4859d02021a21847 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink <47451109+jorritsandbrink@users.noreply.github.com> Date: Wed, 17 Apr 2024 21:55:34 +0400 Subject: [PATCH] Remove `staging-optimized` replace strategy for `synapse` (#1231) * remove staging-optimized replace strategy for synapse * fix athena iceberg locations --------- Co-authored-by: Jorrit Sandbrink Co-authored-by: Dave --- dlt/destinations/impl/synapse/synapse.py | 45 +------------------ .../dlt-ecosystem/destinations/synapse.md | 3 +- .../synapse/test_synapse_table_indexing.py | 9 +--- 3 files changed, 3 insertions(+), 54 deletions(-) diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index 1c58bc4dbb..f52b64b9d9 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -127,21 +127,7 @@ def _get_columstore_valid_column(self, c: TColumnSchema) -> TColumnSchema: def _create_replace_followup_jobs( self, table_chain: Sequence[TTableSchema] ) -> List[NewLoadJob]: - if self.config.replace_strategy == "staging-optimized": - # we must recreate staging table after SCHEMA TRANSFER - job_params: SqlJobParams = {"table_chain_create_table_statements": {}} - create_statements = job_params["table_chain_create_table_statements"] - with self.with_staging_dataset(): - for table in table_chain: - columns = [c for c in self.schema.get_table_columns(table["name"]).values()] - # generate CREATE TABLE statement - create_statements[table["name"]] = self._get_table_update_sql( - table["name"], columns, generate_alter=False - ) - return [ - SynapseStagingCopyJob.from_table_chain(table_chain, self.sql_client, job_params) - ] - return super()._create_replace_followup_jobs(table_chain) + return SqlJobClientBase._create_replace_followup_jobs(self, table_chain) def prepare_load_table(self, table_name: str, staging: bool = False) -> TTableSchema: table = super().prepare_load_table(table_name, staging) @@ -149,9 +135,6 @@ def prepare_load_table(self, table_name: str, staging: bool = False) -> TTableSc # Staging tables should always be heap tables, because "when you are # temporarily landing data in dedicated SQL pool, you may find that # using a heap table makes the overall process faster." - # "staging-optimized" is not included, because in that strategy the - # staging table becomes the final table, so we should already create - # it with the desired index type. table[TABLE_INDEX_TYPE_HINT] = "heap" # type: ignore[typeddict-unknown-key] elif table_name in self.schema.dlt_table_names(): # dlt tables should always be heap tables, because "for small lookup @@ -186,32 +169,6 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> return job -class SynapseStagingCopyJob(SqlStagingCopyJob): - @classmethod - def generate_sql( - cls, - table_chain: Sequence[TTableSchema], - sql_client: SqlClientBase[Any], - params: Optional[SqlJobParams] = None, - ) -> List[str]: - sql: List[str] = [] - for table in table_chain: - with sql_client.with_staging_dataset(staging=True): - staging_table_name = sql_client.make_qualified_table_name(table["name"]) - table_name = sql_client.make_qualified_table_name(table["name"]) - # drop destination table - sql.append(f"DROP TABLE {table_name};") - # moving staging table to destination schema - sql.append( - f"ALTER SCHEMA {sql_client.fully_qualified_dataset_name()} TRANSFER" - f" {staging_table_name};" - ) - # recreate staging table - sql.extend(params["table_chain_create_table_statements"][table["name"]]) - - return sql - - class SynapseCopyFileLoadJob(CopyRemoteFileLoadJob): def __init__( self, diff --git a/docs/website/docs/dlt-ecosystem/destinations/synapse.md b/docs/website/docs/dlt-ecosystem/destinations/synapse.md index 2369a5c566..fae4571e34 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/synapse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/synapse.md @@ -129,7 +129,7 @@ pipeline = dlt.pipeline( ## Write disposition All write dispositions are supported. -If you set the [`replace` strategy](../../general-usage/full-loading.md) to `staging-optimized`, the destination tables will be dropped and replaced by the staging tables with an `ALTER SCHEMA ... TRANSFER` command. Please note that this operation is **not** atomic—it involves multiple DDL commands and Synapse does not support DDL transactions. +> ❗ The `staging-optimized` [`replace` strategy](../../general-usage/full-loading.md) is **not** implemented for Synapse. ## Data loading Data is loaded via `INSERT` statements by default. @@ -166,7 +166,6 @@ Possible values: >* **CLUSTERED COLUMNSTORE INDEX tables do not support the `varchar(max)`, `nvarchar(max)`, and `varbinary(max)` data types.** If you don't specify the `precision` for columns that map to any of these types, `dlt` will use the maximum lengths `varchar(4000)`, `nvarchar(4000)`, and `varbinary(8000)`. >* **While Synapse creates CLUSTERED COLUMNSTORE INDEXES by default, `dlt` creates HEAP tables by default.** HEAP is a more robust choice because it supports all data types and doesn't require conversions. >* **When using the `insert-from-staging` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are always created as HEAP tables**—any configuration of the table index types is ignored. The HEAP strategy makes sense for staging tables for reasons explained [here](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables). ->* **When using the `staging-optimized` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are already created with the configured table index type**, because the staging table becomes the final table. >* **`dlt` system tables are always created as HEAP tables, regardless of any configuration.** This is in line with Microsoft's recommendation that "for small lookup tables, less than 60 million rows, consider using HEAP or clustered index for faster query performance." >* Child tables, if any, inherit the table index type of their parent table. diff --git a/tests/load/synapse/test_synapse_table_indexing.py b/tests/load/synapse/test_synapse_table_indexing.py index 71f419cbca..c9ecba17a1 100644 --- a/tests/load/synapse/test_synapse_table_indexing.py +++ b/tests/load/synapse/test_synapse_table_indexing.py @@ -100,18 +100,11 @@ def items_with_table_index_type_specified() -> Iterator[Any]: @pytest.mark.parametrize( "table_index_type,column_schema", TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID ) -@pytest.mark.parametrize( - # Also test staging replace strategies, to make sure the final table index - # type is not affected by staging table index type adjustments. - "replace_strategy", - ["insert-from-staging", "staging-optimized"], -) def test_resource_table_index_type_configuration( table_index_type: TTableIndexType, column_schema: Union[List[TColumnSchema], None], - replace_strategy: str, ) -> None: - os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy + os.environ["DESTINATION__REPLACE_STRATEGY"] = "insert-from-staging" @dlt.resource( name="items_with_table_index_type_specified",