Skip to content

Commit

Permalink
Merge pull request #140 from axiomhq/arne/add-timer-for-logger
Browse files Browse the repository at this point in the history
fix: Add a threading.Timer to ensure logs are flushed
  • Loading branch information
darach authored Sep 30, 2024
2 parents 7bc7697 + 4d34bf3 commit 87f09bd
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 51 deletions.
6 changes: 1 addition & 5 deletions src/axiom_py/annotations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""This package provides annotation models and methods as well as an AnnotationsClient"""

import ujson
from logging import Logger
from requests import Session
from typing import List, Optional
from dataclasses import dataclass, asdict, field
Expand Down Expand Up @@ -55,9 +54,8 @@ class AnnotationsClient: # pylint: disable=R0903

session: Session

def __init__(self, session: Session, logger: Logger):
def __init__(self, session: Session):
self.session = session
self.logger = logger

def get(self, id: str) -> Annotation:
"""
Expand All @@ -79,7 +77,6 @@ def create(self, req: AnnotationCreateRequest) -> Annotation:
path = "/v2/annotations"
res = self.session.post(path, data=ujson.dumps(asdict(req)))
annotation = from_dict(Annotation, res.json())
self.logger.info(f"created new annotation: {annotation.id}")
return annotation

def list(
Expand Down Expand Up @@ -120,7 +117,6 @@ def update(self, id: str, req: AnnotationUpdateRequest) -> Annotation:
path = "/v2/annotations/%s" % id
res = self.session.put(path, data=ujson.dumps(asdict(req)))
annotation = from_dict(Annotation, res.json())
self.logger.info(f"updated annotation({annotation.id})")
return annotation

def delete(self, id: str):
Expand Down
26 changes: 11 additions & 15 deletions src/axiom_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import os
from enum import Enum
from humps import decamelize
from typing import Optional, List, Dict
from logging import getLogger
from typing import Optional, List, Dict, Callable
from dataclasses import dataclass, field, asdict
from datetime import datetime
from requests_toolbelt.sessions import BaseUrlSession
Expand Down Expand Up @@ -148,7 +147,8 @@ class Client: # pylint: disable=R0903
datasets: DatasetsClient
users: UsersClient
annotations: AnnotationsClient
is_closed: bool # track if the client has been closed (for tests)
is_closed: bool = False # track if the client has been closed (for tests)
before_shutdown_funcs: List[Callable] = []

def __init__(
self,
Expand All @@ -164,7 +164,6 @@ def __init__(
if url_base is None:
url_base = AXIOM_URL

self.logger = getLogger()
# set exponential retries
retries = Retry(
total=3, backoff_factor=2, status_forcelist=[500, 502, 503, 504]
Expand All @@ -189,18 +188,21 @@ def __init__(
# if there is an organization id passed,
# set it in the header
if org_id:
self.logger.info("found organization id: %s" % org_id)
self.session.headers.update({"X-Axiom-Org-Id": org_id})

self.datasets = DatasetsClient(self.session, self.logger)
self.datasets = DatasetsClient(self.session)
self.users = UsersClient(self.session, is_personal_token(token))
self.annotations = AnnotationsClient(self.session, self.logger)
self.annotations = AnnotationsClient(self.session)

# wrap shutdown hook in a lambda passing in self as a ref
atexit.register(lambda: self.shutdown_hook())
self.is_closed = False
atexit.register(self.shutdown_hook)

def before_shutdown(self, func: Callable):
self.before_shutdown_funcs.append(func)

def shutdown_hook(self):
for func in self.before_shutdown_funcs:
func()
self.session.close()
self.is_closed = True

Expand Down Expand Up @@ -271,13 +273,10 @@ def query_legacy(

path = "/v1/datasets/%s/query" % id
payload = ujson.dumps(asdict(query), default=handle_json_serialization)
self.logger.debug("sending query %s" % payload)
params = self._prepare_query_options(opts)
res = self.session.post(path, data=payload, params=params)
result = from_dict(QueryLegacyResult, res.json())
self.logger.debug(f"query result: {result}")
query_id = res.headers.get("X-Axiom-History-Query-Id")
self.logger.info(f"received query result with query_id: {query_id}")
result.savedQueryID = query_id
return result

Expand All @@ -304,13 +303,10 @@ def query(
self._prepare_apl_payload(apl, opts),
default=handle_json_serialization,
)
self.logger.debug("sending query %s" % payload)
params = self._prepare_apl_options(opts)
res = self.session.post(path, data=payload, params=params)
result = from_dict(QueryResult, res.json())
self.logger.debug(f"apl query result: {result}")
query_id = res.headers.get("X-Axiom-History-Query-Id")
self.logger.info(f"received query result with query_id: {query_id}")
result.savedQueryID = query_id
return result

Expand Down
8 changes: 1 addition & 7 deletions src/axiom_py/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

import ujson
from logging import Logger
from requests import Session
from typing import List
from dataclasses import dataclass, asdict, field
Expand Down Expand Up @@ -52,9 +51,8 @@ class DatasetsClient: # pylint: disable=R0903

session: Session

def __init__(self, session: Session, logger: Logger):
def __init__(self, session: Session):
self.session = session
self.logger = logger

def get(self, id: str) -> Dataset:
"""
Expand Down Expand Up @@ -86,7 +84,6 @@ def create(self, name: str, description: str = "") -> Dataset:
),
)
ds = from_dict(Dataset, res.json())
self.logger.info(f"created new dataset: {ds.name}")
return ds

def get_list(self) -> List[Dataset]:
Expand Down Expand Up @@ -123,9 +120,6 @@ def update(self, id: str, new_description: str) -> Dataset:
),
)
ds = from_dict(Dataset, res.json())
self.logger.info(
f"updated dataset({ds.name}) with new desc: {ds.description}"
)
return ds

def delete(self, id: str):
Expand Down
47 changes: 32 additions & 15 deletions src/axiom_py/logging.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Logging contains the AxiomHandler and related methods to do with logging."""

from threading import Timer
from logging import Handler, NOTSET, getLogger, WARNING
import time
import atexit

from logging import Handler, NOTSET, getLogger, WARNING
from .client import Client


Expand All @@ -14,37 +14,54 @@ class AxiomHandler(Handler):
dataset: str
buffer: list
interval: int
last_run: float
last_flush: float
timer: Timer

def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1):
super().__init__()
# set urllib3 logging level to warning, check:
# Set urllib3 logging level to warning, check:
# https://github.com/axiomhq/axiom-py/issues/23
# This is a temp solution that would stop requests
# library from flooding the logs with debug messages
# This is a temp solution that would stop requests library from
# flooding the logs with debug messages
getLogger("urllib3").setLevel(WARNING)
self.client = client
self.dataset = dataset
self.buffer = []
self.last_run = time.monotonic()
self.interval = interval
self.last_flush = time.monotonic()

# We use a threading.Timer to make sure we flush every second, even
# if no more logs are emitted.
self.timer = Timer(self.interval, self.flush)

# Make sure we flush before the client shuts down
def before_shutdown():
self.flush()
self.timer.cancel()

# register flush on exit,
atexit.register(self.flush)
client.before_shutdown(before_shutdown)

def emit(self, record):
"""emit sends a log to Axiom."""
"""Emit sends a log to Axiom."""
self.buffer.append(record.__dict__)
if (
len(self.buffer) >= 1000
or time.monotonic() - self.last_run > self.interval
or time.monotonic() - self.last_flush > self.interval
):
self.flush()

# Restart timer
self.timer.cancel()
self.timer = Timer(self.interval, self.flush)
self.timer.start()

def flush(self):
"""flush sends all logs in the logcache to Axiom."""
self.last_run = time.monotonic()
"""Flush sends all logs in the buffer to Axiom."""

self.last_flush = time.monotonic()

if len(self.buffer) == 0:
return
self.client.ingest_events(self.dataset, self.buffer)
self.buffer = []

local_buffer, self.buffer = self.buffer, []
self.client.ingest_events(self.dataset, local_buffer)
34 changes: 25 additions & 9 deletions tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,58 @@
import os
import logging
import unittest
import time

from .helpers import get_random_name
from axiom_py import Client
from axiom_py.logging import AxiomHandler


class TestLogger(unittest.TestCase):
def test_log(self):
"""Tests a simple log"""
"""Tests the logger"""
client = Client(
os.getenv("AXIOM_TOKEN"),
os.getenv("AXIOM_ORG_ID"),
os.getenv("AXIOM_URL"),
)
# create a dataset for that purpose
# Create a dataset for that purpose
dataset_name = get_random_name()
client.datasets.create(
dataset_name, "a dataset to test axiom-py logger"
dataset_name, "A dataset to test axiom-py logger"
)

axiom_handler = AxiomHandler(client, dataset_name)
axiom_handler = AxiomHandler(client, dataset_name, interval=1.0)

logger = logging.getLogger()
logger.addHandler(axiom_handler)

logger.warning("foo")
logger.warning("This is a log!")

# this log shouldn't be ingested yet
# This log shouldn't be ingested yet
res = client.apl_query(dataset_name)
self.assertEqual(0, res.status.rowsExamined)

# flush events
# Flush events
axiom_handler.flush()

# now we should have a log
# Wait a bit for the ingest to finish
time.sleep(0.5)

# Now we should have a log
res = client.apl_query(dataset_name)
self.assertEqual(1, res.status.rowsExamined)

# cleanup created dataset
logger.warning(
"This log should be ingested without any subsequent call"
)

# Wait for the background flush.
time.sleep(1.5)

# Now we should have two logs
res = client.apl_query(dataset_name)
self.assertEqual(2, res.status.rowsExamined)

# Cleanup created dataset
client.datasets.delete(dataset_name)

0 comments on commit 87f09bd

Please sign in to comment.