Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment: Guard add_limit counter using lock #1277

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Any,
Optional,
)
from threading import Lock

from dlt.common.configuration.resolve import inject_section
from dlt.common.configuration.specs import known_sections
Expand Down Expand Up @@ -332,7 +333,7 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003
# make sure max_items is a number, to allow "None" as value for unlimited
if max_items is None:
max_items = -1

counter_lock = Lock()
def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first `max_items` records"""

Expand All @@ -354,7 +355,9 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep:
for i in gen: # type: ignore # TODO: help me fix this later
yield i
if i is not None:
count += 1
with counter_lock:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sultaniman I think all this code is run on the main pipeline thread anyway, so the lock will not help here, but I'm not 100% sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh I also didn't dig deeper, my theory was that we submit the work to ThreadPoolExecutor which can potentially case data races.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operations on integers in CPython are atomic so we are not locking anything. IMO race condition goes deep here and we are better off by making our tests to accept 10th element 🤷

count += 1

# async gen yields awaitable so we must count one awaitable more
# so the previous one is evaluated and yielded.
# new awaitable will be cancelled
Expand Down
Loading