diff --git a/.changes/unreleased/Fixes-20240703-140732.yaml b/.changes/unreleased/Fixes-20240703-140732.yaml new file mode 100644 index 00000000..fc78f5ae --- /dev/null +++ b/.changes/unreleased/Fixes-20240703-140732.yaml @@ -0,0 +1,8 @@ +kind: Fixes +body: Fix incremental materialization to correctly set the temporary relation to a + view or table +time: 2024-07-03T14:07:32.613562+02:00 +custom: + Author: damian3031 + Issue: "416" + PR: "418" diff --git a/dbt/include/trino/macros/materializations/incremental.sql b/dbt/include/trino/macros/materializations/incremental.sql index a8b2dec0..712d458d 100644 --- a/dbt/include/trino/macros/materializations/incremental.sql +++ b/dbt/include/trino/macros/materializations/incremental.sql @@ -20,16 +20,6 @@ {% materialization incremental, adapter='trino', supported_languages=['sql'] -%} - {#-- relations --#} - {%- set existing_relation = load_cached_relation(this) -%} - {%- set target_relation = this.incorporate(type='table') -%} - {#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#} - {%- set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) -%} - {%- set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) -%} - {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} - {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} - {#-- configs --#} {%- set unique_key = config.get('unique_key') -%} {%- set full_refresh_mode = (should_full_refresh()) -%} @@ -41,6 +31,20 @@ {% do log(log_message) %} {%- set on_table_exists = 'rename' -%} {% endif %} + {#-- Get the incremental_strategy and the macro to use for the strategy --#} + {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} + {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} + + {#-- relations --#} + {%- set existing_relation = load_cached_relation(this) -%} + {%- set target_relation = this.incorporate(type='table') -%} + {#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#} + {%- set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) -%} + {%- set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) -%} + {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} + {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} {#-- the temp_ and backup_ relation should not already exist in the database; get_relation -- will return None in that case. Otherwise, we get a relation that we can drop @@ -96,12 +100,8 @@ {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} - {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} - {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} - {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} - {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} + {#-- Build the sql --#} {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} - {%- call statement('main') -%} {{ strategy_sql_macro_func(strategy_arg_dict) }} {%- endcall -%} diff --git a/tests/functional/adapter/materialization/test_incremental_views_enabled.py b/tests/functional/adapter/materialization/test_incremental_views_enabled.py new file mode 100644 index 00000000..65cd4375 --- /dev/null +++ b/tests/functional/adapter/materialization/test_incremental_views_enabled.py @@ -0,0 +1,87 @@ +import pytest +from dbt.tests.util import run_dbt, run_dbt_and_capture + +from tests.functional.adapter.materialization.fixtures import model_sql, seed_csv + + +class BaseViewsEnabled: + # everything that goes in the "seeds" directory + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + # everything that goes in the "models" directory + @pytest.fixture(scope="class") + def models(self): + return { + "materialization.sql": model_sql, + } + + +class TestViewsEnabledTrue(BaseViewsEnabled): + """ + Testing without views_enabled config specified, which defaults to views_enabled = True configuration + """ + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "views_enabled_true", + "models": {"+materialized": "incremental"}, + "seeds": { + "+column_types": {"some_date": "timestamp(6)"}, + }, + } + + # The actual sequence of dbt commands and assertions + # pytest will take care of all "setup" + "teardown" + def test_run_seed_test(self, project): + # seed seeds + results = run_dbt(["seed"], expect_pass=True) + assert len(results) == 1 + + results = run_dbt(["run"], expect_pass=True) + assert len(results) == 1 + + results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True) + assert len(results) == 1 + assert ( + f'''create or replace view + "{project.database}"."{project.test_schema}"."materialization__dbt_tmp"''' + in logs + ) + + +class TestViewsEnabledFalse(BaseViewsEnabled): + """ + Testing views_enabled = False configuration for incremental materialization + """ + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "views_enabled_false", + "models": {"+materialized": "incremental", "+views_enabled": False}, + "seeds": { + "+column_types": {"some_date": "timestamp(6)"}, + }, + } + + # The actual sequence of dbt commands and assertions + # pytest will take care of all "setup" + "teardown" + def test_run_seed_test(self, project): + # seed seeds + results = run_dbt(["seed"], expect_pass=True) + assert len(results) == 1 + + results = run_dbt(["run"], expect_pass=True) + assert len(results) == 1 + + results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True) + assert len(results) == 1 + assert ( + f'create table "{project.database}"."{project.test_schema}"."materialization__dbt_tmp"' + in logs + )