From a07b6ed2c1fff23fc65b36b6257977556a473e5d Mon Sep 17 00:00:00 2001 From: Damian Owsianny Date: Tue, 25 Jul 2023 11:41:29 +0200 Subject: [PATCH] Adjust materialized view macros to dbt-core implementation of materialized_view materialization. Add new tests from dbt-tests-adapter --- .../materializations/materialized_view.sql | 93 +++++-------------- .../test_materialized_view_dbt_core.py | 60 ++++++++++++ .../adapter/materialized_view_tests/utils.py | 31 +++++++ 3 files changed, 116 insertions(+), 68 deletions(-) create mode 100644 tests/functional/adapter/materialized_view_tests/test_materialized_view_dbt_core.py create mode 100644 tests/functional/adapter/materialized_view_tests/utils.py diff --git a/dbt/include/trino/macros/materializations/materialized_view.sql b/dbt/include/trino/macros/materializations/materialized_view.sql index 69527166..897b810a 100644 --- a/dbt/include/trino/macros/materializations/materialized_view.sql +++ b/dbt/include/trino/macros/materializations/materialized_view.sql @@ -1,78 +1,35 @@ -{% materialization materialized_view, adapter="trino" %} - - {% set full_refresh_mode = (should_full_refresh()) %} - - {%- set target_relation = this %} - {%- set existing_relation = load_relation(this) -%} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - {% set to_drop = [] %} - - {% if existing_relation is none %} - {{ log("No existing materialized view found, creating materialized view...", info=true) }} - {%- set build_sql = create_materialized_view_as(target_relation) %} - - {% elif full_refresh_mode or existing_relation.type != "materialized_view" %} - {{ log("Found a " ~ existing_relation.type ~ " with same name. Dropping it...", info=true) }} - {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - {% do adapter.drop_relation(backup_relation) %} - - {% do adapter.rename_relation(existing_relation, backup_relation) %} - {%- set build_sql = create_materialized_view_as(target_relation) %} - {% do to_drop.append(backup_relation) %} - - {% else %} - {{ log("Refreshing materialized view '" ~ existing_relation.identifier ~ "'...", info=true) }} - {%- set build_sql = refresh_materialized_view(target_relation) %} - {% endif %} - - {% if build_sql %} - - {% call statement("main") %} - {{ build_sql }} - {% endcall %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - {% do persist_docs(target_relation, model) %} - - -- `COMMIT` happens here - {% do adapter.commit() %} - - {% else %} - - {{ store_result('main', 'SKIP') }} +{%- macro trino__get_create_materialized_view_as_sql(target_relation, sql) -%} + {%- set _properties = config.get('properties') -%} + create materialized view {{ target_relation }} + {{ properties(_properties) }} + as + {{ sql }} + ; +{%- endmacro -%} - {% endif %} - {% for rel in to_drop %} - {% do adapter.drop_relation(rel) %} - {% endfor %} +{% macro trino__get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) %} + {{- trino__get_create_materialized_view_as_sql(intermediate_relation, sql) }} - {{ run_hooks(post_hooks, inside_transaction=False) }} + {% if existing_relation is not none %} + {{ log("Found a " ~ existing_relation.type ~ " with same name. Will drop it", info=true) }} + alter {{ existing_relation.type|replace("_", " ") }} {{ existing_relation }} rename to {{ backup_relation.include(database=False, schema=False) }}; + {% endif %} - {{ return({'relations': [target_relation]}) }} + alter materialized view {{ intermediate_relation }} rename to {{ relation.include(database=False, schema=False) }}; -{%- endmaterialization %} +{% endmacro %} -{%- macro create_materialized_view_as(target_relation) -%} - {%- set _properties = config.get('properties') -%} - create or replace materialized view {{ target_relation }} - {{ properties(_properties) }} - as - {{ sql }} - ; -{%- endmacro -%} +{#-- Applying materialized view configuration changes via alter is not supported. --#} +{#-- Return None, so `refresh_materialized_view` macro is invoked even --#} +{#-- if materialized view configuration changes are made. --#} +{#-- After configuration change, full refresh needs to be performed on mv. --#} +{% macro trino__get_materialized_view_configuration_changes(existing_relation, new_config) %} + {% do return(None) %} +{% endmacro %} -{%- macro refresh_materialized_view(target_relation) -%} - {%- set sqlcode = "refresh materialized view " ~ target_relation %} - {{ sqlcode }} +{%- macro trino__refresh_materialized_view(relation) -%} + refresh materialized view {{ relation }} {%- endmacro -%} diff --git a/tests/functional/adapter/materialized_view_tests/test_materialized_view_dbt_core.py b/tests/functional/adapter/materialized_view_tests/test_materialized_view_dbt_core.py new file mode 100644 index 00000000..b42e73a0 --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/test_materialized_view_dbt_core.py @@ -0,0 +1,60 @@ +from typing import Optional, Tuple + +import pytest +from dbt.adapters.base.relation import BaseRelation +from dbt.tests.adapter.materialized_view.basic import MaterializedViewBasic +from dbt.tests.util import get_model_file, run_dbt, set_model_file + +from tests.functional.adapter.materialized_view_tests.utils import query_relation_type + + +@pytest.mark.iceberg +class TestRedshiftMaterializedViewsBasic(MaterializedViewBasic): + @staticmethod + def insert_record(project, table: BaseRelation, record: Tuple[int, int]): + my_id, value = record + project.run_sql(f"insert into {table} (id, value) values ({my_id}, {value})") + + @staticmethod + def refresh_materialized_view(project, materialized_view: BaseRelation): + sql = f"refresh materialized view {materialized_view}" + project.run_sql(sql) + + @staticmethod + def query_row_count(project, relation: BaseRelation) -> int: + sql = f"select count(*) from {relation}" + return project.run_sql(sql, fetch="one")[0] + + @staticmethod + def query_relation_type(project, relation: BaseRelation) -> Optional[str]: + return query_relation_type(project, relation) + + @pytest.fixture(scope="function", autouse=True) + def setup(self, project, my_materialized_view): + run_dbt(["seed"]) + run_dbt(["run", "--models", my_materialized_view.identifier, "--full-refresh"]) + + # the tests touch these files, store their contents in memory + initial_model = get_model_file(project, my_materialized_view) + + yield + + # and then reset them after the test runs + set_model_file(project, my_materialized_view, initial_model) + # project.run_sql(f"drop schema if exists {project.test_schema} cascade") + + relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + project.adapter.drop_schema(relation) + + @pytest.mark.skip( + reason=""" + on iceberg: + If the data is outdated, the materialized view behaves like a normal view, + and the data is queried directly from the base tables. + https://trino.io/docs/current/connector/iceberg.html#materialized-views + """ + ) + def test_materialized_view_only_updates_after_refresh(self): + pass diff --git a/tests/functional/adapter/materialized_view_tests/utils.py b/tests/functional/adapter/materialized_view_tests/utils.py new file mode 100644 index 00000000..e15a3429 --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/utils.py @@ -0,0 +1,31 @@ +from typing import Optional + +from dbt.adapters.base.relation import BaseRelation + +from dbt.adapters.trino.relation import TrinoRelation + + +def query_relation_type(project, relation: BaseRelation) -> Optional[str]: + assert isinstance(relation, TrinoRelation) + sql = f""" + select + case when mv.name is not null then 'materialized_view' + when t.table_type = 'BASE TABLE' then 'table' + when t.table_type = 'VIEW' then 'view' + else t.table_type + end as table_type + from {relation.information_schema()}.tables t + left join system.metadata.materialized_views mv + on mv.catalog_name = t.table_catalog and mv.schema_name = t.table_schema and mv.name = t.table_name + where t.table_schema = '{relation.schema.lower()}' + and (mv.catalog_name is null or mv.catalog_name = '{relation.database.lower()}') + and (mv.schema_name is null or mv.schema_name = '{relation.schema.lower()}') + and t.table_name = '{relation.identifier.lower()}' + """ + results = project.run_sql(sql, fetch="all") + if len(results) == 0: + return None + elif len(results) > 1: + raise ValueError(f"More than one instance of {relation.name} found!") + else: + return results[0][0]