diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 343a737c07..96815903be 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -502,7 +502,12 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: # add directly computed hashes unique_hashes.update(transformer.unique_hashes) self._cached_state["unique_hashes"] = list(unique_hashes) - + DEDUP_WARNING_THRESHOLD = 200 + if len(self._cached_state["unique_hashes"]) > 200: + logger.warning( + f"There are over {DEDUP_WARNING_THRESHOLD} records to be deduplicated because" + f" they share the same primary key `{self.primary_key}`." + ) return rows diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 0a0de75987..7324c1c1c6 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -2586,3 +2586,24 @@ def updated_is_int(updated_at=dlt.sources.incremental("updated_at", initial_valu pipeline.run(updated_is_int()) assert isinstance(pip_ex.value.__cause__, IncrementalCursorInvalidCoercion) assert pip_ex.value.__cause__.cursor_path == "updated_at" + + +@pytest.mark.parametrize("item_type", ["object"]) +@pytest.mark.parametrize("primary_key", ["id", None]) +def test_warning_large_deduplication_state(item_type: TestDataItemFormat, primary_key, mocker): + @dlt.resource(primary_key=primary_key) + def some_data( + created_at=dlt.sources.incremental("created_at"), + ): + yield data_to_item_format( + item_type, + [{"id": i, "created_at": 1} for i in range(201)], + ) + + logger_spy = mocker.spy(dlt.common.logger, "warning") + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data(1)) + logger_spy.assert_called_once_with( + "There are over 200 records to be deduplicated because they share the same primary key" + f" `{primary_key}`." + )