From 75cd97c40a14232b93c4208bec61e0f386bc0001 Mon Sep 17 00:00:00 2001 From: zhangxingzhi Date: Wed, 24 Apr 2024 12:09:09 +0800 Subject: [PATCH] feat: debug service --- .../promptflow/_cli/_pf/_flow.py | 31 ++++------ .../promptflow/_sdk/_chat_utils.py | 27 +++++++++ .../promptflow/_sdk/_constants.py | 1 + .../_sdk/_orchestrator/test_submitter.py | 6 ++ .../_sdk/operations/_flow_operations.py | 59 +++++++++++++++++++ 5 files changed, 103 insertions(+), 21 deletions(-) create mode 100644 src/promptflow-devkit/promptflow/_sdk/_chat_utils.py diff --git a/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py b/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py index 6826ce815986..9fb75cc0c9a5 100644 --- a/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py +++ b/src/promptflow-devkit/promptflow/_cli/_pf/_flow.py @@ -12,9 +12,9 @@ import tempfile import webbrowser from pathlib import Path -from urllib.parse import urlencode, urlunparse from promptflow._cli._params import ( + AppendToDictAction, add_param_config, add_param_entry, add_param_environment_variables, @@ -40,12 +40,12 @@ ) from promptflow._cli._utils import _copy_to_flow, activate_action, confirm, inject_sys_path, list_of_dict_to_dict from promptflow._constants import ConnectionProviderConfig, FlowLanguage +from promptflow._sdk._chat_utils import register_chat_session from promptflow._sdk._configuration import Configuration from promptflow._sdk._constants import PROMPT_FLOW_DIR_NAME from promptflow._sdk._pf_client import PFClient -from promptflow._sdk._service.utils.utils import encrypt_flow_path from promptflow._sdk.operations._flow_operations import FlowOperations -from promptflow._utils.flow_utils import is_flex_flow, resolve_flow_path +from promptflow._utils.flow_utils import is_flex_flow from promptflow._utils.logger_utils import get_cli_sdk_logger from promptflow.exceptions import ErrorTarget, UserErrorException @@ -293,7 +293,7 @@ def add_parser_test_flow(subparsers): "--skip-open-browser", action="store_true", help=argparse.SUPPRESS ) add_param_url_params = lambda parser: parser.add_argument( # noqa: E731 - "--url-params", type=str, help=argparse.SUPPRESS + "--url-params", action=AppendToDictAction, help=argparse.SUPPRESS ) add_params = [ @@ -515,24 +515,13 @@ def _test_flow_multi_modal(args, pf_client): else: from promptflow._sdk._tracing import _invoke_pf_svc - # Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info - def generate_url(flow_path, port, url_params): - encrypted_flow_path = encrypt_flow_path(flow_path) - query_dict = {"flow": encrypted_flow_path} - if Configuration.get_instance().is_internal_features_enabled(): - query_dict.update({"enable_internal_features": "true"}) - query_params = urlencode(query_dict) - if url_params: - query_params += "&" + url_params - return urlunparse(("http", f"127.0.0.1:{port}", "/v1.0/ui/chat", "", query_params, "")) - pfs_port = _invoke_pf_svc() - flow_path_dir, flow_path_file = resolve_flow_path(args.flow) - flow_path = str(flow_path_dir / flow_path_file) - chat_page_url = generate_url(flow_path, pfs_port, args.url_params) - print(f"You can begin chat flow on {chat_page_url}") - if not args.skip_open_browser: - webbrowser.open(chat_page_url) + session_id, session_url = register_chat_session(args.flow, pfs_port=pfs_port, url_param=args.url_params) + pf_client.flows._start_chat_monitor( + args.flow, + session_id=session_id, + session_url=session_url if args.skip_open_browser else None, + ) def _test_flow_interactive(args, pf_client, inputs, environment_variables): diff --git a/src/promptflow-devkit/promptflow/_sdk/_chat_utils.py b/src/promptflow-devkit/promptflow/_sdk/_chat_utils.py new file mode 100644 index 000000000000..306798dcf187 --- /dev/null +++ b/src/promptflow-devkit/promptflow/_sdk/_chat_utils.py @@ -0,0 +1,27 @@ +from typing import Tuple, TypedDict +from urllib.parse import urlencode, urlunparse + +from promptflow._utils.flow_utils import resolve_flow_path + + +def construct_session_id(flow: str) -> str: + from promptflow._sdk._service.utils.utils import encrypt_flow_path + + # Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info + return encrypt_flow_path(flow) + + +def register_chat_session(flow: str, pfs_port, url_param: dict) -> Tuple[str, str]: + # TODO: register chat session so that we may store related information in db and allow multiple + # debug sessions on the same flow + flow_dir, flow_file = resolve_flow_path(flow) + session_id = construct_session_id((flow_dir / flow_file).absolute().resolve().as_posix()) + query_params = urlencode({"flow": session_id, **url_param}) + return session_id, urlunparse(("http", f"127.0.0.1:{pfs_port}", "/v1.0/ui/chat", "", query_params)) + + +def get_info_for_flow_monitor(*, session_id, flow_dir) -> TypedDict["FlowInfo", {"hash": str}]: + return { + # TODO: calculate hash for flow file + "hash": "hash", + } diff --git a/src/promptflow-devkit/promptflow/_sdk/_constants.py b/src/promptflow-devkit/promptflow/_sdk/_constants.py index a140fbd28079..0b740190692b 100644 --- a/src/promptflow-devkit/promptflow/_sdk/_constants.py +++ b/src/promptflow-devkit/promptflow/_sdk/_constants.py @@ -23,6 +23,7 @@ # Please avoid using PROMPT_FLOW_DIR_NAME directly for home directory, "Path.home() / PROMPT_FLOW_DIR_NAME" e.g. # Use HOME_PROMPT_FLOW_DIR instead PROMPT_FLOW_DIR_NAME = PROMPT_FLOW_DIR_NAME +SESSION_CONFIG_FILE_NAME = "session_config.json" def _prepare_home_dir() -> Path: diff --git a/src/promptflow-devkit/promptflow/_sdk/_orchestrator/test_submitter.py b/src/promptflow-devkit/promptflow/_sdk/_orchestrator/test_submitter.py index b2febde49a26..f1d4017836ca 100644 --- a/src/promptflow-devkit/promptflow/_sdk/_orchestrator/test_submitter.py +++ b/src/promptflow-devkit/promptflow/_sdk/_orchestrator/test_submitter.py @@ -625,3 +625,9 @@ def _get_generator_outputs(outputs): generator_outputs = {key: output for key, output in outputs.items() if isinstance(output, GeneratorType)} if generator_outputs: logger.info(f"Some streaming outputs in the result, {generator_outputs.keys()}") + + def refresh(self): + pass + + def try_destroy(self): + pass diff --git a/src/promptflow-devkit/promptflow/_sdk/operations/_flow_operations.py b/src/promptflow-devkit/promptflow/_sdk/operations/_flow_operations.py index 68809a272410..a9c4ab32947f 100644 --- a/src/promptflow-devkit/promptflow/_sdk/operations/_flow_operations.py +++ b/src/promptflow-devkit/promptflow/_sdk/operations/_flow_operations.py @@ -12,7 +12,9 @@ import stat import subprocess import sys +import time import uuid +import webbrowser from dataclasses import MISSING, fields from importlib.metadata import version from os import PathLike @@ -23,6 +25,7 @@ from promptflow._constants import FLOW_FLEX_YAML, LANGUAGE_KEY, PROMPT_FLOW_DIR_NAME, FlowLanguage from promptflow._proxy import ProxyFactory +from promptflow._sdk._chat_utils import get_info_for_flow_monitor from promptflow._sdk._configuration import Configuration from promptflow._sdk._constants import ( DEFAULT_ENCODING, @@ -382,6 +385,62 @@ def _chat_with_ui(self, script, skip_open_browser: bool = False): sys.argv += ["--server.headless=true"] st_cli.main() + def _start_chat_monitor( + self, + flow: Union[str, PathLike], + *, + session_id: str, + session_url: str = None, + variant: str = None, + environment_variables: dict = None, + interval: float = 5.0, + **kwargs, + ): + """Refresh executor service. + + :param flow: path to flow directory to refresh + """ + + from promptflow._sdk._load_functions import load_flow + + flow = load_flow(flow) + flow.context.variant = variant + + browser_opened, current_flow_hash = False, None + with TestSubmitter(flow=flow, flow_context=flow.context, client=self._client).init( + environment_variables=environment_variables, + stream_log=False, # no need to stream log in chat mode + collection=kwargs.get("collection", None), + ) as submitter: + try: + while True: + # we should keep monitoring the original flow + flow_info = get_info_for_flow_monitor( + session_id=session_id, + flow_dir=flow.code, + ) + + if current_flow_hash != flow_info["hash"]: + try: + submitter.refresh() + except Exception as ex: + logger.error(f"Failed to refresh executor service, {ex}") + + if not browser_opened and session_url: + print(f"You can begin chat flow on {session_url}") + webbrowser.open(session_url) + browser_opened = True + time.sleep(interval) + logger.info("Chat monitor is running...") + except KeyboardInterrupt: + # logger will not print the message if the process is killed by KeyboardInterrupt + print("Shutdown executor service... exiting") + sys.exit(0) + finally: + submitter.try_destroy() + # logger will not print the message if the process is killed by KeyboardInterrupt + print("Shutdown executor service forcefully... exiting") + def _build_environment_config(self, flow_dag_path: Path): flow_info = load_yaml(flow_dag_path) # standard env object: