From 9b860b4b0e8ffa50ab9ec794da5e37c2aaa82d11 Mon Sep 17 00:00:00 2001 From: Florian Schroevers Date: Thu, 6 Jul 2023 14:19:02 +0200 Subject: [PATCH] added databricks adapter --- macros/common/helpers/recover_partitions.sql | 12 ++++++ .../databricks/create_external_table.sql | 41 +++++++++++++++++++ .../databricks/get_external_build_plan.sql | 25 +++++++++++ macros/plugins/databricks/helpers/dropif.sql | 9 ++++ .../databricks/helpers/recover_partitions.sql | 14 +++++++ .../databricks/refresh_external_table.sql | 9 ++++ .../spark/helpers/recover_partitions.sql | 13 ------ 7 files changed, 110 insertions(+), 13 deletions(-) create mode 100644 macros/common/helpers/recover_partitions.sql create mode 100644 macros/plugins/databricks/create_external_table.sql create mode 100644 macros/plugins/databricks/get_external_build_plan.sql create mode 100644 macros/plugins/databricks/helpers/dropif.sql create mode 100644 macros/plugins/databricks/helpers/recover_partitions.sql create mode 100644 macros/plugins/databricks/refresh_external_table.sql diff --git a/macros/common/helpers/recover_partitions.sql b/macros/common/helpers/recover_partitions.sql new file mode 100644 index 00000000..80779047 --- /dev/null +++ b/macros/common/helpers/recover_partitions.sql @@ -0,0 +1,12 @@ +{% macro recover_partitions(source_node) %} + {{ return(adapter.dispatch('recover_partitions', 'dbt_external_tables')(source_node)) }} +{% endmacro %} + +{% macro default__recover_partitions(source_node) %} + /*{# + We're dispatching this macro so that users can override it if required on other adapters + but this will work for spark/databricks. + #}*/ + + {{ exceptions.raise_not_implemented('recover_partitions macro not implemented for adapter ' + adapter.type()) }} +{% endmacro %} diff --git a/macros/plugins/databricks/create_external_table.sql b/macros/plugins/databricks/create_external_table.sql new file mode 100644 index 00000000..f32e2a52 --- /dev/null +++ b/macros/plugins/databricks/create_external_table.sql @@ -0,0 +1,41 @@ +{% macro databricks__create_external_table(source_node) %} + + {%- set columns = source_node.columns.values() -%} + {%- set external = source_node.external -%} + {%- set partitions = external.partitions -%} + {%- set options = external.options -%} + + {%- set columns_and_partitions = columns | list -%} + {%- if partitions -%} + {%- for i in partitions -%} + {%- if i.name not in columns_and_partitions | list | map(attribute='name') -%} + {%- do columns_and_partitions.append(i) -%} + {%- endif -%} + {%- endfor -%} + {%- endif -%} + +{# https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html #} + create table {{source(source_node.source_name, source_node.name)}} + {%- if columns | length > 0 %} ( + {% for column in columns_and_partitions %} + {{column.name}} {{column.data_type}} + {{- ',' if not loop.last -}} + {% endfor %} + ) {% endif -%} + {% if external.using %} using {{external.using}} {%- endif %} + {% if options -%} options ( + {%- for key, value in options.items() -%} + '{{ key }}' = '{{value}}' {{- ', \n' if not loop.last -}} + {%- endfor -%} + ) {%- endif %} + {% if partitions -%} partitioned by ( + {%- for partition in partitions -%} + {{partition.name}}{{', ' if not loop.last}} + {%- endfor -%} + ) {%- endif %} + {% if external.row_format -%} row format {{external.row_format}} {%- endif %} + {% if external.file_format -%} stored as {{external.file_format}} {%- endif %} + {% if external.location -%} location '{{external.location}}' {%- endif %} + {% if external.table_properties -%} tblproperties {{ external.table_properties }} {%- endif -%} + +{% endmacro %} diff --git a/macros/plugins/databricks/get_external_build_plan.sql b/macros/plugins/databricks/get_external_build_plan.sql new file mode 100644 index 00000000..32fe7cb3 --- /dev/null +++ b/macros/plugins/databricks/get_external_build_plan.sql @@ -0,0 +1,25 @@ +{% macro databricks__get_external_build_plan(source_node) %} + + {% set build_plan = [] %} + + {% set old_relation = adapter.get_relation( + database = source_node.database, + schema = source_node.schema, + identifier = source_node.identifier + ) %} + + {% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %} + + {% if create_or_replace %} + {% set build_plan = build_plan + [ + dbt_external_tables.create_external_schema(source_node), + dbt_external_tables.dropif(source_node), + dbt_external_tables.create_external_table(source_node) + ] %} + {% else %} + {% set build_plan = build_plan + dbt_external_tables.refresh_external_table(source_node) %} + {% endif %} + + {% do return(build_plan) %} + +{% endmacro %} diff --git a/macros/plugins/databricks/helpers/dropif.sql b/macros/plugins/databricks/helpers/dropif.sql new file mode 100644 index 00000000..818d331a --- /dev/null +++ b/macros/plugins/databricks/helpers/dropif.sql @@ -0,0 +1,9 @@ +{% macro databricks__dropif(node) %} + + {% set ddl %} + drop table if exists {{source(node.source_name, node.name)}} + {% endset %} + + {{return(ddl)}} + +{% endmacro %} diff --git a/macros/plugins/databricks/helpers/recover_partitions.sql b/macros/plugins/databricks/helpers/recover_partitions.sql new file mode 100644 index 00000000..e1646417 --- /dev/null +++ b/macros/plugins/databricks/helpers/recover_partitions.sql @@ -0,0 +1,14 @@ +{% macro databricks__recover_partitions(source_node) %} + {# https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-alter-table.html #} + + {%- if source_node.external.partitions and source_node.external.using and source_node.external.using|lower != 'delta' -%} + {% set ddl %} + ALTER TABLE {{ source(source_node.source_name, source_node.name) }} RECOVER PARTITIONS + {% endset %} + {%- else -%} + {% set ddl = none %} + {%- endif -%} + + {{return(ddl)}} + +{% endmacro %} \ No newline at end of file diff --git a/macros/plugins/databricks/refresh_external_table.sql b/macros/plugins/databricks/refresh_external_table.sql new file mode 100644 index 00000000..ce01a0f3 --- /dev/null +++ b/macros/plugins/databricks/refresh_external_table.sql @@ -0,0 +1,9 @@ +{% macro databricks__refresh_external_table(source_node) %} + + {% set refresh %} + refresh table {{source(source_node.source_name, source_node.name)}} + {% endset %} + + {% do return([refresh]) %} + +{% endmacro %} diff --git a/macros/plugins/spark/helpers/recover_partitions.sql b/macros/plugins/spark/helpers/recover_partitions.sql index bdc4b227..305777fb 100644 --- a/macros/plugins/spark/helpers/recover_partitions.sql +++ b/macros/plugins/spark/helpers/recover_partitions.sql @@ -12,16 +12,3 @@ {{return(ddl)}} {% endmacro %} - -{% macro recover_partitions(source_node) %} - {{ return(adapter.dispatch('recover_partitions', 'dbt_external_tables')(source_node)) }} -{% endmacro %} - -{% macro default__recover_partitions(source_node) %} - /*{# - We're dispatching this macro so that users can override it if required on other adapters - but this will work for spark/databricks. - #}*/ - - {{ exceptions.raise_not_implemented('recover_partitions macro not implemented for adapter ' + adapter.type()) }} -{% endmacro %}