Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incremental materialization to correctly set the temporary relati… #418

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Fixes-20240703-140732.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Fixes
body: Fix incremental materialization to correctly set the temporary relation to a
view or table
time: 2024-07-03T14:07:32.613562+02:00
custom:
Author: damian3031
Issue: "416"
PR: "418"
30 changes: 15 additions & 15 deletions dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,6 @@

{% materialization incremental, adapter='trino', supported_languages=['sql'] -%}

{#-- relations --#}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) -%}
{%- set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) -%}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

{#-- configs --#}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
Expand All @@ -41,6 +31,20 @@
{% do log(log_message) %}
{%- set on_table_exists = 'rename' -%}
{% endif %}
{#-- Get the incremental_strategy and the macro to use for the strategy --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}

{#-- relations --#}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{#-- The temp relation will be a view (faster) or temp table, depending on upsert/merge strategy --#}
{%- set tmp_relation_type = get_incremental_tmp_relation_type(incremental_strategy, unique_key, language) -%}
{%- set tmp_relation = make_temp_relation(this).incorporate(type=tmp_relation_type) -%}
{%- set intermediate_relation = make_intermediate_relation(target_relation) -%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

{#-- the temp_ and backup_ relation should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
Expand Down Expand Up @@ -96,12 +100,8 @@
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{#-- Build the sql --#}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}

{%- call statement('main') -%}
{{ strategy_sql_macro_func(strategy_arg_dict) }}
{%- endcall -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture

from tests.functional.adapter.materialization.fixtures import model_sql, seed_csv


class BaseViewsEnabled:
# everything that goes in the "seeds" directory
@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

# everything that goes in the "models" directory
@pytest.fixture(scope="class")
def models(self):
return {
"materialization.sql": model_sql,
}


class TestViewsEnabledTrue(BaseViewsEnabled):
"""
Testing without views_enabled config specified, which defaults to views_enabled = True configuration
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "views_enabled_true",
"models": {"+materialized": "incremental"},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1

results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1

results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert (
f'''create or replace view
"{project.database}"."{project.test_schema}"."materialization__dbt_tmp"'''
in logs
)


class TestViewsEnabledFalse(BaseViewsEnabled):
"""
Testing views_enabled = False configuration for incremental materialization
"""

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"name": "views_enabled_false",
"models": {"+materialized": "incremental", "+views_enabled": False},
"seeds": {
"+column_types": {"some_date": "timestamp(6)"},
},
}

# The actual sequence of dbt commands and assertions
# pytest will take care of all "setup" + "teardown"
def test_run_seed_test(self, project):
# seed seeds
results = run_dbt(["seed"], expect_pass=True)
assert len(results) == 1

results = run_dbt(["run"], expect_pass=True)
assert len(results) == 1

results, logs = run_dbt_and_capture(["--debug", "run"], expect_pass=True)
assert len(results) == 1
assert (
f'create table "{project.database}"."{project.test_schema}"."materialization__dbt_tmp"'
in logs
)
Loading