diff --git a/src/promptflow/promptflow/_constants.py b/src/promptflow/promptflow/_constants.py index 53facb646da..4b55fe0e98e 100644 --- a/src/promptflow/promptflow/_constants.py +++ b/src/promptflow/promptflow/_constants.py @@ -44,6 +44,9 @@ LINE_NUMBER_KEY = "line_number" # Using the same key with portal. LINE_TIMEOUT_SEC = 600 +# Environment variables +PF_LONG_RUNNING_LOGGING_INTERVAL = "PF_LONG_RUNNING_LOGGING_INTERVAL" + class FlowLanguage: """The enum of tool source type.""" diff --git a/src/promptflow/promptflow/_core/flow_execution_context.py b/src/promptflow/promptflow/_core/flow_execution_context.py index 2ea6259a4a0..b1843bc8767 100644 --- a/src/promptflow/promptflow/_core/flow_execution_context.py +++ b/src/promptflow/promptflow/_core/flow_execution_context.py @@ -17,7 +17,7 @@ from promptflow._core.cache_manager import AbstractCacheManager, CacheInfo, CacheResult from promptflow._utils.logger_utils import flow_logger, logger from promptflow._utils.thread_utils import RepeatLogTimer -from promptflow._utils.utils import generate_elapsed_time_messages +from promptflow._utils.utils import generate_elapsed_time_messages, try_get_long_running_logging_interval from promptflow.contracts.flow import Node from promptflow.contracts.run_info import RunInfo from promptflow.exceptions import PromptflowException @@ -26,6 +26,8 @@ from .run_tracker import RunTracker +DEFAULT_LOGGING_INTERVAL = 60 + class FlowExecutionContext(ThreadLocalSingleton): """The context for a flow execution.""" @@ -82,7 +84,7 @@ def invoke_tool(self, node: Node, f: Callable, kwargs): if not hit_cache: Tracer.start_tracing(node_run_id, node.name) - result = self._invoke_tool_with_timer(node, f, kwargs) + result = self._invoke_tool_inner(node, f, kwargs) traces = Tracer.end_tracing(node_run_id) self._run_tracker.end_run(node_run_id, result=result, traces=traces) @@ -166,14 +168,17 @@ async def _invoke_tool_async_inner(self, node: Node, f: Callable, kwargs): # and shows stack trace in the error message to make it easy for user to troubleshoot. raise ToolExecutionError(node_name=node.name, module=module) from e - def _invoke_tool_with_timer(self, node: Node, f: Callable, kwargs): + def _invoke_tool_inner(self, node: Node, f: Callable, kwargs): module = f.func.__module__ if isinstance(f, functools.partial) else f.__module__ node_name = node.name try: + if ( + interval_seconds := try_get_long_running_logging_interval(flow_logger, DEFAULT_LOGGING_INTERVAL) + ) is None: + return f(**kwargs) logging_name = node_name if self._line_number is not None: logging_name = f"{node_name} in line {self._line_number}" - interval_seconds = 60 start_time = time.perf_counter() thread_id = threading.current_thread().ident with RepeatLogTimer( diff --git a/src/promptflow/promptflow/_utils/utils.py b/src/promptflow/promptflow/_utils/utils.py index 844913829e7..02d787280e0 100644 --- a/src/promptflow/promptflow/_utils/utils.py +++ b/src/promptflow/promptflow/_utils/utils.py @@ -20,7 +20,7 @@ from pathlib import Path from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar, Union -from promptflow._constants import DEFAULT_ENCODING +from promptflow._constants import DEFAULT_ENCODING, PF_LONG_RUNNING_LOGGING_INTERVAL from promptflow.contracts.multimedia import PFBytes from promptflow.contracts.types import AssistantDefinition @@ -396,3 +396,25 @@ def prepare_folder(path: Union[str, Path]) -> Path: path = Path(path) path.mkdir(parents=True, exist_ok=True) return path + + +def try_get_long_running_logging_interval(logger: logging.Logger, default_interval: int): + logging_interval_in_env = os.environ.get(PF_LONG_RUNNING_LOGGING_INTERVAL, None) + if logging_interval_in_env: + try: + value = int(logging_interval_in_env) + if value <= 0: + raise ValueError + logger.info( + f"Using value of {PF_LONG_RUNNING_LOGGING_INTERVAL} in environment variable as " + f"logging interval: {logging_interval_in_env}" + ) + return value + except ValueError: + logger.warning( + f"Value of {PF_LONG_RUNNING_LOGGING_INTERVAL} in environment variable " + f"('{logging_interval_in_env}') is invalid, use default value {default_interval}" + ) + return default_interval + # If the environment variable is not set, return none to disable the long running logging + return None diff --git a/src/promptflow/promptflow/executor/_async_nodes_scheduler.py b/src/promptflow/promptflow/executor/_async_nodes_scheduler.py index 664016a698c..01b2e926f34 100644 --- a/src/promptflow/promptflow/executor/_async_nodes_scheduler.py +++ b/src/promptflow/promptflow/executor/_async_nodes_scheduler.py @@ -18,7 +18,7 @@ from promptflow._core.tools_manager import ToolsManager from promptflow._utils.logger_utils import flow_logger from promptflow._utils.thread_utils import ThreadWithContextVars -from promptflow._utils.utils import extract_user_frame_summaries, set_context +from promptflow._utils.utils import extract_user_frame_summaries, set_context, try_get_long_running_logging_interval from promptflow.contracts.flow import Node from promptflow.executor._dag_manager import DAGManager from promptflow.executor._errors import NoNodeExecutedError @@ -58,12 +58,15 @@ async def execute( # Semaphore should be created in the loop, otherwise it will not work. loop = asyncio.get_running_loop() self._semaphore = asyncio.Semaphore(self._node_concurrency) - monitor = ThreadWithContextVars( - target=monitor_long_running_coroutine, - args=(loop, self._task_start_time, self._task_last_log_time, self._dag_manager_completed_event), - daemon=True, - ) - monitor.start() + if (interval := try_get_long_running_logging_interval(flow_logger, DEFAULT_TASK_LOGGING_INTERVAL)) is not None: + monitor = ThreadWithContextVars( + target=monitor_long_running_coroutine, + args=( + interval, loop, self._task_start_time, self._task_last_log_time, self._dag_manager_completed_event + ), + daemon=True, + ) + monitor.start() # Set the name of scheduler tasks to avoid monitoring its duration task = asyncio.current_task() @@ -223,6 +226,7 @@ def log_stack_recursively(task: asyncio.Task, elapse_time: float): def monitor_long_running_coroutine( + logging_interval: int, loop: asyncio.AbstractEventLoop, task_start_time: dict, task_last_log_time: dict, @@ -230,24 +234,6 @@ def monitor_long_running_coroutine( ): flow_logger.info("monitor_long_running_coroutine started") - logging_interval = DEFAULT_TASK_LOGGING_INTERVAL - logging_interval_in_env = os.environ.get("PF_TASK_PEEKING_INTERVAL") - if logging_interval_in_env: - try: - value = int(logging_interval_in_env) - if value <= 0: - raise ValueError - logging_interval = value - flow_logger.info( - f"Using value of PF_TASK_PEEKING_INTERVAL in environment variable as " - f"logging interval: {logging_interval_in_env}" - ) - except ValueError: - flow_logger.warning( - f"Value of PF_TASK_PEEKING_INTERVAL in environment variable ('{logging_interval_in_env}') " - f"is invalid, use default value {DEFAULT_TASK_LOGGING_INTERVAL}" - ) - while not dag_manager_completed_event.is_set(): running_tasks = [task for task in asyncio.all_tasks(loop) if not task.done()] # get duration of running tasks diff --git a/src/promptflow/tests/executor/e2etests/test_executor_happypath.py b/src/promptflow/tests/executor/e2etests/test_executor_happypath.py index ed60eff72aa..87a411bae98 100644 --- a/src/promptflow/tests/executor/e2etests/test_executor_happypath.py +++ b/src/promptflow/tests/executor/e2etests/test_executor_happypath.py @@ -97,8 +97,9 @@ def test_long_running_log(self, dev_connections, capsys): from promptflow._utils.logger_utils import flow_logger flow_logger.addHandler(logging.StreamHandler(sys.stdout)) - os.environ["PF_TASK_PEEKING_INTERVAL"] = "1" + # Test long running tasks with log + os.environ["PF_LONG_RUNNING_LOGGING_INTERVAL"] = "1" executor = FlowExecutor.create(get_yaml_file("async_tools"), dev_connections) executor.exec_line(self.get_line_inputs()) captured = capsys.readouterr() @@ -110,8 +111,19 @@ def test_long_running_log(self, dev_connections, capsys): assert re.match( expected_long_running_str_2, captured.out, re.DOTALL ), "flow_logger should contain long running async tool log" + os.environ.pop("PF_LONG_RUNNING_LOGGING_INTERVAL") + + # Test long running tasks without log + executor.exec_line(self.get_line_inputs()) + captured = capsys.readouterr() + assert not re.match( + expected_long_running_str_1, captured.out, re.DOTALL + ), "flow_logger should not contain long running async tool log" + assert not re.match( + expected_long_running_str_2, captured.out, re.DOTALL + ), "flow_logger should not contain long running async tool log" + flow_logger.handlers.pop() - os.environ.pop("PF_TASK_PEEKING_INTERVAL") @pytest.mark.parametrize( "flow_folder, node_name, flow_inputs, dependency_nodes_outputs", diff --git a/src/promptflow/tests/executor/e2etests/test_logs.py b/src/promptflow/tests/executor/e2etests/test_logs.py index 89a0b6f47bc..4e39001bc6d 100644 --- a/src/promptflow/tests/executor/e2etests/test_logs.py +++ b/src/promptflow/tests/executor/e2etests/test_logs.py @@ -1,3 +1,4 @@ +import os from pathlib import Path from tempfile import mkdtemp @@ -152,19 +153,13 @@ def test_node_logs_in_executor_logs(self, folder_name): assert all(node_log in log_content for node_log in node_logs_list) def test_long_run_log(self): - executor = FlowExecutor.create(get_yaml_file("long_run"), {}) - file_path = Path(mkdtemp()) / "flow.log" - with LogContext(file_path): - flow_result = executor.exec_line({}, index=0) - node_run = flow_result.node_run_infos["long_run_node"] - assert node_run.status == Status.Completed - with open(file_path) as fin: - lines = fin.readlines() - lines = [line for line in lines if line.strip()] + # Test long running tasks with log + os.environ["PF_LONG_RUNNING_LOGGING_INTERVAL"] = "60" target_texts = [ "INFO Start executing nodes in thread pool mode.", "INFO Start to run 1 nodes with concurrency level 16.", "INFO Executing node long_run_node.", + "INFO Using value of PF_LONG_RUNNING_LOGGING_INTERVAL in environment variable", "WARNING long_run_node in line 0 has been running for 60 seconds, stacktrace of thread", "in wrapped", "output = func(*args, **kwargs)", @@ -176,6 +171,28 @@ def test_long_run_log(self): "time.sleep(61)", "INFO Node long_run_node completes.", ] + self.assert_long_run_log(target_texts) + os.environ.pop("PF_LONG_RUNNING_LOGGING_INTERVAL") + + # Test long running tasks without log + target_texts = [ + "INFO Start executing nodes in thread pool mode.", + "INFO Start to run 1 nodes with concurrency level 16.", + "INFO Executing node long_run_node.", + "INFO Node long_run_node completes.", + ] + self.assert_long_run_log(target_texts) + + def assert_long_run_log(self, target_texts): + executor = FlowExecutor.create(get_yaml_file("long_run"), {}) + file_path = Path(mkdtemp()) / "flow.log" + with LogContext(file_path): + flow_result = executor.exec_line({}, index=0) + node_run = flow_result.node_run_infos["long_run_node"] + assert node_run.status == Status.Completed + with open(file_path) as fin: + lines = fin.readlines() + lines = [line for line in lines if line.strip()] msg = f"Got {len(lines)} lines in {file_path}, expected {len(target_texts)}." assert len(lines) == len(target_texts), msg for actual, expected in zip(lines, target_texts): @@ -237,6 +254,7 @@ def test_activate_config_log(self): assert all(log in log_content for log in logs_list) def test_async_log_in_worker_thread(self): + os.environ["PF_LONG_RUNNING_LOGGING_INTERVAL"] = "60" logs_directory = Path(mkdtemp()) log_path = str(logs_directory / "flow.log") with LogContext(log_path, run_mode=RunMode.Test): @@ -246,3 +264,4 @@ def test_async_log_in_worker_thread(self): # Below log is created by worker thread logs_list = ["INFO monitor_long_running_coroutine started"] assert all(log in log_content for log in logs_list) + os.environ.pop("PF_LONG_RUNNING_LOGGING_INTERVAL")