Skip to content

Commit

Permalink
Add support for Iceberg Table Materialization (#1170)
Browse files Browse the repository at this point in the history
* Add materializations of table and dynamic table.

* Add the method to tell something is iceberg format and pipe that through to relation object

* Finish create macro and fix alters.

* Finish todo items and begin cleaning code.

* revert dynamic table changes.

* Fix the drop by fixing snowflake__show_iceberg_relations

* Transient needs sophisticated handling based on what user specifies for
transient manually.

* Try to figure out what the right None semantics are.

* Revert to original statement.

* Fix the transient behavior by passing table_type again.

* Rename object_format config param to table_format

* Migrate Jinja macros to Python.

* All classes are frozen

* Clean up the metadata queries that power is_iceberg column generation

* Fix Python models generation argument

* Add changelog.

* Try to fix duplication of join record issues.

* Use the RelationConfig protocol for type checking.

* Fix transient semantics.

* Add functional tests.

* Fix test.

* Fix test.

* Fix test and remove strip calls

* Add view test case.

* Code review comments.

* I'm using too new a version of mypy for Self.

* Add a behavior flag for iceberg table materialization.

* Flip order of flag.

* Adjust test.

---------

Co-authored-by: Mila Page <[email protected]>
  • Loading branch information
VersusFacit and VersusFacit authored Sep 12, 2024
1 parent 3fbc074 commit 49623d7
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 36 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240911-001806.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for Iceberg table materializations.
time: 2024-09-11T00:18:06.780586-07:00
custom:
Author: versusfacit
Issue: "321"
41 changes: 36 additions & 5 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
LIST_SCHEMAS_MACRO_NAME,
LIST_RELATIONS_MACRO_NAME,
)
from dbt_common.behavior_flags import BehaviorFlag
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.contracts.metadata import (
TableMetadata,
Expand All @@ -20,7 +21,10 @@
from dbt_common.exceptions import CompilationError, DbtDatabaseError, DbtRuntimeError
from dbt_common.utils import filter_null_values

from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType
from dbt.adapters.snowflake.relation_configs import (
SnowflakeRelationType,
TableFormat,
)
from dbt.adapters.snowflake import SnowflakeColumn
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
Expand All @@ -44,6 +48,11 @@ class SnowflakeConfig(AdapterConfig):
merge_update_columns: Optional[str] = None
target_lag: Optional[str] = None

# extended formats
table_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_subpath: Optional[str] = None


class SnowflakeAdapter(SQLAdapter):
Relation = SnowflakeRelation
Expand All @@ -69,6 +78,10 @@ class SnowflakeAdapter(SQLAdapter):
}
)

@property
def _behavior_flags(self) -> List[BehaviorFlag]:
return [{"name": "enable_iceberg_materializations", "default": False}]

@classmethod
def date_function(cls):
return "CURRENT_TIMESTAMP()"
Expand Down Expand Up @@ -223,8 +236,9 @@ def list_relations_without_caching(
self, schema_relation: SnowflakeRelation
) -> List[SnowflakeRelation]:
kwargs = {"schema_relation": schema_relation}

try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
schema_objects = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except DbtDatabaseError as exc:
# if the schema doesn't exist, we just want to return.
# Alternatively, we could query the list of schemas before we start
Expand All @@ -235,18 +249,26 @@ def list_relations_without_caching(

# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
columns = ["database_name", "schema_name", "name", "kind"]
if "is_dynamic" in results.column_names:
if "is_dynamic" in schema_objects.column_names:
columns.append("is_dynamic")
if "is_iceberg" in schema_objects.column_names:
columns.append("is_iceberg")

return [self._parse_list_relations_result(result) for result in results.select(columns)]
return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)]

def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
# this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects
try:
database, schema, identifier, relation_type, is_dynamic = result
if self.behavior.enable_iceberg_materializations.no_warn:
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result
else:
database, schema, identifier, relation_type, is_dynamic = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"
if self.behavior.enable_iceberg_materializations.no_warn:
is_iceberg = "N"

try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
Expand All @@ -256,12 +278,21 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation
if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

# This line is the main gate on supporting Iceberg materializations. Pass forward a default
# table format, and no downstream table macros can build iceberg relations.
table_format: str = (
TableFormat.ICEBERG
if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES")
else TableFormat.DEFAULT
)
quote_policy = {"database": True, "schema": True, "identifier": True}

return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
table_format=table_format,
quote_policy=quote_policy,
)

Expand Down
132 changes: 131 additions & 1 deletion dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import textwrap

from dataclasses import dataclass, field
from typing import FrozenSet, Optional, Type
from typing import FrozenSet, Optional, Type, Iterator, Tuple


from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.relation import ComponentName, RelationConfig
from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.events.functions import fire_event, warn_or_error

from dbt.adapters.snowflake.relation_configs import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
TableFormat,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
Expand All @@ -25,6 +31,7 @@
@dataclass(frozen=True, eq=False, repr=False)
class SnowflakeRelation(BaseRelation):
type: Optional[SnowflakeRelationType] = None
table_format: str = TableFormat.DEFAULT
quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy())
require_alias: bool = False
relation_configs = {
Expand Down Expand Up @@ -53,6 +60,10 @@ class SnowflakeRelation(BaseRelation):
def is_dynamic_table(self) -> bool:
return self.type == SnowflakeRelationType.DynamicTable

@property
def is_iceberg_format(self) -> bool:
return self.table_format == TableFormat.ICEBERG

@classproperty
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)
Expand Down Expand Up @@ -120,3 +131,122 @@ def as_case_sensitive(self) -> "SnowflakeRelation":
path_part_map[path] = part.upper()

return self.replace_path(**path_part_map)

def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str:
"""
This macro renders the appropriate DDL prefix during the create_table_as
macro. It decides based on mutually exclusive table configuration options:
- TEMPORARY: Indicates a table that exists only for the duration of the session.
- ICEBERG: A specific storage format that requires a distinct DDL layout.
- TRANSIENT: A table similar to a permanent table but without fail-safe.
Additional Caveats for Iceberg models:
- transient=true throws a warning because Iceberg does not support transient tables
- A temporary relation is never an Iceberg relation because Iceberg does not
support temporary relations.
"""

transient_explicitly_set_true: bool = config.get("transient", False)

# Temporary tables are a Snowflake feature that do not exist in the
# Iceberg framework. We ignore the Iceberg status of the model.
if temporary:
return "temporary"
elif self.is_iceberg_format:
# Log a warning that transient=true on an Iceberg relation is ignored.
if transient_explicitly_set_true:
warn_or_error(
AdapterEventWarning(
base_msg=(
"Iceberg format relations cannot be transient. Please "
"remove either the transient or iceberg config options "
f"from {self.path.database}.{self.path.schema}."
f"{self.path.identifier}. If left unmodified, dbt will "
"ignore 'transient'."
)
)
)

return "iceberg"

# Always supply transient on table create DDL unless user specifically sets
# transient to false or unset. Might as well update the object attribute too!
elif transient_explicitly_set_true or config.get("transient", True):
return "transient"
else:
return ""

def get_ddl_prefix_for_alter(self) -> str:
"""All ALTER statements on Iceberg tables require an ICEBERG prefix"""
if self.is_iceberg_format:
return "iceberg"
else:
return ""

def get_iceberg_ddl_options(self, config: RelationConfig) -> str:
base_location: str = f"_dbt/{self.schema}/{self.name}"

if subpath := config.get("base_location_subpath"):
base_location += f"/{subpath}"

iceberg_ddl_predicates: str = f"""
external_volume = '{config.get('external_volume')}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)

def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]:
drop_view_message: str = (
f"Dropping relation {old_relation} because it is a view and target relation {self} "
f"is of type {self.type}."
)

drop_table_for_iceberg_message: str = (
f"Dropping relation {old_relation} because it is a default format table "
f"and target relation {self} is an Iceberg format table."
)

drop_iceberg_for_table_message: str = (
f"Dropping relation {old_relation} because it is an Iceberg format table "
f"and target relation {self} is a default format table."
)

# An existing view must be dropped for model to build into a table".
yield (not old_relation.is_table, drop_view_message)
# An existing table must be dropped for model to build into an Iceberg table.
yield (
old_relation.is_table
and not old_relation.is_iceberg_format
and self.is_iceberg_format,
drop_table_for_iceberg_message,
)
# existing Iceberg table must be dropped for model to build into a table.
yield (
old_relation.is_table
and old_relation.is_iceberg_format
and not self.is_iceberg_format,
drop_iceberg_for_table_message,
)

def needs_to_drop(self, old_relation: Optional["SnowflakeRelation"]) -> bool:
"""
To convert between Iceberg and non-Iceberg relations, a preemptive drop is
required.
drops cause latency, but it should be a relatively infrequent occurrence.
Some Boolean expression below are logically redundant, but this is done for easier
readability.
"""

if old_relation is None:
return False

for condition, message in self.__drop_conditions(old_relation):
if condition:
fire_event(AdapterEventDebug(base_msg=message))
return True

return False
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
14 changes: 14 additions & 0 deletions dbt/adapters/snowflake/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11


class TableFormat(StrEnum):
"""
Snowflake docs refers to this an 'Object Format.'
Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here.
"""

DEFAULT = "default"
ICEBERG = "iceberg"

def __str__(self):
return self.value
27 changes: 18 additions & 9 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,24 @@
{% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %}

{%- set max_total_results = max_results_per_iter * max_iter -%}
{% if schema_relation is string %}
{%- set sql -%}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }}
{%- endset -%}
{% else %}
{%- set sql -%}
show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }}
{%- endset -%}
{% endif -%}
{%- set sql -%}
{% if schema_relation is string %}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }};
{% else %}
show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }};
{% endif -%}

{# -- Gated for performance reason. If you don't want Iceberg, you shouldn't pay the
-- latency penalty. #}
{% if adapter.behavior.enable_iceberg_materializations.no_warn %}
select all_objects.*, is_iceberg as "is_iceberg"
from table(result_scan(last_query_id(-1))) all_objects
left join INFORMATION_SCHEMA.tables as all_tables
on all_tables.table_name = all_objects."name"
and all_tables.table_schema = all_objects."schema_name"
and all_tables.table_catalog = all_objects."database_name"
{% endif -%}
{%- endset -%}

{%- set result = run_query(sql) -%}

Expand Down
15 changes: 8 additions & 7 deletions dbt/include/snowflake/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
{% set grant_config = config.get('grants') %}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type='table',
table_format=config.get('table_format', 'default')
) -%}

{{ run_hooks(pre_hooks) }}

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{% if target_relation.needs_to_drop(old_relation) %}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}

Expand Down
Loading

0 comments on commit 49623d7

Please sign in to comment.