Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max_table_nesting to resource decorator #1242

Merged
merged 42 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d705c83
Add max_table_nesting to resource decorator
sultaniman Apr 18, 2024
981b7c9
Handle max_table_nesting in normalizer
sultaniman Apr 18, 2024
53bfd4f
Use dict.get to retrieve table from schema
sultaniman Apr 18, 2024
85c7f70
Use schema.get_table and format code
sultaniman Apr 18, 2024
5ddc360
Fix mypy issues
sultaniman Apr 18, 2024
d401d97
Fix mypy errors
sultaniman Apr 18, 2024
5b1ecc2
Fix bugs and parametrize test
sultaniman Apr 18, 2024
f425412
Add one more test case
sultaniman Apr 18, 2024
f20d009
Format code
sultaniman Apr 18, 2024
23780fb
Get table from schema.tables
sultaniman Apr 18, 2024
937b375
Add comments and cleanup code
sultaniman Apr 19, 2024
4c15cdc
Remove breakpoint
sultaniman Apr 19, 2024
8ed96c7
Add more tests
sultaniman Apr 19, 2024
94d9467
Add more tests
sultaniman Apr 19, 2024
6f1f169
Revert ariflow config change
sultaniman Apr 19, 2024
c5acd08
Move tests under normalizer tests
sultaniman Apr 19, 2024
5e31feb
Use dummy destination with timeout
sultaniman Apr 19, 2024
420dbb1
Add max_table_nesting getter/setter for resource
sultaniman Apr 19, 2024
fd65de0
Fix mypy errors
sultaniman Apr 19, 2024
18c9d05
Cache table nesting getter
sultaniman Apr 19, 2024
0376165
Fix mypy issues
sultaniman Apr 19, 2024
2c3dbac
Use general mypy ignore
sultaniman Apr 19, 2024
a7b247e
Update resouce documentation with max_table_nesting description
sultaniman Apr 19, 2024
f7b3291
Test max_table_nesting override via resource property
sultaniman Apr 19, 2024
e8cf8d5
Adjust code sample
sultaniman Apr 19, 2024
046db65
Describe last test scenario and add comments
sultaniman Apr 19, 2024
e0d323b
Fix snippet syntax error
sultaniman Apr 19, 2024
129502e
Fix snippet
sultaniman Apr 19, 2024
520a520
Revert ariflow config change
sultaniman Apr 24, 2024
a9b5094
Fix documentation typos
sultaniman Apr 24, 2024
80a8c17
Add more test cases when max_table_nesting is overridden via the prop…
sultaniman Apr 24, 2024
ba6ad19
Fix typos
sultaniman Apr 24, 2024
53dca70
Add one more check
sultaniman Apr 24, 2024
7737786
Add one more assert
sultaniman Apr 24, 2024
02f7b01
Assert property setter sets the value and a test with source and reso…
sultaniman Apr 24, 2024
1b6ed18
Clarify test scenario description
sultaniman Apr 24, 2024
1f6d904
Add checks if max_nesting set in x-normalizer hints
sultaniman Apr 24, 2024
4d7b1dd
Fix mypy errors
sultaniman Apr 24, 2024
07cbea2
Add another case when resource does not define but source has defined…
sultaniman Apr 24, 2024
5072fba
Remove duplicate
sultaniman Apr 24, 2024
f4541be
Check max_table_nesting propery accessor
sultaniman Apr 24, 2024
72ed561
Update resource.md
sultaniman Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from dlt.common.typing import DictStrAny, DictStrStr, TDataItem, StrAny
from dlt.common.schema import Schema
from dlt.common.schema.typing import (
TTableSchema,
TColumnSchema,
TColumnName,
TSimpleRegex,
Expand All @@ -32,7 +31,7 @@ class TDataItemRow(TypedDict, total=False):


class TDataItemRowRoot(TDataItemRow, total=False):
_dlt_load_id: str # load id to identify records loaded together that ie. need to be processed
_dlt_load_id: (str) # load id to identify records loaded together that ie. need to be processed
# _dlt_meta: TEventDLTMeta # stores metadata, should never be sent to the normalizer


Expand Down Expand Up @@ -67,7 +66,9 @@ def __init__(self, schema: Schema) -> None:
self._reset()

def _reset(self) -> None:
self.normalizer_config = self.schema._normalizers_config["json"].get("config") or {} # type: ignore
self.normalizer_config = (
self.schema._normalizers_config["json"].get("config") or {} # type: ignore[assignment]
)
self.propagation_config = self.normalizer_config.get("propagation", None)
self.max_nesting = self.normalizer_config.get("max_nesting", 1000)
self._skip_primary_key = {}
Expand All @@ -79,10 +80,14 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo
# turn everything at the recursion level into complex type
max_nesting = self.max_nesting
schema = self.schema
max_table_nesting = self._get_table_nesting_level(schema, table_name)
if max_table_nesting is not None:
max_nesting = max_table_nesting
sultaniman marked this conversation as resolved.
Show resolved Hide resolved

assert _r_lvl <= max_nesting
if _r_lvl == max_nesting:
return True

# use cached value
# path = f"{table_name}▶{field_name}"
# or use definition in the schema
Expand All @@ -94,6 +99,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo
data_type = schema.get_preferred_type(field_name)
else:
data_type = column["data_type"]

return data_type == "complex"

def _flatten(
Expand Down Expand Up @@ -220,7 +226,13 @@ def _normalize_list(
elif isinstance(v, list):
# to normalize lists of lists, we must create a tracking intermediary table by creating a mock row
yield from self._normalize_row(
{"list": v}, extend, ident_path, parent_path, parent_row_id, idx, _r_lvl + 1
{"list": v},
extend,
ident_path,
parent_path,
parent_row_id,
idx,
_r_lvl + 1,
)
else:
# list of simple types
Expand Down Expand Up @@ -261,20 +273,29 @@ def _normalize_row(
extend.update(self._get_propagated_values(table, flattened_row, _r_lvl))

# yield parent table first
should_descend = yield (table, schema.naming.shorten_fragments(*parent_path)), flattened_row
should_descend = yield (
(table, schema.naming.shorten_fragments(*parent_path)),
flattened_row,
)
if should_descend is False:
return

# normalize and yield lists
for list_path, list_content in lists.items():
yield from self._normalize_list(
list_content, extend, list_path, parent_path + ident_path, row_id, _r_lvl + 1
list_content,
extend,
list_path,
parent_path + ident_path,
row_id,
_r_lvl + 1,
)

def extend_schema(self) -> None:
# validate config
config = cast(
RelationalNormalizerConfig, self.schema._normalizers_config["json"].get("config") or {}
RelationalNormalizerConfig,
self.schema._normalizers_config["json"].get("config") or {},
)
DataItemNormalizer._validate_normalizer_config(self.schema, config)

Expand Down Expand Up @@ -366,6 +387,14 @@ def _validate_normalizer_config(schema: Schema, config: RelationalNormalizerConf
validator_f=column_name_validator(schema.naming),
)

@staticmethod
@lru_cache(maxsize=None)
def _get_table_nesting_level(schema: Schema, table_name: str) -> Optional[int]:
table = schema.tables.get(table_name)
if table:
return table.get("x-normalizer", {}).get("max_nesting") # type: ignore
return None

@staticmethod
@lru_cache(maxsize=None)
def _is_scd2_table(schema: Schema, table_name: str) -> bool:
Expand All @@ -382,7 +411,11 @@ def _get_validity_column_names(schema: Schema, table_name: str) -> List[Optional
@staticmethod
@lru_cache(maxsize=None)
def _dlt_id_is_row_hash(schema: Schema, table_name: str) -> bool:
return schema.get_table(table_name)["columns"].get("_dlt_id", dict()).get("x-row-version", False) # type: ignore[return-value]
return (
schema.get_table(table_name)["columns"] # type: ignore[return-value]
.get("_dlt_id", {})
.get("x-row-version", False)
)

@staticmethod
def _validate_validity_column_names(
Expand Down
15 changes: 15 additions & 0 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ def resource(
/,
name: str = None,
table_name: TTableHintTemplate[str] = None,
max_table_nesting: int = None,
write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
Expand All @@ -304,6 +305,7 @@ def resource(
/,
name: str = None,
table_name: TTableHintTemplate[str] = None,
max_table_nesting: int = None,
write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
Expand All @@ -322,6 +324,7 @@ def resource(
/,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
max_table_nesting: int = None,
write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
Expand All @@ -341,6 +344,7 @@ def resource(
/,
name: str = None,
table_name: TTableHintTemplate[str] = None,
max_table_nesting: int = None,
write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
Expand All @@ -358,6 +362,7 @@ def resource(
/,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
max_table_nesting: int = None,
write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
Expand Down Expand Up @@ -398,6 +403,7 @@ def resource(
If not present, the name of the decorated function will be used.

table_name (TTableHintTemplate[str], optional): An table name, if different from `name`.
max_table_nesting (int, optional): A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.

write_disposition (TTableHintTemplate[TWriteDispositionConfig], optional): Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary.
Expand Down Expand Up @@ -449,6 +455,15 @@ def make_resource(
schema_contract=schema_contract,
table_format=table_format,
)

# If custom nesting level was specified then
# we need to add it to table hints so that
# later in normalizer dlt/common/normalizers/json/relational.py
# we can override max_nesting level for the given table
if max_table_nesting is not None:
sultaniman marked this conversation as resolved.
Show resolved Hide resolved
table_template.setdefault("x-normalizer", {}) # type: ignore[typeddict-item]
table_template["x-normalizer"]["max_nesting"] = max_table_nesting # type: ignore[typeddict-item]

resource = DltResource.from_data(
_data,
_name,
Expand Down
11 changes: 11 additions & 0 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ def validator(self, validator: Optional[ValidateItem]) -> None:
if validator:
self.add_step(validator, insert_at=step_no if step_no >= 0 else None)

@property
def max_table_nesting(self) -> Optional[int]:
"""A schema hint for resource that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON."""
max_nesting = self._hints.get("x-normalizer", {}).get("max_nesting") # type: ignore[attr-defined]
return max_nesting if isinstance(max_nesting, int) else None

@max_table_nesting.setter
def max_table_nesting(self, value: int) -> None:
self._hints.setdefault("x-normalizer", {}) # type: ignore[typeddict-item]
self._hints["x-normalizer"]["max_nesting"] = value # type: ignore[typeddict-item]

def pipe_data_from(self, data_from: Union["DltResource", Pipe]) -> None:
"""Replaces the parent in the transformer resource pipe from which the data is piped."""
if self.is_transformer:
Expand Down
56 changes: 56 additions & 0 deletions docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,62 @@ for user in users().add_filter(lambda user: user["user_id"] != "me").add_map(ano
print(user)
```

### Reduce the nesting level of generated tables

You can limit how deep `dlt` goes when generating child tables. By default, the library will descend
and generate child tables for all nested lists, without limit.

:::note
`max_table_nesting` is optional so you can skip it, in this case dlt will
use it from the source if it is specified there or fallback to default
value which has 1000 as maximum nesting level.
:::

```py
import dlt

@dlt.resource(max_table_nesting=1)
def my_resource():
yield {
"id": 1,
"name": "random name",
"properties": [
{
"name": "customer_age",
"type": "int",
"label": "Age",
"notes": [
{
"text": "string",
"author": "string",
}
]
}
]
}
```

In the example above we want only 1 level of child tables to be generated (so there are no child
tables of child tables). Typical settings:

- `max_table_nesting=0` will not generate child tables at all and all nested data will be
represented as json.
- `max_table_nesting=1` will generate child tables of top level tables and nothing more. All nested
data in child tables will be represented as json.

You can achieve the same effect after the resource instance is created:

```py
from my_resource import my_awesome_module

resource = my_resource()
resource.max_table_nesting = 0
```

Several data sources are prone to contain semi-structured documents with very deep nesting i.e.
MongoDB databases. Our practical experience is that setting the `max_nesting_level` to 2 or 3
produces the clearest and human-readable schemas.

### Sample from large data

If your resource loads thousands of pages of data from a REST API or millions of rows from a db
Expand Down
Loading
Loading