diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 4776158bbb..16e37944bc 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -12,6 +12,7 @@ Any, Optional, ) +from threading import Lock from dlt.common.configuration.resolve import inject_section from dlt.common.configuration.specs import known_sections @@ -321,7 +322,7 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003 # make sure max_items is a number, to allow "None" as value for unlimited if max_items is None: max_items = -1 - + counter_lock = Lock() def _gen_wrap(gen: TPipeStep) -> TPipeStep: """Wrap a generator to take the first `max_items` records""" @@ -343,7 +344,9 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: for i in gen: # type: ignore # TODO: help me fix this later yield i if i is not None: - count += 1 + with counter_lock: + count += 1 + # async gen yields awaitable so we must count one awaitable more # so the previous one is evaluated and yielded. # new awaitable will be cancelled