diff --git a/modal/parallel_map.py b/modal/parallel_map.py index cbf5d7cc7..54d3a8594 100644 --- a/modal/parallel_map.py +++ b/modal/parallel_map.py @@ -119,7 +119,6 @@ def count_update(): retry_queue = TimestampPriorityQueue() pending_outputs: dict[int, asyncio.Future[_MapItemContext]] = {} # Map input idx -> context completed_outputs: set[str] = set() # Set of input_ids whose outputs are complete (expecting no more values) - input_queue: asyncio.Queue[api_pb2.FunctionPutInputsItem | None] = asyncio.Queue() # semaphore to limit the number of inputs that can be in progress at once @@ -234,7 +233,6 @@ async def retry_inputs(): function_call_jwt=function_call_jwt, inputs=inputs, ) - logger.debug(f"Sending retry for {inputs}.") while True: try: @@ -294,7 +292,12 @@ async def get_all_outputs(): # processed and was received again due to a duplicate. continue - item_context = await pending_outputs[item.idx] + future = pending_outputs.get(item.idx, None) + if future is None: + # We've already processed this output, so we can skip it. + # This can happen because the worker can sometimes send duplicate outputs. + continue + item_context = await future if item.result and item.result.status == api_pb2.GenericResult.GENERIC_STATUS_SUCCESS: # clear the item context to allow it to be garbage collected