From 677dae608edd19a0ec7f9b0b0ed3d106af097298 Mon Sep 17 00:00:00 2001 From: Emiel Date: Fri, 29 Sep 2023 11:53:30 +0200 Subject: [PATCH] Implement the base macro (where possible) --- .github/workflows/integration_tests.yml | 2 +- dbt_project.yml | 7 +--- ...tegration_test.sh => integration_tests.sh} | 0 ...nowplow_normalize_incremental_manifest.sql | 11 +----- ...nowplow_normalize_base_events_this_run.sql | 36 ------------------- ...nowplow_normalize_base_events_this_run.sql | 25 ------------- ...nowplow_normalize_base_events_this_run.sql | 27 -------------- ...nowplow_normalize_base_events_this_run.sql | 30 ++++++++++++++++ ...owplow_normalize_base_new_event_limits.sql | 11 +++--- packages.yml | 2 +- 10 files changed, 40 insertions(+), 111 deletions(-) rename integration_tests/.scripts/{integration_test.sh => integration_tests.sh} (100%) delete mode 100644 models/base/scratch/bigquery/snowplow_normalize_base_events_this_run.sql delete mode 100644 models/base/scratch/databricks/snowplow_normalize_base_events_this_run.sql delete mode 100644 models/base/scratch/snowflake/snowplow_normalize_base_events_this_run.sql create mode 100644 models/base/scratch/snowplow_normalize_base_events_this_run.sql diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index d601d2a..f13a222 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -115,7 +115,7 @@ jobs: dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} - name: Run tests - run: ./.scripts/integration_test.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_tests.sh -d ${{ matrix.warehouse }} - name: "Post-test: Drop ci schemas" run: | diff --git a/dbt_project.yml b/dbt_project.yml index 63e91e7..35bd12c 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -67,9 +67,4 @@ models: scratch: +schema: "scratch" +tags: "scratch" - bigquery: - enabled: "{{ target.type == 'bigquery' | as_bool() }}" - databricks: - enabled: "{{ target.type in ['databricks', 'spark'] | as_bool() }}" - snowflake: - enabled: "{{ target.type == 'snowflake' | as_bool() }}" + +enabled: "{{ target.type in ['bigquery', 'databricks', 'spark', 'snowflake'] | as_bool() }}" diff --git a/integration_tests/.scripts/integration_test.sh b/integration_tests/.scripts/integration_tests.sh similarity index 100% rename from integration_tests/.scripts/integration_test.sh rename to integration_tests/.scripts/integration_tests.sh diff --git a/models/base/manifest/snowplow_normalize_incremental_manifest.sql b/models/base/manifest/snowplow_normalize_incremental_manifest.sql index 19c3302..07703a9 100644 --- a/models/base/manifest/snowplow_normalize_incremental_manifest.sql +++ b/models/base/manifest/snowplow_normalize_incremental_manifest.sql @@ -8,13 +8,4 @@ -- Boilerplate to generate table. -- Table updated as part of end-run hook -with prep as ( - select - cast(null as {{ snowplow_utils.type_max_string() }}) model, - cast('1970-01-01' as {{ type_timestamp() }}) as last_success -) - -select * - -from prep -where false +{{ snowplow_utils.base_create_snowplow_incremental_manifest() }} diff --git a/models/base/scratch/bigquery/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/bigquery/snowplow_normalize_base_events_this_run.sql deleted file mode 100644 index 5ad20a6..0000000 --- a/models/base/scratch/bigquery/snowplow_normalize_base_events_this_run.sql +++ /dev/null @@ -1,36 +0,0 @@ -{{ - config( - tags=["this_run"] - ) -}} - -{%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} - --- without downstream joins, it's safe to dedupe by picking the first event_id found. -select - array_agg(e order by e.collector_tstamp limit 1)[offset(0)].* - -from ( - - select - a.* - - from {{ var('snowplow__events') }} as a - - where - {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} - {% if var("snowplow__days_late_allowed") == -1 %} - 1 = 1 - {% else %} - a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} - {% endif %} - and a.collector_tstamp >= {{ lower_limit }} - and a.collector_tstamp <= {{ upper_limit }} - {% if var('snowplow__derived_tstamp_partitioned', true) and target.type == 'bigquery' | as_bool() %} - and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} - and a.derived_tstamp <= {{ upper_limit }} - {% endif %} - and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} - -) e -group by e.event_id diff --git a/models/base/scratch/databricks/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/databricks/snowplow_normalize_base_events_this_run.sql deleted file mode 100644 index 08f1b1d..0000000 --- a/models/base/scratch/databricks/snowplow_normalize_base_events_this_run.sql +++ /dev/null @@ -1,25 +0,0 @@ -{{ - config( - tags=["this_run"] - ) -}} - -{%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} - -select - a.* - -from {{ var('snowplow__events') }} as a - -where - {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} - {% if var("snowplow__days_late_allowed") == -1 %} - 1 = 1 - {% else %} - a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} - {% endif %} - and a.collector_tstamp >= {{ lower_limit }} - and a.collector_tstamp <= {{ upper_limit }} - and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} - -qualify row_number() over (partition by a.event_id order by a.collector_tstamp, a.etl_tstamp) = 1 diff --git a/models/base/scratch/snowflake/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/snowflake/snowplow_normalize_base_events_this_run.sql deleted file mode 100644 index 9e3c728..0000000 --- a/models/base/scratch/snowflake/snowplow_normalize_base_events_this_run.sql +++ /dev/null @@ -1,27 +0,0 @@ -{{ - config( - tags=["this_run"], - sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')) - ) -}} - -{%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} - -select - a.* - -from {{ var('snowplow__events') }} as a - - -where - {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} - {% if var("snowplow__days_late_allowed") == -1 %} - 1 = 1 - {% else %} - a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} - {% endif %} - and a.collector_tstamp >= {{ lower_limit }} - and a.collector_tstamp <= {{ upper_limit }} - and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} - -qualify row_number() over (partition by a.event_id order by a.collector_tstamp) = 1 diff --git a/models/base/scratch/snowplow_normalize_base_events_this_run.sql b/models/base/scratch/snowplow_normalize_base_events_this_run.sql new file mode 100644 index 0000000..cbcaab9 --- /dev/null +++ b/models/base/scratch/snowplow_normalize_base_events_this_run.sql @@ -0,0 +1,30 @@ +{{ + config( + tags=["this_run"], + sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')) + ) +}} + +{%- set lower_limit, upper_limit, session_start_limit = snowplow_utils.return_base_new_event_limits(ref('snowplow_normalize_base_new_event_limits')) %} + +select + a.* + +from {{ var('snowplow__events') }} as a + +where + {# dvce_sent_tstamp is an optional field and not all trackers/webhooks populate it, this means this filter needs to be optional #} + {% if var("snowplow__days_late_allowed") == -1 %} + 1 = 1 + {% else %} + a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', var("snowplow__days_late_allowed", 3), 'a.dvce_created_tstamp') }} + {% endif %} + and a.collector_tstamp >= {{ lower_limit }} + and a.collector_tstamp <= {{ upper_limit }} + {% if var('snowplow__derived_tstamp_partitioned', true) and target.type == 'bigquery' | as_bool() %} + and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }} + and a.derived_tstamp <= {{ upper_limit }} + {% endif %} + and {{ snowplow_utils.app_id_filter(var("snowplow__app_id",[])) }} + +qualify row_number() over (partition by a.event_id order by a.collector_tstamp{% if target.type in ['databricks', 'spark'] -%}, a.etl_tstamp {%- endif %}) = 1 diff --git a/models/base/scratch/snowplow_normalize_base_new_event_limits.sql b/models/base/scratch/snowplow_normalize_base_new_event_limits.sql index 72078ed..4238cf3 100644 --- a/models/base/scratch/snowplow_normalize_base_new_event_limits.sql +++ b/models/base/scratch/snowplow_normalize_base_new_event_limits.sql @@ -10,14 +10,15 @@ {% set min_last_success, max_last_success, models_matched_from_manifest, - has_matched_all_models = snowplow_utils.get_incremental_manifest_status(ref('snowplow_normalize_incremental_manifest'), models_in_run) -%} + has_matched_all_models = snowplow_utils.get_incremental_manifest_status(ref('snowplow_normalize_incremental_manifest'), + models_in_run) -%} {% set run_limits_query = snowplow_utils.get_run_limits(min_last_success, - max_last_success, - models_matched_from_manifest, - has_matched_all_models, - var("snowplow__start_date","2020-01-01")) -%} + max_last_success, + models_matched_from_manifest, + has_matched_all_models, + var("snowplow__start_date","2020-01-01")) -%} {{ run_limits_query }} diff --git a/packages.yml b/packages.yml index 74baac8..61af189 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,3 @@ packages: - package: snowplow/snowplow_utils - version: [">=0.14.0", "<0.16.0"] + version: [">=0.15.0", "<0.16.0"]