Skip to content

Commit

Permalink
Add hasql metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavkazzz committed Oct 8, 2023
1 parent cda4a0c commit 028f8a3
Show file tree
Hide file tree
Showing 18 changed files with 268 additions and 75 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/tox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ jobs:
- py38
- py39
- py310
- py311
- py312

steps:
- uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions hasql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
authors = (
("Vladislav Bakaev", "[email protected]"),
("Dmitry Orlov", "[email protected]"),
("Pavel Mosein", "[email protected]"),
)

authors_email = ", ".join(email for _, email in authors)
Expand Down
13 changes: 9 additions & 4 deletions hasql/aiopg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import aiopg

from hasql.base import BasePoolManager
from hasql.metrics import Metrics
from hasql.metrics import DriverMetrics
from hasql.utils import Dsn


Expand Down Expand Up @@ -46,15 +46,20 @@ async def _terminate(self, pool):
def is_connection_closed(self, connection):
return connection.closed

def metrics(self) -> Sequence[Metrics]:
def host(self, pool: aiopg.Pool):
return Dsn.parse(str(pool._dsn)).netloc

def _driver_metrics(self) -> Sequence[DriverMetrics]:
return [
Metrics(
DriverMetrics(
max=p.maxsize or 0,
min=p.minsize,
idle=p.freesize,
used=p.size - p.freesize,
host=Dsn.parse(str(p._dsn)).netloc,
) for p in self.pools
)
for p in self.pools
if p
]


Expand Down
9 changes: 6 additions & 3 deletions hasql/aiopg_sa.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from psycopg2.extensions import parse_dsn

from hasql.aiopg import PoolManager as AioPgPoolManager
from hasql.metrics import Metrics
from hasql.metrics import DriverMetrics
from hasql.utils import Dsn


Expand All @@ -21,9 +21,12 @@ async def _pool_factory(self, dsn: Dsn) -> aiopg.sa.Engine:
**self.pool_factory_kwargs,
)

def metrics(self) -> Sequence[Metrics]:
def host(self, pool: aiopg.sa.Engine):
return parse_dsn(pool.dsn).get("host", ""),

def _driver_metrics(self) -> Sequence[DriverMetrics]:
return [
Metrics(
DriverMetrics(
max=p.maxsize,
min=p.minsize,
idle=p.freesize,
Expand Down
10 changes: 5 additions & 5 deletions hasql/asyncpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncpg # type: ignore

from hasql.base import BasePoolManager
from hasql.metrics import Metrics
from hasql.metrics import DriverMetrics
from hasql.utils import Dsn


Expand Down Expand Up @@ -42,18 +42,18 @@ async def _terminate(self, pool):
def is_connection_closed(self, connection):
return connection.is_closed()

def _parse_host(self, pool: asyncpg.Pool):
def host(self, pool: asyncpg.Pool):
addr, _ = pool._working_addr
return addr

def metrics(self) -> Sequence[Metrics]:
def _driver_metrics(self) -> Sequence[DriverMetrics]:
return [
Metrics(
DriverMetrics(
max=p._maxsize,
min=p._minsize,
idle=self.get_pool_freesize(p),
used=p._maxsize - self.get_pool_freesize(p),
host=self._parse_host(p),
host=self.host(p),
) for p in self.pools
]

Expand Down
9 changes: 6 additions & 3 deletions hasql/asyncsqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from sqlalchemy.pool import QueuePool # type: ignore

from hasql.base import BasePoolManager
from hasql.metrics import Metrics
from hasql.metrics import DriverMetrics
from hasql.utils import Dsn


Expand Down Expand Up @@ -51,9 +51,12 @@ async def _terminate(self, pool: AsyncEngine):
def is_connection_closed(self, connection: AsyncConnection):
return connection.closed

def metrics(self) -> Sequence[Metrics]:
def host(self, pool: AsyncEngine):
return pool.sync_engine.url.host

def _driver_metrics(self) -> Sequence[DriverMetrics]:
return [
Metrics(
DriverMetrics(
max=p.sync_engine.pool.size(),
min=0,
idle=p.sync_engine.pool.checkedin(),
Expand Down
92 changes: 60 additions & 32 deletions hasql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Union,
)

from .metrics import Metrics
from .metrics import CalculateMetrics, DriverMetrics, Metrics
from .utils import Dsn, Stopwatch, split_dsn

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -45,13 +45,14 @@ async def get_pool(
class PoolAcquireContext(AsyncContextManager):

def __init__(
self,
pool_manager: "BasePoolManager",
read_only: bool,
fallback_master: Optional[bool],
master_as_replica_weight: Optional[float],
timeout: float,
**kwargs,
self,
pool_manager: "BasePoolManager",
read_only: bool,
fallback_master: Optional[bool],
master_as_replica_weight: Optional[float],
timeout: float,
metrics: CalculateMetrics,
**kwargs,
):
self.pool_manager = pool_manager
self.read_only = read_only
Expand All @@ -61,37 +62,50 @@ def __init__(
self.kwargs = kwargs
self.pool = None
self.context = None
self.metrics = metrics

async def acquire_from_pool_connection(self):
async def execute():
self.pool = await self.pool_manager.balancer.get_pool(
read_only=self.read_only,
fallback_master=self.fallback_master,
master_as_replica_weight=self.master_as_replica_weight,
)
connection = await self.pool_manager.acquire_from_pool(
self.pool, **self.kwargs,
)
self.pool_manager.register_connection(connection, self.pool)
return connection
return await asyncio.wait_for(execute(), timeout=self.timeout)
with self.metrics.with_get_pool():
self.pool = await self.pool_manager.balancer.get_pool(
read_only=self.read_only,
fallback_master=self.fallback_master,
master_as_replica_weight=self.master_as_replica_weight,
)

with self.metrics.with_acquire():
return await self.pool_manager.acquire_from_pool(
self.pool, **self.kwargs,
)

self.conn = await asyncio.wait_for(execute(), timeout=self.timeout)
self.metrics.add_connection(self.pool_manager.host(self.pool))
self.pool_manager.register_connection(self.conn, self.pool)
return self.conn

async def __aenter__(self):
async def go():
self.pool = await self.pool_manager.balancer.get_pool(
read_only=self.read_only,
fallback_master=self.fallback_master,
master_as_replica_weight=self.master_as_replica_weight,
)
self.context = self.pool_manager.acquire_from_pool(
self.pool,
**self.kwargs,
)
with self.metrics.with_get_pool():
self.pool = await self.pool_manager.balancer.get_pool(
read_only=self.read_only,
fallback_master=self.fallback_master,
master_as_replica_weight=self.master_as_replica_weight,
)
with self.metrics.with_acquire():
self.context = self.pool_manager.acquire_from_pool(
self.pool,
**self.kwargs,
)
return await self.context.__aenter__()
return await asyncio.wait_for(go(), timeout=self.timeout)

self.conn = await asyncio.wait_for(go(), timeout=self.timeout)
self.metrics.add_connection(self.pool_manager.host(self.pool))
return self.conn

async def __aexit__(self, *exc):
self.metrics.remove_connection(self.pool_manager.host(self.pool))
await self.context.__aexit__(*exc)
del self.conn

def __await__(self):
return self.acquire_from_pool_connection().__await__()
Expand Down Expand Up @@ -153,6 +167,7 @@ def __init__(
]
self._closing = False
self._closed = False
self._metrics = CalculateMetrics()

@property
def dsn(self) -> List[Dsn]:
Expand Down Expand Up @@ -231,9 +246,19 @@ def is_connection_closed(self, connection):
pass

@abstractmethod
def metrics(self) -> Sequence[Metrics]:
def host(self, pool: Any):
pass

@abstractmethod
def _driver_metrics(self) -> Sequence[DriverMetrics]:
pass

def metrics(self) -> Metrics:
return Metrics(
drivers=self._driver_metrics(),
hasql=self._metrics.metrics(),
)

def acquire(
self,
read_only: bool = False,
Expand Down Expand Up @@ -272,6 +297,7 @@ def acquire(
fallback_master=fallback_master,
master_as_replica_weight=master_as_replica_weight,
timeout=timeout,
metrics=self._metrics,
**kwargs,
)

Expand Down Expand Up @@ -303,7 +329,9 @@ async def release(self, connection, **kwargs):
"Pool.release() received invalid connection: "
f"{connection!r} is not a member of this pool",
)

pool = self._unmanaged_connections.pop(connection)
self._metrics.remove_connection(self.host(pool))
await self.release_to_pool(connection, pool, **kwargs)

async def close(self):
Expand Down Expand Up @@ -343,8 +371,8 @@ async def ready(
):

if (
(masters_count is not None and replicas_count is None) or
(masters_count is None and replicas_count is not None)
(masters_count is not None and replicas_count is None) or
(masters_count is None and replicas_count is not None)
):
raise ValueError(
"Arguments master_count and replicas_count "
Expand Down
67 changes: 65 additions & 2 deletions hasql/metrics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,73 @@
from dataclasses import dataclass
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Sequence


@dataclass(frozen=True)
class Metrics:
class DriverMetrics:
max: int
min: int
idle: int
used: int
host: str


@dataclass(frozen=True)
class HasqlMetrics:
pool: int
pool_time: float
acquire: int
acquire_time: float
add_connections: dict[str, int]
remove_connections: dict[str, int]


@dataclass
class CalculateMetrics:
_pool: int = 0
_pool_time: int = 0
_acquire: int = 0
_acquire_time: int = 0
_add_connections: dict[str, int] = field(default_factory=dict)
_remove_connections: dict[str, int] = field(default_factory=dict)

def metrics(self) -> HasqlMetrics:
return HasqlMetrics(
pool=self._pool,
pool_time=self._pool_time,
acquire=self._acquire,
acquire_time=self._acquire,
add_connections=self._add_connections,
remove_connections=self._remove_connections,
)

@contextmanager
def with_get_pool(self):
self._pool += 1
tt = time.monotonic()
yield
self._pool_time += time.monotonic() - tt

@contextmanager
def with_acquire(self):
self._acquire += 1
tt = time.monotonic()
yield
self._acquire_time += time.monotonic() - tt

def add_connection(self, dsn: str):
self._add_connections[dsn] = (
self._add_connections.get(dsn, 0) + 1
)

def remove_connection(self, dsn: str):
self._remove_connections[dsn] = (
self._remove_connections.get(dsn, 0) + 1
)


@dataclass(frozen=True)
class Metrics:
drivers: Sequence[DriverMetrics]
hasql: HasqlMetrics
11 changes: 7 additions & 4 deletions hasql/psycopg3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from psycopg_pool import AsyncConnectionPool

from .base import BasePoolManager
from .metrics import Metrics
from .metrics import DriverMetrics
from .utils import Dsn


Expand Down Expand Up @@ -89,15 +89,18 @@ async def _terminate(self, pool: AsyncConnectionPool):
def is_connection_closed(self, connection):
return connection.closed

def metrics(self) -> Sequence[Metrics]:
def host(self, pool: AsyncConnectionPool):
return conninfo_to_dict(pool.conninfo)["host"]

def _driver_metrics(self) -> Sequence[DriverMetrics]:
stats = [
{
**p.get_stats(),
"host": conninfo_to_dict(p.conninfo)["host"]
"host": self.host(p)
} for p in self.pools
]
return [
Metrics(
DriverMetrics(
min=stat["pool_min"],
max=stat["pool_max"],
idle=stat["pool_available"],
Expand Down
Loading

0 comments on commit 028f8a3

Please sign in to comment.