Skip to content

Commit

Permalink
Fix deadlocks on large numbers of inputs
Browse files Browse the repository at this point in the history
Once the input queue filled up, we had no more room to put pending
retries. And since we had no more room to put retries, we stopped
fetching new outputs. And since we stopped fetching new outputs, the
server stopped accepting new inputs.

As a result, the input queue would never burn down.

Instead, use a semaphore to ensure we never have more than 1000 items
outstanding.
  • Loading branch information
rohansingh authored and rculbertson committed Jan 8, 2025
1 parent 1a8d5b2 commit afdc7f8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
5 changes: 3 additions & 2 deletions modal/_utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,9 +779,10 @@ async def put_with_timestamp(self, timestamp: float, item: Union[T, None]):
"""
Add an item to the queue to be processed at a specific timestamp.
"""
self.nonce += 1
await super().put((timestamp, self.nonce, item))

async with self.condition:
self.nonce += 1
await super().put((timestamp, self.nonce, item))
self.condition.notify_all() # notify any waiting coroutines

async def get_next(self) -> Union[T, None]:
Expand Down
32 changes: 21 additions & 11 deletions modal/parallel_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ class _MapItemRetryContext:
retry_manager: RetryManager


MAP_INVOCATION_CHUNK_SIZE = 49
# maximum number of inputs that can be in progress (either queued to be sent,
# or waiting for completion). if this limit is reached, we will block reading
# from the input generator until more work completed.
MAP_MAX_INPUTS_OUTSTANDING = 1000

# maximum number of inputs that can be queued to be sent to the server. once
# this limit is reached, we will block (a) reading from the input generator
# and (b) processing outputs from the server, until some of the already-queued
# inputs are pushed to the server.
MAP_INPUT_QUEUE_MAX_SIZE = 1000
# maximum number of inputs to send to the server in a single request
MAP_INVOCATION_CHUNK_SIZE = 49

if typing.TYPE_CHECKING:
import modal.functions
Expand Down Expand Up @@ -118,11 +118,10 @@ def count_update():
pending_outputs: dict[int, asyncio.Future[_MapItemRetryContext]] = {} # Map input idx -> retry context
completed_outputs: set[str] = set() # Set of input_ids whose outputs are complete (expecting no more values)

inputs_outstanding = asyncio.BoundedSemaphore(MAP_MAX_INPUTS_OUTSTANDING)
input_queue: TimedPriorityQueue[
api_pb2.FunctionPutInputsItem | api_pb2.FunctionRetryInputsItem
] = TimedPriorityQueue(
maxsize=MAP_INPUT_QUEUE_MAX_SIZE,
)
] = TimedPriorityQueue()

async def create_input(argskwargs):
nonlocal num_inputs
Expand All @@ -146,6 +145,16 @@ async def drain_input_generator():
async_map_ordered(input_iter(), create_input, concurrency=BLOB_MAX_PARALLELISM)
) as streamer:
async for item in streamer:
while True:
try:
await asyncio.wait_for(inputs_outstanding.acquire(), timeout=30)
break
except TimeoutError:
logger.warning(
"Warning: map progress is limited. Common bottlenecks "
"include slow iteration over results, or function backlogs."
)

await input_queue.put_with_timestamp(time.time(), item)

have_all_inputs = True
Expand Down Expand Up @@ -305,10 +314,11 @@ async def get_all_outputs():
input_jwt=retry_context.input_jwt,
input=retry_context.input,
)
await input_queue.put_with_timestamp(time.time() + delay_ms, retry)
await input_queue.put_with_timestamp(time.time() + (delay_ms / 1000), retry)
continue

completed_outputs.add(item.input_id)
inputs_outstanding.release()
num_outputs += 1

yield item
Expand All @@ -331,7 +341,7 @@ async def get_all_outputs_and_clean_up():
await retry_transient_errors(client.stub.FunctionGetOutputs, request)

# close the input queue
await input_queue.put_with_timestamp(time.time(), None)
await input_queue.put_with_timestamp(0, None)

async def fetch_output(item: api_pb2.FunctionGetOutputsItem) -> tuple[int, Any]:
try:
Expand Down

0 comments on commit afdc7f8

Please sign in to comment.