Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client now retries .map() failures #2734

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open

Client now retries .map() failures #2734

wants to merge 14 commits into from

Conversation

rculbertson
Copy link
Contributor

@rculbertson rculbertson commented Jan 7, 2025

Describe your changes

Closes SVC-180.

Adds client retries to .map(). We already did this to .remote() in this PR: #2403

For now, retries are enabled only if the MODAL_CLIENT_RETRIES flag is set to true.


Check these boxes or delete any item (or this section) if not relevant for this PR.

  • Client+Server: this change is compatible with old servers
  • Client forward compatibility: this change ensures client can accept data intended for later versions of itself

Note on protobuf: protobuf message changes in one place may have impact to
multiple entities (client, server, worker, database). See points above.


Changelog

@rculbertson rculbertson marked this pull request as ready for review January 8, 2025 19:54
@rculbertson rculbertson changed the title WIP Client retries for .map() Client now retries .map() failures Jan 8, 2025
@rohansingh
Copy link
Contributor

There were some unit tests on the priority queue that could be restored:
2c1f62c#diff-80ca24dfa2bbe913c31c13fd90c8c0a2cef7cb4ba98e66ca0c5eca38d94c7732

if timestamp_seconds == self._MAX_PRIORITY:
return None
await self._queue.put((timestamp_seconds, idx))
await asyncio.sleep(1)
Copy link
Contributor

@rohansingh rohansingh Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using an asyncio.Condition like the previous implementation is better than asyncio.sleep(1) since the latter becomes a busy wait.

# wait until either the timeout or a new item is added
try:
await asyncio.wait_for(self.condition.wait(), timeout=sleep_time)
except asyncio.TimeoutError:
continue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point. I'll bring back that original implementation.

rohansingh and others added 13 commits January 16, 2025 22:16
When two items had the same timestamp, we would try to sort by the
actual item value, which breaks for types that don't support comparison.

Instead use a nonce when inserting an item, to ensure that we never have
to compare the item value itself.
Though very unlikely outside of unit tests, it's possible to have an output
returned before the corresponding retry context has been put into the
`pending_outputs` dict.
Once the input queue filled up, we had no more room to put pending
retries. And since we had no more room to put retries, we stopped
fetching new outputs. And since we stopped fetching new outputs, the
server stopped accepting new inputs.

As a result, the input queue would never burn down.

Instead, use a semaphore to ensure we never have more than 1000 items
outstanding.
Instead of using a priority queue, just use the event loop to schedule
retries in the future. This significantly simplifies the implementation
and makes it much more like the original.

Note that we still do have a semaphore that ensures that no more than 1K
inputs are in flight (i.e., sent to the server but not completed).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants