Skip to content

Commit

Permalink
Adjust materialized view macros to dbt-core implementation of materia…
Browse files Browse the repository at this point in the history
…lized_view materialization. Add new tests from dbt-tests-adapter
  • Loading branch information
damian3031 committed Jul 26, 2023
1 parent efddcb2 commit a07b6ed
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 68 deletions.
93 changes: 25 additions & 68 deletions dbt/include/trino/macros/materializations/materialized_view.sql
Original file line number Diff line number Diff line change
@@ -1,78 +1,35 @@
{% materialization materialized_view, adapter="trino" %}

{% set full_refresh_mode = (should_full_refresh()) %}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{% if existing_relation is none %}
{{ log("No existing materialized view found, creating materialized view...", info=true) }}
{%- set build_sql = create_materialized_view_as(target_relation) %}

{% elif full_refresh_mode or existing_relation.type != "materialized_view" %}
{{ log("Found a " ~ existing_relation.type ~ " with same name. Dropping it...", info=true) }}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
{% do adapter.drop_relation(backup_relation) %}

{% do adapter.rename_relation(existing_relation, backup_relation) %}
{%- set build_sql = create_materialized_view_as(target_relation) %}
{% do to_drop.append(backup_relation) %}

{% else %}
{{ log("Refreshing materialized view '" ~ existing_relation.identifier ~ "'...", info=true) }}
{%- set build_sql = refresh_materialized_view(target_relation) %}
{% endif %}

{% if build_sql %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{% do persist_docs(target_relation, model) %}

-- `COMMIT` happens here
{% do adapter.commit() %}

{% else %}

{{ store_result('main', 'SKIP') }}
{%- macro trino__get_create_materialized_view_as_sql(target_relation, sql) -%}
{%- set _properties = config.get('properties') -%}
create materialized view {{ target_relation }}
{{ properties(_properties) }}
as
{{ sql }}
;
{%- endmacro -%}

{% endif %}

{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}
{% macro trino__get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) %}
{{- trino__get_create_materialized_view_as_sql(intermediate_relation, sql) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% if existing_relation is not none %}
{{ log("Found a " ~ existing_relation.type ~ " with same name. Will drop it", info=true) }}
alter {{ existing_relation.type|replace("_", " ") }} {{ existing_relation }} rename to {{ backup_relation.include(database=False, schema=False) }};
{% endif %}

{{ return({'relations': [target_relation]}) }}
alter materialized view {{ intermediate_relation }} rename to {{ relation.include(database=False, schema=False) }};

{%- endmaterialization %}
{% endmacro %}


{%- macro create_materialized_view_as(target_relation) -%}
{%- set _properties = config.get('properties') -%}
create or replace materialized view {{ target_relation }}
{{ properties(_properties) }}
as
{{ sql }}
;
{%- endmacro -%}
{#-- Applying materialized view configuration changes via alter is not supported. --#}
{#-- Return None, so `refresh_materialized_view` macro is invoked even --#}
{#-- if materialized view configuration changes are made. --#}
{#-- After configuration change, full refresh needs to be performed on mv. --#}
{% macro trino__get_materialized_view_configuration_changes(existing_relation, new_config) %}
{% do return(None) %}
{% endmacro %}


{%- macro refresh_materialized_view(target_relation) -%}
{%- set sqlcode = "refresh materialized view " ~ target_relation %}
{{ sqlcode }}
{%- macro trino__refresh_materialized_view(relation) -%}
refresh materialized view {{ relation }}
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import Optional, Tuple

import pytest
from dbt.adapters.base.relation import BaseRelation
from dbt.tests.adapter.materialized_view.basic import MaterializedViewBasic
from dbt.tests.util import get_model_file, run_dbt, set_model_file

from tests.functional.adapter.materialized_view_tests.utils import query_relation_type


@pytest.mark.iceberg
class TestRedshiftMaterializedViewsBasic(MaterializedViewBasic):
@staticmethod
def insert_record(project, table: BaseRelation, record: Tuple[int, int]):
my_id, value = record
project.run_sql(f"insert into {table} (id, value) values ({my_id}, {value})")

@staticmethod
def refresh_materialized_view(project, materialized_view: BaseRelation):
sql = f"refresh materialized view {materialized_view}"
project.run_sql(sql)

@staticmethod
def query_row_count(project, relation: BaseRelation) -> int:
sql = f"select count(*) from {relation}"
return project.run_sql(sql, fetch="one")[0]

@staticmethod
def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
return query_relation_type(project, relation)

@pytest.fixture(scope="function", autouse=True)
def setup(self, project, my_materialized_view):
run_dbt(["seed"])
run_dbt(["run", "--models", my_materialized_view.identifier, "--full-refresh"])

# the tests touch these files, store their contents in memory
initial_model = get_model_file(project, my_materialized_view)

yield

# and then reset them after the test runs
set_model_file(project, my_materialized_view, initial_model)
# project.run_sql(f"drop schema if exists {project.test_schema} cascade")

relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
project.adapter.drop_schema(relation)

@pytest.mark.skip(
reason="""
on iceberg:
If the data is outdated, the materialized view behaves like a normal view,
and the data is queried directly from the base tables.
https://trino.io/docs/current/connector/iceberg.html#materialized-views
"""
)
def test_materialized_view_only_updates_after_refresh(self):
pass
31 changes: 31 additions & 0 deletions tests/functional/adapter/materialized_view_tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Optional

from dbt.adapters.base.relation import BaseRelation

from dbt.adapters.trino.relation import TrinoRelation


def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
assert isinstance(relation, TrinoRelation)
sql = f"""
select
case when mv.name is not null then 'materialized_view'
when t.table_type = 'BASE TABLE' then 'table'
when t.table_type = 'VIEW' then 'view'
else t.table_type
end as table_type
from {relation.information_schema()}.tables t
left join system.metadata.materialized_views mv
on mv.catalog_name = t.table_catalog and mv.schema_name = t.table_schema and mv.name = t.table_name
where t.table_schema = '{relation.schema.lower()}'
and (mv.catalog_name is null or mv.catalog_name = '{relation.database.lower()}')
and (mv.schema_name is null or mv.schema_name = '{relation.schema.lower()}')
and t.table_name = '{relation.identifier.lower()}'
"""
results = project.run_sql(sql, fetch="all")
if len(results) == 0:
return None
elif len(results) > 1:
raise ValueError(f"More than one instance of {relation.name} found!")
else:
return results[0][0]

0 comments on commit a07b6ed

Please sign in to comment.