-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implements pagination #811
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
kind: Under the Hood | ||
body: Implements pagination on `list_schemas` process | ||
time: 2023-10-18T14:57:51.629173-07:00 | ||
custom: | ||
Author: matt-winkler | ||
Issue: "810" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,23 +47,108 @@ | |
{% do return(columns) %} | ||
{% endmacro %} | ||
|
||
{% macro snowflake__list_schemas(database) -%} | ||
{% macro snowflake__get_paginated_schemas_array(max_iter, max_results_per_iter, max_total_results, database, watermark) %} | ||
|
||
{% set paginated_schemas = [] %} | ||
|
||
{% for _ in range(0, max_iter) %} | ||
{%- set paginated_sql -%} | ||
show terse schemas in database {{ database }} limit {{ max_results_per_iter }} from '{{ watermark.schema_name }}'; | ||
{%- endset -%} | ||
|
||
{%- set paginated_result = run_query(paginated_sql) %} | ||
{%- set paginated_n = (paginated_result | length) -%} | ||
|
||
{# | ||
terminating condition: if there are 0 records in the result we reached | ||
the end exactly on the previous iteration | ||
#} | ||
{%- if paginated_n == 0 -%} | ||
{%- break -%} | ||
{%- endif -%} | ||
|
||
{# | ||
terminating condition: At some point the user needs to be reasonable with how | ||
many schemas are contained in their databases. Since there was already | ||
one iteration before attempting pagination, loop.index == max_iter means | ||
the limit has been surpassed. | ||
#} | ||
|
||
{%- if loop.index == max_iter -%} | ||
{%- set msg -%} | ||
dbt will list a maximum of {{ max_total_results }} schemas in database {{ database }}. | ||
Your database exceeds this limit. Please contact [email protected] for troubleshooting tips, | ||
or review and reduce the number of objects contained. | ||
{%- endset -%} | ||
|
||
{% do exceptions.raise_compiler_error(msg) %} | ||
{%- endif -%} | ||
|
||
{%- do paginated_schemas.append(paginated_result) -%} | ||
{% set watermark.schema_name = paginated_result.columns[1].values()[-1] %} | ||
|
||
{# | ||
terminating condition: paginated_n < max_results_per_iter means we reached the end | ||
#} | ||
{%- if paginated_n < max_results_per_iter -%} | ||
{%- break -%} | ||
{%- endif -%} | ||
{%- endfor -%} | ||
|
||
{{ return(paginated_schemas) }} | ||
|
||
{% endmacro %} | ||
|
||
{% macro snowflake__list_schemas(database, max_iter=10, max_results_per_iter=1000) %} | ||
|
||
{%- set max_total_results = max_results_per_iter * max_iter -%} | ||
|
||
{%- set sql -%} | ||
show terse schemas in database {{ database }} limit {{ max_results_per_iter }}; | ||
{%- endset -%} | ||
|
||
{%- set result = run_query(sql) -%} | ||
|
||
{%- set n = (result | length) -%} | ||
{%- set watermark = namespace(schema_name=result.columns[1].values()[-1]) -%} | ||
{%- set paginated = namespace(result=[]) -%} | ||
|
||
{% if n >= max_results_per_iter %} | ||
|
||
{% set paginated.result = snowflake__get_paginated_schemas_array( | ||
max_iter, | ||
max_results_per_iter, | ||
max_total_results, | ||
database, | ||
watermark | ||
) | ||
%} | ||
|
||
{% endif %} | ||
|
||
{%- set all_results_array = [result] + paginated.result -%} | ||
{%- set result = result.merge(all_results_array) -%} | ||
{%- do return(result) -%} | ||
|
||
{% endmacro %} | ||
|
||
{# macro snowflake__list_schemas(database) -#} | ||
{# 10k limit from here: https://docs.snowflake.net/manuals/sql-reference/sql/show-schemas.html#usage-notes #} | ||
{% set maximum = 10000 %} | ||
{% set sql -%} | ||
{# set maximum = 10000 #} | ||
{# set sql -#} | ||
show terse schemas in database {{ database }} | ||
limit {{ maximum }} | ||
{%- endset %} | ||
{% set result = run_query(sql) %} | ||
{% if (result | length) >= maximum %} | ||
{% set msg %} | ||
{#- endset %} | ||
{# set result = run_query(sql) #} | ||
{# if (result | length) >= maximum #} | ||
{# set msg %} | ||
Too many schemas in database {{ database }}! dbt can only get | ||
information about databases with fewer than {{ maximum }} schemas. | ||
{% endset %} | ||
{% do exceptions.raise_compiler_error(msg) %} | ||
{% endif %} | ||
{# endset %} | ||
{# do exceptions.raise_compiler_error(msg) #} | ||
{# endif #} | ||
{{ return(result) }} | ||
{% endmacro %} | ||
{# endmacro #} | ||
dbeatty10 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
{% macro snowflake__get_paginated_relations_array(max_iter, max_results_per_iter, max_total_results, schema_relation, watermark) %} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like that you considered positive and negative test cases that cover a few scenarios. You structured the test cases very similarly. I think you could take this one step further and make it a single parameterized test case, making it easy to see the three scenarios (e.g. if <scenario 1> then <expected outcome 1>, etc.). If parameterized tests are a new thing, let me know if you want to pair on it (or if you just want to pair on it). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
import pytest | ||
|
||
import json | ||
from dbt.tests.util import run_dbt, run_dbt_and_capture | ||
|
||
# Testing rationale: | ||
# - snowflake SHOW TERSE SCHEMAS command returns at max 10K objects in a single call | ||
# - when dbt attempts to write into a database with more than 10K schemas, compilation will fail | ||
# unless we paginate the result | ||
# - however, testing this process is difficult at a full scale of 10K actual objects populated | ||
# into a fresh testing schema | ||
# - accordingly, we create a smaller set of views and test the looping iteration logic in | ||
# smaller chunks | ||
|
||
NUM_SCHEMAS = 100 | ||
|
||
TABLE_BASE_SQL = """ | ||
{{ config(materialized='table') }} | ||
|
||
select 1 as id | ||
""".lstrip() | ||
|
||
MACROS__CREATE__TEST_SCHEMAS = """ | ||
{% macro create_test_schemas(database, schemas) %} | ||
|
||
{% for schema in schemas %} | ||
{% set sql %} | ||
use database {{database}}; | ||
create schema if not exists {{schema}}; | ||
{% endset %} | ||
|
||
{% do run_query(sql) %} | ||
{% endfor %} | ||
|
||
{% endmacro %} | ||
""" | ||
|
||
MACROS__DROP__TEST_SCHEMAS = """ | ||
{% macro drop_test_schemas(database, schemas) %} | ||
|
||
{% for schema in schemas %} | ||
{% set sql %} | ||
drop schema {{database}}.{{schema}}; | ||
{% endset %} | ||
|
||
{% do run_query(sql) %} | ||
{% endfor %} | ||
|
||
{% endmacro %} | ||
""" | ||
|
||
MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS = """ | ||
{% macro validate_list_schemas(database, max_iter=11, max_results_per_iter=10) %} | ||
{% set schema_list_result = snowflake__list_schemas(database, max_iter=max_iter, max_results_per_iter=max_results_per_iter) %} | ||
{% set n_schemas = schema_list_result | length %} | ||
{{ log("n_schemas: " ~ n_schemas) }} | ||
{% endmacro %} | ||
""" | ||
|
||
MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS_RAISE_ERROR = """ | ||
{% macro validate_list_schemas_raise_error(database) %} | ||
{{ snowflake__list_schemas(database, max_iter=33, max_results_per_iter=3) }} | ||
{% endmacro %} | ||
""" | ||
|
||
|
||
def parse_json_logs(json_log_output): | ||
parsed_logs = [] | ||
for line in json_log_output.split("\n"): | ||
try: | ||
log = json.loads(line) | ||
except ValueError: | ||
continue | ||
|
||
parsed_logs.append(log) | ||
|
||
return parsed_logs | ||
|
||
|
||
def find_result_in_parsed_logs(parsed_logs, result_name): | ||
return next( | ||
( | ||
item["data"]["msg"] | ||
for item in parsed_logs | ||
if result_name in item["data"].get("msg", "msg") | ||
), | ||
False, | ||
) | ||
|
||
|
||
def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name): | ||
return next( | ||
( | ||
item["data"]["exc_info"] | ||
for item in parsed_logs | ||
if exc_info_name in item["data"].get("exc_info", "exc_info") | ||
), | ||
False, | ||
) | ||
|
||
|
||
class TestListSchemasSingle: | ||
@pytest.fixture(scope="class") | ||
def macros(self): | ||
return { | ||
"validate_list_schemas.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS, | ||
"create_test_schemas.sql": MACROS__CREATE__TEST_SCHEMAS, | ||
"drop_test_schemas.sql": MACROS__DROP__TEST_SCHEMAS, | ||
} | ||
|
||
def test__snowflake__list_schemas_termination(self, project): | ||
""" | ||
validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not clear on how this validates that the pagination logic is not triggered. I agree that it shouldn't since we're allowing for 200 schemas per result (line 127) but only creating 100 schemas (line 125 and line 15). However, we're only checking for the correct number of schemas at the end, which I would think is the same regardless of whether pagination was used. |
||
macro when there are fewer than max_results_per_iter relations in the target schema | ||
""" | ||
|
||
database = project.database | ||
schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)] | ||
|
||
create_kwargs = { | ||
"database": database, | ||
"schemas": schemas, | ||
} | ||
|
||
run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)]) | ||
|
||
validate_kwargs = {"database": database, "max_iter": 1, "max_results_per_iter": 200} | ||
_, log_output = run_dbt_and_capture( | ||
[ | ||
"--debug", | ||
"--log-format=json", | ||
"run-operation", | ||
"validate_list_schemas", | ||
"--args", | ||
str(validate_kwargs), | ||
] | ||
) | ||
|
||
parsed_logs = parse_json_logs(log_output) | ||
n_schemas = find_result_in_parsed_logs(parsed_logs, "n_schemas") | ||
|
||
run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)]) | ||
|
||
assert ( | ||
n_schemas == f"n_schemas: {(NUM_SCHEMAS + 2)}" | ||
) # include information schema and base test schema in the count | ||
|
||
|
||
class TestListRelationsWithoutCachingFull: | ||
@pytest.fixture(scope="class") | ||
def macros(self): | ||
return { | ||
"validate_list_schemas.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS, | ||
"create_test_schemas.sql": MACROS__CREATE__TEST_SCHEMAS, | ||
"validate_list_schemas_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_SCHEMAS_RAISE_ERROR, | ||
"drop_test_schemas.sql": MACROS__DROP__TEST_SCHEMAS, | ||
} | ||
|
||
def test__snowflake__list_schemas(self, project): | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think providing the arguments |
||
validates pagination logic in snowflake__list_schemas macro counts | ||
the correct number of schemas in the target database when having to make multiple looped | ||
calls of SHOW TERSE SCHEMAS. | ||
""" | ||
database = project.database | ||
schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)] | ||
|
||
create_kwargs = {"database": database, "schemas": schemas} | ||
|
||
run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)]) | ||
|
||
validate_kwargs = {"database": database} | ||
_, log_output = run_dbt_and_capture( | ||
[ | ||
"--debug", | ||
"--log-format=json", | ||
"run-operation", | ||
"validate_list_schemas", | ||
"--args", | ||
str(validate_kwargs), | ||
] | ||
) | ||
|
||
parsed_logs = parse_json_logs(log_output) | ||
n_schemas = find_result_in_parsed_logs(parsed_logs, "n_schemas") | ||
|
||
run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)]) | ||
|
||
assert ( | ||
n_schemas == f"n_schemas: {(NUM_SCHEMAS + 2)}" | ||
) # include information schema and base test schema in the count | ||
|
||
def test__snowflake__list_schemas_raise_error(self, project): | ||
""" | ||
validates pagination logic terminates and raises a compilation error | ||
when exceeding the limit of how many results to return. | ||
""" | ||
run_dbt(["run"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since there are no models or tests, does this do anything here? It doesn't appear to be needed in the other tests. Also, if this does something, it potentially makes the test run order-dependent. If the first method in this class runs first, then it's run before running |
||
|
||
database = project.database | ||
schemas = [f"test_schema_{i}" for i in range(0, NUM_SCHEMAS)] | ||
|
||
create_kwargs = {"database": database, "schemas": schemas} | ||
|
||
run_dbt(["run-operation", "create_test_schemas", "--args", str(create_kwargs)]) | ||
|
||
validate_kwargs = {"database": database} | ||
_, log_output = run_dbt_and_capture( | ||
[ | ||
"--debug", | ||
"--log-format=json", | ||
"run-operation", | ||
"validate_list_schemas_raise_error", | ||
"--args", | ||
str(validate_kwargs), | ||
], | ||
expect_pass=False, | ||
) | ||
|
||
run_dbt(["run-operation", "drop_test_schemas", "--args", str(create_kwargs)]) | ||
|
||
parsed_logs = parse_json_logs(log_output) | ||
traceback = find_exc_info_in_parsed_logs(parsed_logs, "Traceback") | ||
assert "dbt will list a maximum of 99 schemas in database" in traceback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code looks familiar, did we use pagination for a different query recently? If so, is there any way to reuse that, or to augment that so that both use cases can use it? The only piece that appears to be use case specific is
paginated_sql
and the error message. The former could be an argument to this macro and the latter could probably be made more generic (e.g. swapschemas
forobjects
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you are remembering correctly @mikealfare !