Skip to content

Commit

Permalink
[PromptFlow][Extension] Fixed local batchrun could not be cancelled (#…
Browse files Browse the repository at this point in the history
…1219)

# Description
Fixed local batch-run could not be cancelled.

Windows:

![image](https://github.com/microsoft/promptflow/assets/115983968/9359c22f-08d0-4573-b807-042061d773be)

Linux:

![image](https://github.com/microsoft/promptflow/assets/115983968/0004c18c-6354-4f8b-a5fd-4c4e4d1488d1)

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
Hhhilulu authored Nov 21, 2023
1 parent 48844ca commit 5f5ac23
Showing 1 changed file with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import contextvars
import math
import sys
import signal
import multiprocessing
import os
import queue
Expand Down Expand Up @@ -31,6 +33,19 @@
from promptflow.storage import AbstractRunStorage


def signal_handler(signum, frame):
signame = signal.Signals(signum).name
bulk_logger.info("Execution stopping. Handling signal %s (%s)", signame, signum)
try:
process = psutil.Process(os.getpid())
bulk_logger.info("Successfully terminated process with pid %s", process.pid)
process.terminate()
except Exception:
bulk_logger.warning("Error when handling execution stop signal", exc_info=True)
finally:
sys.exit(1)


class QueueRunStorage(AbstractRunStorage):
"""This storage persists run info by putting it into a queue."""

Expand Down Expand Up @@ -323,6 +338,7 @@ def _generate_line_result_for_exception(self, inputs, run_id, line_number, flow_
return result

def run(self, batch_inputs):
signal.signal(signal.SIGINT, signal_handler)
for index, inputs in batch_inputs:
self._inputs_queue.put(
(
Expand All @@ -348,13 +364,23 @@ def run(self, batch_inputs):
),
):
try:
self._pool.starmap(
# The variable 'async_result' here is not the actual result of the batch run
# but an AsyncResult object that can be used to check if the execution are finished
# The actual results of the batch run are stored in 'result_list'
async_result = self._pool.starmap_async(
self._timeout_process_wrapper,
[
(run_start_time, self._inputs_queue, self._line_timeout_sec, result_list)
for _ in range(self._n_process)
],
)
try:
# Wait for batch run to complete or KeyboardInterrupt
while not async_result.ready():
# Check every 1 second
async_result.wait(1)
except KeyboardInterrupt:
raise
except PromptflowException:
raise
except Exception as e:
Expand Down Expand Up @@ -434,6 +460,7 @@ def _process_wrapper(
log_context_initialization_func,
operation_contexts_dict: dict,
):
signal.signal(signal.SIGINT, signal_handler)
logger.info(f"Process {os.getpid()} started.")
OperationContext.get_instance().update(operation_contexts_dict) # Update the operation context for the new process.
if log_context_initialization_func:
Expand Down

0 comments on commit 5f5ac23

Please sign in to comment.