From 49623d7309bb64600d2ddb3f545ed6a9d8a0ddaf Mon Sep 17 00:00:00 2001 From: Mila Page <67295367+VersusFacit@users.noreply.github.com> Date: Thu, 12 Sep 2024 13:26:43 -0700 Subject: [PATCH] Add support for Iceberg Table Materialization (#1170) * Add materializations of table and dynamic table. * Add the method to tell something is iceberg format and pipe that through to relation object * Finish create macro and fix alters. * Finish todo items and begin cleaning code. * revert dynamic table changes. * Fix the drop by fixing snowflake__show_iceberg_relations * Transient needs sophisticated handling based on what user specifies for transient manually. * Try to figure out what the right None semantics are. * Revert to original statement. * Fix the transient behavior by passing table_type again. * Rename object_format config param to table_format * Migrate Jinja macros to Python. * All classes are frozen * Clean up the metadata queries that power is_iceberg column generation * Fix Python models generation argument * Add changelog. * Try to fix duplication of join record issues. * Use the RelationConfig protocol for type checking. * Fix transient semantics. * Add functional tests. * Fix test. * Fix test. * Fix test and remove strip calls * Add view test case. * Code review comments. * I'm using too new a version of mypy for Self. * Add a behavior flag for iceberg table materialization. * Flip order of flag. * Adjust test. --------- Co-authored-by: Mila Page --- .../unreleased/Features-20240911-001806.yaml | 6 + dbt/adapters/snowflake/impl.py | 41 +++++- dbt/adapters/snowflake/relation.py | 132 +++++++++++++++++- .../snowflake/relation_configs/__init__.py | 1 + .../snowflake/relation_configs/formats.py | 14 ++ dbt/include/snowflake/macros/adapters.sql | 27 ++-- .../macros/materializations/table.sql | 15 +- .../macros/relations/table/create.sql | 33 +++-- tests/functional/iceberg/test_table_basic.py | 106 ++++++++++++++ 9 files changed, 339 insertions(+), 36 deletions(-) create mode 100644 .changes/unreleased/Features-20240911-001806.yaml create mode 100644 dbt/adapters/snowflake/relation_configs/formats.py create mode 100644 tests/functional/iceberg/test_table_basic.py diff --git a/.changes/unreleased/Features-20240911-001806.yaml b/.changes/unreleased/Features-20240911-001806.yaml new file mode 100644 index 000000000..024480b96 --- /dev/null +++ b/.changes/unreleased/Features-20240911-001806.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for Iceberg table materializations. +time: 2024-09-11T00:18:06.780586-07:00 +custom: + Author: versusfacit + Issue: "321" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 6854b199d..7e8ec9cf2 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -9,6 +9,7 @@ LIST_SCHEMAS_MACRO_NAME, LIST_RELATIONS_MACRO_NAME, ) +from dbt_common.behavior_flags import BehaviorFlag from dbt_common.contracts.constraints import ConstraintType from dbt_common.contracts.metadata import ( TableMetadata, @@ -20,7 +21,10 @@ from dbt_common.exceptions import CompilationError, DbtDatabaseError, DbtRuntimeError from dbt_common.utils import filter_null_values -from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType +from dbt.adapters.snowflake.relation_configs import ( + SnowflakeRelationType, + TableFormat, +) from dbt.adapters.snowflake import SnowflakeColumn from dbt.adapters.snowflake import SnowflakeConnectionManager from dbt.adapters.snowflake import SnowflakeRelation @@ -44,6 +48,11 @@ class SnowflakeConfig(AdapterConfig): merge_update_columns: Optional[str] = None target_lag: Optional[str] = None + # extended formats + table_format: Optional[str] = None + external_volume: Optional[str] = None + base_location_subpath: Optional[str] = None + class SnowflakeAdapter(SQLAdapter): Relation = SnowflakeRelation @@ -69,6 +78,10 @@ class SnowflakeAdapter(SQLAdapter): } ) + @property + def _behavior_flags(self) -> List[BehaviorFlag]: + return [{"name": "enable_iceberg_materializations", "default": False}] + @classmethod def date_function(cls): return "CURRENT_TIMESTAMP()" @@ -223,8 +236,9 @@ def list_relations_without_caching( self, schema_relation: SnowflakeRelation ) -> List[SnowflakeRelation]: kwargs = {"schema_relation": schema_relation} + try: - results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) + schema_objects = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) except DbtDatabaseError as exc: # if the schema doesn't exist, we just want to return. # Alternatively, we could query the list of schemas before we start @@ -235,18 +249,26 @@ def list_relations_without_caching( # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory columns = ["database_name", "schema_name", "name", "kind"] - if "is_dynamic" in results.column_names: + if "is_dynamic" in schema_objects.column_names: columns.append("is_dynamic") + if "is_iceberg" in schema_objects.column_names: + columns.append("is_iceberg") - return [self._parse_list_relations_result(result) for result in results.select(columns)] + return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)] def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation: # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory + # this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects try: - database, schema, identifier, relation_type, is_dynamic = result + if self.behavior.enable_iceberg_materializations.no_warn: + database, schema, identifier, relation_type, is_dynamic, is_iceberg = result + else: + database, schema, identifier, relation_type, is_dynamic = result except ValueError: database, schema, identifier, relation_type = result is_dynamic = "N" + if self.behavior.enable_iceberg_materializations.no_warn: + is_iceberg = "N" try: relation_type = self.Relation.get_relation_type(relation_type.lower()) @@ -256,12 +278,21 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation if relation_type == self.Relation.Table and is_dynamic == "Y": relation_type = self.Relation.DynamicTable + # This line is the main gate on supporting Iceberg materializations. Pass forward a default + # table format, and no downstream table macros can build iceberg relations. + table_format: str = ( + TableFormat.ICEBERG + if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES") + else TableFormat.DEFAULT + ) quote_policy = {"database": True, "schema": True, "identifier": True} + return self.Relation.create( database=database, schema=schema, identifier=identifier, type=relation_type, + table_format=table_format, quote_policy=quote_policy, ) diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index ace85695b..224b2b75e 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -1,8 +1,12 @@ +import textwrap + from dataclasses import dataclass, field -from typing import FrozenSet, Optional, Type +from typing import FrozenSet, Optional, Type, Iterator, Tuple + from dbt.adapters.base.relation import BaseRelation from dbt.adapters.contracts.relation import ComponentName, RelationConfig +from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug from dbt.adapters.relation_configs import ( RelationConfigBase, RelationConfigChangeAction, @@ -10,6 +14,7 @@ ) from dbt.adapters.utils import classproperty from dbt_common.exceptions import DbtRuntimeError +from dbt_common.events.functions import fire_event, warn_or_error from dbt.adapters.snowflake.relation_configs import ( SnowflakeDynamicTableConfig, @@ -17,6 +22,7 @@ SnowflakeDynamicTableRefreshModeConfigChange, SnowflakeDynamicTableTargetLagConfigChange, SnowflakeDynamicTableWarehouseConfigChange, + TableFormat, SnowflakeQuotePolicy, SnowflakeRelationType, ) @@ -25,6 +31,7 @@ @dataclass(frozen=True, eq=False, repr=False) class SnowflakeRelation(BaseRelation): type: Optional[SnowflakeRelationType] = None + table_format: str = TableFormat.DEFAULT quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy()) require_alias: bool = False relation_configs = { @@ -53,6 +60,10 @@ class SnowflakeRelation(BaseRelation): def is_dynamic_table(self) -> bool: return self.type == SnowflakeRelationType.DynamicTable + @property + def is_iceberg_format(self) -> bool: + return self.table_format == TableFormat.ICEBERG + @classproperty def DynamicTable(cls) -> str: return str(SnowflakeRelationType.DynamicTable) @@ -120,3 +131,122 @@ def as_case_sensitive(self) -> "SnowflakeRelation": path_part_map[path] = part.upper() return self.replace_path(**path_part_map) + + def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str: + """ + This macro renders the appropriate DDL prefix during the create_table_as + macro. It decides based on mutually exclusive table configuration options: + + - TEMPORARY: Indicates a table that exists only for the duration of the session. + - ICEBERG: A specific storage format that requires a distinct DDL layout. + - TRANSIENT: A table similar to a permanent table but without fail-safe. + + Additional Caveats for Iceberg models: + - transient=true throws a warning because Iceberg does not support transient tables + - A temporary relation is never an Iceberg relation because Iceberg does not + support temporary relations. + """ + + transient_explicitly_set_true: bool = config.get("transient", False) + + # Temporary tables are a Snowflake feature that do not exist in the + # Iceberg framework. We ignore the Iceberg status of the model. + if temporary: + return "temporary" + elif self.is_iceberg_format: + # Log a warning that transient=true on an Iceberg relation is ignored. + if transient_explicitly_set_true: + warn_or_error( + AdapterEventWarning( + base_msg=( + "Iceberg format relations cannot be transient. Please " + "remove either the transient or iceberg config options " + f"from {self.path.database}.{self.path.schema}." + f"{self.path.identifier}. If left unmodified, dbt will " + "ignore 'transient'." + ) + ) + ) + + return "iceberg" + + # Always supply transient on table create DDL unless user specifically sets + # transient to false or unset. Might as well update the object attribute too! + elif transient_explicitly_set_true or config.get("transient", True): + return "transient" + else: + return "" + + def get_ddl_prefix_for_alter(self) -> str: + """All ALTER statements on Iceberg tables require an ICEBERG prefix""" + if self.is_iceberg_format: + return "iceberg" + else: + return "" + + def get_iceberg_ddl_options(self, config: RelationConfig) -> str: + base_location: str = f"_dbt/{self.schema}/{self.name}" + + if subpath := config.get("base_location_subpath"): + base_location += f"/{subpath}" + + iceberg_ddl_predicates: str = f""" + external_volume = '{config.get('external_volume')}' + catalog = 'snowflake' + base_location = '{base_location}' + """ + return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10) + + def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]: + drop_view_message: str = ( + f"Dropping relation {old_relation} because it is a view and target relation {self} " + f"is of type {self.type}." + ) + + drop_table_for_iceberg_message: str = ( + f"Dropping relation {old_relation} because it is a default format table " + f"and target relation {self} is an Iceberg format table." + ) + + drop_iceberg_for_table_message: str = ( + f"Dropping relation {old_relation} because it is an Iceberg format table " + f"and target relation {self} is a default format table." + ) + + # An existing view must be dropped for model to build into a table". + yield (not old_relation.is_table, drop_view_message) + # An existing table must be dropped for model to build into an Iceberg table. + yield ( + old_relation.is_table + and not old_relation.is_iceberg_format + and self.is_iceberg_format, + drop_table_for_iceberg_message, + ) + # existing Iceberg table must be dropped for model to build into a table. + yield ( + old_relation.is_table + and old_relation.is_iceberg_format + and not self.is_iceberg_format, + drop_iceberg_for_table_message, + ) + + def needs_to_drop(self, old_relation: Optional["SnowflakeRelation"]) -> bool: + """ + To convert between Iceberg and non-Iceberg relations, a preemptive drop is + required. + + drops cause latency, but it should be a relatively infrequent occurrence. + + Some Boolean expression below are logically redundant, but this is done for easier + readability. + """ + + if old_relation is None: + return False + + for condition, message in self.__drop_conditions(old_relation): + if condition: + fire_event(AdapterEventDebug(base_msg=message)) + return True + + return False diff --git a/dbt/adapters/snowflake/relation_configs/__init__.py b/dbt/adapters/snowflake/relation_configs/__init__.py index 62f95faff..61941ab50 100644 --- a/dbt/adapters/snowflake/relation_configs/__init__.py +++ b/dbt/adapters/snowflake/relation_configs/__init__.py @@ -10,3 +10,4 @@ SnowflakeQuotePolicy, SnowflakeRelationType, ) +from dbt.adapters.snowflake.relation_configs.formats import TableFormat diff --git a/dbt/adapters/snowflake/relation_configs/formats.py b/dbt/adapters/snowflake/relation_configs/formats.py new file mode 100644 index 000000000..460241d9d --- /dev/null +++ b/dbt/adapters/snowflake/relation_configs/formats.py @@ -0,0 +1,14 @@ +from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 + + +class TableFormat(StrEnum): + """ + Snowflake docs refers to this an 'Object Format.' + Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here. + """ + + DEFAULT = "default" + ICEBERG = "iceberg" + + def __str__(self): + return self.value diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 4cb4bcffa..aa8895819 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -137,15 +137,24 @@ {% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %} {%- set max_total_results = max_results_per_iter * max_iter -%} - {% if schema_relation is string %} - {%- set sql -%} - show objects in {{ schema_relation }} limit {{ max_results_per_iter }} - {%- endset -%} - {% else %} - {%- set sql -%} - show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }} - {%- endset -%} - {% endif -%} + {%- set sql -%} + {% if schema_relation is string %} + show objects in {{ schema_relation }} limit {{ max_results_per_iter }}; + {% else %} + show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }}; + {% endif -%} + + {# -- Gated for performance reason. If you don't want Iceberg, you shouldn't pay the + -- latency penalty. #} + {% if adapter.behavior.enable_iceberg_materializations.no_warn %} + select all_objects.*, is_iceberg as "is_iceberg" + from table(result_scan(last_query_id(-1))) all_objects + left join INFORMATION_SCHEMA.tables as all_tables + on all_tables.table_name = all_objects."name" + and all_tables.table_schema = all_objects."schema_name" + and all_tables.table_catalog = all_objects."database_name" + {% endif -%} + {%- endset -%} {%- set result = run_query(sql) -%} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index ef201c705..cbc6d9ce6 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -8,16 +8,17 @@ {% set grant_config = config.get('grants') %} {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(identifier=identifier, - schema=schema, - database=database, type='table') -%} + {%- set target_relation = api.Relation.create( + identifier=identifier, + schema=schema, + database=database, + type='table', + table_format=config.get('table_format', 'default') + ) -%} {{ run_hooks(pre_hooks) }} - {#-- Drop the relation if it was a view to "convert" it in a table. This may lead to - -- downtime, but it should be a relatively infrequent occurrence #} - {% if old_relation is not none and not old_relation.is_table %} - {{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }} + {% if target_relation.needs_to_drop(old_relation) %} {{ drop_relation_if_exists(old_relation) }} {% endif %} diff --git a/dbt/include/snowflake/macros/relations/table/create.sql b/dbt/include/snowflake/macros/relations/table/create.sql index c6bc8f775..355150e28 100644 --- a/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt/include/snowflake/macros/relations/table/create.sql @@ -1,14 +1,8 @@ {% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%} - {%- set transient = config.get('transient', default=true) -%} - - {% if temporary -%} - {%- set table_type = "temporary" -%} - {%- elif transient -%} - {%- set table_type = "transient" -%} - {%- else -%} - {%- set table_type = "" -%} - {%- endif %} + {%- set materialization_prefix = relation.get_ddl_prefix_for_create(config.model.config, temporary) -%} + {%- set alter_prefix = relation.get_ddl_prefix_for_alter() -%} + {# Generate DDL/DML #} {%- if language == 'sql' -%} {%- set cluster_by_keys = config.get('cluster_by', default=none) -%} {%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%} @@ -26,7 +20,15 @@ {{ sql_header if sql_header is not none }} - create or replace {{ table_type }} table {{ relation }} + create or replace {{ materialization_prefix }} table {{ relation }} + {%- if relation.is_iceberg_format %} + {# + Valid DDL in CTAS statements. Plain create statements have a different order. + https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table + #} + {{ relation.get_iceberg_ddl_options(config.model.config) }} + {%- endif -%} + {%- set contract_config = config.get('contract') -%} {%- if contract_config.enforced -%} {{ get_assert_columns_equivalent(sql) }} @@ -44,14 +46,17 @@ {%- endif %} ); {% if cluster_by_string is not none and not temporary -%} - alter table {{relation}} cluster by ({{cluster_by_string}}); + alter {{ alter_prefix }} table {{relation}} cluster by ({{cluster_by_string}}); {%- endif -%} - {% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%} - alter table {{relation}} resume recluster; + {% if enable_automatic_clustering and cluster_by_string is not none and not temporary %} + alter {{ alter_prefix }} table {{relation}} resume recluster; {%- endif -%} {%- elif language == 'python' -%} - {{ py_write_table(compiled_code=compiled_code, target_relation=relation, table_type=table_type) }} + {%- if relation.is_iceberg_format %} + {% do exceptions.raise_compiler_error('Iceberg is incompatible with Python models. Please use a SQL model for the iceberg format.') %} + {%- endif %} + {{ py_write_table(compiled_code=compiled_code, target_relation=relation, table_type=relation.get_ddl_prefix_for_create(config.model.config, temporary)) }} {%- else -%} {% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %} {%- endif -%} diff --git a/tests/functional/iceberg/test_table_basic.py b/tests/functional/iceberg/test_table_basic.py new file mode 100644 index 000000000..0bfdf59f1 --- /dev/null +++ b/tests/functional/iceberg/test_table_basic.py @@ -0,0 +1,106 @@ +import pytest + +from pathlib import Path + +from dbt.tests.util import run_dbt, rm_file, write_file + +_MODEL_BASIC_TABLE_MODEL = """ +{{ + config( + materialized = "table", + cluster_by=['id'], + ) +}} +select 1 as id +""" + +_MODEL_BASIC_ICEBERG_MODEL = """ +{{ + config( + transient = "true", + materialized = "table", + cluster_by=['id'], + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", + ) +}} + +select * from {{ ref('first_table') }} +""" + +_MODEL_BUILT_ON_ICEBERG_TABLE = """ +{{ + config( + materialized = "table", + ) +}} +select * from {{ ref('iceberg_table') }} +""" + +_MODEL_TABLE_BEFORE_SWAP = """ +{{ + config( + materialized = "table", + ) +}} +select 1 as id +""" + +_MODEL_VIEW_BEFORE_SWAP = """ +select 1 as id +""" + +_MODEL_TABLE_FOR_SWAP_ICEBERG = """ +{{ + config( + materialized = "table", + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", + ) +}} +select 1 as id +""" + + +class TestIcebergTableBuilds: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @pytest.fixture(scope="class") + def models(self): + return { + "first_table.sql": _MODEL_BASIC_TABLE_MODEL, + "iceberg_table.sql": _MODEL_BASIC_ICEBERG_MODEL, + "table_built_on_iceberg_table.sql": _MODEL_BUILT_ON_ICEBERG_TABLE, + } + + def test_iceberg_tables_build_and_can_be_referred(self, project): + run_results = run_dbt() + assert len(run_results) == 3 + + +class TestIcebergTableTypeBuildsOnExistingTable: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @pytest.mark.parametrize("start_model", [_MODEL_TABLE_BEFORE_SWAP, _MODEL_VIEW_BEFORE_SWAP]) + def test_changing_model_types(self, project, start_model): + model_file = project.project_root / Path("models") / Path("my_model.sql") + + write_file(start_model, model_file) + run_results = run_dbt() + assert len(run_results) == 1 + + rm_file(model_file) + write_file(_MODEL_TABLE_FOR_SWAP_ICEBERG, model_file) + run_results = run_dbt() + assert len(run_results) == 1 + + rm_file(model_file) + write_file(start_model, model_file) + run_results = run_dbt() + assert len(run_results) == 1