From 19f732762fff34a75a96c731bebd6ad61de07755 Mon Sep 17 00:00:00 2001 From: Ainur Timerbaev Date: Thu, 24 Aug 2023 14:31:42 +0100 Subject: [PATCH 1/7] Add support for data_type --- src/dbt_osmosis/core/osmosis.py | 50 +++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/dbt_osmosis/core/osmosis.py b/src/dbt_osmosis/core/osmosis.py index 3b56343..292dcd0 100644 --- a/src/dbt_osmosis/core/osmosis.py +++ b/src/dbt_osmosis/core/osmosis.py @@ -2,7 +2,6 @@ import re import json from concurrent.futures import ThreadPoolExecutor, wait -from enum import Enum from functools import lru_cache from itertools import chain from pathlib import Path @@ -20,6 +19,7 @@ ) import ruamel.yaml +from dbt.contracts.results import ColumnMetadata from pydantic import BaseModel from dbt_osmosis.core.exceptions import ( @@ -321,10 +321,15 @@ def augment_existing_model( ) return documentation - @lru_cache(maxsize=5000) def get_columns(self, parts: Tuple[str, str, str]) -> List[str]: """Get all columns in a list for a model""" + return list(self.get_columns_meta(parts).keys()) + + @lru_cache(maxsize=5000) + def get_columns_meta(self, parts: Tuple[str, str, str]) -> Dict[str, ColumnMetadata]: + """Get all columns in a list for a model""" + # If we provide a catalog, we read from it if self.catalog_file: file_path = Path(self.catalog_file) @@ -336,15 +341,15 @@ def get_columns(self, parts: Tuple[str, str, str]) -> List[str]: if model.split(".")[-1] == parts[-1] ] if matching_models: - return [col.lower() for col in matching_models[0]["columns"].keys()] + return {self.column_casing(col['name']): ColumnMetadata(**col) for col in matching_models[0]["columns"].values()} else: - return [] + return {} # If we don't provide a catalog we query the warehouse to get the columns else: with self.adapter.connection_named("dbt-osmosis"): table = self.adapter.get_relation(*parts) - columns = [] + columns = {} if not table: logger().info( ( @@ -355,10 +360,11 @@ def get_columns(self, parts: Tuple[str, str, str]) -> List[str]: ) return columns try: - for c in self.adapter.get_columns_in_relation(table): - columns.append(self.column_casing(c.name)) + for col_idx, c in enumerate(self.adapter.get_columns_in_relation(table)): + columns[self.column_casing(c.name)] = ColumnMetadata(name=self.column_casing(c.name), type=c.dtype, index=col_idx) if hasattr(c, "flatten"): - columns.extend([self.column_casing(exp.name) for exp in c.flatten()]) + for exp_idx, exp in enumerate(c.flatten()): + columns[self.column_casing(exp.name)] = ColumnMetadata(name=self.column_casing(exp.name), type=c.dtype, index=exp_idx) except Exception as error: logger().info( ( @@ -410,6 +416,9 @@ def bootstrap_sources(self) -> None: "description": getattr( exp, "description", getattr(c, "description", "") ), + "data_type": getattr( + exp, "dtype", getattr(c, "dtype", "") + ) } for c in self.adapter.get_columns_in_relation(relation) for exp in getattr(c, "flatten", lambda: [c])() @@ -824,6 +833,7 @@ def _run(self, unique_id, node, schema_map, force_inheritance=False): # Build Sets logger().info(":mag: Resolving columns in database") database_columns_ordered = self.get_columns(self.get_database_parts(node)) + columns_db_meta = self.get_columns_meta(self.get_database_parts(node)) database_columns: Set[str] = set(database_columns_ordered) yaml_columns_ordered = [column for column in node.columns] yaml_columns: Set[str] = set(yaml_columns_ordered) @@ -882,6 +892,7 @@ def _run(self, unique_id, node, schema_map, force_inheritance=False): extra_columns, node, section, + columns_db_meta ) if n_cols_added + n_cols_doc_inherited + n_cols_removed > 0: should_dump = True @@ -941,6 +952,8 @@ def _sort_columns(column_info: dict) -> int: ) except Exception as e: with self.mutex: + import traceback + print(traceback.format_exc()) logger().error("Error occurred while processing model %s: %s", unique_id, e) raise e @@ -980,6 +993,7 @@ def update_undocumented_columns_with_prior_knowledge( undocumented_columns: Iterable[str], node: ManifestNode, yaml_file_model_section: Dict[str, Any], + columns_db_meta: Dict[str, ColumnMetadata] ) -> int: """Update undocumented columns with prior knowledge in node and model simultaneously THIS MUTATES THE NODE AND MODEL OBJECTS so that state is always accurate""" @@ -1017,6 +1031,16 @@ def update_undocumented_columns_with_prior_knowledge( node.unique_id, ) logger().info(prior_knowledge) + for column in undocumented_columns: + cased_column_name = self.column_casing(column) + if cased_column_name in node.columns and not node.columns[cased_column_name].data_type: + if columns_db_meta.get(cased_column_name): + node.columns[cased_column_name].data_type = columns_db_meta.get(cased_column_name).type + for model_column in yaml_file_model_section["columns"]: + if self.column_casing(model_column["name"]) == cased_column_name: + model_column.update({ + "data_type": columns_db_meta.get(cased_column_name).type + }) return changes_committed @staticmethod @@ -1024,14 +1048,15 @@ def add_missing_cols_to_node_and_model( missing_columns: Iterable, node: ManifestNode, yaml_file_model_section: Dict[str, Any], + columns_db_meta: Dict[str, ColumnMetadata] ) -> int: """Add missing columns to node and model simultaneously THIS MUTATES THE NODE AND MODEL OBJECTS so that state is always accurate""" changes_committed = 0 for column in missing_columns: - node.columns[column] = ColumnInfo.from_dict({"name": column, "description": ""}) + node.columns[column] = ColumnInfo.from_dict({"name": column, "description": "", "data_type": columns_db_meta[column].type}) yaml_file_model_section.setdefault("columns", []).append( - {"name": column, "description": ""} + {"name": column, "data_type": columns_db_meta[column].type, "description": ""} ) changes_committed += 1 logger().info( @@ -1046,13 +1071,14 @@ def update_schema_file_and_node( extra_columns: Iterable[str], node: ManifestNode, section: Dict[str, Any], + columns_db_meta: Dict[str, ColumnMetadata] ) -> Tuple[int, int, int]: """Take action on a schema file mirroring changes in the node.""" logger().info(":microscope: Looking for actions for %s", node.unique_id) if not self.skip_add_columns: - n_cols_added = self.add_missing_cols_to_node_and_model(missing_columns, node, section) + n_cols_added = self.add_missing_cols_to_node_and_model(missing_columns, node, section, columns_db_meta=columns_db_meta) n_cols_doc_inherited = self.update_undocumented_columns_with_prior_knowledge( - undocumented_columns, node, section + undocumented_columns, node, section, columns_db_meta ) n_cols_removed = self.remove_columns_not_in_database(extra_columns, node, section) return n_cols_added, n_cols_doc_inherited, n_cols_removed From 8a828c6e75d1e5d680090076548d8842132179ae Mon Sep 17 00:00:00 2001 From: Ainur Timerbaev Date: Thu, 24 Aug 2023 14:33:01 +0100 Subject: [PATCH 2/7] Rm junk --- src/dbt_osmosis/core/osmosis.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/dbt_osmosis/core/osmosis.py b/src/dbt_osmosis/core/osmosis.py index 292dcd0..afc6db0 100644 --- a/src/dbt_osmosis/core/osmosis.py +++ b/src/dbt_osmosis/core/osmosis.py @@ -952,8 +952,6 @@ def _sort_columns(column_info: dict) -> int: ) except Exception as e: with self.mutex: - import traceback - print(traceback.format_exc()) logger().error("Error occurred while processing model %s: %s", unique_id, e) raise e From 566ebad79b836c1539835cc29f6fe5661b582970 Mon Sep 17 00:00:00 2001 From: Ainur Timerbaev Date: Thu, 24 Aug 2023 14:37:47 +0100 Subject: [PATCH 3/7] Use ordered dict --- src/dbt_osmosis/core/osmosis.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/dbt_osmosis/core/osmosis.py b/src/dbt_osmosis/core/osmosis.py index afc6db0..79f254d 100644 --- a/src/dbt_osmosis/core/osmosis.py +++ b/src/dbt_osmosis/core/osmosis.py @@ -1,6 +1,7 @@ import os import re import json +from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor, wait from functools import lru_cache from itertools import chain @@ -329,7 +330,7 @@ def get_columns(self, parts: Tuple[str, str, str]) -> List[str]: @lru_cache(maxsize=5000) def get_columns_meta(self, parts: Tuple[str, str, str]) -> Dict[str, ColumnMetadata]: """Get all columns in a list for a model""" - + columns = OrderedDict() # If we provide a catalog, we read from it if self.catalog_file: file_path = Path(self.catalog_file) @@ -341,15 +342,16 @@ def get_columns_meta(self, parts: Tuple[str, str, str]) -> Dict[str, ColumnMetad if model.split(".")[-1] == parts[-1] ] if matching_models: - return {self.column_casing(col['name']): ColumnMetadata(**col) for col in matching_models[0]["columns"].values()} + for col in matching_models[0]["columns"].values(): + columns[self.column_casing(col['name'])] = ColumnMetadata(**col) else: - return {} + return columns # If we don't provide a catalog we query the warehouse to get the columns else: with self.adapter.connection_named("dbt-osmosis"): table = self.adapter.get_relation(*parts) - columns = {} + if not table: logger().info( ( From 7cc79a66d5c94cda842104b956604f301b3ede83 Mon Sep 17 00:00:00 2001 From: Ainur Timerbaev Date: Thu, 24 Aug 2023 14:47:42 +0100 Subject: [PATCH 4/7] Fix keys handling --- src/dbt_osmosis/core/osmosis.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbt_osmosis/core/osmosis.py b/src/dbt_osmosis/core/osmosis.py index 79f254d..3cbe539 100644 --- a/src/dbt_osmosis/core/osmosis.py +++ b/src/dbt_osmosis/core/osmosis.py @@ -376,7 +376,7 @@ def get_columns_meta(self, parts: Tuple[str, str, str]) -> Dict[str, ColumnMetad *parts, str(error), ) - return columns + return columns def bootstrap_sources(self) -> None: """Bootstrap sources from the dbt-osmosis vars config""" @@ -1041,6 +1041,7 @@ def update_undocumented_columns_with_prior_knowledge( model_column.update({ "data_type": columns_db_meta.get(cased_column_name).type }) + changes_committed += 1 return changes_committed @staticmethod From 1621c02975603d6f0f587b668b22596694f0b8b3 Mon Sep 17 00:00:00 2001 From: Ainur Timerbaev Date: Thu, 24 Aug 2023 15:17:10 +0100 Subject: [PATCH 5/7] Remove named arg --- src/dbt_osmosis/core/osmosis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbt_osmosis/core/osmosis.py b/src/dbt_osmosis/core/osmosis.py index 3cbe539..c04b80a 100644 --- a/src/dbt_osmosis/core/osmosis.py +++ b/src/dbt_osmosis/core/osmosis.py @@ -1077,7 +1077,7 @@ def update_schema_file_and_node( """Take action on a schema file mirroring changes in the node.""" logger().info(":microscope: Looking for actions for %s", node.unique_id) if not self.skip_add_columns: - n_cols_added = self.add_missing_cols_to_node_and_model(missing_columns, node, section, columns_db_meta=columns_db_meta) + n_cols_added = self.add_missing_cols_to_node_and_model(missing_columns, node, section, columns_db_meta) n_cols_doc_inherited = self.update_undocumented_columns_with_prior_knowledge( undocumented_columns, node, section, columns_db_meta ) From 939f65e0ac91e2f46dda7b4f7c1b9f3b79206b67 Mon Sep 17 00:00:00 2001 From: Ainur Timerbaev Date: Thu, 24 Aug 2023 15:20:27 +0100 Subject: [PATCH 6/7] Fix/remove indexing --- src/dbt_osmosis/core/osmosis.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dbt_osmosis/core/osmosis.py b/src/dbt_osmosis/core/osmosis.py index c04b80a..3bb806f 100644 --- a/src/dbt_osmosis/core/osmosis.py +++ b/src/dbt_osmosis/core/osmosis.py @@ -362,11 +362,11 @@ def get_columns_meta(self, parts: Tuple[str, str, str]) -> Dict[str, ColumnMetad ) return columns try: - for col_idx, c in enumerate(self.adapter.get_columns_in_relation(table)): - columns[self.column_casing(c.name)] = ColumnMetadata(name=self.column_casing(c.name), type=c.dtype, index=col_idx) + for c in self.adapter.get_columns_in_relation(table): + columns[self.column_casing(c.name)] = ColumnMetadata(name=self.column_casing(c.name), type=c.dtype, index=None) if hasattr(c, "flatten"): - for exp_idx, exp in enumerate(c.flatten()): - columns[self.column_casing(exp.name)] = ColumnMetadata(name=self.column_casing(exp.name), type=c.dtype, index=exp_idx) + for exp in c.flatten(): + columns[self.column_casing(exp.name)] = ColumnMetadata(name=self.column_casing(exp.name), type=c.dtype, index=None) except Exception as error: logger().info( ( From 2ab3f0efe022e45fd73e153ae6162bd88e273ed5 Mon Sep 17 00:00:00 2001 From: Ainur Timerbaev Date: Fri, 25 Aug 2023 12:31:41 +0100 Subject: [PATCH 7/7] Add column_casing for ColumnMetadata creation --- src/dbt_osmosis/core/osmosis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbt_osmosis/core/osmosis.py b/src/dbt_osmosis/core/osmosis.py index 3bb806f..883c837 100644 --- a/src/dbt_osmosis/core/osmosis.py +++ b/src/dbt_osmosis/core/osmosis.py @@ -343,7 +343,7 @@ def get_columns_meta(self, parts: Tuple[str, str, str]) -> Dict[str, ColumnMetad ] if matching_models: for col in matching_models[0]["columns"].values(): - columns[self.column_casing(col['name'])] = ColumnMetadata(**col) + columns[self.column_casing(col['name'])] = ColumnMetadata(**col, name=self.column_casing(col['name'])) else: return columns