Skip to content

Commit

Permalink
fix: avoid infinite loops when error on emitting (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
simosho authored Dec 14, 2023
1 parent f180100 commit c86d5d3
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 17 deletions.
3 changes: 3 additions & 0 deletions logging_loki/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#: Label name indicating logger name.
logger_tag: str = "logger"

#: Logger name of the logger where the logging errors should be forwarded to
LOGLOG_LOGGER_NAME = 'waylay.loglog'

#: String contains chars that can be used in label names in LogQL.
label_allowed_chars: str = "".join((string.ascii_letters, string.digits, "_"))
#: A list of pairs of characters to replace in the label name.
Expand Down
27 changes: 18 additions & 9 deletions logging_loki/emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import threading
import time
from logging.config import ConvertingDict
from typing import Any, Dict, Optional, Tuple
from typing import Any, Callable, Dict, Optional, Tuple

import requests

Expand All @@ -16,6 +16,19 @@
BasicAuth = Optional[Tuple[str, str]]


def with_lock(method: Callable):
@functools.wraps(method)
def _impl(self, *method_args, **method_kwargs):
# Prevent "recursion" when e.g. urllib3 logs debug messages on POST
if not self._lock.acquire(blocking=False):
return
try:
return method(self, *method_args, **method_kwargs)
finally:
self._lock.release()
return _impl


class LokiEmitter:
"""Base Loki emitter class."""

Expand Down Expand Up @@ -63,16 +76,11 @@ def __init__(self,
self._session: Optional[requests.Session] = None
self._lock = threading.Lock()

@with_lock
def __call__(self, record: logging.LogRecord, line: str):
"""Send log record to Loki."""
# Prevent "recursion" when e.g. urllib3 logs debug messages on POST
if not self._lock.acquire(blocking=False):
return
try:
payload = self.build_payload(record, line)
self._post_to_loki(payload)
finally:
self._lock.release()
payload = self.build_payload(record, line)
self._post_to_loki(payload)

def _post_to_loki(self, payload: dict):
resp = self.session.post(self.url, json=payload, headers=self.headers)
Expand Down Expand Up @@ -146,6 +154,7 @@ def build_payload(self, record: logging.LogRecord, line: str) -> dict:
}
return {"streams": [stream]}

@with_lock
def emit_batch(self, records: list[Tuple[logging.LogRecord, str]]):
"""Send log records to Loki."""
streams = [self.build_payload(record[0], record[1])["streams"][0] for record in records]
Expand Down
34 changes: 26 additions & 8 deletions logging_loki/handlers.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
# -*- coding: utf-8 -*-

from contextlib import redirect_stderr, redirect_stdout
import functools
import logging
from logging.handlers import MemoryHandler, QueueHandler
from logging.handlers import QueueListener
import os
from queue import Queue
import sys
import time
from typing import Optional, Union
from typing import Callable, Optional, Union
from logging_loki import const

from logging_loki.emitter import BasicAuth, LokiEmitter

LOKI_MAX_BATCH_BUFFER_SIZE = int(os.environ.get('LOKI_MAX_BATCH_BUFFER_SIZE', 10))

# capture original stdout and stderr
_sys_out = sys.stdout
_sys_err = sys.stderr

_error_logger = logging.getLogger(const.LOGLOG_LOGGER_NAME)

def with_original_stdout(method: Callable):
@functools.wraps(method)
def _impl(self, *method_args, **method_kwargs):
with redirect_stdout(_sys_out):
with redirect_stderr(_sys_err):
return method(self, *method_args, **method_kwargs)
return _impl

class LokiQueueHandler(QueueHandler):
"""This handler automatically creates listener and `LokiHandler` to handle logs queue."""

Expand Down Expand Up @@ -72,27 +89,28 @@ def __init__(
super().__init__()
self.emitter = LokiEmitter(url, tags, headers, auth, as_json, props_to_labels, level_tag, logger_tag)

def handleError(self, record): # noqa: N802
def handleError(self, exc: Exception): # noqa: N802
"""Close emitter and let default handler take actions on error."""
_error_logger.error(exc, exc_info=True)
self.emitter.close()
super().handleError(record)

@with_original_stdout
def emit(self, record: logging.LogRecord):
"""Send log record to Loki."""
# noinspection PyBroadException
try:
self.emitter(record, self.format(record))
except Exception:
self.handleError(record)
except Exception as exc:
self.handleError(exc)

@with_original_stdout
def emit_batch(self, records: list[logging.LogRecord]):
"""Send a batch of log records to Loki."""
# noinspection PyBroadException
try:
self.emitter.emit_batch([(record, self.format(record)) for record in records])
except Exception:
for record in records:
self.handleError(record)
except Exception as exc:
self.handleError(exc)

class LokiBatchHandler(MemoryHandler):
interval: float # The interval at which batched logs are sent in seconds
Expand Down

0 comments on commit c86d5d3

Please sign in to comment.