From d705c8308172de9977a635679340ba99fafe89e8 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 13:44:20 +0200 Subject: [PATCH 01/42] Add max_table_nesting to resource decorator --- dlt/extract/decorators.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index bc85cb4a03..d3db7f3e27 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -286,6 +286,7 @@ def resource( /, name: str = None, table_name: TTableHintTemplate[str] = None, + max_table_nesting: int = None, write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None, columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, @@ -304,6 +305,7 @@ def resource( /, name: str = None, table_name: TTableHintTemplate[str] = None, + max_table_nesting: int = None, write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None, columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, @@ -322,6 +324,7 @@ def resource( /, name: TTableHintTemplate[str] = None, table_name: TTableHintTemplate[str] = None, + max_table_nesting: int = None, write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None, columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, @@ -341,6 +344,7 @@ def resource( /, name: str = None, table_name: TTableHintTemplate[str] = None, + max_table_nesting: int = None, write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None, columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, @@ -358,6 +362,7 @@ def resource( /, name: TTableHintTemplate[str] = None, table_name: TTableHintTemplate[str] = None, + max_table_nesting: int = None, write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None, columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, @@ -398,6 +403,7 @@ def resource( If not present, the name of the decorated function will be used. table_name (TTableHintTemplate[str], optional): An table name, if different from `name`. + max_table_nesting (int, optional): A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. write_disposition (TTableHintTemplate[TWriteDispositionConfig], optional): Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. @@ -442,6 +448,7 @@ def make_resource( ) -> DltResource: table_template = make_hints( table_name, + max_table_nesting=max_table_nesting, write_disposition=write_disposition or DEFAULT_WRITE_DISPOSITION, columns=columns, primary_key=primary_key, @@ -449,6 +456,7 @@ def make_resource( schema_contract=schema_contract, table_format=table_format, ) + resource = DltResource.from_data( _data, _name, From 981b7c9fa2df31d30d9841353b2f726d505f73a9 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 15:51:08 +0200 Subject: [PATCH 02/42] Handle max_table_nesting in normalizer --- dlt/common/normalizers/json/relational.py | 6 ++++++ dlt/extract/decorators.py | 4 +++- tests/common/normalizers/test_max_nesting.py | 18 ++++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 tests/common/normalizers/test_max_nesting.py diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index da38ac60a7..6725bc4598 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -79,6 +79,12 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo # turn everything at the recursion level into complex type max_nesting = self.max_nesting schema = self.schema + if ( + max_table_nesting := schema.tables[table_name] + .get("x-normalizer", {}) + .get("max_nesting") + ): + max_nesting = max_table_nesting assert _r_lvl <= max_nesting if _r_lvl == max_nesting: diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index d3db7f3e27..f9c4738a41 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -448,7 +448,6 @@ def make_resource( ) -> DltResource: table_template = make_hints( table_name, - max_table_nesting=max_table_nesting, write_disposition=write_disposition or DEFAULT_WRITE_DISPOSITION, columns=columns, primary_key=primary_key, @@ -456,6 +455,9 @@ def make_resource( schema_contract=schema_contract, table_format=table_format, ) + if max_table_nesting: + table_template.setdefault("x-normalizer", {}) + table_template["x-normalizer"]["max_nesting"] = max_table_nesting resource = DltResource.from_data( _data, diff --git a/tests/common/normalizers/test_max_nesting.py b/tests/common/normalizers/test_max_nesting.py new file mode 100644 index 0000000000..9fd63e1dfb --- /dev/null +++ b/tests/common/normalizers/test_max_nesting.py @@ -0,0 +1,18 @@ +import dlt + +from dlt.common import json +from tests.common.utils import json_case_path + + +def test_resource_max_nesting(): + @dlt.resource(max_table_nesting=2) + def bot_events(): + with open(json_case_path("rasa_event_bot_metadata"), "rb") as f: + yield json.load(f) + + pipeline = dlt.pipeline( + pipeline_name="test_max_table_nesting", + destination="duckdb", + ) + + pipeline.run(bot_events, write_disposition="append") From 53bfd4fecc44f9c3723ea47b3d54d1310c26b42b Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 16:10:56 +0200 Subject: [PATCH 03/42] Use dict.get to retrieve table from schema --- dlt/common/normalizers/json/relational.py | 2 +- tests/common/normalizers/test_max_nesting.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 6725bc4598..754646b412 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -80,7 +80,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo max_nesting = self.max_nesting schema = self.schema if ( - max_table_nesting := schema.tables[table_name] + max_table_nesting := schema.tables.get(table_name, {}) .get("x-normalizer", {}) .get("max_nesting") ): diff --git a/tests/common/normalizers/test_max_nesting.py b/tests/common/normalizers/test_max_nesting.py index 9fd63e1dfb..0715ab2b4b 100644 --- a/tests/common/normalizers/test_max_nesting.py +++ b/tests/common/normalizers/test_max_nesting.py @@ -5,7 +5,7 @@ def test_resource_max_nesting(): - @dlt.resource(max_table_nesting=2) + @dlt.resource(max_table_nesting=1) def bot_events(): with open(json_case_path("rasa_event_bot_metadata"), "rb") as f: yield json.load(f) From 85c7f702a5ef29349646ec1862125c4b575eefb0 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 16:15:13 +0200 Subject: [PATCH 04/42] Use schema.get_table and format code --- dlt/common/normalizers/json/relational.py | 44 ++++++++++++++++------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 754646b412..7f3b60adaf 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -32,7 +32,7 @@ class TDataItemRow(TypedDict, total=False): class TDataItemRowRoot(TDataItemRow, total=False): - _dlt_load_id: str # load id to identify records loaded together that ie. need to be processed + _dlt_load_id: (str) # load id to identify records loaded together that ie. need to be processed # _dlt_meta: TEventDLTMeta # stores metadata, should never be sent to the normalizer @@ -67,7 +67,9 @@ def __init__(self, schema: Schema) -> None: self._reset() def _reset(self) -> None: - self.normalizer_config = self.schema._normalizers_config["json"].get("config") or {} # type: ignore + self.normalizer_config = ( + self.schema._normalizers_config["json"].get("config") or {} + ) # type: ignore self.propagation_config = self.normalizer_config.get("propagation", None) self.max_nesting = self.normalizer_config.get("max_nesting", 1000) self._skip_primary_key = {} @@ -79,11 +81,9 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo # turn everything at the recursion level into complex type max_nesting = self.max_nesting schema = self.schema - if ( - max_table_nesting := schema.tables.get(table_name, {}) - .get("x-normalizer", {}) - .get("max_nesting") - ): + table = schema.get_table(table_name) + max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") + if max_table_nesting: max_nesting = max_table_nesting assert _r_lvl <= max_nesting @@ -93,7 +93,6 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo # path = f"{table_name}▶{field_name}" # or use definition in the schema column: TColumnSchema = None - table = schema.tables.get(table_name) if table: column = table["columns"].get(field_name) if column is None: @@ -226,7 +225,13 @@ def _normalize_list( elif isinstance(v, list): # to normalize lists of lists, we must create a tracking intermediary table by creating a mock row yield from self._normalize_row( - {"list": v}, extend, ident_path, parent_path, parent_row_id, idx, _r_lvl + 1 + {"list": v}, + extend, + ident_path, + parent_path, + parent_row_id, + idx, + _r_lvl + 1, ) else: # list of simple types @@ -267,20 +272,29 @@ def _normalize_row( extend.update(self._get_propagated_values(table, flattened_row, _r_lvl)) # yield parent table first - should_descend = yield (table, schema.naming.shorten_fragments(*parent_path)), flattened_row + should_descend = yield ( + (table, schema.naming.shorten_fragments(*parent_path)), + flattened_row, + ) if should_descend is False: return # normalize and yield lists for list_path, list_content in lists.items(): yield from self._normalize_list( - list_content, extend, list_path, parent_path + ident_path, row_id, _r_lvl + 1 + list_content, + extend, + list_path, + parent_path + ident_path, + row_id, + _r_lvl + 1, ) def extend_schema(self) -> None: # validate config config = cast( - RelationalNormalizerConfig, self.schema._normalizers_config["json"].get("config") or {} + RelationalNormalizerConfig, + self.schema._normalizers_config["json"].get("config") or {}, ) DataItemNormalizer._validate_normalizer_config(self.schema, config) @@ -388,7 +402,11 @@ def _get_validity_column_names(schema: Schema, table_name: str) -> List[Optional @staticmethod @lru_cache(maxsize=None) def _dlt_id_is_row_hash(schema: Schema, table_name: str) -> bool: - return schema.get_table(table_name)["columns"].get("_dlt_id", dict()).get("x-row-version", False) # type: ignore[return-value] + return ( + schema.get_table(table_name)["columns"] # type: ignore[return-value] + .get("_dlt_id", dict()) + .get("x-row-version", False) + ) @staticmethod def _validate_validity_column_names( From 5ddc36048baedad5ca8bac77943c8baaf04f99ab Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 16:19:37 +0200 Subject: [PATCH 05/42] Fix mypy issues --- dlt/common/normalizers/json/relational.py | 6 +++--- dlt/extract/decorators.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 7f3b60adaf..2a906a682c 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -68,8 +68,8 @@ def __init__(self, schema: Schema) -> None: def _reset(self) -> None: self.normalizer_config = ( - self.schema._normalizers_config["json"].get("config") or {} - ) # type: ignore + self.schema._normalizers_config["json"].get("config") or {} # type: ignore + ) self.propagation_config = self.normalizer_config.get("propagation", None) self.max_nesting = self.normalizer_config.get("max_nesting", 1000) self._skip_primary_key = {} @@ -82,7 +82,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo max_nesting = self.max_nesting schema = self.schema table = schema.get_table(table_name) - max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") + max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] if max_table_nesting: max_nesting = max_table_nesting diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index f9c4738a41..7beb40924f 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -456,7 +456,7 @@ def make_resource( table_format=table_format, ) if max_table_nesting: - table_template.setdefault("x-normalizer", {}) + table_template.setdefault("x-normalizer", {}) # type: ignore[typeddict-item] table_template["x-normalizer"]["max_nesting"] = max_table_nesting resource = DltResource.from_data( From d401d97223977b33c270983b6831d4c20067b577 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 16:27:14 +0200 Subject: [PATCH 06/42] Fix mypy errors --- dlt/common/normalizers/json/relational.py | 2 +- dlt/extract/decorators.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 2a906a682c..e25b76b52a 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -68,7 +68,7 @@ def __init__(self, schema: Schema) -> None: def _reset(self) -> None: self.normalizer_config = ( - self.schema._normalizers_config["json"].get("config") or {} # type: ignore + self.schema._normalizers_config["json"].get("config") or {} # type: ignore[assignment] ) self.propagation_config = self.normalizer_config.get("propagation", None) self.max_nesting = self.normalizer_config.get("max_nesting", 1000) diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 7beb40924f..0b08d3fd67 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -457,7 +457,7 @@ def make_resource( ) if max_table_nesting: table_template.setdefault("x-normalizer", {}) # type: ignore[typeddict-item] - table_template["x-normalizer"]["max_nesting"] = max_table_nesting + table_template["x-normalizer"]["max_nesting"] = max_table_nesting # type: ignore[typeddict-item] resource = DltResource.from_data( _data, From 5b1ecc2b034b6c8f7f5438d41e0d26ef7e787205 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 18:38:40 +0200 Subject: [PATCH 07/42] Fix bugs and parametrize test --- dlt/common/normalizers/json/relational.py | 2 +- dlt/extract/decorators.py | 2 +- tests/common/normalizers/test_max_nesting.py | 61 ++++++++++++++++++-- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index e25b76b52a..8da81d51e6 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -83,7 +83,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo schema = self.schema table = schema.get_table(table_name) max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] - if max_table_nesting: + if max_table_nesting is not None: max_nesting = max_table_nesting assert _r_lvl <= max_nesting diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 0b08d3fd67..aae1bf5ba9 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -455,7 +455,7 @@ def make_resource( schema_contract=schema_contract, table_format=table_format, ) - if max_table_nesting: + if max_table_nesting is not None: table_template.setdefault("x-normalizer", {}) # type: ignore[typeddict-item] table_template["x-normalizer"]["max_nesting"] = max_table_nesting # type: ignore[typeddict-item] diff --git a/tests/common/normalizers/test_max_nesting.py b/tests/common/normalizers/test_max_nesting.py index 0715ab2b4b..42fab8d04f 100644 --- a/tests/common/normalizers/test_max_nesting.py +++ b/tests/common/normalizers/test_max_nesting.py @@ -1,18 +1,71 @@ +from typing import List + import dlt +import pytest from dlt.common import json from tests.common.utils import json_case_path +ALL_TABLES_FOR_RASA_EVENT = [ + "bot_events", + "bot_events__metadata__known_recipients", + "bot_events__metadata__transaction_history__spend__target", + "bot_events__metadata__transaction_history__spend__starbucks", + "bot_events__metadata__transaction_history__spend__amazon", + "bot_events__metadata__transaction_history__deposit__employer", + "bot_events__metadata__transaction_history__deposit__interest", + "bot_events__metadata__vendor_list", +] + -def test_resource_max_nesting(): - @dlt.resource(max_table_nesting=1) +@pytest.mark.parametrize( + "nesting_level,expected_num_tables,expected_table_names", + ( + (0, 1, ["bot_events"]), + (1, 1, ["bot_events"]), + (5, 8, ALL_TABLES_FOR_RASA_EVENT), + ( + 15, + 8, + ALL_TABLES_FOR_RASA_EVENT, + ), + ( + 25, + 8, + ALL_TABLES_FOR_RASA_EVENT, + ), + ( + 1000, + 8, + ALL_TABLES_FOR_RASA_EVENT, + ), + ), +) +def test_resource_max_nesting( + nesting_level: int, expected_num_tables: int, expected_table_names: List[str] +): + @dlt.resource(max_table_nesting=nesting_level) def bot_events(): + print("nesting_level", nesting_level) with open(json_case_path("rasa_event_bot_metadata"), "rb") as f: yield json.load(f) + assert "x-normalizer" in bot_events._hints + + pipeline_name = f"test_max_table_nesting_{nesting_level}_{expected_num_tables}" pipeline = dlt.pipeline( - pipeline_name="test_max_table_nesting", + pipeline_name=pipeline_name, destination="duckdb", + full_refresh=True, ) - pipeline.run(bot_events, write_disposition="append") + pipeline.run(bot_events) + assert pipeline.schemas.keys() + assert pipeline_name in pipeline.schema_names + + pipeline_schema = pipeline.schemas[pipeline_name] + assert len(pipeline_schema.data_table_names()) == expected_num_tables + + all_table_names = pipeline_schema.data_table_names() + for table_name in expected_table_names: + assert table_name in all_table_names From f42541288cc55f9fd7e72d5928138b33a11cccf2 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 18:46:03 +0200 Subject: [PATCH 08/42] Add one more test case --- tests/common/normalizers/test_max_nesting.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/common/normalizers/test_max_nesting.py b/tests/common/normalizers/test_max_nesting.py index 42fab8d04f..63a113a9f2 100644 --- a/tests/common/normalizers/test_max_nesting.py +++ b/tests/common/normalizers/test_max_nesting.py @@ -18,11 +18,19 @@ ] +ALL_TABLES_FOR_RASA_EVENT_NESTING_LEVEL_2 = [ + "bot_events", + "bot_events__metadata__known_recipients", + "bot_events__metadata__vendor_list", +] + + @pytest.mark.parametrize( "nesting_level,expected_num_tables,expected_table_names", ( (0, 1, ["bot_events"]), (1, 1, ["bot_events"]), + (2, 3, ALL_TABLES_FOR_RASA_EVENT_NESTING_LEVEL_2), (5, 8, ALL_TABLES_FOR_RASA_EVENT), ( 15, From f20d009db2335d241b7dc1387a6f4c2597708f59 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 18:47:48 +0200 Subject: [PATCH 09/42] Format code --- tests/common/normalizers/test_max_nesting.py | 26 ++++++-------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/tests/common/normalizers/test_max_nesting.py b/tests/common/normalizers/test_max_nesting.py index 63a113a9f2..8f359fd5f0 100644 --- a/tests/common/normalizers/test_max_nesting.py +++ b/tests/common/normalizers/test_max_nesting.py @@ -6,6 +6,9 @@ from dlt.common import json from tests.common.utils import json_case_path + +TOP_LEVEL_TABLES = ["bot_events"] + ALL_TABLES_FOR_RASA_EVENT = [ "bot_events", "bot_events__metadata__known_recipients", @@ -17,7 +20,6 @@ "bot_events__metadata__vendor_list", ] - ALL_TABLES_FOR_RASA_EVENT_NESTING_LEVEL_2 = [ "bot_events", "bot_events__metadata__known_recipients", @@ -28,25 +30,13 @@ @pytest.mark.parametrize( "nesting_level,expected_num_tables,expected_table_names", ( - (0, 1, ["bot_events"]), - (1, 1, ["bot_events"]), + (0, 1, TOP_LEVEL_TABLES), + (1, 1, TOP_LEVEL_TABLES), (2, 3, ALL_TABLES_FOR_RASA_EVENT_NESTING_LEVEL_2), (5, 8, ALL_TABLES_FOR_RASA_EVENT), - ( - 15, - 8, - ALL_TABLES_FOR_RASA_EVENT, - ), - ( - 25, - 8, - ALL_TABLES_FOR_RASA_EVENT, - ), - ( - 1000, - 8, - ALL_TABLES_FOR_RASA_EVENT, - ), + (15, 8, ALL_TABLES_FOR_RASA_EVENT), + (25, 8, ALL_TABLES_FOR_RASA_EVENT), + (1000, 8, ALL_TABLES_FOR_RASA_EVENT), ), ) def test_resource_max_nesting( From 23780fbee60d25e9d9ac47f97e72392a3bf2e761 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Thu, 18 Apr 2024 20:36:25 +0200 Subject: [PATCH 10/42] Get table from schema.tables --- dlt/common/normalizers/json/relational.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 8da81d51e6..d222098aa2 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -81,7 +81,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo # turn everything at the recursion level into complex type max_nesting = self.max_nesting schema = self.schema - table = schema.get_table(table_name) + table = schema.tables.get(table_name, {}) max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] if max_table_nesting is not None: max_nesting = max_table_nesting @@ -93,6 +93,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo # path = f"{table_name}▶{field_name}" # or use definition in the schema column: TColumnSchema = None + table = schema.get_table(table_name) if table: column = table["columns"].get(field_name) if column is None: From 937b375f91e69ef2f33aa601f634cafc46c31f08 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 10:59:03 +0200 Subject: [PATCH 11/42] Add comments and cleanup code --- dlt/common/normalizers/json/relational.py | 15 ++++++++++----- dlt/extract/decorators.py | 5 +++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index d222098aa2..c2f06573ef 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -81,25 +81,30 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo # turn everything at the recursion level into complex type max_nesting = self.max_nesting schema = self.schema - table = schema.tables.get(table_name, {}) - max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] - if max_table_nesting is not None: - max_nesting = max_table_nesting + table = schema.tables.get(table_name) + if table: + max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] + if max_table_nesting is not None: + import ipdb + + ipdb.set_trace() + max_nesting = max_table_nesting assert _r_lvl <= max_nesting if _r_lvl == max_nesting: return True + # use cached value # path = f"{table_name}▶{field_name}" # or use definition in the schema column: TColumnSchema = None - table = schema.get_table(table_name) if table: column = table["columns"].get(field_name) if column is None: data_type = schema.get_preferred_type(field_name) else: data_type = column["data_type"] + return data_type == "complex" def _flatten( diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index aae1bf5ba9..7230c48516 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -455,6 +455,11 @@ def make_resource( schema_contract=schema_contract, table_format=table_format, ) + + # If custom nesting level was specified then + # we need to add it to table hints so that + # later in normalizer dlt/common/normalizers/json/relational.py + # we can override max_nesting level for the given table if max_table_nesting is not None: table_template.setdefault("x-normalizer", {}) # type: ignore[typeddict-item] table_template["x-normalizer"]["max_nesting"] = max_table_nesting # type: ignore[typeddict-item] From 4c15cdc5805cf557a1561b16ee3bfa91c193f34a Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 10:59:52 +0200 Subject: [PATCH 12/42] Remove breakpoint --- dlt/common/normalizers/json/relational.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index c2f06573ef..e7060dd5c8 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -85,9 +85,6 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo if table: max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] if max_table_nesting is not None: - import ipdb - - ipdb.set_trace() max_nesting = max_table_nesting assert _r_lvl <= max_nesting From 8ed96c7190d9d86097b16224c3907f6e255c1de4 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 12:29:30 +0200 Subject: [PATCH 13/42] Add more tests --- dlt/common/normalizers/json/relational.py | 3 +- tests/common/normalizers/test_max_nesting.py | 111 ++++++++++++++++++- 2 files changed, 107 insertions(+), 7 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index e7060dd5c8..8a6e7aa27a 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -8,7 +8,6 @@ from dlt.common.typing import DictStrAny, DictStrStr, TDataItem, StrAny from dlt.common.schema import Schema from dlt.common.schema.typing import ( - TTableSchema, TColumnSchema, TColumnName, TSimpleRegex, @@ -407,7 +406,7 @@ def _get_validity_column_names(schema: Schema, table_name: str) -> List[Optional def _dlt_id_is_row_hash(schema: Schema, table_name: str) -> bool: return ( schema.get_table(table_name)["columns"] # type: ignore[return-value] - .get("_dlt_id", dict()) + .get("_dlt_id", {}) .get("x-row-version", False) ) diff --git a/tests/common/normalizers/test_max_nesting.py b/tests/common/normalizers/test_max_nesting.py index 8f359fd5f0..18ceb7539f 100644 --- a/tests/common/normalizers/test_max_nesting.py +++ b/tests/common/normalizers/test_max_nesting.py @@ -1,9 +1,11 @@ -from typing import List +from typing import Any, Dict, List import dlt import pytest from dlt.common import json +from pytest_mock.plugin import MockerFixture + from tests.common.utils import json_case_path @@ -27,6 +29,12 @@ ] +@pytest.fixture(scope="module") +def rasa_event_bot_metadata(): + with open(json_case_path("rasa_event_bot_metadata"), "rb") as f: + return json.load(f) + + @pytest.mark.parametrize( "nesting_level,expected_num_tables,expected_table_names", ( @@ -40,13 +48,14 @@ ), ) def test_resource_max_nesting( - nesting_level: int, expected_num_tables: int, expected_table_names: List[str] + nesting_level: int, + expected_num_tables: int, + expected_table_names: List[str], + rasa_event_bot_metadata: Dict[str, Any], ): @dlt.resource(max_table_nesting=nesting_level) def bot_events(): - print("nesting_level", nesting_level) - with open(json_case_path("rasa_event_bot_metadata"), "rb") as f: - yield json.load(f) + yield rasa_event_bot_metadata assert "x-normalizer" in bot_events._hints @@ -67,3 +76,95 @@ def bot_events(): all_table_names = pipeline_schema.data_table_names() for table_name in expected_table_names: assert table_name in all_table_names + + +def test_with_multiple_resources_with_max_table_nesting_levels( + rasa_event_bot_metadata: Dict[str, Any], mocker: MockerFixture +): + @dlt.resource(max_table_nesting=1) + def rasa_bot_events_with_nesting_lvl_one(): + yield rasa_event_bot_metadata + + @dlt.resource(max_table_nesting=2) + def rasa_bot_events_with_nesting_lvl_two(): + yield rasa_event_bot_metadata + + @dlt.resource + def third_resource_with_nested_data(): # first top level table `third_resource_with_nested_data` + yield [ + { + "id": 1, + "payload": { + "f_int": 7817289713, + "f_float": 878172.8292, + "f_timestamp": "2024-04-19T11:40:32.901899+00:00", + "f_bool": False, + "hints": [ # second table `third_resource_with_nested_data__payload__hints` + { + "f_bool": "bool", + "f_timestamp": "bigint", + "f_float": [ # third table `third_resource_with_nested_data__payload__hints__f_float` + { + "cond": "precision > 4", + "then": "decimal", + "else": "float", + "comments": [ # fourth table `third_resource_with_nested_data__payload__hints__f_float__comments` + { + "text": "blabla bla bla we promise magix", + "author": "bart", + } + ], + } + ], + } + ], + }, + } + ] + + assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_one._hints + assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_two._hints + assert "x-normalizer" not in third_resource_with_nested_data._hints + + @dlt.source(max_table_nesting=100) + def some_data(): + return [ + rasa_bot_events_with_nesting_lvl_one(), + rasa_bot_events_with_nesting_lvl_two(), + third_resource_with_nested_data(), + ] + + pipeline_name = "test_different_table_nesting_levels" + pipeline = dlt.pipeline( + pipeline_name=pipeline_name, + destination="duckdb", + full_refresh=True, + ) + + pipeline.run(some_data(), write_disposition="append") + pipeline_schema = pipeline.schemas[pipeline.default_schema_name] + all_table_names = pipeline_schema.data_table_names() + + # expect only one table for resource `rasa_bot_events_with_nesting_lvl_one` + tables = [tbl for tbl in all_table_names if tbl.endswith("nesting_lvl_one")] + assert len(tables) == 1 + assert tables == ["rasa_bot_events_with_nesting_lvl_one"] + + # expect three tables for resource `rasa_bot_events_with_nesting_lvl_two` + tables = [tbl for tbl in all_table_names if "nesting_lvl_two" in tbl] + assert len(tables) == 3 + assert tables == [ + "rasa_bot_events_with_nesting_lvl_two", + "rasa_bot_events_with_nesting_lvl_two__metadata__known_recipients", + "rasa_bot_events_with_nesting_lvl_two__metadata__vendor_list", + ] + + # expect four tables for resource `third_resource_with_nested_data` + tables = [tbl for tbl in all_table_names if "third_resource" in tbl] + assert len(tables) == 4 + assert tables == [ + "third_resource_with_nested_data", + "third_resource_with_nested_data__payload__hints", + "third_resource_with_nested_data__payload__hints__f_float", + "third_resource_with_nested_data__payload__hints__f_float__comments", + ] From 94d946749b339bcdef3064a3bc29e664d28cb002 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 13:02:02 +0200 Subject: [PATCH 14/42] Add more tests --- dlt/common/configuration/providers/airflow.py | 2 +- tests/common/normalizers/test_max_nesting.py | 88 ++++++++++++++++++- 2 files changed, 86 insertions(+), 4 deletions(-) diff --git a/dlt/common/configuration/providers/airflow.py b/dlt/common/configuration/providers/airflow.py index 99edf258d2..be83af387d 100644 --- a/dlt/common/configuration/providers/airflow.py +++ b/dlt/common/configuration/providers/airflow.py @@ -14,7 +14,7 @@ def name(self) -> str: def _look_vault(self, full_key: str, hint: type) -> str: """Get Airflow Variable with given `full_key`, return None if not found""" - + return from airflow.models import Variable with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): diff --git a/tests/common/normalizers/test_max_nesting.py b/tests/common/normalizers/test_max_nesting.py index 18ceb7539f..b9b0ac8812 100644 --- a/tests/common/normalizers/test_max_nesting.py +++ b/tests/common/normalizers/test_max_nesting.py @@ -4,7 +4,6 @@ import pytest from dlt.common import json -from pytest_mock.plugin import MockerFixture from tests.common.utils import json_case_path @@ -79,8 +78,22 @@ def bot_events(): def test_with_multiple_resources_with_max_table_nesting_levels( - rasa_event_bot_metadata: Dict[str, Any], mocker: MockerFixture + rasa_event_bot_metadata: Dict[str, Any], ): + """Test max_table_nesting feature with multiple resources and a source + Test scenario includes + + 1. Testing three different sources with set and unset `max_table_nesting` parameter + and checks if the number of created tables in the schema match the expected numbers + and the exact list table names have been collected; + 2. For the same parent source we change the `max_table_nesting` and verify if it is respected + by the third resource `third_resource_with_nested_data` as well as checking + the number of created tables in the current schema; + 3. Run combined test where we set `max_table_nesting` for the parent source and check + if this `max_table_nesting` is respected by child resources where they don't define their + own nesting level; + """ + @dlt.resource(max_table_nesting=1) def rasa_bot_events_with_nesting_lvl_one(): yield rasa_event_bot_metadata @@ -119,6 +132,7 @@ def third_resource_with_nested_data(): # first top level table `third_resource_ } ], }, + "params": [{"id": 1, "q": "search"}, {"id": 2, "q": "hashtag-search"}], } ] @@ -126,6 +140,7 @@ def third_resource_with_nested_data(): # first top level table `third_resource_ assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_two._hints assert "x-normalizer" not in third_resource_with_nested_data._hints + # Check scenario #1 @dlt.source(max_table_nesting=100) def some_data(): return [ @@ -161,10 +176,77 @@ def some_data(): # expect four tables for resource `third_resource_with_nested_data` tables = [tbl for tbl in all_table_names if "third_resource" in tbl] - assert len(tables) == 4 + assert len(tables) == 5 assert tables == [ "third_resource_with_nested_data", "third_resource_with_nested_data__payload__hints", "third_resource_with_nested_data__payload__hints__f_float", "third_resource_with_nested_data__payload__hints__f_float__comments", + "third_resource_with_nested_data__params", + ] + + # Check scenario #2 + # now we need to check `third_resource_with_nested_data` + # using different nesting levels at the source level + # First we do with max_table_nesting=0 + @dlt.source(max_table_nesting=0) + def some_data_v2(): + yield third_resource_with_nested_data() + + pipeline.drop() + pipeline.run(some_data_v2(), write_disposition="append") + pipeline_schema = pipeline.schemas[pipeline.default_schema_name] + all_table_names = pipeline_schema.data_table_names() + assert len(all_table_names) == 1 + assert all_table_names == [ + "third_resource_with_nested_data", + ] + + # Second we do with max_table_nesting=1 + some_data_source = some_data_v2() + some_data_source.max_table_nesting = 1 + + pipeline.drop() + pipeline.run(some_data_source, write_disposition="append") + pipeline_schema = pipeline.schemas[pipeline.default_schema_name] + all_table_names = pipeline_schema.data_table_names() + assert len(all_table_names) == 2 + assert all_table_names == [ + "third_resource_with_nested_data", + "third_resource_with_nested_data__params", + ] + + # Second we do with max_table_nesting=2 + some_data_source = some_data_v2() + some_data_source.max_table_nesting = 3 + + pipeline.drop() + pipeline.run(some_data_source, write_disposition="append") + pipeline_schema = pipeline.schemas[pipeline.default_schema_name] + all_table_names = pipeline_schema.data_table_names() + + # 5 because payload is a dictionary not a collection of dictionaries + assert len(all_table_names) == 5 + assert all_table_names == [ + "third_resource_with_nested_data", + "third_resource_with_nested_data__payload__hints", + "third_resource_with_nested_data__payload__hints__f_float", + "third_resource_with_nested_data__payload__hints__f_float__comments", + "third_resource_with_nested_data__params", + ] + + # Check scenario #3 + pipeline.drop() + some_data_source = some_data() + some_data_source.max_table_nesting = 0 + pipeline.run(some_data_source, write_disposition="append") + pipeline_schema = pipeline.schemas[pipeline.default_schema_name] + all_table_names = pipeline_schema.data_table_names() + assert len(all_table_names) == 5 + assert sorted(all_table_names) == [ + "rasa_bot_events_with_nesting_lvl_one", + "rasa_bot_events_with_nesting_lvl_two", + "rasa_bot_events_with_nesting_lvl_two__metadata__known_recipients", + "rasa_bot_events_with_nesting_lvl_two__metadata__vendor_list", + "third_resource_with_nested_data", ] From 6f1f169c8e0e810b2dc223f520eacc61756d848b Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 13:02:25 +0200 Subject: [PATCH 15/42] Revert ariflow config change --- dlt/common/configuration/providers/airflow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dlt/common/configuration/providers/airflow.py b/dlt/common/configuration/providers/airflow.py index be83af387d..edd02c3487 100644 --- a/dlt/common/configuration/providers/airflow.py +++ b/dlt/common/configuration/providers/airflow.py @@ -14,7 +14,6 @@ def name(self) -> str: def _look_vault(self, full_key: str, hint: type) -> str: """Get Airflow Variable with given `full_key`, return None if not found""" - return from airflow.models import Variable with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): From c5acd08bdc8aec31c5ab0f1dc63e29bc3cc31bc2 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 14:06:49 +0200 Subject: [PATCH 16/42] Move tests under normalizer tests --- tests/{common/normalizers => normalize}/test_max_nesting.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{common/normalizers => normalize}/test_max_nesting.py (100%) diff --git a/tests/common/normalizers/test_max_nesting.py b/tests/normalize/test_max_nesting.py similarity index 100% rename from tests/common/normalizers/test_max_nesting.py rename to tests/normalize/test_max_nesting.py From 5e31feb118af1a251f9198d31b905d3ec6bf0747 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 14:17:52 +0200 Subject: [PATCH 17/42] Use dummy destination with timeout --- tests/normalize/test_max_nesting.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index b9b0ac8812..2becfcd5e7 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -4,7 +4,7 @@ import pytest from dlt.common import json - +from dlt.destinations import dummy from tests.common.utils import json_case_path @@ -61,7 +61,7 @@ def bot_events(): pipeline_name = f"test_max_table_nesting_{nesting_level}_{expected_num_tables}" pipeline = dlt.pipeline( pipeline_name=pipeline_name, - destination="duckdb", + destination=dummy(timeout=0.1), full_refresh=True, ) @@ -152,7 +152,7 @@ def some_data(): pipeline_name = "test_different_table_nesting_levels" pipeline = dlt.pipeline( pipeline_name=pipeline_name, - destination="duckdb", + destination=dummy(timeout=0.1), full_refresh=True, ) From 420dbb17f03aeda5402b2b6c5b19aa8e6b766da1 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 14:57:46 +0200 Subject: [PATCH 18/42] Add max_table_nesting getter/setter for resource --- dlt/extract/resource.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 4776158bbb..634ff63c02 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -215,6 +215,16 @@ def validator(self, validator: Optional[ValidateItem]) -> None: if validator: self.add_step(validator, insert_at=step_no if step_no >= 0 else None) + @property + def max_table_nesting(self) -> Optional[int]: + """A schema hint for resource that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.""" + return self._hints.get("x-normalizer", {}).get("max_nesting") + + @max_table_nesting.setter + def max_table_nesting(self, value: int) -> None: + self._hints.setdefault("x-normalizer", {}) + self._hints["x-normalizer"]["max_nesting"] = value + def pipe_data_from(self, data_from: Union["DltResource", Pipe]) -> None: """Replaces the parent in the transformer resource pipe from which the data is piped.""" if self.is_transformer: From fd65de0ceeb4e6e012c126246d8f3f7a85025977 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 15:10:57 +0200 Subject: [PATCH 19/42] Fix mypy errors --- dlt/extract/resource.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 634ff63c02..7f4eb05d6f 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -218,12 +218,13 @@ def validator(self, validator: Optional[ValidateItem]) -> None: @property def max_table_nesting(self) -> Optional[int]: """A schema hint for resource that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.""" - return self._hints.get("x-normalizer", {}).get("max_nesting") + max_nesting = self._hints.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] + return max_nesting if isinstance(max_nesting, int) else None @max_table_nesting.setter def max_table_nesting(self, value: int) -> None: - self._hints.setdefault("x-normalizer", {}) - self._hints["x-normalizer"]["max_nesting"] = value + self._hints.setdefault("x-normalizer", {}) # type: ignore[typeddict-item] + self._hints["x-normalizer"]["max_nesting"] = value # type: ignore[typeddict-item] def pipe_data_from(self, data_from: Union["DltResource", Pipe]) -> None: """Replaces the parent in the transformer resource pipe from which the data is piped.""" From 18c9d05f57b671d76cb19216107945c95a46bf71 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 15:57:06 +0200 Subject: [PATCH 20/42] Cache table nesting getter --- dlt/common/normalizers/json/relational.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 8a6e7aa27a..7fb1cf5e4a 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -80,11 +80,9 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo # turn everything at the recursion level into complex type max_nesting = self.max_nesting schema = self.schema - table = schema.tables.get(table_name) - if table: - max_table_nesting = table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] - if max_table_nesting is not None: - max_nesting = max_table_nesting + max_table_nesting = self._get_table_nesting_level(schema, table_name) + if max_table_nesting is not None: + max_nesting = max_table_nesting assert _r_lvl <= max_nesting if _r_lvl == max_nesting: @@ -94,6 +92,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo # path = f"{table_name}▶{field_name}" # or use definition in the schema column: TColumnSchema = None + table = schema.tables.get(table_name) if table: column = table["columns"].get(field_name) if column is None: @@ -388,6 +387,13 @@ def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConf validator_f=column_name_validator(schema.naming), ) + @staticmethod + @lru_cache(maxsize=None) + def _get_table_nesting_level(schema: Schema, table_name: str) -> Optional[int]: + table = schema.tables.get(table_name) + if table: + return table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] + @staticmethod @lru_cache(maxsize=None) def _is_scd2_table(schema: Schema, table_name: str) -> bool: From 037616519f2a48580fe90bd54199892dea1fd19f Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 16:01:40 +0200 Subject: [PATCH 21/42] Fix mypy issues --- dlt/common/normalizers/json/relational.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 7fb1cf5e4a..ff0f5e13e3 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -392,7 +392,8 @@ def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConf def _get_table_nesting_level(schema: Schema, table_name: str) -> Optional[int]: table = schema.tables.get(table_name) if table: - return table.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined] + return table.get("x-normalizer", {}).get("max_nesting") # type: ignore[no-any-return] + return None @staticmethod @lru_cache(maxsize=None) From 2c3dbacefe1b61746298968fb8c72083b20d03dc Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 16:08:16 +0200 Subject: [PATCH 22/42] Use general mypy ignore --- dlt/common/normalizers/json/relational.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index ff0f5e13e3..572bff14a5 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -392,7 +392,7 @@ def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConf def _get_table_nesting_level(schema: Schema, table_name: str) -> Optional[int]: table = schema.tables.get(table_name) if table: - return table.get("x-normalizer", {}).get("max_nesting") # type: ignore[no-any-return] + return table.get("x-normalizer", {}).get("max_nesting") # type: ignore return None @staticmethod From a7b247eb99a38e27a28cf5da5270d2174867ce74 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 16:24:58 +0200 Subject: [PATCH 23/42] Update resouce documentation with max_table_nesting description --- docs/website/docs/general-usage/resource.md | 53 +++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 66c4281d8d..b32d0d9bf9 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -343,6 +343,59 @@ for user in users().add_filter(lambda user: user["user_id"] != "me").add_map(ano print(user) ``` +### Reduce the nesting level of generated tables + +You can limit how deep `dlt` goes when generating child tables. By default, the library will descend +and generate child tables for all nested lists, without limit. + +:::note +You can completely `max_table_nesting`, in this case dlt will use it from the source if it is specified +there or fallback to default value which has 1000 as maximum nesting level. +::: + +```py +@dlt.resource(max_table_nesting=1) +def my_resource(): + yield { + "id": 1, + "name": "random name", + "properties": [ + { + "name": "customer_age", + "type": "int", + "label": "Age" + "notes": [ + { + "text": "string", + "author" "string", + } + ] + } + ] + } +``` + +In the example above we want only 1 level of child tables to be generates (so there are no child +tables of child tables). Typical settings: + +- `max_table_nesting=0` will not generate child tables at all and all nested data will be + represented as json. +- `max_table_nesting=1` will generate child tables of top level tables and nothing more. All nested + data in child tables will be represented as json. + +You can achieve the same effect after the source instance is created: + +```py +from my_resource import my_awesome_module + +source = my_resource() +source.max_table_nesting = 0 +``` + +Several data sources are prone to contain semi-structured documents with very deep nesting i.e. +MongoDB databases. Our practical experience is that setting the `max_nesting_level` to 2 or 3 +produces the clearest and human-readable schemas. + ### Sample from large data If your resource loads thousands of pages of data from a REST API or millions of rows from a db From f7b329101381249e77708f7786665d3a6f39ff14 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 16:38:52 +0200 Subject: [PATCH 24/42] Test max_table_nesting override via resource property --- tests/normalize/test_max_nesting.py | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index 2becfcd5e7..6f78306b0b 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -250,3 +250,34 @@ def some_data_v2(): "rasa_bot_events_with_nesting_lvl_two__metadata__vendor_list", "third_resource_with_nested_data", ] + + # Set max_table_nesting via the setter and check the tables + pipeline.drop() + rasa_bot_events_resource = rasa_bot_events_with_nesting_lvl_one() + pipeline.run( + rasa_bot_events_resource, + dataset_name="bot_events", + write_disposition="append", + ) + pipeline_schema = pipeline.schemas[pipeline.default_schema_name] + all_table_names = pipeline_schema.data_table_names() + tables = pipeline_schema.data_table_names() + assert len(all_table_names) == 1 + assert tables == ["rasa_bot_events_with_nesting_lvl_one"] + + # now adjust the max_table_nesting for resource and check + pipeline.drop() + rasa_bot_events_resource.max_table_nesting = 2 + pipeline.run( + rasa_bot_events_resource, + dataset_name="bot_events", + write_disposition="append", + ) + all_table_names = pipeline_schema.data_table_names() + tables = pipeline_schema.data_table_names() + assert len(all_table_names) == 3 + assert tables == [ + "rasa_bot_events_with_nesting_lvl_one", + "rasa_bot_events_with_nesting_lvl_one__metadata__known_recipients", + "rasa_bot_events_with_nesting_lvl_one__metadata__vendor_list", + ] From e8cf8d51df240ed054544d6bd0608cccba4c88c4 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 16:41:04 +0200 Subject: [PATCH 25/42] Adjust code sample --- docs/website/docs/general-usage/resource.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index b32d0d9bf9..73a7c9a6e0 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -388,8 +388,8 @@ You can achieve the same effect after the source instance is created: ```py from my_resource import my_awesome_module -source = my_resource() -source.max_table_nesting = 0 +resource = my_resource() +resource.max_table_nesting = 0 ``` Several data sources are prone to contain semi-structured documents with very deep nesting i.e. From 046db655be9a9be2f417767e2ef7a3bfe7fa0fa9 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 16:46:17 +0200 Subject: [PATCH 26/42] Describe last test scenario and add comments --- tests/normalize/test_max_nesting.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index 6f78306b0b..e4b92bec66 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -92,6 +92,8 @@ def test_with_multiple_resources_with_max_table_nesting_levels( 3. Run combined test where we set `max_table_nesting` for the parent source and check if this `max_table_nesting` is respected by child resources where they don't define their own nesting level; + 4. Run the pipeline with set `max_table_nesting` of a resource then set overrider it and + rerun the pipeline to check if the number of tables are greated than in the first run; """ @dlt.resource(max_table_nesting=1) @@ -251,6 +253,7 @@ def some_data_v2(): "third_resource_with_nested_data", ] + # Check scenario #4 # Set max_table_nesting via the setter and check the tables pipeline.drop() rasa_bot_events_resource = rasa_bot_events_with_nesting_lvl_one() @@ -261,8 +264,9 @@ def some_data_v2(): ) pipeline_schema = pipeline.schemas[pipeline.default_schema_name] all_table_names = pipeline_schema.data_table_names() + count_all_tables_first_run = len(all_table_names) tables = pipeline_schema.data_table_names() - assert len(all_table_names) == 1 + assert count_all_tables_first_run == 1 assert tables == ["rasa_bot_events_with_nesting_lvl_one"] # now adjust the max_table_nesting for resource and check @@ -274,8 +278,11 @@ def some_data_v2(): write_disposition="append", ) all_table_names = pipeline_schema.data_table_names() + count_all_tables_second_run = len(all_table_names) + assert count_all_tables_first_run < count_all_tables_second_run + tables = pipeline_schema.data_table_names() - assert len(all_table_names) == 3 + assert count_all_tables_second_run == 3 assert tables == [ "rasa_bot_events_with_nesting_lvl_one", "rasa_bot_events_with_nesting_lvl_one__metadata__known_recipients", From e0d323bfc085196cafea938f7130d1b6a9e637aa Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 16:47:30 +0200 Subject: [PATCH 27/42] Fix snippet syntax error --- docs/website/docs/general-usage/resource.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 73a7c9a6e0..2022fc1249 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -363,7 +363,7 @@ def my_resource(): { "name": "customer_age", "type": "int", - "label": "Age" + "label": "Age", "notes": [ { "text": "string", From 129502e5083861999f6f0a9019e56f6ac76f8c96 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Fri, 19 Apr 2024 16:56:23 +0200 Subject: [PATCH 28/42] Fix snippet --- docs/website/docs/general-usage/resource.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 2022fc1249..f6c47de17b 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -354,6 +354,8 @@ there or fallback to default value which has 1000 as maximum nesting level. ::: ```py +import dlt + @dlt.resource(max_table_nesting=1) def my_resource(): yield { @@ -367,7 +369,7 @@ def my_resource(): "notes": [ { "text": "string", - "author" "string", + "author": "string", } ] } From 520a520b5dca47882b757a7b47420ed66a380d03 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 09:29:36 +0200 Subject: [PATCH 29/42] Revert ariflow config change --- dlt/common/configuration/providers/airflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dlt/common/configuration/providers/airflow.py b/dlt/common/configuration/providers/airflow.py index edd02c3487..99edf258d2 100644 --- a/dlt/common/configuration/providers/airflow.py +++ b/dlt/common/configuration/providers/airflow.py @@ -14,6 +14,7 @@ def name(self) -> str: def _look_vault(self, full_key: str, hint: type) -> str: """Get Airflow Variable with given `full_key`, return None if not found""" + from airflow.models import Variable with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): From a9b5094197ae58b23abd46d053e75f55953a5c65 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 09:31:52 +0200 Subject: [PATCH 30/42] Fix documentation typos --- docs/website/docs/general-usage/resource.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index f6c47de17b..a10bc8ba8d 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -349,8 +349,9 @@ You can limit how deep `dlt` goes when generating child tables. By default, the and generate child tables for all nested lists, without limit. :::note -You can completely `max_table_nesting`, in this case dlt will use it from the source if it is specified -there or fallback to default value which has 1000 as maximum nesting level. +`max_table_nesting` is optional so you can skip it, in this case dlt will +use it from the source if it is specified there or fallback to default +value which has 1000 as maximum nesting level. ::: ```py @@ -377,7 +378,7 @@ def my_resource(): } ``` -In the example above we want only 1 level of child tables to be generates (so there are no child +In the example above we want only 1 level of child tables to be generated (so there are no child tables of child tables). Typical settings: - `max_table_nesting=0` will not generate child tables at all and all nested data will be From 80a8c173c140b3ee981d17c3147bdaa67988666d Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 09:41:50 +0200 Subject: [PATCH 31/42] Add more test cases when max_table_nesting is overridden via the property setter --- tests/normalize/test_max_nesting.py | 62 ++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index e4b92bec66..bfa3eccfb3 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -104,6 +104,14 @@ def rasa_bot_events_with_nesting_lvl_one(): def rasa_bot_events_with_nesting_lvl_two(): yield rasa_event_bot_metadata + all_table_names_for_third_resource = [ + "third_resource_with_nested_data", + "third_resource_with_nested_data__payload__hints", + "third_resource_with_nested_data__payload__hints__f_float", + "third_resource_with_nested_data__payload__hints__f_float__comments", + "third_resource_with_nested_data__params", + ] + @dlt.resource def third_resource_with_nested_data(): # first top level table `third_resource_with_nested_data` yield [ @@ -179,13 +187,7 @@ def some_data(): # expect four tables for resource `third_resource_with_nested_data` tables = [tbl for tbl in all_table_names if "third_resource" in tbl] assert len(tables) == 5 - assert tables == [ - "third_resource_with_nested_data", - "third_resource_with_nested_data__payload__hints", - "third_resource_with_nested_data__payload__hints__f_float", - "third_resource_with_nested_data__payload__hints__f_float__comments", - "third_resource_with_nested_data__params", - ] + assert tables == all_table_names_for_third_resource # Check scenario #2 # now we need to check `third_resource_with_nested_data` @@ -229,13 +231,7 @@ def some_data_v2(): # 5 because payload is a dictionary not a collection of dictionaries assert len(all_table_names) == 5 - assert all_table_names == [ - "third_resource_with_nested_data", - "third_resource_with_nested_data__payload__hints", - "third_resource_with_nested_data__payload__hints__f_float", - "third_resource_with_nested_data__payload__hints__f_float__comments", - "third_resource_with_nested_data__params", - ] + assert all_table_names == all_table_names_for_third_resource # Check scenario #3 pipeline.drop() @@ -288,3 +284,41 @@ def some_data_v2(): "rasa_bot_events_with_nesting_lvl_one__metadata__known_recipients", "rasa_bot_events_with_nesting_lvl_one__metadata__vendor_list", ] + + pipeline.drop() + rasa_bot_events_resource.max_table_nesting = 10 + pipeline.run( + rasa_bot_events_resource, + dataset_name="bot_events", + write_disposition="append", + ) + all_table_names = pipeline_schema.data_table_names() + count_all_tables_second_run = len(all_table_names) + assert count_all_tables_first_run < count_all_tables_second_run + + tables = pipeline_schema.data_table_names() + assert count_all_tables_second_run == 8 + assert tables == [ + "rasa_bot_events_with_nesting_lvl_one", + "rasa_bot_events_with_nesting_lvl_one__metadata__known_recipients", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__spend__target", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__spend__starbucks", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__spend__amazon", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__deposit__employer", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__deposit__interest", + "rasa_bot_events_with_nesting_lvl_one__metadata__vendor_list", + ] + + pipeline.drop() + third_resource_with_nested_data.max_table_nesting = 10 + pipeline.run( + third_resource_with_nested_data, + write_disposition="append", + ) + all_table_names = pipeline_schema.data_table_names() + count_all_tables_second_run = len(all_table_names) + assert count_all_tables_first_run < count_all_tables_second_run + + tables = pipeline_schema.data_table_names() + assert count_all_tables_second_run == 5 + assert tables == all_table_names_for_third_resource From ba6ad195374de5851e6b725089e12694e6e018fc Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 09:43:50 +0200 Subject: [PATCH 32/42] Fix typos --- tests/normalize/test_max_nesting.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index bfa3eccfb3..1dcf1083fa 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -92,8 +92,8 @@ def test_with_multiple_resources_with_max_table_nesting_levels( 3. Run combined test where we set `max_table_nesting` for the parent source and check if this `max_table_nesting` is respected by child resources where they don't define their own nesting level; - 4. Run the pipeline with set `max_table_nesting` of a resource then set overrider it and - rerun the pipeline to check if the number of tables are greated than in the first run; + 4. Run the pipeline with set `max_table_nesting` of a resource then override it and + rerun the pipeline to check if the number and names of tables are expected; """ @dlt.resource(max_table_nesting=1) From 53dca70e221bac6afdf48fefc26851af080de601 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 11:12:01 +0200 Subject: [PATCH 33/42] Add one more check --- tests/normalize/test_max_nesting.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index 1dcf1083fa..be2c339fdf 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -322,3 +322,17 @@ def some_data_v2(): tables = pipeline_schema.data_table_names() assert count_all_tables_second_run == 5 assert tables == all_table_names_for_third_resource + + # Set max_table_nesting=None and check if the same tables exist + third_resource_with_nested_data.max_table_nesting = None + pipeline.run( + third_resource_with_nested_data, + write_disposition="append", + ) + all_table_names = pipeline_schema.data_table_names() + count_all_tables_second_run = len(all_table_names) + assert count_all_tables_first_run < count_all_tables_second_run + + tables = pipeline_schema.data_table_names() + assert count_all_tables_second_run == 5 + assert tables == all_table_names_for_third_resource From 7737786c3c671cb1aba69c7a312b216980c6fc0c Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 11:14:27 +0200 Subject: [PATCH 34/42] Add one more assert --- tests/normalize/test_max_nesting.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index be2c339fdf..fe2fe33faf 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -319,9 +319,9 @@ def some_data_v2(): count_all_tables_second_run = len(all_table_names) assert count_all_tables_first_run < count_all_tables_second_run - tables = pipeline_schema.data_table_names() + tables_with_nesting_level_set = pipeline_schema.data_table_names() assert count_all_tables_second_run == 5 - assert tables == all_table_names_for_third_resource + assert tables_with_nesting_level_set == all_table_names_for_third_resource # Set max_table_nesting=None and check if the same tables exist third_resource_with_nested_data.max_table_nesting = None @@ -336,3 +336,4 @@ def some_data_v2(): tables = pipeline_schema.data_table_names() assert count_all_tables_second_run == 5 assert tables == all_table_names_for_third_resource + assert tables == tables_with_nesting_level_set From 02f7b01680b8bbf97e1b4df3c0139315224baf60 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 14:39:22 +0200 Subject: [PATCH 35/42] Assert property setter sets the value and a test with source and resource with max_table_nesting set --- dlt/common/configuration/providers/airflow.py | 2 +- tests/normalize/test_max_nesting.py | 34 ++++++++++++------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/dlt/common/configuration/providers/airflow.py b/dlt/common/configuration/providers/airflow.py index 99edf258d2..be83af387d 100644 --- a/dlt/common/configuration/providers/airflow.py +++ b/dlt/common/configuration/providers/airflow.py @@ -14,7 +14,7 @@ def name(self) -> str: def _look_vault(self, full_key: str, hint: type) -> str: """Get Airflow Variable with given `full_key`, return None if not found""" - + return from airflow.models import Variable with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index fe2fe33faf..5978f998a3 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -94,6 +94,8 @@ def test_with_multiple_resources_with_max_table_nesting_levels( own nesting level; 4. Run the pipeline with set `max_table_nesting` of a resource then override it and rerun the pipeline to check if the number and names of tables are expected; + 5. Check if source and resource both have defined `max_nesting_level` and we respect + `max_nesting_level` from resource; """ @dlt.resource(max_table_nesting=1) @@ -268,6 +270,7 @@ def some_data_v2(): # now adjust the max_table_nesting for resource and check pipeline.drop() rasa_bot_events_resource.max_table_nesting = 2 + assert rasa_bot_events_resource.max_table_nesting == 2 pipeline.run( rasa_bot_events_resource, dataset_name="bot_events", @@ -287,11 +290,8 @@ def some_data_v2(): pipeline.drop() rasa_bot_events_resource.max_table_nesting = 10 - pipeline.run( - rasa_bot_events_resource, - dataset_name="bot_events", - write_disposition="append", - ) + assert rasa_bot_events_resource.max_table_nesting == 10 + pipeline.run(rasa_bot_events_resource, dataset_name="bot_events") all_table_names = pipeline_schema.data_table_names() count_all_tables_second_run = len(all_table_names) assert count_all_tables_first_run < count_all_tables_second_run @@ -311,10 +311,8 @@ def some_data_v2(): pipeline.drop() third_resource_with_nested_data.max_table_nesting = 10 - pipeline.run( - third_resource_with_nested_data, - write_disposition="append", - ) + assert third_resource_with_nested_data.max_table_nesting == 10 + pipeline.run(third_resource_with_nested_data) all_table_names = pipeline_schema.data_table_names() count_all_tables_second_run = len(all_table_names) assert count_all_tables_first_run < count_all_tables_second_run @@ -325,10 +323,8 @@ def some_data_v2(): # Set max_table_nesting=None and check if the same tables exist third_resource_with_nested_data.max_table_nesting = None - pipeline.run( - third_resource_with_nested_data, - write_disposition="append", - ) + assert third_resource_with_nested_data.max_table_nesting is None + pipeline.run(third_resource_with_nested_data) all_table_names = pipeline_schema.data_table_names() count_all_tables_second_run = len(all_table_names) assert count_all_tables_first_run < count_all_tables_second_run @@ -337,3 +333,15 @@ def some_data_v2(): assert count_all_tables_second_run == 5 assert tables == all_table_names_for_third_resource assert tables == tables_with_nesting_level_set + + # Check scenario #5 + @dlt.source(max_table_nesting=1000) + def some_data_with_table_nesting(): + yield rasa_bot_events_with_nesting_lvl_one() + + pipeline.drop() + pipeline.run(some_data_with_table_nesting()) + pipeline_schema = pipeline.schemas[pipeline.default_schema_name] + tables = pipeline_schema.data_table_names() + assert len(tables) == 1 + assert tables == ["rasa_bot_events_with_nesting_lvl_one"] From 1b6ed1829ddcf7a70c75cb0f70401a1770fed7ef Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 14:40:27 +0200 Subject: [PATCH 36/42] Clarify test scenario description --- dlt/common/configuration/providers/airflow.py | 2 +- tests/normalize/test_max_nesting.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/common/configuration/providers/airflow.py b/dlt/common/configuration/providers/airflow.py index be83af387d..99edf258d2 100644 --- a/dlt/common/configuration/providers/airflow.py +++ b/dlt/common/configuration/providers/airflow.py @@ -14,7 +14,7 @@ def name(self) -> str: def _look_vault(self, full_key: str, hint: type) -> str: """Get Airflow Variable with given `full_key`, return None if not found""" - return + from airflow.models import Variable with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()): diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index 5978f998a3..2b239ca393 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -94,7 +94,7 @@ def test_with_multiple_resources_with_max_table_nesting_levels( own nesting level; 4. Run the pipeline with set `max_table_nesting` of a resource then override it and rerun the pipeline to check if the number and names of tables are expected; - 5. Check if source and resource both have defined `max_nesting_level` and we respect + 5. Create source and resource both with defined `max_nesting_level` and check if we respect `max_nesting_level` from resource; """ From 1f6d9047026db9b32988e7898c32be0f868fd075 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 14:48:07 +0200 Subject: [PATCH 37/42] Add checks if max_nesting set in x-normalizer hints --- tests/normalize/test_max_nesting.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index 2b239ca393..663a428104 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -150,6 +150,8 @@ def third_resource_with_nested_data(): # first top level table `third_resource_ assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_one._hints assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_two._hints + assert rasa_bot_events_with_nesting_lvl_one._hints["x-normalizer"]["max_nesting"] == 1 + assert rasa_bot_events_with_nesting_lvl_two._hints["x-normalizer"]["max_nesting"] == 2 assert "x-normalizer" not in third_resource_with_nested_data._hints # Check scenario #1 From 4d7b1dd4b2eb899ccea29a56fe7db0b34c748dc5 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 14:51:12 +0200 Subject: [PATCH 38/42] Fix mypy errors --- tests/normalize/test_max_nesting.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index 663a428104..c3180578e4 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -150,8 +150,8 @@ def third_resource_with_nested_data(): # first top level table `third_resource_ assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_one._hints assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_two._hints - assert rasa_bot_events_with_nesting_lvl_one._hints["x-normalizer"]["max_nesting"] == 1 - assert rasa_bot_events_with_nesting_lvl_two._hints["x-normalizer"]["max_nesting"] == 2 + assert rasa_bot_events_with_nesting_lvl_one._hints["x-normalizer"]["max_nesting"] == 1 # type: ignore[typeddict-item] + assert rasa_bot_events_with_nesting_lvl_two._hints["x-normalizer"]["max_nesting"] == 2 # type: ignore[typeddict-item] assert "x-normalizer" not in third_resource_with_nested_data._hints # Check scenario #1 From 07cbea2a9d20bc3e220adbb82c9c31d34a23637b Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 14:57:19 +0200 Subject: [PATCH 39/42] Add another case when resource does not define but source has defined max_table_nesting --- tests/normalize/test_max_nesting.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index c3180578e4..58b1b8ecd3 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -337,6 +337,7 @@ def some_data_v2(): assert tables == tables_with_nesting_level_set # Check scenario #5 + # We give priority `max_table_nesting` of the resource if it is defined @dlt.source(max_table_nesting=1000) def some_data_with_table_nesting(): yield rasa_bot_events_with_nesting_lvl_one() @@ -347,3 +348,26 @@ def some_data_with_table_nesting(): tables = pipeline_schema.data_table_names() assert len(tables) == 1 assert tables == ["rasa_bot_events_with_nesting_lvl_one"] + + # Now check the case when `max_table_nesting` is not defined in the resource + rasa_bot_events_with_nesting_lvl_one.max_table_nesting = None + + @dlt.source(max_table_nesting=1000) + def some_data_with_table_nesting(): + yield rasa_bot_events_with_nesting_lvl_one() + + pipeline.drop() + pipeline.run(some_data_with_table_nesting()) + pipeline_schema = pipeline.schemas[pipeline.default_schema_name] + tables = pipeline_schema.data_table_names() + assert len(tables) == 8 + assert tables == [ + "rasa_bot_events_with_nesting_lvl_one", + "rasa_bot_events_with_nesting_lvl_one__metadata__known_recipients", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__spend__target", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__spend__starbucks", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__spend__amazon", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__deposit__employer", + "rasa_bot_events_with_nesting_lvl_one__metadata__transaction_history__deposit__interest", + "rasa_bot_events_with_nesting_lvl_one__metadata__vendor_list", + ] From 5072fba13c112143471e158cbfafbcba574f156f Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 15:07:43 +0200 Subject: [PATCH 40/42] Remove duplicate --- tests/normalize/test_max_nesting.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index 58b1b8ecd3..7ec231dadf 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -352,10 +352,6 @@ def some_data_with_table_nesting(): # Now check the case when `max_table_nesting` is not defined in the resource rasa_bot_events_with_nesting_lvl_one.max_table_nesting = None - @dlt.source(max_table_nesting=1000) - def some_data_with_table_nesting(): - yield rasa_bot_events_with_nesting_lvl_one() - pipeline.drop() pipeline.run(some_data_with_table_nesting()) pipeline_schema = pipeline.schemas[pipeline.default_schema_name] From f4541be4698a81021e6cbb6b9ee08d04303b3bb6 Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 15:16:17 +0200 Subject: [PATCH 41/42] Check max_table_nesting propery accessor --- tests/normalize/test_max_nesting.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/normalize/test_max_nesting.py b/tests/normalize/test_max_nesting.py index 7ec231dadf..4015836232 100644 --- a/tests/normalize/test_max_nesting.py +++ b/tests/normalize/test_max_nesting.py @@ -150,6 +150,8 @@ def third_resource_with_nested_data(): # first top level table `third_resource_ assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_one._hints assert "x-normalizer" in rasa_bot_events_with_nesting_lvl_two._hints + assert rasa_bot_events_with_nesting_lvl_one.max_table_nesting == 1 + assert rasa_bot_events_with_nesting_lvl_two.max_table_nesting == 2 assert rasa_bot_events_with_nesting_lvl_one._hints["x-normalizer"]["max_nesting"] == 1 # type: ignore[typeddict-item] assert rasa_bot_events_with_nesting_lvl_two._hints["x-normalizer"]["max_nesting"] == 2 # type: ignore[typeddict-item] assert "x-normalizer" not in third_resource_with_nested_data._hints From 72ed5619be5b74f6ecad885d1c9e1b87b5716fe7 Mon Sep 17 00:00:00 2001 From: Sultan Iman <354868+sultaniman@users.noreply.github.com> Date: Wed, 24 Apr 2024 17:10:51 +0200 Subject: [PATCH 42/42] Update resource.md --- docs/website/docs/general-usage/resource.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index a10bc8ba8d..3ab485486e 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -386,7 +386,7 @@ tables of child tables). Typical settings: - `max_table_nesting=1` will generate child tables of top level tables and nothing more. All nested data in child tables will be represented as json. -You can achieve the same effect after the source instance is created: +You can achieve the same effect after the resource instance is created: ```py from my_resource import my_awesome_module