Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Set timeout for DB Advisory lock #1826

Merged
merged 11 commits into from
Feb 13, 2024
1 change: 1 addition & 0 deletions changes/1826.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Set timeout for Postgres Advisory lock.
5 changes: 5 additions & 0 deletions configs/manager/sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ password = "DB_PASSWORD" # env: BACKEND_DB_PASSWORD
# (See https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine.params.pool_size)
# pool-size = 8

# The connections for lock lifetime
# Distribution locks wait for `lock-conn-timeout` seconds and lock context should finish within this time.
# Default is 0, which means infinite
# lock-conn-timeout = 30

# This setting causes the pool to recycle connections after the given number of seconds has passed.
# Default is -1, which means infinite.
# https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine.params.pool_recycle
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/manager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@
t.Key("pool-size", default=8): t.ToInt[1:], # type: ignore
t.Key("pool-recycle", default=-1): t.ToFloat[-1:], # -1 is infinite
t.Key("max-overflow", default=64): t.ToInt[-1:], # -1 is infinite # type: ignore
t.Key("lock-conn-timeout", default=0): t.ToFloat[0:], # 0 is infinite
}),
t.Key("manager"): t.Dict({
t.Key("ipc-base-path", default="/tmp/backend.ai/ipc"): tx.Path(
Expand Down
29 changes: 20 additions & 9 deletions src/ai/backend/manager/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class ExtendedAsyncSAEngine(SAEngine):

def __init__(self, *args, **kwargs) -> None:
self._txn_concurrency_threshold = kwargs.pop("_txn_concurrency_threshold", 0)
self.lock_conn_timeout: float | None = (
kwargs.pop("_lock_conn_timeout", 0) or None
) # Convert 0 to `None`
super().__init__(*args, **kwargs)
self._readonly_txn_count = 0
self._generic_txn_count = 0
Expand Down Expand Up @@ -139,36 +142,43 @@ async def advisory_lock(self, lock_id: LockID) -> AsyncIterator[None]:
# but in this case:
# - The lock ID is only given from trusted codes.
# - asyncpg does not support parameter interpolation with raw SQL statements.
await lock_conn.exec_driver_sql(
f"SELECT pg_advisory_lock({lock_id:d});",
)
async with asyncio.timeout(self.lock_conn_timeout):
await lock_conn.exec_driver_sql(
f"SELECT pg_advisory_lock({lock_id:d});",
)
lock_acquired = True
yield
except sa.exc.DBAPIError as e:
if getattr(e.orig, "pgcode", None) == "55P03": # lock not available error
# This may happen upon shutdown after some time.
raise asyncio.CancelledError()
raise
except asyncio.CancelledError:
raise
else:
lock_acquired = True
yield
finally:
if lock_acquired and not lock_conn.closed:
await lock_conn.exec_driver_sql(
f"SELECT pg_advisory_unlock({lock_id:d})",
)
try:
await lock_conn.exec_driver_sql(
f"SELECT pg_advisory_unlock({lock_id:d})",
)
except sa.exc.InterfaceError:
log.warning(
f"DB Connnection for lock(id: {lock_id:d}) has already been closed. Skip unlock"
)


def create_async_engine(
*args,
_txn_concurrency_threshold: int = 0,
_lock_conn_timeout: int = 0,
**kwargs,
) -> ExtendedAsyncSAEngine:
kwargs["future"] = True
sync_engine = _create_engine(*args, **kwargs)
return ExtendedAsyncSAEngine(
sync_engine,
_txn_concurrency_threshold=_txn_concurrency_threshold,
_lock_conn_timeout=_lock_conn_timeout,
)


Expand Down Expand Up @@ -207,6 +217,7 @@ async def connect_database(
int(local_config["db"]["pool-size"] + max(0, local_config["db"]["max-overflow"]) * 0.5),
2,
),
_lock_conn_timeout=local_config["db"]["lock-conn-timeout"],
)
yield db
await db.dispose()
Expand Down
1 change: 1 addition & 0 deletions tests/manager/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def local_config(
"pool-size": 8,
"pool-recycle": -1,
"max-overflow": 64,
"lock-conn-timeout": 0,
},
"manager": {
"id": f"i-{test_id}",
Expand Down
11 changes: 11 additions & 0 deletions tests/manager/test_advisory_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,14 @@ async def critical_section(db: ExtendedAsyncSAEngine) -> None:
rows = result.fetchall()
print(rows)
assert len(rows) == 0


@pytest.mark.asyncio
async def test_lock_timeout(database_engine: ExtendedAsyncSAEngine) -> None:
lock_connection_timeout = 0.5
sleep = 1

with pytest.raises(asyncio.TimeoutError):
database_engine.lock_conn_timeout = lock_connection_timeout
async with database_engine.advisory_lock(LockID.LOCKID_TEST):
await asyncio.sleep(sleep)
Loading