Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed May 9, 2023
1 parent a423803 commit 723729c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 64 deletions.
16 changes: 1 addition & 15 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,6 @@ def quote(self, identifier):
def _get_relation_information(self, row: agate.Row) -> RelationInfo:
"""relation info was fetched with SHOW TABLES EXTENDED"""
try:
# Example lines:
# Database: dbt_schema
# Table: names
# Owner: fokkodriesprong
# Created Time: Mon May 08 18:06:47 CEST 2023
# Last Access: UNKNOWN
# Created By: Spark 3.3.2
# Type: MANAGED
# Provider: hive
# Table Properties: [transient_lastDdlTime=1683562007]
# Statistics: 16 bytes
# Schema: root
# |-- idx: integer (nullable = false)
# |-- name: string (nullable = false)
table_properties = {}
columns = []
_schema, name, _, information_blob = row
Expand Down Expand Up @@ -187,7 +173,7 @@ def _parse_describe_table(
for info_row in table_results_itr:
info_type, info_value = info_row[:2]
if info_type is not None and not info_type.startswith("#") and info_type != "":
table_properties[info_type] = info_value
table_properties[info_type] = str(info_value)

return columns, table_properties

Expand Down
88 changes: 39 additions & 49 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,19 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self)
)
self.assertEqual(len(tables), 1)

columns = adapter.parse_describe_extended(tables[0], None)
table = tables[0]

assert isinstance(table, SparkRelation)

columns = adapter.get_columns_in_relation(
SparkRelation.create(
type=rel_type,
schema="default_schema",
identifier="mytable",
columns=table.columns,
properties=table.properties,
)
)

self.assertEqual(len(columns), 5)
self.assertEqual(
Expand Down Expand Up @@ -684,7 +696,19 @@ def test_parse_columns_from_information_with_view_type(self):
)
self.assertEqual(len(tables), 1)

columns = adapter.parse_describe_extended(tables[0], None)
table = tables[0]

assert isinstance(table, SparkRelation)

columns = adapter.get_columns_in_relation(
SparkRelation.create(
type=rel_type,
schema="default_schema",
identifier="myview",
columns=table.columns,
properties=table.properties,
)
)

self.assertEqual(len(columns), 5)
self.assertEqual(
Expand Down Expand Up @@ -757,7 +781,19 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel
)
self.assertEqual(len(tables), 1)

columns = adapter.parse_describe_extended(tables[0], None)
table = tables[0]

assert isinstance(table, SparkRelation)

columns = adapter.get_columns_in_relation(
SparkRelation.create(
type=rel_type,
schema="default_schema",
identifier="mytable",
columns=table.columns,
properties=table.properties,
)
)

self.assertEqual(len(columns), 5)
self.assertEqual(
Expand Down Expand Up @@ -809,49 +845,3 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel
"stats:rows:value": 12345678,
},
)

def test_parse_columns_from_describe_extended(self):
self.maxDiff = None
rows = [
agate.MappedSequence(["idx", "int", ""]),
agate.MappedSequence(["name", "string", ""]),
agate.MappedSequence(["", "", ""]),
agate.MappedSequence(["# Partitioning", "", ""]),
agate.MappedSequence(["Not partitioned", "", ""]),
agate.MappedSequence(["", "", ""]),
agate.MappedSequence(["# Metadata Columns", "", ""]),
agate.MappedSequence(["_spec_id", "int", ""]),
agate.MappedSequence(["_partition", "struct<>", ""]),
agate.MappedSequence(["_file", "string", ""]),
agate.MappedSequence(["_pos", "bigint", ""]),
agate.MappedSequence(["_deleted", "boolean", ""]),
agate.MappedSequence(["", "", ""]),
agate.MappedSequence(["# Detailed Table Information", "", ""]),
agate.MappedSequence(["Name", "sandbox.dbt_tabular3.names", ""]),
agate.MappedSequence(
[
"Location",
"s3://tabular-wh-us-east-1/6efbcaf4-21ae-499d-b340-3bc1a7003f52/d2082e32-d2bd-4484-bb93-7bc445c1c6bb",
"",
]
),
agate.MappedSequence(["Provider", "iceberg", ""]),
]

config = self._get_target_http(self.project_cfg)
adapter = SparkAdapter(config)

columns, properties = adapter._parse_describe_table(rows)

assert columns == [("idx", "int"), ("name", "string")]
assert properties == {
"Location": "s3://tabular-wh-us-east-1/6efbcaf4-21ae-499d-b340-3bc1a7003f52/d2082e32-d2bd-4484-bb93-7bc445c1c6bb",
"Name": "sandbox.dbt_tabular3.names",
"Not partitioned": "",
"Provider": "iceberg",
"_deleted": "boolean",
"_file": "string",
"_partition": "struct<>",
"_pos": "bigint",
"_spec_id": "int",
}

0 comments on commit 723729c

Please sign in to comment.