Skip to content

Commit

Permalink
Guard against duplicate outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
rculbertson committed Jan 16, 2025
1 parent 61cf07b commit 8934a0d
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions modal/parallel_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def count_update():
retry_queue = TimestampPriorityQueue()
pending_outputs: dict[int, asyncio.Future[_MapItemContext]] = {} # Map input idx -> context
completed_outputs: set[str] = set() # Set of input_ids whose outputs are complete (expecting no more values)

input_queue: asyncio.Queue[api_pb2.FunctionPutInputsItem | None] = asyncio.Queue()

# semaphore to limit the number of inputs that can be in progress at once
Expand Down Expand Up @@ -234,7 +233,6 @@ async def retry_inputs():
function_call_jwt=function_call_jwt,
inputs=inputs,
)
logger.debug(f"Sending retry for {inputs}.")

while True:
try:
Expand Down Expand Up @@ -294,7 +292,12 @@ async def get_all_outputs():
# processed and was received again due to a duplicate.
continue

item_context = await pending_outputs[item.idx]
future = pending_outputs.get(item.idx, None)
if future is None:
# We've already processed this output, so we can skip it.
# This can happen because the worker can sometimes send duplicate outputs.
continue
item_context = await future

if item.result and item.result.status == api_pb2.GenericResult.GENERIC_STATUS_SUCCESS:
# clear the item context to allow it to be garbage collected
Expand Down

0 comments on commit 8934a0d

Please sign in to comment.