-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adjust materialized view macros to dbt-core implementation of materia…
…lized_view materialization. Add new tests from dbt-tests-adapter
- Loading branch information
1 parent
efddcb2
commit 5b70b94
Showing
3 changed files
with
121 additions
and
68 deletions.
There are no files selected for viewing
93 changes: 25 additions & 68 deletions
93
dbt/include/trino/macros/materializations/materialized_view.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,78 +1,35 @@ | ||
{% materialization materialized_view, adapter="trino" %} | ||
|
||
{% set full_refresh_mode = (should_full_refresh()) %} | ||
|
||
{%- set target_relation = this %} | ||
{%- set existing_relation = load_relation(this) -%} | ||
|
||
{{ run_hooks(pre_hooks, inside_transaction=False) }} | ||
|
||
-- `BEGIN` happens here: | ||
{{ run_hooks(pre_hooks, inside_transaction=True) }} | ||
|
||
{% set to_drop = [] %} | ||
|
||
{% if existing_relation is none %} | ||
{{ log("No existing materialized view found, creating materialized view...", info=true) }} | ||
{%- set build_sql = create_materialized_view_as(target_relation) %} | ||
|
||
{% elif full_refresh_mode or existing_relation.type != "materialized_view" %} | ||
{{ log("Found a " ~ existing_relation.type ~ " with same name. Dropping it...", info=true) }} | ||
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} | ||
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} | ||
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} | ||
{% do adapter.drop_relation(backup_relation) %} | ||
|
||
{% do adapter.rename_relation(existing_relation, backup_relation) %} | ||
{%- set build_sql = create_materialized_view_as(target_relation) %} | ||
{% do to_drop.append(backup_relation) %} | ||
|
||
{% else %} | ||
{{ log("Refreshing materialized view '" ~ existing_relation.identifier ~ "'...", info=true) }} | ||
{%- set build_sql = refresh_materialized_view(target_relation) %} | ||
{% endif %} | ||
|
||
{% if build_sql %} | ||
|
||
{% call statement("main") %} | ||
{{ build_sql }} | ||
{% endcall %} | ||
|
||
{{ run_hooks(post_hooks, inside_transaction=True) }} | ||
|
||
{% do persist_docs(target_relation, model) %} | ||
|
||
-- `COMMIT` happens here | ||
{% do adapter.commit() %} | ||
|
||
{% else %} | ||
|
||
{{ store_result('main', 'SKIP') }} | ||
{%- macro trino__get_create_materialized_view_as_sql(target_relation, sql) -%} | ||
{%- set _properties = config.get('properties') -%} | ||
create materialized view {{ target_relation }} | ||
{{ properties(_properties) }} | ||
as | ||
{{ sql }} | ||
; | ||
{%- endmacro -%} | ||
|
||
{% endif %} | ||
|
||
{% for rel in to_drop %} | ||
{% do adapter.drop_relation(rel) %} | ||
{% endfor %} | ||
{% macro trino__get_replace_materialized_view_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) %} | ||
{{- trino__get_create_materialized_view_as_sql(intermediate_relation, sql) }} | ||
|
||
{{ run_hooks(post_hooks, inside_transaction=False) }} | ||
{% if existing_relation is not none %} | ||
{{ log("Found a " ~ existing_relation.type ~ " with same name. Will drop it", info=true) }} | ||
alter {{ existing_relation.type|replace("_", " ") }} {{ existing_relation }} rename to {{ backup_relation.include(database=False, schema=False) }}; | ||
{% endif %} | ||
|
||
{{ return({'relations': [target_relation]}) }} | ||
alter materialized view {{ intermediate_relation }} rename to {{ relation.include(database=False, schema=False) }}; | ||
|
||
{%- endmaterialization %} | ||
{% endmacro %} | ||
|
||
|
||
{%- macro create_materialized_view_as(target_relation) -%} | ||
{%- set _properties = config.get('properties') -%} | ||
create or replace materialized view {{ target_relation }} | ||
{{ properties(_properties) }} | ||
as | ||
{{ sql }} | ||
; | ||
{%- endmacro -%} | ||
{#-- Applying materialized view configuration changes via alter is not supported. --#} | ||
{#-- Return None, so `refresh_materialized_view` macro is invoked even --#} | ||
{#-- if materialized view configuration changes are made. --#} | ||
{#-- After configuration change, full refresh needs to be performed on mv. --#} | ||
{% macro trino__get_materialized_view_configuration_changes(existing_relation, new_config) %} | ||
{% do return(None) %} | ||
{% endmacro %} | ||
|
||
|
||
{%- macro refresh_materialized_view(target_relation) -%} | ||
{%- set sqlcode = "refresh materialized view " ~ target_relation %} | ||
{{ sqlcode }} | ||
{%- macro trino__refresh_materialized_view(relation) -%} | ||
refresh materialized view {{ relation }} | ||
{%- endmacro -%} |
65 changes: 65 additions & 0 deletions
65
tests/functional/adapter/materialized_view_tests/test_materialized_view_dbt_core.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
from typing import Optional, Tuple | ||
|
||
import pytest | ||
from dbt.adapters.base.relation import BaseRelation | ||
from dbt.tests.adapter.materialized_view.basic import MaterializedViewBasic | ||
from dbt.tests.util import get_model_file, run_dbt, run_sql_with_adapter, set_model_file | ||
|
||
from tests.functional.adapter.materialized_view_tests.utils import query_relation_type | ||
|
||
|
||
@pytest.mark.iceberg | ||
class TestTrinoMaterializedViewsBasic(MaterializedViewBasic): | ||
@staticmethod | ||
def insert_record(project, table: BaseRelation, record: Tuple[int, int]): | ||
my_id, value = record | ||
project.run_sql(f"insert into {table} (id, value) values ({my_id}, {value})") | ||
|
||
@staticmethod | ||
def refresh_materialized_view(project, materialized_view: BaseRelation): | ||
sql = f"refresh materialized view {materialized_view}" | ||
project.run_sql(sql) | ||
|
||
@staticmethod | ||
def query_row_count(project, relation: BaseRelation) -> int: | ||
sql = f"select count(*) from {relation}" | ||
return project.run_sql(sql, fetch="one")[0] | ||
|
||
@staticmethod | ||
def query_relation_type(project, relation: BaseRelation) -> Optional[str]: | ||
return query_relation_type(project, relation) | ||
|
||
@pytest.fixture(scope="function", autouse=True) | ||
def setup(self, project, my_materialized_view): | ||
run_dbt(["seed"]) | ||
run_dbt(["run", "--models", my_materialized_view.identifier, "--full-refresh"]) | ||
|
||
# the tests touch these files, store their contents in memory | ||
initial_model = get_model_file(project, my_materialized_view) | ||
|
||
yield | ||
|
||
# and then reset them after the test runs | ||
set_model_file(project, my_materialized_view, initial_model) | ||
|
||
# Drop materialized views first, then drop schema | ||
sql = "select * from system.metadata.materialized_views" | ||
results = run_sql_with_adapter(project.adapter, sql, fetch="all") | ||
for mv in results: | ||
project.run_sql(f"drop materialized view {mv[0]}.{mv[1]}.{mv[2]}") | ||
|
||
relation = project.adapter.Relation.create( | ||
database=project.database, schema=project.test_schema | ||
) | ||
project.adapter.drop_schema(relation) | ||
|
||
@pytest.mark.skip( | ||
reason=""" | ||
on iceberg: | ||
If the data is outdated, the materialized view behaves like a normal view, | ||
and the data is queried directly from the base tables. | ||
https://trino.io/docs/current/connector/iceberg.html#materialized-views | ||
""" | ||
) | ||
def test_materialized_view_only_updates_after_refresh(self): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from typing import Optional | ||
|
||
from dbt.adapters.base.relation import BaseRelation | ||
|
||
from dbt.adapters.trino.relation import TrinoRelation | ||
|
||
|
||
def query_relation_type(project, relation: BaseRelation) -> Optional[str]: | ||
assert isinstance(relation, TrinoRelation) | ||
sql = f""" | ||
select | ||
case when mv.name is not null then 'materialized_view' | ||
when t.table_type = 'BASE TABLE' then 'table' | ||
when t.table_type = 'VIEW' then 'view' | ||
else t.table_type | ||
end as table_type | ||
from {relation.information_schema()}.tables t | ||
left join system.metadata.materialized_views mv | ||
on mv.catalog_name = t.table_catalog and mv.schema_name = t.table_schema and mv.name = t.table_name | ||
where t.table_schema = '{relation.schema.lower()}' | ||
and (mv.catalog_name is null or mv.catalog_name = '{relation.database.lower()}') | ||
and (mv.schema_name is null or mv.schema_name = '{relation.schema.lower()}') | ||
and t.table_name = '{relation.identifier.lower()}' | ||
""" | ||
results = project.run_sql(sql, fetch="all") | ||
if len(results) == 0: | ||
return None | ||
elif len(results) > 1: | ||
raise ValueError(f"More than one instance of {relation.name} found!") | ||
else: | ||
return results[0][0] |