Skip to content

Commit

Permalink
Merge pull request #4 from aiokitchen/feature/metrics
Browse files Browse the repository at this point in the history
Add metrics to pool
  • Loading branch information
mosquito authored Oct 12, 2023
2 parents f73282a + d5b1aa2 commit a69d0cc
Show file tree
Hide file tree
Showing 28 changed files with 596 additions and 111 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ jobs:

matrix:
toxenv:
- py37
- py38
- py39
- py310
- py311

steps:
- uses: actions/checkout@v2
Expand Down
12 changes: 9 additions & 3 deletions example/simple_web_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import argparse
import asyncio
from dataclasses import asdict

import aiohttp.web
from aiohttp.web_urldispatcher import View
Expand Down Expand Up @@ -45,15 +45,21 @@ async def get(self):
return aiohttp.web.Response(text=str(res))


class MetricsHandler(BaseView):
async def get(self):
metrics = self.pool.metrics()
return aiohttp.web.json_response([asdict(m) for m in metrics])


class REST(AIOHTTPService):
async def create_application(self) -> aiohttp.web.Application:
app = aiohttp.web.Application()

app.add_routes([
aiohttp.web.get('/master', MasterHandler),
aiohttp.web.get('/replica', ReplicaHandler),
aiohttp.web.get('/metrics', MetricsHandler),
])

pool_manager: PoolManager = PoolManager(
arguments.dsn,
pool_factory_kwargs=dict(
Expand All @@ -73,5 +79,5 @@ async def create_application(self) -> aiohttp.web.Application:
arguments = parser.parse_args()
service = REST(address=arguments.address, port=arguments.port)

with entrypoint(service) as loop:
with entrypoint(service, log_config=True) as loop:
loop.run_forever()
1 change: 1 addition & 0 deletions hasql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
authors = (
("Vladislav Bakaev", "[email protected]"),
("Dmitry Orlov", "[email protected]"),
("Pavel Mosein", "[email protected]"),
)

authors_email = ", ".join(email for _, email in authors)
Expand Down
24 changes: 21 additions & 3 deletions hasql/aiopg.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
from typing import Sequence

import aiopg

from hasql.base import BasePoolManager
from hasql.metrics import DriverMetrics
from hasql.utils import Dsn


class PoolManager(BasePoolManager):
pools: Sequence[aiopg.Pool]

def get_pool_freesize(self, pool):
return pool.freesize

Expand All @@ -18,12 +22,10 @@ async def release_to_pool(self, connection, pool, **kwargs):

async def _is_master(self, connection):
cursor = await connection.cursor()
try:
async with cursor:
await cursor.execute("SHOW transaction_read_only")
read_only = await cursor.fetchone()
return read_only[0] == "off"
finally:
cursor.close()

async def _pool_factory(self, dsn: Dsn):
return await aiopg.create_pool(str(dsn), **self.pool_factory_kwargs)
Expand All @@ -44,5 +46,21 @@ async def _terminate(self, pool):
def is_connection_closed(self, connection):
return connection.closed

def host(self, pool: aiopg.Pool):
return Dsn.parse(str(pool._dsn)).netloc

def _driver_metrics(self) -> Sequence[DriverMetrics]:
return [
DriverMetrics(
max=p.maxsize or 0,
min=p.minsize,
idle=p.freesize,
used=p.size - p.freesize,
host=Dsn.parse(str(p._dsn)).netloc,
)
for p in self.pools
if p
]


__all__ = ("PoolManager",)
22 changes: 21 additions & 1 deletion hasql/aiopg_sa.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
from typing import Sequence

import aiopg.sa
from psycopg2.extensions import parse_dsn

from hasql.aiopg import PoolManager as AioPgPoolManager
from hasql.metrics import DriverMetrics
from hasql.utils import Dsn


class PoolManager(AioPgPoolManager):
pools: Sequence[aiopg.sa.Engine] # type: ignore[assignment]

async def _is_master(self, connection):
read_only = await connection.scalar("SHOW transaction_read_only")
return read_only == "off"

async def _pool_factory(self, dsn: Dsn):
async def _pool_factory(self, dsn: Dsn) -> aiopg.sa.Engine:
return await aiopg.sa.create_engine(
str(dsn),
**self.pool_factory_kwargs,
)

def host(self, pool: aiopg.sa.Engine) -> str: # type: ignore[override]
return parse_dsn(pool.dsn).get("host", "")

def _driver_metrics(self) -> Sequence[DriverMetrics]:
return [
DriverMetrics(
max=p.maxsize,
min=p.minsize,
idle=p.freesize,
used=p.size - p.freesize,
host=parse_dsn(p.dsn).get("host", ""),
) for p in self.pools
]


__all__ = ("PoolManager",)
19 changes: 19 additions & 0 deletions hasql/asyncpg.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
from typing import Sequence

import asyncpg # type: ignore

from hasql.base import BasePoolManager
from hasql.metrics import DriverMetrics
from hasql.utils import Dsn


class PoolManager(BasePoolManager):
pools: Sequence[asyncpg.Pool]

def get_pool_freesize(self, pool):
return pool._queue.qsize()

Expand Down Expand Up @@ -38,5 +42,20 @@ async def _terminate(self, pool):
def is_connection_closed(self, connection):
return connection.is_closed()

def host(self, pool: asyncpg.Pool):
addr, _ = pool._working_addr
return addr

def _driver_metrics(self) -> Sequence[DriverMetrics]:
return [
DriverMetrics(
max=p._maxsize,
min=p._minsize,
idle=self.get_pool_freesize(p),
used=p._maxsize - self.get_pool_freesize(p),
host=self.host(p),
) for p in self.pools
]


__all__ = ("PoolManager",)
1 change: 0 additions & 1 deletion hasql/asyncpgsa.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncpgsa # type: ignore

from hasql.asyncpg import PoolManager as AsyncPgPoolManager
from hasql.utils import Dsn

Expand Down
21 changes: 18 additions & 3 deletions hasql/asyncsqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
from typing import Sequence

import sqlalchemy as sa # type: ignore
from sqlalchemy.ext.asyncio import ( # type: ignore
AsyncConnection, AsyncEngine, create_async_engine,
)
from sqlalchemy.ext.asyncio import AsyncConnection # type: ignore
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlalchemy.pool import QueuePool # type: ignore

from hasql.base import BasePoolManager
from hasql.metrics import DriverMetrics
from hasql.utils import Dsn


Expand Down Expand Up @@ -50,5 +51,19 @@ async def _terminate(self, pool: AsyncEngine):
def is_connection_closed(self, connection: AsyncConnection):
return connection.closed

def host(self, pool: AsyncEngine):
return pool.sync_engine.url.host

def _driver_metrics(self) -> Sequence[DriverMetrics]:
return [
DriverMetrics(
max=p.sync_engine.pool.size(),
min=0,
idle=p.sync_engine.pool.checkedin(),
used=p.sync_engine.pool.checkedout(),
host=p.sync_engine.url.host,
) for p in self.pools
]


__all__ = ("PoolManager",)
4 changes: 2 additions & 2 deletions hasql/balancer_policy/greedy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ async def _get_pool(
if (
not read_only or
(
choose_master_as_replica and
self._pool_manager.master_pool_count > 0
choose_master_as_replica and
self._pool_manager.master_pool_count > 0
)
):
candidates.extend(await self._pool_manager.get_master_pools())
Expand Down
4 changes: 2 additions & 2 deletions hasql/balancer_policy/random_weighted.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ async def _get_pool(
if (
not read_only or
(
choose_master_as_replica and
self._pool_manager.master_pool_count > 0
choose_master_as_replica and
self._pool_manager.master_pool_count > 0
)
):
candidates.extend(await self._pool_manager.get_master_pools())
Expand Down
Loading

0 comments on commit a69d0cc

Please sign in to comment.