Skip to content

Commit

Permalink
Guard add_limit counter using lock
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Apr 24, 2024
1 parent e0a7fe0 commit e7e1b9a
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Any,
Optional,
)
from threading import Lock

from dlt.common.configuration.resolve import inject_section
from dlt.common.configuration.specs import known_sections
Expand Down Expand Up @@ -321,7 +322,7 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003
# make sure max_items is a number, to allow "None" as value for unlimited
if max_items is None:
max_items = -1

counter_lock = Lock()
def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first `max_items` records"""

Expand All @@ -343,7 +344,9 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep:
for i in gen: # type: ignore # TODO: help me fix this later
yield i
if i is not None:
count += 1
with counter_lock:
count += 1

# async gen yields awaitable so we must count one awaitable more
# so the previous one is evaluated and yielded.
# new awaitable will be cancelled
Expand Down

0 comments on commit e7e1b9a

Please sign in to comment.