Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add log collection #306

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/examples/introduction/inmemory_run.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# broker.py
import asyncio
import logging

from taskiq import InMemoryBroker
from taskiq.task import AsyncTaskiqTask

broker = InMemoryBroker()
task_logger = logging.getLogger("taskiq.tasklogger")


@broker.task
async def add_one(value: int) -> int:
task_logger.info(f"Adding 1 to {value}")
return value + 1


Expand All @@ -17,10 +21,11 @@ async def main() -> None:
# Send the task to the broker.
task = await add_one.kiq(1)
# Wait for the result.
result = await task.wait_result(timeout=2)
result = await task.wait_result(with_logs=True)
print(f"Task execution took: {result.execution_time} seconds.")
if not result.is_err:
print(f"Returned value: {result.return_value}")
print(f"Logs: {result.log}")
else:
print("Error found while executing task.")
await broker.shutdown()
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/schedule/intro.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from taskiq_aio_pika import AioPikaBroker

from taskiq.schedule_sources import LabelScheduleSource
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource

broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")

Expand Down
25 changes: 23 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pytz = "*"
orjson = { version = "^3.9.9", optional = true }
msgpack = { version = "^1.0.7", optional = true }
cbor2 = { version = "^5.4.6", optional = true }
bidict = "^0.23.1"

[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
Expand Down
125 changes: 70 additions & 55 deletions taskiq/cli/worker/log_collector.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,78 @@
import asyncio
import logging
import sys
from contextlib import contextmanager
from typing import IO, Any, Generator, List, TextIO
from logging import LogRecord
from typing import Dict, List, Union

from bidict import bidict

class Redirector:
"""A class to write to multiple streams."""

def __init__(self, *streams: IO[Any]) -> None:
self.streams = streams
class TaskiqLogHandler(logging.Handler):
"""Log handler class."""

def write(self, message: Any) -> None:
def __init__(self, level: Union[int, str] = 0) -> None:
self.stream: Dict[Union[str, None], List[logging.LogRecord]] = {}
self._associations: bidict[Union[str, None], Union[str, None]] = bidict()
super().__init__(level)

@staticmethod
def _get_async_task_name() -> Union[str, None]:
try:
task = asyncio.current_task()
except RuntimeError:
return None
else:
if task:
return task.get_name()

return None

def associate(self, task_id: str) -> None:
"""
Associate the current async task with the Taskiq task ID.

:param task_id: The Taskiq task ID.
:type task_id: str
"""
async_task_name = self._get_async_task_name()
self._associations[task_id] = async_task_name

def retrieve_logs(self, task_id: str) -> List[LogRecord]:
"""
Collect logs.

Collect the logs of a Taskiq task and return
them after removing them from memory.

:param task_id: The Taskiq task ID
:type task_id: str
:return: A list of LogRecords
:rtype: List[LogRecord]
"""
async_task_name = self._associations[task_id]
try:
stream = self.stream[async_task_name]
except KeyError:
stream = []
else:
del self._associations[task_id]
return stream

def emit(self, record: LogRecord) -> None:
"""
This write request writes to all available streams.
Collect a log record.

:param message: message to write.
:param record: The log record to collect.
:type record: LogRecord
"""
for stream in self.streams:
stream.write(message)


@contextmanager
def log_collector(
new_target: TextIO,
custom_format: str,
) -> "Generator[TextIO, None, None]":
"""
Context manager to collect logs.

This useful class redirects all logs
from stdout, stderr and root logger
to some new target.

It can be used like this:

>>> logs = io.StringIO()
>>> with log_collector(logs, "%(levelname)s %(message)s"):
>>> print("A")
>>>
>>> print(f"Collected logs: {logs.get_value()}")

:param new_target: new target for logs. All
logs are written in new_target.
:param custom_format: custom format for
collected logging calls.
:yields: new target.
"""
old_targets: "List[TextIO]" = []
log_handler = logging.StreamHandler(new_target)
log_handler.setFormatter(logging.Formatter(custom_format))

old_targets.extend([sys.stdout, sys.stderr])
logging.root.addHandler(log_handler)
sys.stdout = Redirector(new_target, sys.stdout) # type: ignore
sys.stderr = Redirector(new_target, sys.stderr) # type: ignore

try:
yield new_target
finally:
sys.stderr = old_targets.pop()
sys.stdout = old_targets.pop()
logging.root.removeHandler(log_handler)
self.format(record)
async_task_name = self._get_async_task_name()
if not async_task_name:
return
try:
record.task_id = self._associations.inverse[async_task_name]
except KeyError:
return
try:
self.stream[async_task_name].append(record)
except KeyError:
self.stream[async_task_name] = [record]
63 changes: 41 additions & 22 deletions taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
import inspect
import logging
from concurrent.futures import Executor
from logging import getLogger
from logging import Formatter, getLogger
from time import time
from typing import Any, Callable, Dict, List, Optional, Set, Union, get_type_hints

Expand All @@ -11,6 +12,7 @@
from taskiq.abc.broker import AckableMessage, AsyncBroker
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.acks import AcknowledgeType
from taskiq.cli.worker.log_collector import TaskiqLogHandler
from taskiq.context import Context
from taskiq.exceptions import NoResultError
from taskiq.message import TaskiqMessage
Expand All @@ -20,6 +22,8 @@
from taskiq.utils import maybe_awaitable

logger = getLogger(__name__)
task_logger = getLogger("taskiq.tasklogger")
task_logger.setLevel(logging.DEBUG)
QUEUE_DONE = b"-1"


Expand Down Expand Up @@ -79,6 +83,14 @@ def __init__(
"can result in undefined behavior",
)
self.sem_prefetch = asyncio.Semaphore(max_prefetch)
self._logging_handler = TaskiqLogHandler(logging.DEBUG)
self._logging_handler.setFormatter(
Formatter(
fmt="[%(asctime)s] [%(name)s] [%(levelname)s] > %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
),
)
task_logger.addHandler(self._logging_handler)

async def callback( # noqa: C901, PLR0912
self,
Expand Down Expand Up @@ -236,6 +248,7 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
# Start a timer.
start_time = time()

log = []
try:
# We put kwargs resolving here,
# to be able to catch any exception (for example ),
Expand All @@ -245,26 +258,32 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
# We udpate kwargs with kwargs from network.
kwargs.update(message.kwargs)
is_coroutine = True
# If the function is a coroutine, we await it.
if asyncio.iscoroutinefunction(target):
target_future = target(*message.args, **kwargs)
else:
is_coroutine = False
# If this is a synchronous function, we
# run it in executor.
target_future = loop.run_in_executor(
self.executor,
_run_sync,
target,
message.args,
kwargs,
)
timeout = message.labels.get("timeout")
if timeout is not None:
if not is_coroutine:
logger.warning("Timeouts for sync tasks don't work in python well.")
target_future = asyncio.wait_for(target_future, float(timeout))
returned = await target_future
self._logging_handler.associate(message.task_id)
try:
# If the function is a coroutine, we await it.
if asyncio.iscoroutinefunction(target):
target_future = target(*message.args, **kwargs)
else:
is_coroutine = False
# If this is a synchronous function, we
# run it in executor.
target_future = loop.run_in_executor(
self.executor,
_run_sync,
target,
message.args,
kwargs,
)
timeout = message.labels.get("timeout")
if timeout is not None:
if not is_coroutine:
logger.warning(
"Timeouts for sync tasks don't work in python well.",
)
target_future = asyncio.wait_for(target_future, float(timeout))
returned = await target_future
finally:
log = self._logging_handler.retrieve_logs(message.task_id)
except NoResultError as no_res_exc:
found_exception = no_res_exc
logger.warning(
Expand Down Expand Up @@ -294,7 +313,7 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
# Assemble result.
result: "TaskiqResult[Any]" = TaskiqResult(
is_err=found_exception is not None,
log=None,
log=log,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log here seems to be just the normal log in the task function, do we need to collect the exceptions generated by NoResultError and BaseException as well?

return_value=returned,
execution_time=round(execution_time, 2),
error=found_exception,
Expand Down
8 changes: 3 additions & 5 deletions taskiq/result/v1.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import pickle
from functools import partial
from typing import Any, Callable, Dict, Generic, Optional, TypeVar
from logging import LogRecord
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar

from pydantic import Field, validator
from pydantic.generics import GenericModel
Expand All @@ -27,10 +28,7 @@ class TaskiqResult(GenericModel, Generic[_ReturnType]):
"""Result of a remote task invocation."""

is_err: bool
# Log is a deprecated field. It would be removed in future
# releases of not, if we find a way to capture logs in async
# environment.
log: Optional[str] = None
log: Optional[List[LogRecord]] = None
return_value: _ReturnType
execution_time: float
labels: Dict[str, Any] = Field(default_factory=dict)
Expand Down
Loading
Loading