Skip to content

Commit

Permalink
feat: flow test ui lifecycle management
Browse files Browse the repository at this point in the history
  • Loading branch information
elliotzh committed Apr 23, 2024
1 parent 7d50227 commit 6b0c560
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 19 deletions.
6 changes: 5 additions & 1 deletion src/promptflow-core/promptflow/_utils/flow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
from os import PathLike
from pathlib import Path
from typing import Optional, Tuple, Union
from typing import Optional, Tuple, TypedDict, Union

from promptflow._constants import (
CHAT_HISTORY,
Expand Down Expand Up @@ -321,3 +321,7 @@ def parse_variant(variant: str) -> Tuple[str, str]:
message=str(error),
error=error,
)


def get_info_for_flow_monitor(flow) -> TypedDict("FlowMonitorInfo", {"hash": str}):
return {"hash": "hash_value_of_current_flow"}
20 changes: 2 additions & 18 deletions src/promptflow-devkit/promptflow/_cli/_pf/_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import tempfile
import webbrowser
from pathlib import Path
from urllib.parse import urlencode, urlunparse

from promptflow._cli._params import (
add_param_config,
Expand Down Expand Up @@ -43,9 +42,8 @@
from promptflow._sdk._configuration import Configuration
from promptflow._sdk._constants import PROMPT_FLOW_DIR_NAME
from promptflow._sdk._pf_client import PFClient
from promptflow._sdk._service.utils.utils import encrypt_flow_path
from promptflow._sdk.operations._flow_operations import FlowOperations
from promptflow._utils.flow_utils import is_flex_flow, resolve_flow_path
from promptflow._utils.flow_utils import is_flex_flow
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow.exceptions import ErrorTarget, UserErrorException

Expand Down Expand Up @@ -482,21 +480,7 @@ def _test_flow_multi_modal(args, pf_client):
from promptflow._sdk._load_functions import load_flow

if Configuration.get_instance().is_internal_features_enabled():
from promptflow._sdk._tracing import _invoke_pf_svc

# Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info
def generate_url(flow_path, port):
encrypted_flow_path = encrypt_flow_path(flow_path)
query_params = urlencode({"flow": encrypted_flow_path})
return urlunparse(("http", f"127.0.0.1:{port}", "/v1.0/ui/chat", "", query_params, ""))

pfs_port = _invoke_pf_svc()
flow_path_dir, flow_path_file = resolve_flow_path(args.flow)
flow_path = str(flow_path_dir / flow_path_file)
chat_page_url = generate_url(flow_path, pfs_port)
print(f"You can begin chat flow on {chat_page_url}")
if not args.skip_open_browser:
webbrowser.open(chat_page_url)
pf_client.flows._start_chat_monitor(args.flow, skip_open_browser=args.skip_open_browser)
else:
if is_flex_flow(flow_path=args.flow):
error = ValueError("Only support dag yaml in streamlit ui.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,9 @@ def _get_generator_outputs(outputs):
generator_outputs = {key: output for key, output in outputs.items() if isinstance(output, GeneratorType)}
if generator_outputs:
logger.info(f"Some streaming outputs in the result, {generator_outputs.keys()}")

def refresh(self):
pass

def try_destroy(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import stat
import subprocess
import sys
import time
import uuid
import webbrowser
from dataclasses import MISSING, fields
from importlib.metadata import version
from os import PathLike
from pathlib import Path
from typing import Callable, Dict, Iterable, List, NoReturn, Optional, Tuple, Union
from urllib.parse import urlencode, urlunparse

import pydash

Expand Down Expand Up @@ -48,14 +51,17 @@
logger,
)
from promptflow._sdk.entities._flows import FlexFlow, Flow, Prompty
from promptflow._sdk.entities._flows.base import FlowBase
from promptflow._sdk.entities._validation import ValidationResult
from promptflow._utils.context_utils import _change_working_dir
from promptflow._utils.flow_utils import (
dump_flow_result,
get_info_for_flow_monitor,
is_executable_chat_flow,
is_flex_flow,
is_prompty_flow,
parse_variant,
resolve_flow_path,
)
from promptflow._utils.yaml_utils import dump_yaml, load_yaml
from promptflow.core._utils import load_inputs_from_sample
Expand Down Expand Up @@ -378,6 +384,73 @@ def _chat_with_ui(self, script, skip_open_browser: bool = False):
sys.argv += ["--server.headless=true"]
st_cli.main()

@staticmethod
def _construct_chat_ui_url(flow: FlowBase):
from promptflow._sdk._service.utils.utils import encrypt_flow_path
from promptflow._sdk._tracing import _invoke_pf_svc

# Todo: use base64 encode for now, will consider whether need use encryption or use db to store flow path info
def generate_url(flow_path, port):
encrypted_flow_path = encrypt_flow_path(flow_path)
query_params = urlencode({"flow": encrypted_flow_path})
return urlunparse(("http", f"127.0.0.1:{port}", "/v1.0/ui/chat", "", query_params, ""))

pfs_port = _invoke_pf_svc()
flow_dir, flow_file = resolve_flow_path(flow)
return generate_url((flow_dir / flow_file).absolute().resolve().as_posix(), pfs_port)

def _start_chat_monitor(
self,
flow: Union[str, PathLike],
*,
variant: str = None,
environment_variables: dict = None,
skip_open_browser: bool = False,
interval: float = 1.0,
**kwargs,
):
"""Refresh executor service.
:param flow: path to flow directory to refresh
"""

from promptflow._sdk._load_functions import load_flow

flow = load_flow(flow)
flow.context.variant = variant

browser_opened, current_flow_hash = False, None
with TestSubmitter(flow=flow, flow_context=flow.context, client=self._client).init(
environment_variables=environment_variables,
stream_log=False, # no need to stream log in chat mode
collection=kwargs.get("collection", None),
) as submitter:
try:
while True:
# we should keep monitoring the original flow
flow_info = get_info_for_flow_monitor(flow)
if current_flow_hash != flow_info["hash"]:
try:
submitter.refresh()
except Exception as ex:
logger.error(f"Failed to refresh executor service, {ex}")

if not browser_opened and not skip_open_browser:
chat_page_url = self._construct_chat_ui_url(flow)
print(f"You can begin chat flow on {chat_page_url}")
webbrowser.open(chat_page_url)
browser_opened = True
time.sleep(interval)
logger.info("Chat monitor is running...")
except KeyboardInterrupt:
# logger will not print the message if the process is killed by KeyboardInterrupt
print("Shutdown executor service... exiting")
sys.exit(0)
finally:
submitter.try_destroy()
# logger will not print the message if the process is killed by KeyboardInterrupt
print("Shutdown executor service forcefully... exiting")

def _build_environment_config(self, flow_dag_path: Path):
flow_info = load_yaml(flow_dag_path)
# standard env object:
Expand Down

0 comments on commit 6b0c560

Please sign in to comment.