From afdc7f8e19758352dda69458cfd7c310fd5eef31 Mon Sep 17 00:00:00 2001 From: Rohan Singh Date: Wed, 4 Dec 2024 17:09:12 +0000 Subject: [PATCH] Fix deadlocks on large numbers of inputs Once the input queue filled up, we had no more room to put pending retries. And since we had no more room to put retries, we stopped fetching new outputs. And since we stopped fetching new outputs, the server stopped accepting new inputs. As a result, the input queue would never burn down. Instead, use a semaphore to ensure we never have more than 1000 items outstanding. --- modal/_utils/async_utils.py | 5 +++-- modal/parallel_map.py | 32 +++++++++++++++++++++----------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/modal/_utils/async_utils.py b/modal/_utils/async_utils.py index 1dec0e0d2..cca49dd9a 100644 --- a/modal/_utils/async_utils.py +++ b/modal/_utils/async_utils.py @@ -779,9 +779,10 @@ async def put_with_timestamp(self, timestamp: float, item: Union[T, None]): """ Add an item to the queue to be processed at a specific timestamp. """ + self.nonce += 1 + await super().put((timestamp, self.nonce, item)) + async with self.condition: - self.nonce += 1 - await super().put((timestamp, self.nonce, item)) self.condition.notify_all() # notify any waiting coroutines async def get_next(self) -> Union[T, None]: diff --git a/modal/parallel_map.py b/modal/parallel_map.py index 97b121fd5..c41d2e075 100644 --- a/modal/parallel_map.py +++ b/modal/parallel_map.py @@ -72,13 +72,13 @@ class _MapItemRetryContext: retry_manager: RetryManager -MAP_INVOCATION_CHUNK_SIZE = 49 +# maximum number of inputs that can be in progress (either queued to be sent, +# or waiting for completion). if this limit is reached, we will block reading +# from the input generator until more work completed. +MAP_MAX_INPUTS_OUTSTANDING = 1000 -# maximum number of inputs that can be queued to be sent to the server. once -# this limit is reached, we will block (a) reading from the input generator -# and (b) processing outputs from the server, until some of the already-queued -# inputs are pushed to the server. -MAP_INPUT_QUEUE_MAX_SIZE = 1000 +# maximum number of inputs to send to the server in a single request +MAP_INVOCATION_CHUNK_SIZE = 49 if typing.TYPE_CHECKING: import modal.functions @@ -118,11 +118,10 @@ def count_update(): pending_outputs: dict[int, asyncio.Future[_MapItemRetryContext]] = {} # Map input idx -> retry context completed_outputs: set[str] = set() # Set of input_ids whose outputs are complete (expecting no more values) + inputs_outstanding = asyncio.BoundedSemaphore(MAP_MAX_INPUTS_OUTSTANDING) input_queue: TimedPriorityQueue[ api_pb2.FunctionPutInputsItem | api_pb2.FunctionRetryInputsItem - ] = TimedPriorityQueue( - maxsize=MAP_INPUT_QUEUE_MAX_SIZE, - ) + ] = TimedPriorityQueue() async def create_input(argskwargs): nonlocal num_inputs @@ -146,6 +145,16 @@ async def drain_input_generator(): async_map_ordered(input_iter(), create_input, concurrency=BLOB_MAX_PARALLELISM) ) as streamer: async for item in streamer: + while True: + try: + await asyncio.wait_for(inputs_outstanding.acquire(), timeout=30) + break + except TimeoutError: + logger.warning( + "Warning: map progress is limited. Common bottlenecks " + "include slow iteration over results, or function backlogs." + ) + await input_queue.put_with_timestamp(time.time(), item) have_all_inputs = True @@ -305,10 +314,11 @@ async def get_all_outputs(): input_jwt=retry_context.input_jwt, input=retry_context.input, ) - await input_queue.put_with_timestamp(time.time() + delay_ms, retry) + await input_queue.put_with_timestamp(time.time() + (delay_ms / 1000), retry) continue completed_outputs.add(item.input_id) + inputs_outstanding.release() num_outputs += 1 yield item @@ -331,7 +341,7 @@ async def get_all_outputs_and_clean_up(): await retry_transient_errors(client.stub.FunctionGetOutputs, request) # close the input queue - await input_queue.put_with_timestamp(time.time(), None) + await input_queue.put_with_timestamp(0, None) async def fetch_output(item: api_pb2.FunctionGetOutputsItem) -> tuple[int, Any]: try: