Skip to content

Commit

Permalink
Merge pull request #82 from nurikk/DA-917-dbt-osmosis-add-support-for…
Browse files Browse the repository at this point in the history
…-data-type-retrieval

Add support for data_type
  • Loading branch information
z3z1ma authored Aug 27, 2023
2 parents 0ae6aa8 + 2ab3f0e commit 071aa77
Showing 1 changed file with 39 additions and 12 deletions.
51 changes: 39 additions & 12 deletions src/dbt_osmosis/core/osmosis.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
import re
import json
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor, wait
from enum import Enum
from functools import lru_cache
from itertools import chain
from pathlib import Path
Expand All @@ -20,6 +20,7 @@
)

import ruamel.yaml
from dbt.contracts.results import ColumnMetadata
from pydantic import BaseModel

from dbt_osmosis.core.exceptions import (
Expand Down Expand Up @@ -321,10 +322,15 @@ def augment_existing_model(
)
return documentation

@lru_cache(maxsize=5000)
def get_columns(self, parts: Tuple[str, str, str]) -> List[str]:
"""Get all columns in a list for a model"""

return list(self.get_columns_meta(parts).keys())

@lru_cache(maxsize=5000)
def get_columns_meta(self, parts: Tuple[str, str, str]) -> Dict[str, ColumnMetadata]:
"""Get all columns in a list for a model"""
columns = OrderedDict()
# If we provide a catalog, we read from it
if self.catalog_file:
file_path = Path(self.catalog_file)
Expand All @@ -336,15 +342,16 @@ def get_columns(self, parts: Tuple[str, str, str]) -> List[str]:
if model.split(".")[-1] == parts[-1]
]
if matching_models:
return [col.lower() for col in matching_models[0]["columns"].keys()]
for col in matching_models[0]["columns"].values():
columns[self.column_casing(col['name'])] = ColumnMetadata(**col, name=self.column_casing(col['name']))
else:
return []
return columns

# If we don't provide a catalog we query the warehouse to get the columns
else:
with self.adapter.connection_named("dbt-osmosis"):
table = self.adapter.get_relation(*parts)
columns = []

if not table:
logger().info(
(
Expand All @@ -356,9 +363,10 @@ def get_columns(self, parts: Tuple[str, str, str]) -> List[str]:
return columns
try:
for c in self.adapter.get_columns_in_relation(table):
columns.append(self.column_casing(c.name))
columns[self.column_casing(c.name)] = ColumnMetadata(name=self.column_casing(c.name), type=c.dtype, index=None)
if hasattr(c, "flatten"):
columns.extend([self.column_casing(exp.name) for exp in c.flatten()])
for exp in c.flatten():
columns[self.column_casing(exp.name)] = ColumnMetadata(name=self.column_casing(exp.name), type=c.dtype, index=None)
except Exception as error:
logger().info(
(
Expand All @@ -368,7 +376,7 @@ def get_columns(self, parts: Tuple[str, str, str]) -> List[str]:
*parts,
str(error),
)
return columns
return columns

def bootstrap_sources(self) -> None:
"""Bootstrap sources from the dbt-osmosis vars config"""
Expand Down Expand Up @@ -410,6 +418,9 @@ def bootstrap_sources(self) -> None:
"description": getattr(
exp, "description", getattr(c, "description", "")
),
"data_type": getattr(
exp, "dtype", getattr(c, "dtype", "")
)
}
for c in self.adapter.get_columns_in_relation(relation)
for exp in getattr(c, "flatten", lambda: [c])()
Expand Down Expand Up @@ -824,6 +835,7 @@ def _run(self, unique_id, node, schema_map, force_inheritance=False):
# Build Sets
logger().info(":mag: Resolving columns in database")
database_columns_ordered = self.get_columns(self.get_database_parts(node))
columns_db_meta = self.get_columns_meta(self.get_database_parts(node))
database_columns: Set[str] = set(database_columns_ordered)
yaml_columns_ordered = [column for column in node.columns]
yaml_columns: Set[str] = set(yaml_columns_ordered)
Expand Down Expand Up @@ -882,6 +894,7 @@ def _run(self, unique_id, node, schema_map, force_inheritance=False):
extra_columns,
node,
section,
columns_db_meta
)
if n_cols_added + n_cols_doc_inherited + n_cols_removed > 0:
should_dump = True
Expand Down Expand Up @@ -980,6 +993,7 @@ def update_undocumented_columns_with_prior_knowledge(
undocumented_columns: Iterable[str],
node: ManifestNode,
yaml_file_model_section: Dict[str, Any],
columns_db_meta: Dict[str, ColumnMetadata]
) -> int:
"""Update undocumented columns with prior knowledge in node and model simultaneously
THIS MUTATES THE NODE AND MODEL OBJECTS so that state is always accurate"""
Expand Down Expand Up @@ -1017,21 +1031,33 @@ def update_undocumented_columns_with_prior_knowledge(
node.unique_id,
)
logger().info(prior_knowledge)
for column in undocumented_columns:
cased_column_name = self.column_casing(column)
if cased_column_name in node.columns and not node.columns[cased_column_name].data_type:
if columns_db_meta.get(cased_column_name):
node.columns[cased_column_name].data_type = columns_db_meta.get(cased_column_name).type
for model_column in yaml_file_model_section["columns"]:
if self.column_casing(model_column["name"]) == cased_column_name:
model_column.update({
"data_type": columns_db_meta.get(cased_column_name).type
})
changes_committed += 1
return changes_committed

@staticmethod
def add_missing_cols_to_node_and_model(
missing_columns: Iterable,
node: ManifestNode,
yaml_file_model_section: Dict[str, Any],
columns_db_meta: Dict[str, ColumnMetadata]
) -> int:
"""Add missing columns to node and model simultaneously
THIS MUTATES THE NODE AND MODEL OBJECTS so that state is always accurate"""
changes_committed = 0
for column in missing_columns:
node.columns[column] = ColumnInfo.from_dict({"name": column, "description": ""})
node.columns[column] = ColumnInfo.from_dict({"name": column, "description": "", "data_type": columns_db_meta[column].type})
yaml_file_model_section.setdefault("columns", []).append(
{"name": column, "description": ""}
{"name": column, "data_type": columns_db_meta[column].type, "description": ""}
)
changes_committed += 1
logger().info(
Expand All @@ -1046,13 +1072,14 @@ def update_schema_file_and_node(
extra_columns: Iterable[str],
node: ManifestNode,
section: Dict[str, Any],
columns_db_meta: Dict[str, ColumnMetadata]
) -> Tuple[int, int, int]:
"""Take action on a schema file mirroring changes in the node."""
logger().info(":microscope: Looking for actions for %s", node.unique_id)
if not self.skip_add_columns:
n_cols_added = self.add_missing_cols_to_node_and_model(missing_columns, node, section)
n_cols_added = self.add_missing_cols_to_node_and_model(missing_columns, node, section, columns_db_meta)
n_cols_doc_inherited = self.update_undocumented_columns_with_prior_knowledge(
undocumented_columns, node, section
undocumented_columns, node, section, columns_db_meta
)
n_cols_removed = self.remove_columns_not_in_database(extra_columns, node, section)
return n_cols_added, n_cols_doc_inherited, n_cols_removed
Expand Down

0 comments on commit 071aa77

Please sign in to comment.