From 8970aaafd840197416e8b19d87eb017226abf96c Mon Sep 17 00:00:00 2001 From: Peter Webb Date: Wed, 11 Oct 2023 22:09:25 -0400 Subject: [PATCH] Implementation of metadata-based freshness (#796) * Draft implementation of metadata-based freshness * Add test of metadata-based freshness mechanism * Strengthen test case. * Fix some overzealous escaping. * Simplifications per review * Update for core code review changes * Repoint to main core branch before merge * Temporarily skip test * test using custom schema for test * use specific schema env var * use specific schema env var * cleanup env var * skip test_get_last_relation_modified * remove extra whitespace * remove Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full) for CI troubleshooting * remove metadata.sql for CI troubleshooting * remove test_get_last_relation_modified.py for CI troubleshooting * add non-test files back for CI troubleshooting * add test files back for CI troubleshooting * remove run order dependency from test cases within their test class --------- Co-authored-by: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Co-authored-by: Colin Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Co-authored-by: Mike Alfare --- .../unreleased/Features-20231008-195410.yaml | 6 ++ dbt/adapters/snowflake/impl.py | 7 ++- dbt/include/snowflake/macros/metadata.sql | 19 ++++++ .../test_get_last_relation_modified.py | 61 +++++++++++++++++++ .../test_list_relations_without_caching.py | 27 ++++++-- 5 files changed, 113 insertions(+), 7 deletions(-) create mode 100644 .changes/unreleased/Features-20231008-195410.yaml create mode 100644 dbt/include/snowflake/macros/metadata.sql create mode 100644 tests/functional/adapter/test_get_last_relation_modified.py diff --git a/.changes/unreleased/Features-20231008-195410.yaml b/.changes/unreleased/Features-20231008-195410.yaml new file mode 100644 index 000000000..fac7167e0 --- /dev/null +++ b/.changes/unreleased/Features-20231008-195410.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for checking table-last-modified by metadata +time: 2023-10-08T19:54:10.503476-04:00 +custom: + Author: peterallenwebb + Issue: "785" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 3f8124a9a..6f71fec1a 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -50,8 +50,11 @@ class SnowflakeAdapter(SQLAdapter): ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED, } - _capabilities = CapabilityDict( - {Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)} + _capabilities: CapabilityDict = CapabilityDict( + { + Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full), + Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + } ) @classmethod diff --git a/dbt/include/snowflake/macros/metadata.sql b/dbt/include/snowflake/macros/metadata.sql new file mode 100644 index 000000000..667082fe2 --- /dev/null +++ b/dbt/include/snowflake/macros/metadata.sql @@ -0,0 +1,19 @@ +{% macro snowflake__get_relation_last_modified(information_schema, relations) -%} + + {%- call statement('last_modified', fetch_result=True) -%} + select table_schema as schema, + table_name as identifier, + last_altered as last_modified, + {{ current_timestamp() }} as snapshotted_at + from {{ information_schema }}.tables + where ( + {%- for relation in relations -%} + (upper(table_schema) = upper('{{ relation.schema }}') and + upper(table_name) = upper('{{ relation.identifier }}')){%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + {%- endcall -%} + + {{ return(load_result('last_modified')) }} + +{% endmacro %} diff --git a/tests/functional/adapter/test_get_last_relation_modified.py b/tests/functional/adapter/test_get_last_relation_modified.py new file mode 100644 index 000000000..60547cd49 --- /dev/null +++ b/tests/functional/adapter/test_get_last_relation_modified.py @@ -0,0 +1,61 @@ +import os +import pytest + +from dbt.cli.main import dbtRunner + + +freshness_via_metadata_schema_yml = """version: 2 +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_table +""" + + +class TestGetLastRelationModified: + @pytest.fixture(scope="class", autouse=True) + def set_env_vars(self, project): + os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema + yield + del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": freshness_via_metadata_schema_yml} + + @pytest.fixture(scope="class") + def custom_schema(self, project, set_env_vars): + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + ) + project.adapter.drop_schema(relation) + project.adapter.create_schema(relation) + + yield relation.schema + + with project.adapter.connection_named("__test"): + project.adapter.drop_schema(relation) + + @pytest.mark.skip() + def test_get_last_relation_modified(self, project, set_env_vars, custom_schema): + project.run_sql( + f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);" + ) + + warning_or_error = False + + def probe(e): + nonlocal warning_or_error + if e.info.level in ["warning", "error"]: + warning_or_error = True + + runner = dbtRunner(callbacks=[probe]) + runner.invoke(["source", "freshness"]) + + # The 'source freshness' command should succeed without warnings or errors. + assert not warning_or_error diff --git a/tests/functional/adapter/test_list_relations_without_caching.py b/tests/functional/adapter/test_list_relations_without_caching.py index dd1d03b78..f6dfc2144 100644 --- a/tests/functional/adapter/test_list_relations_without_caching.py +++ b/tests/functional/adapter/test_list_relations_without_caching.py @@ -75,7 +75,7 @@ def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name): ) -class TestListRelationsWithoutCaching: +class TestListRelationsWithoutCachingSingle: @pytest.fixture(scope="class") def models(self): my_models = {"my_model_base.sql": TABLE_BASE_SQL} @@ -88,7 +88,6 @@ def models(self): def macros(self): return { "validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING, - "validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR, } def test__snowflake__list_relations_without_caching_termination(self, project): @@ -96,8 +95,7 @@ def test__snowflake__list_relations_without_caching_termination(self, project): validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching macro when there are fewer than max_results_per_iter relations in the target schema """ - - _ = run_dbt(["run", "-s", "my_model_base"]) + run_dbt(["run", "-s", "my_model_base"]) database = project.database schemas = project.created_schemas @@ -121,6 +119,23 @@ def test__snowflake__list_relations_without_caching_termination(self, project): assert n_relations == "n_relations: 1" + +class TestListRelationsWithoutCachingFull: + @pytest.fixture(scope="class") + def models(self): + my_models = {"my_model_base.sql": TABLE_BASE_SQL} + for view in range(0, NUM_VIEWS): + my_models.update({f"my_model_{view}.sql": VIEW_X_SQL}) + + return my_models + + @pytest.fixture(scope="class") + def macros(self): + return { + "validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING, + "validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR, + } + def test__snowflake__list_relations_without_caching(self, project): """ validates pagination logic in snowflake__list_relations_without_caching macro counts @@ -128,7 +143,7 @@ def test__snowflake__list_relations_without_caching(self, project): calls of SHOW TERSE OBJECTS. """ # purpose of the first run is to create the replicated views in the target schema - _ = run_dbt(["run", "--exclude", "my_model_base"]) + run_dbt(["run"]) database = project.database schemas = project.created_schemas @@ -157,6 +172,8 @@ def test__snowflake__list_relations_without_caching_raise_error(self, project): validates pagination logic terminates and raises a compilation error when exceeding the limit of how many results to return. """ + run_dbt(["run"]) + database = project.database schemas = project.created_schemas