Skip to content

Commit

Permalink
Merge pull request #124 from stankudrow/allow-pool-maxsize-exceeding
Browse files Browse the repository at this point in the history
Allow exceeding the pool.maxsize boundary
  • Loading branch information
long2ice authored Oct 29, 2024
2 parents 09a744f + 69d0453 commit a632230
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 80 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.2

### 0.2.6

- Allow requesting more connections from a `Pool` object without raising AsynchPoolError("no free connections"). The issue #121 by @itssimon. By @stankudrow in #124

### 0.2.5

- Add more validation rules in the `parse_dsn` function. By @stankudrow in #113
Expand Down
63 changes: 41 additions & 22 deletions asynch/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def __init__(
self._used: set[Connection] = set()
self._cond = asyncio.Condition()
self._closing = False
self._sem = asyncio.Semaphore(maxsize)
self._lock = asyncio.Lock()
self._acquired_connections: deque[Connection] = deque(maxlen=maxsize)
self._free_connections: deque[Connection] = deque(maxlen=maxsize)
Expand Down Expand Up @@ -261,30 +262,36 @@ async def _create_connection(self) -> None:
if pool_size == maxsize:
raise AsynchPoolError(f"{self} is already full")
if pool_size > maxsize:
raise RuntimeError(f"{self} is overburden")
raise AsynchPoolError(f"{self} is overburden")

conn = await connect(**self._connection_kwargs)
self._free_connections.append(conn)

async def _acquire_connection(self) -> Connection:
if not self._free_connections:
raise AsynchPoolError(f"no free connection in the {self}")
raise AsynchPoolError(f"no free connection in {self}")

conn = self._free_connections.popleft()
self._acquired_connections.append(conn)
return conn

async def _release_connection(self, conn: Connection) -> None:
if conn not in self._acquired_connections:
raise AsynchPoolError(f"the connection {conn} does not belong to the {self}")
raise AsynchPoolError(f"the connection {conn} does not belong to {self}")

self._acquired_connections.remove(conn)
self._free_connections.append(conn)

async def _fill_with_connections(self, n: Optional[int] = None) -> None:
to_create = n if n else self.minsize
async def _init_connections(self, n: Optional[int] = None) -> None:
to_create = n if n is not None else self.minsize
if to_create < 0:
raise ValueError(f"cannot create negative connections ({to_create}) for {self}")
msg = f"cannot create ({to_create}) negative connections for {self}"
raise ValueError(msg)
if to_create == 0:
return
if (self.connections + to_create) > self.maxsize:
msg = f"cannot create {to_create} connections to exceed the size of {self}"
raise AsynchPoolError(msg)
tasks: list[asyncio.Task] = [
asyncio.create_task(self._create_connection()) for _ in range(to_create)
]
Expand All @@ -294,35 +301,47 @@ async def _fill_with_connections(self, n: Optional[int] = None) -> None:
async def connection(self) -> AsyncIterator[Connection]:
"""Get a connection from the pool.
:raises AsynchPoolError: if a connection cannot be acquired
:raises AsynchPoolError: if a connection cannot be released
If requested more connections than the pool can provide,
the pool gets blocked until a connection comes back.
:raises AsynchPoolError: if a connection cannot be acquired or released
:return: a free connection from the pool
:rtype: Connection
"""

async with self._lock:
if not self._free_connections:
to_create = min(self.minsize, self.maxsize - self.connections)
await self._fill_with_connections(to_create)
conn = await self._acquire_connection()
try:
yield conn
finally:
async with self._sem:
async with self._lock:
await self._release_connection(conn)
if not self._free_connections:
conns, maxsize = self.connections, self.maxsize
avail = maxsize - conns
if (maxsize - conns) < 0:
msg = (
f"the number of pool connections ({conns}) "
f"exceeds the pool maxsize ({maxsize}) for {self}"
)
raise AsynchPoolError(msg)
to_create = min(self.minsize, avail)
await self._init_connections(to_create)
conn = await self._acquire_connection()
try:
yield conn
finally:
async with self._lock:
await self._release_connection(conn)

async def startup(self) -> "Pool":
"""Initialise the pool.
When entering the context, the pool get filled with connections
When entering the context,
the pool get filled with connections
up to the pool `minsize` value.
"""

if self._opened:
return self
async with self._lock:
await self._fill_with_connections(n=self.minsize)
if self._opened:
return self
await self._init_connections(self.minsize)
self._opened = True
if self._closed:
self._closed = False
Expand All @@ -332,7 +351,7 @@ async def shutdown(self) -> None:
"""Close the pool.
This method closes consequently free connections first.
Then it does the same for the acquired/active connections.
Then it does the same for the acquired connections.
Then the pool is marked closed.
"""

Expand Down
95 changes: 37 additions & 58 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest

from asynch.connection import Connection
from asynch.pool import AsynchPoolError, Pool
from asynch.pool import Pool
from asynch.proto import constants
from asynch.proto.models.enums import PoolStatus

Expand Down Expand Up @@ -86,26 +86,32 @@ async def test_pool_connection_management(get_tcp_connections):
No dangling/unclosed connections must leave behind.
"""

conn = Connection()
init_tcps: int = await get_tcp_connections(conn)
async def _get_pool_connection(pool: Pool):
async with pool.connection():
pass

min_size, max_size = 1, 2
async with Pool(minsize=min_size, maxsize=max_size) as pool:
async with Connection() as conn:
init_tcps: int = await get_tcp_connections(conn)

async with Pool(minsize=1, maxsize=2) as pool:
async with pool.connection():
assert pool.free_connections == 0
assert pool.acquired_connections == min_size
assert pool.free_connections == min_size
assert pool.acquired_connections == 1
assert pool.free_connections == 1
assert pool.acquired_connections == 0

async with pool.connection() as cn1:
assert pool.free_connections == 0
assert pool.acquired_connections == 1

async with pool.connection() as cn2:
assert pool.free_connections == 0
assert pool.acquired_connections == max_size
assert pool.acquired_connections == 2

# cannot acquire more than pool.maxsize
with pytest.raises(AsynchPoolError):
async with pool.connection():
pass
# It is possible to acquire more than pool.maxsize property.
# But the caller gets stuck while waiting for a free connection
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(_get_pool_connection(pool), timeout=1.0)

# the returned connections are functional
async with cn1.cursor() as cur:
Expand All @@ -119,56 +125,28 @@ async def test_pool_connection_management(get_tcp_connections):

# the status quo has remained
assert pool.free_connections == 0
assert pool.acquired_connections == max_size
assert pool.acquired_connections == 2

assert pool.free_connections == 1
assert pool.acquired_connections == 1

assert pool.free_connections == max_size
assert pool.acquired_connections == 0

assert init_tcps == await get_tcp_connections(conn)
await conn.close()


@pytest.mark.asyncio
async def test_pool_reuse(get_tcp_connections):
"""Tests connection pool reusability."""

async def _test_pool(pool: Pool):
async with pool.connection() as cn1_ctx:
assert pool.acquired_connections == pool.minsize
assert pool.free_connections == 0
assert pool.connections == pool.minsize

async with pool.connection() as cn2_ctx:
async with cn1_ctx.cursor() as cur:
await cur.execute("SELECT 21")
ret = await cur.fetchone()
assert ret == (21,)
async with cn2_ctx.cursor() as cur:
await cur.execute("SELECT 42")
ret = await cur.fetchone()
assert ret == (42,)
assert pool.acquired_connections == pool.maxsize
async with pool.connection() as cn3:
assert pool.free_connections == 0
assert pool.connections == pool.maxsize
assert pool.acquired_connections == 2

assert pool.acquired_connections == pool.minsize
assert pool.free_connections == pool.minsize
assert pool.connections == pool.maxsize
async with cn3.cursor() as cur:
await cur.execute("SELECT 84")
ret = await cur.fetchone()
assert ret == (84,)

conn = Connection()
init_tcps: int = await get_tcp_connections(conn)
assert pool.free_connections == 1
assert pool.acquired_connections == 1

min_size, max_size = 1, 2
pool = Pool(minsize=min_size, maxsize=max_size)
assert pool.free_connections == 2
assert pool.acquired_connections == 0

for _ in range(2):
async with pool:
await _test_pool(pool)
assert await get_tcp_connections(conn) <= init_tcps
await conn.close()
async with Connection() as conn:
assert init_tcps == await get_tcp_connections(conn)


@pytest.mark.asyncio
Expand All @@ -188,11 +166,11 @@ async def _test_pool_connection(pool: Pool, *, selectee: Any = 42):
assert ret == (selectee,)
return selectee

conn = Connection()
init_tcps: int = await get_tcp_connections(conn)
async with Connection() as conn:
init_tcps: int = await get_tcp_connections(conn)

min_size, max_size = 10, 21
selectees = list(range(min_size, max_size))
selectees = list(range(min_size, max_size + 1)) # exceeding the maxsize
answers: list[int] = []
async with Pool(minsize=min_size, maxsize=max_size) as pool:
tasks: list[asyncio.Task] = [
Expand All @@ -201,7 +179,8 @@ async def _test_pool_connection(pool: Pool, *, selectee: Any = 42):
]
answers = await asyncio.gather(*tasks)

assert await get_tcp_connections(conn) <= init_tcps
await conn.close()
async with Connection() as conn:
noc = await get_tcp_connections(conn)
assert noc == init_tcps

assert selectees == answers

0 comments on commit a632230

Please sign in to comment.