Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply "Initial refactoring of incremental materialization" #1123

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20241017-101532.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Applies "Initial refactoring of incremental materialization" (dbt-labs/dbt-core#5359).
time: 2024-10-17T10:15:32.88591-07:00
custom:
Author: hiloboy0119
Issue: "1123"
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.8'
python-version: '3.9'

- name: Install python dependencies
run: |
Expand All @@ -75,7 +75,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- name: Check out the repository
Expand Down Expand Up @@ -126,7 +126,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.8'
python-version: '3.9'

- name: Install python dependencies
run: |
Expand Down Expand Up @@ -173,7 +173,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-12, windows-latest]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- name: Set up Python ${{ matrix.python-version }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-internal.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defaults:
shell: "bash"

env:
PYTHON_TARGET_VERSION: 3.8
PYTHON_TARGET_VERSION: 3.9

jobs:
run-unit-tests:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release-prep.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ defaults:
shell: bash

env:
PYTHON_TARGET_VERSION: 3.8
PYTHON_TARGET_VERSION: 3.9
NOTIFICATION_PREFIX: "[Release Preparation]"

jobs:
Expand Down Expand Up @@ -448,7 +448,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- name: Check out the repository
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ unit: ## Runs unit tests with py38.
test: ## Runs unit tests with py38 and code checks against staged changes.
@\
python -m pytest tests/unit; \
python dagger/run_dbt_spark_tests.py --profile spark_session \
pre-commit run --all-files
pre-commit run --all-files; \
python dagger/run_dbt_spark_tests.py --profile spark_session

.PHONY: clean
@echo "cleaning repo"
Expand Down
2 changes: 1 addition & 1 deletion dagger/run_dbt_spark_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async def test_spark(test_args):
platform = dagger.Platform("linux/amd64")
tst_container = (
client.container(platform=platform)
.from_("python:3.8-slim")
.from_("python:3.9-slim")
.with_mounted_cache("/var/cache/apt/archives", os_reqs_cache)
.with_mounted_cache("/root/.cache/pip", pip_cache)
# install OS deps first so any local changes don't invalidate the cache
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,12 @@ def debug_query(self) -> None:
"""Override for DebugTask method"""
self.execute("select 1 as id")

def valid_incremental_strategies(self) -> list[str]:
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "merge", "insert_overwrite", "microbatch"]


# spark does something interesting with joins when both tables have the same
# static values for the join condition and complains that the join condition is
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
{% materialization incremental, adapter='spark', supported_languages=['sql', 'python'] -%}
{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy') or 'append' -%}
{% set raw_strategy = config.get('incremental_strategy') or 'default' %}
{%- set grant_config = config.get('grants') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, strategy) %}

{#-- Set vars --#}

Expand Down Expand Up @@ -56,8 +58,11 @@
{{ create_table_as(True, tmp_relation, compiled_code, language) }}
{%- endcall -%}
{%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{#-- call the incremental strategy macro --#}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}
{%- call statement('main') -%}
{{ dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, existing_relation, unique_key, incremental_predicates) }}
{{ build_sql }}
{%- endcall -%}
{%- if language == 'python' -%}
{#--
Expand All @@ -83,3 +88,19 @@
{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

{% macro spark__get_incremental_default_sql(arg_dict) %}
{% do return(spark__get_incremental_append_sql(arg_dict)) %}
{% endmacro %}
{% macro spark__get_incremental_append_sql(arg_dict) %}
{% do return(dbt_spark_get_incremental_sql("append", arg_dict["temp_relation"], arg_dict["target_relation"], load_relation(this), arg_dict["unique_key"], arg_dict["predicates"])) %}
{% endmacro %}
{% macro spark__get_incremental_merge_sql(arg_dict) %}
{% do return(dbt_spark_get_incremental_sql("merge", arg_dict["temp_relation"], arg_dict["target_relation"], load_relation(this), arg_dict["unique_key"], arg_dict["predicates"])) %}
{% endmacro %}
{% macro spark__get_incremental_insert_overwrite_sql(arg_dict) %}
{% do return(dbt_spark_get_incremental_sql("insert_overwrite", arg_dict["temp_relation"], arg_dict["target_relation"], load_relation(this), arg_dict["unique_key"], arg_dict["predicates"])) %}
{% endmacro %}
{% macro spark__get_incremental_microbatch_sql(arg_dict) %}
{% do return(dbt_spark_get_incremental_sql("microbatch", arg_dict["temp_relation"], arg_dict["target_relation"], load_relation(this), arg_dict["unique_key"], arg_dict["predicates"])) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
{% macro dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) %}
{#-- Validate the incremental strategy #}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'merge', 'insert_overwrite', 'microbatch'
{%- endset %}

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi'
Expand All @@ -35,15 +30,11 @@
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'microbatch'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy in ['insert_overwrite', 'microbatch'] and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy in ['insert_overwrite', 'microbatch'] and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}

{% do return(raw_strategy) %}
Expand Down
2 changes: 1 addition & 1 deletion docker/spark-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ spark.hadoop.datanucleus.autoCreateTables true
spark.hadoop.datanucleus.schema.autoCreateTables true
spark.hadoop.datanucleus.fixedDatastore false
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0
spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.13.0
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.userClassPathFirst true
4 changes: 2 additions & 2 deletions docker/spark.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
ARG OPENJDK_VERSION=8
ARG OPENJDK_VERSION=17
FROM eclipse-temurin:${OPENJDK_VERSION}-jre

ARG BUILD_DATE
ARG SPARK_VERSION=3.3.2
ARG SPARK_VERSION=3.3.4
ARG HADOOP_VERSION=3

LABEL org.label-schema.name="Apache Spark ${SPARK_VERSION}" \
Expand Down
9 changes: 4 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import sys
import re

# require python 3.8 or newer
if sys.version_info < (3, 8):
# require python 3.9 or newer
if sys.version_info < (3, 9):
print("Error: dbt does not support this version of Python.")
print("Please upgrade to Python 3.8 or higher.")
print("Please upgrade to Python 3.9 or higher.")
sys.exit(1)

# require version of setuptools that supports find_namespace_packages
Expand Down Expand Up @@ -83,11 +83,10 @@ def _get_plugin_version_dict():
"Operating System :: Microsoft :: Windows",
"Operating System :: MacOS :: MacOS X",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
],
python_requires=">=3.8",
python_requires=">=3.9",
)
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def models(self):
return {
"bad_file_format.sql": bad_file_format_sql,
"bad_merge_not_delta.sql": bad_merge_not_delta_sql,
"bad_strategy.sql": bad_strategy_sql,
}

@staticmethod
Expand All @@ -142,3 +141,23 @@ def run_and_test():
@pytest.mark.skip_profile("databricks_http_cluster", "spark_session")
def test_bad_strategies(self, project):
self.run_and_test()


class TestBadCustomStrategy(BaseIncrementalStrategies):
@pytest.fixture(scope="class")
def models(self):
return {
"bad_strategy.sql": bad_strategy_sql,
}

@staticmethod
def run_and_test():
run_results = run_dbt(["run"], expect_pass=False)
# assert all models fail with compilation errors
for result in run_results:
assert result.status == "error"
assert "dbt could not find an incremental strategy macro" in result.message

@pytest.mark.skip_profile("databricks_http_cluster", "spark_session")
def test_bad_custom_strategies(self, project):
self.run_and_test()
Loading