-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert information
into dict
#752
base: main
Are you sure you want to change the base?
Changes from 18 commits
5e6527c
d21fe88
6cb4289
a423803
036f278
f697b80
f97c161
e1b74ff
e14d512
be0a387
2612f98
4a421e3
639a663
0849382
13cda76
f1b9034
4b5c45a
3435f2e
85395df
cc5e536
7a9c50b
9182ee9
c7d39d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
kind: Under the Hood | ||
body: Convert information into dict | ||
time: 2023-05-08T22:23:13.704302+02:00 | ||
custom: | ||
Author: Fokko | ||
Issue: "751" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,14 @@ class SparkConfig(AdapterConfig): | |
merge_update_columns: Optional[str] = None | ||
|
||
|
||
@dataclass(frozen=True) | ||
class RelationInfo: | ||
table_schema: str | ||
table_name: str | ||
columns: List[Tuple[str, str]] | ||
properties: Dict[str, str] | ||
|
||
|
||
class SparkAdapter(SQLAdapter): | ||
COLUMN_NAMES = ( | ||
"table_database", | ||
|
@@ -81,9 +89,7 @@ class SparkAdapter(SQLAdapter): | |
"stats:rows:description", | ||
"stats:rows:include", | ||
) | ||
INFORMATION_COLUMNS_REGEX = re.compile(r"^ \|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE) | ||
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE) | ||
INFORMATION_STATISTICS_REGEX = re.compile(r"^Statistics: (.*)$", re.MULTILINE) | ||
INFORMATION_COLUMN_REGEX = re.compile(r"[ | ]* \|-- (.*)\: (.*) \(nullable = (.*)\)") | ||
|
||
HUDI_METADATA_COLUMNS = [ | ||
"_hoodie_commit_time", | ||
|
@@ -102,7 +108,6 @@ class SparkAdapter(SQLAdapter): | |
} | ||
|
||
Relation: TypeAlias = SparkRelation | ||
RelationInfo = Tuple[str, str, str] | ||
Column: TypeAlias = SparkColumn | ||
ConnectionManager: TypeAlias = SparkConnectionManager | ||
AdapterSpecificConfigs: TypeAlias = SparkConfig | ||
|
@@ -138,13 +143,54 @@ def quote(self, identifier: str) -> str: # type: ignore | |
def _get_relation_information(self, row: agate.Row) -> RelationInfo: | ||
"""relation info was fetched with SHOW TABLES EXTENDED""" | ||
try: | ||
_schema, name, _, information = row | ||
table_properties = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've changed this by directly separating the columns and properties when we parse the output. I think that makes more sense than splitting this later on because the |
||
columns = [] | ||
_schema, name, _, information_blob = row | ||
for line in information_blob.split("\n"): | ||
if line: | ||
if " |--" in line: | ||
# A column | ||
match = self.INFORMATION_COLUMN_REGEX.match(line) | ||
if match: | ||
columns.append((match[1], match[2])) | ||
else: | ||
logger.warning(f"Could not parse column: {line}") | ||
else: | ||
# A property | ||
parts = line.split(": ", maxsplit=2) | ||
if len(parts) == 2: | ||
table_properties[parts[0]] = parts[1] | ||
else: | ||
logger.warning(f"Found invalid property: {line}") | ||
|
||
except ValueError: | ||
raise dbt.exceptions.DbtRuntimeError( | ||
f'Invalid value from "show tables extended ...", got {len(row)} values, expected 4' | ||
) | ||
|
||
return _schema, name, information | ||
return RelationInfo(_schema, name, columns, table_properties) | ||
|
||
def _parse_describe_table_extended( | ||
self, table_results: agate.Table | ||
) -> Tuple[List[Tuple[str, str]], Dict[str, str]]: | ||
# Wrap it in an iter, so we continue reading the properties from where we stopped reading columns | ||
table_results_itr = iter(table_results) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤩 |
||
|
||
# First the columns | ||
columns = [] | ||
for info_row in table_results_itr: | ||
if info_row[0] is None or info_row[0] == "" or info_row[0].startswith("#"): | ||
break | ||
columns.append((info_row[0], str(info_row[1]))) | ||
|
||
# Next all the properties | ||
table_properties = {} | ||
for info_row in table_results_itr: | ||
info_type, info_value = info_row[:2] | ||
if info_type is not None and not info_type.startswith("#") and info_type != "": | ||
table_properties[info_type] = str(info_value) | ||
|
||
return columns, table_properties | ||
|
||
def _get_relation_information_using_describe(self, row: agate.Row) -> RelationInfo: | ||
"""Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement""" | ||
|
@@ -164,41 +210,37 @@ def _get_relation_information_using_describe(self, row: agate.Row) -> RelationIn | |
logger.debug(f"Error while retrieving information about {table_name}: {e.msg}") | ||
table_results = AttrDict() | ||
|
||
information = "" | ||
for info_row in table_results: | ||
info_type, info_value, _ = info_row | ||
if not info_type.startswith("#"): | ||
information += f"{info_type}: {info_value}\n" | ||
|
||
return _schema, name, information | ||
columns, table_properties = self._parse_describe_table_extended(table_results) | ||
return RelationInfo(_schema, name, columns, table_properties) | ||
|
||
def _build_spark_relation_list( | ||
self, | ||
row_list: agate.Table, | ||
relation_info_func: Callable[[agate.Row], RelationInfo], | ||
) -> List[BaseRelation]: | ||
"""Aggregate relations with format metadata included.""" | ||
relations = [] | ||
relations: List[BaseRelation] = [] | ||
for row in row_list: | ||
_schema, name, information = relation_info_func(row) | ||
relation = relation_info_func(row) | ||
|
||
rel_type: RelationType = ( | ||
RelationType.View if "Type: VIEW" in information else RelationType.Table | ||
RelationType.View | ||
if relation.properties.get("Type") == "VIEW" | ||
else RelationType.Table | ||
) | ||
is_delta: bool = "Provider: delta" in information | ||
is_hudi: bool = "Provider: hudi" in information | ||
is_iceberg: bool = "Provider: iceberg" in information | ||
|
||
relation: BaseRelation = self.Relation.create( | ||
schema=_schema, | ||
identifier=name, | ||
type=rel_type, | ||
information=information, | ||
is_delta=is_delta, | ||
is_iceberg=is_iceberg, | ||
is_hudi=is_hudi, | ||
|
||
relations.append( | ||
self.Relation.create( | ||
schema=relation.table_schema, | ||
identifier=relation.table_name, | ||
type=rel_type, | ||
is_delta=relation.properties.get("Provider") == "delta", | ||
is_iceberg=relation.properties.get("Provider") == "iceberg", | ||
is_hudi=relation.properties.get("Provider") == "hudi", | ||
columns=relation.columns, | ||
properties=relation.properties, | ||
) | ||
) | ||
relations.append(relation) | ||
|
||
return relations | ||
|
||
|
@@ -248,80 +290,54 @@ def get_relation(self, database: str, schema: str, identifier: str) -> Optional[ | |
|
||
return super().get_relation(database, schema, identifier) | ||
|
||
def parse_describe_extended( | ||
self, relation: BaseRelation, raw_rows: AttrDict | ||
) -> List[SparkColumn]: | ||
# Convert the Row to a dict | ||
dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] | ||
# Find the separator between the rows and the metadata provided | ||
# by the DESCRIBE TABLE EXTENDED statement | ||
pos = self.find_table_information_separator(dict_rows) | ||
|
||
# Remove rows that start with a hash, they are comments | ||
rows = [row for row in raw_rows[0:pos] if not row["col_name"].startswith("#")] | ||
metadata = {col["col_name"]: col["data_type"] for col in raw_rows[pos + 1 :]} | ||
def get_columns_in_relation(self, relation: BaseRelation) -> List[SparkColumn]: | ||
assert isinstance(relation, SparkRelation) | ||
if relation.columns is not None and len(relation.columns) > 0: | ||
columns = relation.columns | ||
properties = relation.properties | ||
else: | ||
try: | ||
describe_extended_result = self.execute_macro( | ||
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} | ||
) | ||
columns, properties = self._parse_describe_table_extended(describe_extended_result) | ||
except dbt.exceptions.DbtRuntimeError as e: | ||
# spark would throw error when table doesn't exist, where other | ||
# CDW would just return and empty list, normalizing the behavior here | ||
errmsg = getattr(e, "msg", "") | ||
found_msgs = (msg in errmsg for msg in TABLE_OR_VIEW_NOT_FOUND_MESSAGES) | ||
if any(found_msgs): | ||
columns = [] | ||
properties = {} | ||
else: | ||
raise e | ||
|
||
raw_table_stats = metadata.get(KEY_TABLE_STATISTICS) | ||
# Convert the Row to a dict | ||
raw_table_stats = properties.get(KEY_TABLE_STATISTICS) | ||
table_stats = SparkColumn.convert_table_stats(raw_table_stats) | ||
return [ | ||
SparkColumn( | ||
table_database=None, | ||
table_schema=relation.schema, | ||
table_name=relation.name, | ||
table_type=relation.type, | ||
table_owner=str(metadata.get(KEY_TABLE_OWNER)), | ||
table_owner=properties.get(KEY_TABLE_OWNER, ""), | ||
table_stats=table_stats, | ||
column=column["col_name"], | ||
column=column_name, | ||
column_index=idx, | ||
dtype=column["data_type"], | ||
dtype=column_type, | ||
) | ||
for idx, column in enumerate(rows) | ||
for idx, (column_name, column_type) in enumerate(columns) | ||
if column_name not in self.HUDI_METADATA_COLUMNS | ||
] | ||
|
||
@staticmethod | ||
def find_table_information_separator(rows: List[dict]) -> int: | ||
pos = 0 | ||
for row in rows: | ||
if not row["col_name"] or row["col_name"].startswith("#"): | ||
break | ||
pos += 1 | ||
return pos | ||
|
||
def get_columns_in_relation(self, relation: BaseRelation) -> List[SparkColumn]: | ||
columns = [] | ||
try: | ||
rows: AttrDict = self.execute_macro( | ||
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} | ||
) | ||
columns = self.parse_describe_extended(relation, rows) | ||
except dbt.exceptions.DbtRuntimeError as e: | ||
# spark would throw error when table doesn't exist, where other | ||
# CDW would just return and empty list, normalizing the behavior here | ||
errmsg = getattr(e, "msg", "") | ||
found_msgs = (msg in errmsg for msg in TABLE_OR_VIEW_NOT_FOUND_MESSAGES) | ||
if any(found_msgs): | ||
pass | ||
else: | ||
raise e | ||
|
||
# strip hudi metadata columns. | ||
columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] | ||
return columns | ||
|
||
def parse_columns_from_information(self, relation: BaseRelation) -> List[SparkColumn]: | ||
if hasattr(relation, "information"): | ||
information = relation.information or "" | ||
else: | ||
information = "" | ||
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, information) | ||
owner = owner_match[0] if owner_match else None | ||
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, information) | ||
def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkColumn]: | ||
owner = relation.properties.get(KEY_TABLE_OWNER, "") | ||
columns = [] | ||
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, information) | ||
raw_table_stats = stats_match[0] if stats_match else None | ||
table_stats = SparkColumn.convert_table_stats(raw_table_stats) | ||
for match_num, match in enumerate(matches): | ||
column_name, column_type, nullable = match.groups() | ||
table_stats = SparkColumn.convert_table_stats( | ||
relation.properties.get(KEY_TABLE_STATISTICS) | ||
) | ||
for match_num, (column_name, column_type) in enumerate(relation.columns): | ||
column = SparkColumn( | ||
table_database=None, | ||
table_schema=relation.schema, | ||
|
@@ -337,7 +353,7 @@ def parse_columns_from_information(self, relation: BaseRelation) -> List[SparkCo | |
return columns | ||
|
||
def _get_columns_for_catalog(self, relation: BaseRelation) -> Iterable[Dict[str, Any]]: | ||
columns = self.parse_columns_from_information(relation) | ||
columns = self.parse_columns_from_information(relation) # type: ignore | ||
|
||
for column in columns: | ||
# convert SparkColumns into catalog dicts | ||
|
@@ -355,7 +371,7 @@ def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]] | |
) | ||
|
||
with executor(self.config) as tpe: | ||
futures: List[Future[agate.Table]] = [] | ||
futures: List[Future[agate.Table]] = [] # type: ignore | ||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for info, schemas in schema_map.items(): | ||
for schema in schemas: | ||
futures.append( | ||
|
@@ -410,13 +426,15 @@ def get_rows_different_sql( | |
""" | ||
# This method only really exists for test reasons. | ||
names: List[str] | ||
if column_names is None: | ||
if not column_names: | ||
columns = self.get_columns_in_relation(relation_a) | ||
names = sorted((self.quote(c.name) for c in columns)) | ||
else: | ||
names = sorted((self.quote(n) for n in column_names)) | ||
columns_csv = ", ".join(names) | ||
|
||
assert columns_csv, f"Could not determine columns for: {relation_a}" | ||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
sql = COLUMNS_EQUAL_SQL.format( | ||
columns=columns_csv, | ||
relation_a=str(relation_a), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
from typing import Optional, TypeVar | ||
from typing import Optional, TypeVar, List, Tuple, Dict | ||
from dataclasses import dataclass, field | ||
|
||
from dbt.adapters.base.relation import BaseRelation, Policy | ||
|
@@ -33,8 +33,8 @@ class SparkRelation(BaseRelation): | |
is_delta: Optional[bool] = None | ||
is_hudi: Optional[bool] = None | ||
is_iceberg: Optional[bool] = None | ||
# TODO: make this a dict everywhere | ||
information: Optional[str] = None | ||
columns: List[Tuple[str, str]] = field(default_factory=list) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this! Much better than the |
||
properties: Dict[str, str] = field(default_factory=dict) | ||
|
||
def __post_init__(self) -> None: | ||
if self.database != self.schema and self.database: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko : Could you extend the docstring to explain that the
SHOW TABLES EXTENDED
is preferred because fetching multiple tables at once is faster than fetching tables one by one. And, that we except the downside of parsing the|---
string given the performance gains?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly. I've added a docstring. Let me know what you think