Skip to content

Commit

Permalink
Fix copy_partitions when used with static partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Sep 21, 2024
1 parent 563633b commit 8ebbbde
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 26 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240921-213533.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix copy_partitions when used with static partitions
time: 2024-09-21T21:35:33.180816+02:00
custom:
Author: Kayrnt
Issue: "1349"
Original file line number Diff line number Diff line change
Expand Up @@ -47,37 +47,63 @@
{% endif %}
{% endmacro %}

{% macro bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %}
{%- set source_sql -%}
(
{% if partition_by.time_ingestion_partitioning and tmp_relation_exists -%}
select
{{ partition_by.insertable_time_partitioning_field() }},
* from {{ tmp_relation }}
{% elif tmp_relation_exists -%}
select
* from {{ tmp_relation }}
{%- elif partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }}
{%- else -%}
{{sql}}
{%- endif %}
)
{%- endset -%}
{{ return(source_sql) }}
{% endmacro %}

{% macro bq_static_copy_partitions_insert_overwrite_sql(
tmp_relation, target_relation, sql, partition_by, partitions, tmp_relation_exists
) %}
{%- if tmp_relation_exists is false -%}
{%- set source_sql = bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %}
{# We run temp table creation in a separated script to move to partitions copy if it doesn't already exist #}
{%- call statement('create_tmp_relation_for_copy', language='sql') -%}
{{ bq_create_table_as(partition_by, True, tmp_relation, source_sql, 'sql')
}}
{%- endcall %}
{%- endif -%}
{%- set partitions_sql -%}
select
{%- for partition in partitions %}
CAST({{ partition }} AS TIMESTAMP){%- if not loop.last -%},{%- endif -%}
{%- endfor %}
from {{ tmp_relation }}
{%- endset -%}
{%- set partitions = run_query(partitions_sql).columns[0].values() -%}
{# We copy the partitions #}
{%- do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) -%}
-- Clean up the temp table
drop table if exists {{ tmp_relation }}
{% endmacro %}

{% macro bq_static_insert_overwrite_sql(
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}

{%- if copy_partitions %}
{{ bq_static_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, tmp_relation_exists) }}
{% else -%}
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}

{%- set source_sql -%}
(
{% if partition_by.time_ingestion_partitioning and tmp_relation_exists -%}
select
{{ partition_by.insertable_time_partitioning_field() }},
* from {{ tmp_relation }}
{% elif tmp_relation_exists -%}
select
* from {{ tmp_relation }}
{%- elif partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning_sql(partition_by, sql, True) }}
{%- else -%}
{{sql}}
{%- endif %}

)
{%- endset -%}

{% if copy_partitions %}
{% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
{% else %}
{%- set source_sql = bq_static_select_insert_overwrite_sql(tmp_relation, sql, partition_by, tmp_relation_exists) %}

{#-- In case we're putting the model SQL _directly_ into the MERGE statement,
we need to prepend the MERGE statement with the user-configured sql_header,
Expand All @@ -92,8 +118,7 @@
-- 2. clean up the temp table
drop table if exists {{ tmp_relation }};
{%- endif -%}

{% endif %}
{%- endif -%}
{% endmacro %}

{% macro bq_dynamic_copy_partitions_insert_overwrite_sql(
Expand All @@ -118,7 +143,7 @@
{% endmacro %}

{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %}
{%- if copy_partitions is true %}
{%- if copy_partitions %}
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% else -%}
{% set predicate -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,52 @@
}}
with data as (
{% if not is_incremental() %}
select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day
{% else %}
-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
select 40 as id, cast('2020-01-02' as date) as date_day
{% endif %}
)
select * from data
{% if is_incremental() %}
where date_day in ({{ config.get("partitions") | join(",") }})
{% endif %}
-- Test comment to prevent recurrence of https://github.com/dbt-labs/dbt-bigquery/issues/896
""".lstrip()

overwrite_copy_partitions_with_partitions_sql = """
{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partitions=["CAST('2020-01-02' AS DATE)","'2020-01-02'"],
partition_by={
"field": "date_day",
"data_type": "date",
"copy_partitions": true
}
)
}}
with data as (
{% if not is_incremental() %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
overwrite_day_sql,
overwrite_day_with_copy_partitions_sql,
overwrite_partitions_sql,
overwrite_copy_partitions_with_partitions_sql,
overwrite_range_sql,
overwrite_time_sql,
overwrite_day_with_time_ingestion_sql,
Expand All @@ -45,6 +46,7 @@ def models(self):
"incremental_overwrite_day.sql": overwrite_day_sql,
"incremental_overwrite_day_with_copy_partitions.sql": overwrite_day_with_copy_partitions_sql,
"incremental_overwrite_partitions.sql": overwrite_partitions_sql,
"incremental_overwrite_copy_partitions_with_partitions.sql": overwrite_copy_partitions_with_partitions_sql,
"incremental_overwrite_range.sql": overwrite_range_sql,
"incremental_overwrite_time.sql": overwrite_time_sql,
"incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql,
Expand Down Expand Up @@ -78,6 +80,10 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se
("incremental_overwrite_time", "incremental_overwrite_time_expected"),
("incremental_overwrite_date", "incremental_overwrite_date_expected"),
("incremental_overwrite_partitions", "incremental_overwrite_date_expected"),
(
"incremental_overwrite_copy_partitions_with_partitions",
"incremental_overwrite_date_expected",
),
("incremental_overwrite_day", "incremental_overwrite_day_expected"),
("incremental_overwrite_range", "incremental_overwrite_range_expected"),
(
Expand Down

0 comments on commit 8ebbbde

Please sign in to comment.