Skip to content

Commit

Permalink
Fix incremental materialization to correctly set the temporary relati…
Browse files Browse the repository at this point in the history
…on to a view or table
  • Loading branch information
damian3031 committed Jul 3, 2024
1 parent 15a4398 commit ec6f35d
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 15 deletions.
30 changes: 15 additions & 15 deletions dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,6 @@

{% materialization incremental, adapter='trino', supported_languages=['sql'] -%}

{#-- relations --#}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) -%}
{%- set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) -%}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

{#-- configs --#}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
Expand All @@ -41,6 +31,20 @@
{% do log(log_message) %}
{%- set on_table_exists = 'rename' -%}
{% endif %}
{#-- Get the incremental_strategy and the macro to use for the strategy --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}

{#-- relations --#}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) -%}
{%- set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) -%}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

{#-- the temp_ and backup_ relation should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
Expand Down Expand Up @@ -96,12 +100,8 @@
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{#-- Build the sql --#}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}

{%- call statement('main') -%}
{{ strategy_sql_macro_func(strategy_arg_dict) }}
{%- endcall -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture

from tests.functional.adapter.materialization.fixtures import model_sql, seed_csv


class BaseViewsEnabled:
# everything that goes in the "seeds" directory
@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

# everything that goes in the "models" directory
@pytest.fixture(scope="class")
def models(self):
return {
"materialization.sql": model_sql,
}


class TestViewsEnabledTrue(BaseViewsEnabled):
"""
Testing without views_enabled config specified, which defaults to views_enabled = True configuration
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "views_enabled_true",
"models": {"+materialized": "incremental"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1

results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1

results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert (
f'''create or replace view
"{project.database}"."{project.test_schema}"."materialization__dbt_tmp"'''
in logs
)


class TestViewsEnabledFalse(BaseViewsEnabled):
"""
Testing views_enabled = False configuration for incremental materialization
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "views_enabled_false",
"models": {"+materialized": "incremental", "+views_enabled": False},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1

results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1

results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert (
f'create table "{project.database}"."{project.test_schema}"."materialization__dbt_tmp"'
in logs
)

0 comments on commit ec6f35d

Please sign in to comment.