Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Databricks plugin for Unity Catalog + SQL Warehouse compute #236

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions macros/common/helpers/recover_partitions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{% macro recover_partitions(source_node) %}
{{ return(adapter.dispatch('recover_partitions', 'dbt_external_tables')(source_node)) }}
{% endmacro %}

{% macro default__recover_partitions(source_node) %}
/*{#
We're dispatching this macro so that users can override it if required on other adapters
but this will work for spark/databricks.
#}*/

{{ exceptions.raise_not_implemented('recover_partitions macro not implemented for adapter ' + adapter.type()) }}
{% endmacro %}
41 changes: 41 additions & 0 deletions macros/plugins/databricks/create_external_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{% macro databricks__create_external_table(source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}
{%- set partitions = external.partitions -%}
{%- set options = external.options -%}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love that you've created this PR. Been thinking about doing it myself for a while, but haven't gotten further than overriding the databricks__create_external_table and databricks__get_external_build_plan macros locally.

One idea you could take into account is adding the support for liquid clustering:
image

{%- set columns_and_partitions = columns | list -%}
{%- if partitions -%}
{%- for i in partitions -%}
{%- if i.name not in columns_and_partitions | list | map(attribute='name') -%}
{%- do columns_and_partitions.append(i) -%}
{%- endif -%}
{%- endfor -%}
{%- endif -%}

{# https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html #}
create table {{source(source_node.source_name, source_node.name)}}
{%- if columns | length > 0 %} (
{% for column in columns_and_partitions %}
{{column.name}} {{column.data_type}}
{{- ',' if not loop.last -}}
{% endfor %}
) {% endif -%}
{% if external.using %} using {{external.using}} {%- endif %}
{% if options -%} options (
{%- for key, value in options.items() -%}
'{{ key }}' = '{{value}}' {{- ', \n' if not loop.last -}}
{%- endfor -%}
) {%- endif %}
{% if partitions -%} partitioned by (
{%- for partition in partitions -%}
{{partition.name}}{{', ' if not loop.last}}
{%- endfor -%}
) {%- endif %}
{% if external.row_format -%} row format {{external.row_format}} {%- endif %}
{% if external.file_format -%} stored as {{external.file_format}} {%- endif %}
{% if external.location -%} location '{{external.location}}' {%- endif %}
{% if external.table_properties -%} tblproperties {{ external.table_properties }} {%- endif -%}

{% endmacro %}
25 changes: 25 additions & 0 deletions macros/plugins/databricks/get_external_build_plan.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{% macro databricks__get_external_build_plan(source_node) %}

{% set build_plan = [] %}

{% set old_relation = adapter.get_relation(
database = source_node.database,
schema = source_node.schema,
identifier = source_node.identifier
) %}

{% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %}

{% if create_or_replace %}
{% set build_plan = build_plan + [
dbt_external_tables.create_external_schema(source_node),
dbt_external_tables.dropif(source_node),
dbt_external_tables.create_external_table(source_node)
] %}
{% else %}
{% set build_plan = build_plan + dbt_external_tables.refresh_external_table(source_node) %}
{% endif %}

{% do return(build_plan) %}

{% endmacro %}
9 changes: 9 additions & 0 deletions macros/plugins/databricks/helpers/dropif.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% macro databricks__dropif(node) %}

{% set ddl %}
drop table if exists {{source(node.source_name, node.name)}}
{% endset %}

{{return(ddl)}}

{% endmacro %}
14 changes: 14 additions & 0 deletions macros/plugins/databricks/helpers/recover_partitions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% macro databricks__recover_partitions(source_node) %}
{# https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-alter-table.html #}

{%- if source_node.external.partitions and source_node.external.using and source_node.external.using|lower != 'delta' -%}
{% set ddl %}
ALTER TABLE {{ source(source_node.source_name, source_node.name) }} RECOVER PARTITIONS
{% endset %}
{%- else -%}
{% set ddl = none %}
{%- endif -%}

{{return(ddl)}}

{% endmacro %}
9 changes: 9 additions & 0 deletions macros/plugins/databricks/refresh_external_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% macro databricks__refresh_external_table(source_node) %}

{% set refresh %}
refresh table {{source(source_node.source_name, source_node.name)}}
{% endset %}

{% do return([refresh]) %}

{% endmacro %}
13 changes: 0 additions & 13 deletions macros/plugins/spark/helpers/recover_partitions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,3 @@
{{return(ddl)}}

{% endmacro %}

{% macro recover_partitions(source_node) %}
{{ return(adapter.dispatch('recover_partitions', 'dbt_external_tables')(source_node)) }}
{% endmacro %}

{% macro default__recover_partitions(source_node) %}
/*{#
We're dispatching this macro so that users can override it if required on other adapters
but this will work for spark/databricks.
#}*/

{{ exceptions.raise_not_implemented('recover_partitions macro not implemented for adapter ' + adapter.type()) }}
{% endmacro %}
Loading