Skip to content

Commit

Permalink
make list relations configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
mikealfare committed Nov 7, 2024
1 parent 01dbd70 commit 753c638
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 177 deletions.
5 changes: 4 additions & 1 deletion dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@

{% endmacro %}

{% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %}
{% macro snowflake__list_relations_without_caching(schema_relation) %}

{%- set max_results_per_iter = adapter.config.flags.get('list_relations_per_iteration', 10000) -%}
{%- set max_iter = adapter.config.flags.get('list_relations_iteration_limit', 10) -%}

{%- set max_total_results = max_results_per_iter * max_iter -%}
{%- set sql -%}
Expand Down
243 changes: 67 additions & 176 deletions tests/functional/adapter/list_relations_tests/test_pagination.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
import os

import pytest
import json
from dbt.tests.util import run_dbt, run_dbt_and_capture
from dbt.adapters.snowflake import SnowflakeRelation # Ensure this is the correct import path

# Testing rationale:
# - snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call
# - when dbt attempts to write into a schema with more than 10K objects, compilation will fail
# unless we paginate the result
# - however, testing this process is difficult at a full scale of 10K actual objects populated
# into a fresh testing schema
# - accordingly, we create a smaller set of views and test the looping iteration logic in
# smaller chunks

NUM_VIEWS = 90
NUM_DYNAMIC_TABLES = 10
# the total number should be between the numbers referenced in the "passing" and "failing" macros below
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING (11 iter * 10 results per iter -> 110 objects)
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR (33 iter * 3 results per iter -> 99 objects)
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + NUM_DYNAMIC_TABLES

TABLE_BASE_SQL = """
{{ config(materialized='table') }}

from dbt_common.exceptions import CompilationError
from dbt.tests.util import run_dbt

"""
Testing rationale:
- snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call
- when dbt attempts to write into a schema with more than 10K objects, compilation will fail
unless we paginate the result
- we default pagination to 10 pages, but users want to configure this
- we instead use that here to force failures by making it smaller
"""


TABLE = """
{{ config(materialized='table') }}
select 1 as id
""".lstrip()
"""


VIEW_X_SQL = """
VIEW = """
{{ config(materialized='view') }}
select id from {{ ref('my_model_base') }}
""".lstrip()
"""


DYNAMIC_TABLE = (
"""
Expand All @@ -44,173 +41,67 @@
"""
)

MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING = """
{% macro validate_list_relations_without_caching(schema_relation) %}
{% set relation_list_result = snowflake__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %}
{% set n_relations = relation_list_result | length %}
{{ log("n_relations: " ~ n_relations) }}
{% endmacro %}
"""

MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR = """
{% macro validate_list_relations_without_caching_raise_error(schema_relation) %}
{{ snowflake__list_relations_without_caching(schema_relation, max_iter=33, max_results_per_iter=3) }}
{% endmacro %}
"""


def parse_json_logs(json_log_output):
parsed_logs = []
for line in json_log_output.split("\n"):
try:
log = json.loads(line)
except ValueError:
continue

parsed_logs.append(log)

return parsed_logs

class BaseConfig:
VIEWS = 90
DYNAMIC_TABLES = 10

def find_result_in_parsed_logs(parsed_logs, result_name):
return next(
(
item["data"]["msg"]
for item in parsed_logs
if result_name in item["data"].get("msg", "msg")
),
False,
)


def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name):
return next(
(
item["data"]["exc_info"]
for item in parsed_logs
if exc_info_name in item["data"].get("exc_info", "exc_info")
),
False,
)


class TestListRelationsWithoutCachingSingle:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})
for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
my_models = {"my_model_base.sql": TABLE}
for view in range(0, self.VIEWS):
my_models[f"my_model_{view}.sql"] = VIEW
for dynamic_table in range(0, self.DYNAMIC_TABLES):
my_models[f"my_dynamic_table_{dynamic_table}.sql"] = DYNAMIC_TABLE
return my_models

@pytest.fixture(scope="class")
def macros(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING,
}
@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["run"])

def test__snowflake__list_relations_without_caching_termination(self, project):
"""
validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching
macro when there are fewer than max_results_per_iter relations in the target schema
"""
run_dbt(["run", "-s", "my_model_base"])

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = SnowflakeRelation.create(database=database, schema=schema)
kwargs = {"schema_relation": schema_relation.render()}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching",
"--args",
str(kwargs),
]
def test_list_relations(self, project):
kwargs = {"schema_relation": project.test_schema}
with project.adapter.connection_named("__test"):
relations = project.adapter.execute_macro(
"snowflake__list_relations_without_caching", kwargs=kwargs
)
assert len(relations) == self.VIEWS + self.DYNAMIC_TABLES + 1

parsed_logs = parse_json_logs(log_output)
n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations")
assert n_relations == "n_relations: 1"

class TestListRelationsWithoutCachingSmall(BaseConfig):
pass

class TestListRelationsWithoutCachingFull:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})
for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
return my_models

class TestListRelationsWithoutCachingLarge(BaseConfig):
@pytest.fixture(scope="class")
def macros(self):
def profiles_config_update(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING,
"validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR,
"flags": {
"list_relations_per_iteration": 10,
"list_relations_iteration_limit": 20,
}
}

def test__snowflake__list_relations_without_caching(self, project):
"""
validates pagination logic in snowflake__list_relations_without_caching macro counts
the correct number of objects in the target schema when having to make multiple looped
calls of SHOW TERSE OBJECTS.
"""
# purpose of the first run is to create the replicated views in the target schema
run_dbt(["run"])

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = SnowflakeRelation.create(database=database, schema=schema)
kwargs = {"schema_relation": schema_relation.render()}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching",
"--args",
str(kwargs),
]
)
parsed_logs = parse_json_logs(log_output)
n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations")
class TestListRelationsWithoutCachingTooLarge(BaseConfig):

assert n_relations == f"n_relations: {NUM_EXPECTED_RELATIONS}"

def test__snowflake__list_relations_without_caching_raise_error(self, project):
"""
validates pagination logic terminates and raises a compilation error
when exceeding the limit of how many results to return.
"""
run_dbt(["run"])
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"flags": {
"list_relations_per_iteration": 10,
"list_relations_iteration_limit": 5,
}
}

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = SnowflakeRelation.create(database=database, schema=schema)

kwargs = {"schema_relation": schema_relation.render()}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching_raise_error",
"--args",
str(kwargs),
],
expect_pass=False,
)
parsed_logs = parse_json_logs(log_output)
traceback = find_exc_info_in_parsed_logs(parsed_logs, "Traceback")
assert "dbt will list a maximum of 99 objects in schema " in traceback
def test_list_relations(self, project):
kwargs = {"schema_relation": project.test_schema}
with project.adapter.connection_named("__test"):
with pytest.raises(CompilationError):
project.adapter.execute_macro(
"snowflake__list_relations_without_caching", kwargs=kwargs
)

def test_on_run(self, project):
with pytest.raises(CompilationError):
run_dbt(["run"])

0 comments on commit 753c638

Please sign in to comment.