From e7e1b9a21f0b763957c98658bf9a669f57d7fcea Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 24 Apr 2024 17:54:58 +0200 Subject: [PATCH] Guard add_limit counter using lock --- dlt/extract/resource.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 4776158bbb..16e37944bc 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -12,6 +12,7 @@ Any, Optional, ) +from threading import Lock from dlt.common.configuration.resolve import inject_section from dlt.common.configuration.specs import known_sections @@ -321,7 +322,7 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003 # make sure max_items is a number, to allow "None" as value for unlimited if max_items is None: max_items = -1 - + counter_lock = Lock() def _gen_wrap(gen: TPipeStep) -> TPipeStep: """Wrap a generator to take the first `max_items` records""" @@ -343,7 +344,9 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep: for i in gen: # type: ignore # TODO: help me fix this later yield i if i is not None: - count += 1 + with counter_lock: + count += 1 + # async gen yields awaitable so we must count one awaitable more # so the previous one is evaluated and yielded. # new awaitable will be cancelled