Skip to content

Commit

Permalink
[Executor] Disable long runing logging by default (#2362)
Browse files Browse the repository at this point in the history
# Description

Issue: #2316

This PR introduces a change to the logging behavior for long-running
nodes. Previously, if a node executed for an extended period, a stack
trace would be automatically logged, potentially cluttering the flow
execution logs. To address this, we have now disabled long-running
logging by default.

Users who wish to enable logging for long-running tasks can do so by
setting the PF_LONG_RUNNING_LOGGING_INTERVAL environment variable. When
this variable is defined, the system will log stack traces in the
specified intervals for tasks that exceed the expected runtime.

## Key Changes:

*
[`src/promptflow/promptflow/_constants.py`](diffhunk://#diff-0d3cf5f31883ff073bf1e11cb2e17db5cc8fe6e5e38cbb4af7b99f37ac31d41aR47-R49):
Added `PF_LONG_RUNNING_LOGGING_INTERVAL` as a new environment variable.
*
[`src/promptflow/promptflow/_utils/utils.py`](diffhunk://#diff-d12fdd7b90cc1748f1d3e1237b4f357ba7f66740445d117beeb68ed174d1e86eR399-R420):
Added `try_get_long_running_logging_interval` function to fetch and
validate the logging interval from the environment variable.
*
[`src/promptflow/promptflow/_core/flow_execution_context.py`](diffhunk://#diff-8a45b6238b72974b62aa211aec63ef4cbeadfa8277f84525442c245a16ee4461L169-R190):
Updated `invoke_tool` method to use the new logging interval.
*
[`src/promptflow/promptflow/executor/_async_nodes_scheduler.py`](diffhunk://#diff-aea06244ab378a5cbd47d27ba6d92c433df3089d077871b1e6aaa1b47cd3c73fR61-R66):
Updated `execute` and `monitor_long_running_coroutine` methods to use
the new logging interval.
*
[`src/promptflow/tests/executor/e2etests/test_executor_happypath.py`](diffhunk://#diff-44d4009e9df8029bb88432b7a843d3a887764cd6a67070afc49557334af3cbaaL100-R100):
Updated the test to use the new environment variable.
*
[`src/promptflow/tests/executor/e2etests/test_logs.py`](diffhunk://#diff-1a19d8e55ebdc42bc8032fbf6025ceb4870857ef940322043695e3e8ee2847c7R156):
Updated multiple tests to use the new environment variable.

# All Promptflow Contribution checklist:
- [x] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [x] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [x] Title of the pull request is clear and informative.
- [x] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [x] Pull request includes test coverage for the included changes.

---------

Co-authored-by: Lina Tang <[email protected]>
  • Loading branch information
lumoslnt and Lina Tang authored Mar 15, 2024
1 parent 4ba00ec commit a8bee4b
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 41 deletions.
3 changes: 3 additions & 0 deletions src/promptflow/promptflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
LINE_NUMBER_KEY = "line_number" # Using the same key with portal.
LINE_TIMEOUT_SEC = 600

# Environment variables
PF_LONG_RUNNING_LOGGING_INTERVAL = "PF_LONG_RUNNING_LOGGING_INTERVAL"


class FlowLanguage:
"""The enum of tool source type."""
Expand Down
13 changes: 9 additions & 4 deletions src/promptflow/promptflow/_core/flow_execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from promptflow._core.cache_manager import AbstractCacheManager, CacheInfo, CacheResult
from promptflow._utils.logger_utils import flow_logger, logger
from promptflow._utils.thread_utils import RepeatLogTimer
from promptflow._utils.utils import generate_elapsed_time_messages
from promptflow._utils.utils import generate_elapsed_time_messages, try_get_long_running_logging_interval
from promptflow.contracts.flow import Node
from promptflow.contracts.run_info import RunInfo
from promptflow.exceptions import PromptflowException
Expand All @@ -26,6 +26,8 @@

from .run_tracker import RunTracker

DEFAULT_LOGGING_INTERVAL = 60


class FlowExecutionContext(ThreadLocalSingleton):
"""The context for a flow execution."""
Expand Down Expand Up @@ -82,7 +84,7 @@ def invoke_tool(self, node: Node, f: Callable, kwargs):

if not hit_cache:
Tracer.start_tracing(node_run_id, node.name)
result = self._invoke_tool_with_timer(node, f, kwargs)
result = self._invoke_tool_inner(node, f, kwargs)
traces = Tracer.end_tracing(node_run_id)

self._run_tracker.end_run(node_run_id, result=result, traces=traces)
Expand Down Expand Up @@ -166,14 +168,17 @@ async def _invoke_tool_async_inner(self, node: Node, f: Callable, kwargs):
# and shows stack trace in the error message to make it easy for user to troubleshoot.
raise ToolExecutionError(node_name=node.name, module=module) from e

def _invoke_tool_with_timer(self, node: Node, f: Callable, kwargs):
def _invoke_tool_inner(self, node: Node, f: Callable, kwargs):
module = f.func.__module__ if isinstance(f, functools.partial) else f.__module__
node_name = node.name
try:
if (
interval_seconds := try_get_long_running_logging_interval(flow_logger, DEFAULT_LOGGING_INTERVAL)
) is None:
return f(**kwargs)
logging_name = node_name
if self._line_number is not None:
logging_name = f"{node_name} in line {self._line_number}"
interval_seconds = 60
start_time = time.perf_counter()
thread_id = threading.current_thread().ident
with RepeatLogTimer(
Expand Down
24 changes: 23 additions & 1 deletion src/promptflow/promptflow/_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar, Union

from promptflow._constants import DEFAULT_ENCODING
from promptflow._constants import DEFAULT_ENCODING, PF_LONG_RUNNING_LOGGING_INTERVAL
from promptflow.contracts.multimedia import PFBytes
from promptflow.contracts.types import AssistantDefinition

Expand Down Expand Up @@ -396,3 +396,25 @@ def prepare_folder(path: Union[str, Path]) -> Path:
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path


def try_get_long_running_logging_interval(logger: logging.Logger, default_interval: int):
logging_interval_in_env = os.environ.get(PF_LONG_RUNNING_LOGGING_INTERVAL, None)
if logging_interval_in_env:
try:
value = int(logging_interval_in_env)
if value <= 0:
raise ValueError
logger.info(
f"Using value of {PF_LONG_RUNNING_LOGGING_INTERVAL} in environment variable as "
f"logging interval: {logging_interval_in_env}"
)
return value
except ValueError:
logger.warning(
f"Value of {PF_LONG_RUNNING_LOGGING_INTERVAL} in environment variable "
f"('{logging_interval_in_env}') is invalid, use default value {default_interval}"
)
return default_interval
# If the environment variable is not set, return none to disable the long running logging
return None
36 changes: 11 additions & 25 deletions src/promptflow/promptflow/executor/_async_nodes_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from promptflow._core.tools_manager import ToolsManager
from promptflow._utils.logger_utils import flow_logger
from promptflow._utils.thread_utils import ThreadWithContextVars
from promptflow._utils.utils import extract_user_frame_summaries, set_context
from promptflow._utils.utils import extract_user_frame_summaries, set_context, try_get_long_running_logging_interval
from promptflow.contracts.flow import Node
from promptflow.executor._dag_manager import DAGManager
from promptflow.executor._errors import NoNodeExecutedError
Expand Down Expand Up @@ -58,12 +58,15 @@ async def execute(
# Semaphore should be created in the loop, otherwise it will not work.
loop = asyncio.get_running_loop()
self._semaphore = asyncio.Semaphore(self._node_concurrency)
monitor = ThreadWithContextVars(
target=monitor_long_running_coroutine,
args=(loop, self._task_start_time, self._task_last_log_time, self._dag_manager_completed_event),
daemon=True,
)
monitor.start()
if (interval := try_get_long_running_logging_interval(flow_logger, DEFAULT_TASK_LOGGING_INTERVAL)) is not None:
monitor = ThreadWithContextVars(
target=monitor_long_running_coroutine,
args=(
interval, loop, self._task_start_time, self._task_last_log_time, self._dag_manager_completed_event
),
daemon=True,
)
monitor.start()

# Set the name of scheduler tasks to avoid monitoring its duration
task = asyncio.current_task()
Expand Down Expand Up @@ -223,31 +226,14 @@ def log_stack_recursively(task: asyncio.Task, elapse_time: float):


def monitor_long_running_coroutine(
logging_interval: int,
loop: asyncio.AbstractEventLoop,
task_start_time: dict,
task_last_log_time: dict,
dag_manager_completed_event: threading.Event,
):
flow_logger.info("monitor_long_running_coroutine started")

logging_interval = DEFAULT_TASK_LOGGING_INTERVAL
logging_interval_in_env = os.environ.get("PF_TASK_PEEKING_INTERVAL")
if logging_interval_in_env:
try:
value = int(logging_interval_in_env)
if value <= 0:
raise ValueError
logging_interval = value
flow_logger.info(
f"Using value of PF_TASK_PEEKING_INTERVAL in environment variable as "
f"logging interval: {logging_interval_in_env}"
)
except ValueError:
flow_logger.warning(
f"Value of PF_TASK_PEEKING_INTERVAL in environment variable ('{logging_interval_in_env}') "
f"is invalid, use default value {DEFAULT_TASK_LOGGING_INTERVAL}"
)

while not dag_manager_completed_event.is_set():
running_tasks = [task for task in asyncio.all_tasks(loop) if not task.done()]
# get duration of running tasks
Expand Down
16 changes: 14 additions & 2 deletions src/promptflow/tests/executor/e2etests/test_executor_happypath.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ def test_long_running_log(self, dev_connections, capsys):
from promptflow._utils.logger_utils import flow_logger

flow_logger.addHandler(logging.StreamHandler(sys.stdout))
os.environ["PF_TASK_PEEKING_INTERVAL"] = "1"

# Test long running tasks with log
os.environ["PF_LONG_RUNNING_LOGGING_INTERVAL"] = "1"
executor = FlowExecutor.create(get_yaml_file("async_tools"), dev_connections)
executor.exec_line(self.get_line_inputs())
captured = capsys.readouterr()
Expand All @@ -110,8 +111,19 @@ def test_long_running_log(self, dev_connections, capsys):
assert re.match(
expected_long_running_str_2, captured.out, re.DOTALL
), "flow_logger should contain long running async tool log"
os.environ.pop("PF_LONG_RUNNING_LOGGING_INTERVAL")

# Test long running tasks without log
executor.exec_line(self.get_line_inputs())
captured = capsys.readouterr()
assert not re.match(
expected_long_running_str_1, captured.out, re.DOTALL
), "flow_logger should not contain long running async tool log"
assert not re.match(
expected_long_running_str_2, captured.out, re.DOTALL
), "flow_logger should not contain long running async tool log"

flow_logger.handlers.pop()
os.environ.pop("PF_TASK_PEEKING_INTERVAL")

@pytest.mark.parametrize(
"flow_folder, node_name, flow_inputs, dependency_nodes_outputs",
Expand Down
37 changes: 28 additions & 9 deletions src/promptflow/tests/executor/e2etests/test_logs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from pathlib import Path
from tempfile import mkdtemp

Expand Down Expand Up @@ -152,19 +153,13 @@ def test_node_logs_in_executor_logs(self, folder_name):
assert all(node_log in log_content for node_log in node_logs_list)

def test_long_run_log(self):
executor = FlowExecutor.create(get_yaml_file("long_run"), {})
file_path = Path(mkdtemp()) / "flow.log"
with LogContext(file_path):
flow_result = executor.exec_line({}, index=0)
node_run = flow_result.node_run_infos["long_run_node"]
assert node_run.status == Status.Completed
with open(file_path) as fin:
lines = fin.readlines()
lines = [line for line in lines if line.strip()]
# Test long running tasks with log
os.environ["PF_LONG_RUNNING_LOGGING_INTERVAL"] = "60"
target_texts = [
"INFO Start executing nodes in thread pool mode.",
"INFO Start to run 1 nodes with concurrency level 16.",
"INFO Executing node long_run_node.",
"INFO Using value of PF_LONG_RUNNING_LOGGING_INTERVAL in environment variable",
"WARNING long_run_node in line 0 has been running for 60 seconds, stacktrace of thread",
"in wrapped",
"output = func(*args, **kwargs)",
Expand All @@ -176,6 +171,28 @@ def test_long_run_log(self):
"time.sleep(61)",
"INFO Node long_run_node completes.",
]
self.assert_long_run_log(target_texts)
os.environ.pop("PF_LONG_RUNNING_LOGGING_INTERVAL")

# Test long running tasks without log
target_texts = [
"INFO Start executing nodes in thread pool mode.",
"INFO Start to run 1 nodes with concurrency level 16.",
"INFO Executing node long_run_node.",
"INFO Node long_run_node completes.",
]
self.assert_long_run_log(target_texts)

def assert_long_run_log(self, target_texts):
executor = FlowExecutor.create(get_yaml_file("long_run"), {})
file_path = Path(mkdtemp()) / "flow.log"
with LogContext(file_path):
flow_result = executor.exec_line({}, index=0)
node_run = flow_result.node_run_infos["long_run_node"]
assert node_run.status == Status.Completed
with open(file_path) as fin:
lines = fin.readlines()
lines = [line for line in lines if line.strip()]
msg = f"Got {len(lines)} lines in {file_path}, expected {len(target_texts)}."
assert len(lines) == len(target_texts), msg
for actual, expected in zip(lines, target_texts):
Expand Down Expand Up @@ -237,6 +254,7 @@ def test_activate_config_log(self):
assert all(log in log_content for log in logs_list)

def test_async_log_in_worker_thread(self):
os.environ["PF_LONG_RUNNING_LOGGING_INTERVAL"] = "60"
logs_directory = Path(mkdtemp())
log_path = str(logs_directory / "flow.log")
with LogContext(log_path, run_mode=RunMode.Test):
Expand All @@ -246,3 +264,4 @@ def test_async_log_in_worker_thread(self):
# Below log is created by worker thread
logs_list = ["INFO monitor_long_running_coroutine started"]
assert all(log in log_content for log in logs_list)
os.environ.pop("PF_LONG_RUNNING_LOGGING_INTERVAL")

0 comments on commit a8bee4b

Please sign in to comment.