Skip to content

Commit

Permalink
Implemented SentinelBlockingConnectionPool.
Browse files Browse the repository at this point in the history
  • Loading branch information
DABND19 committed Jul 18, 2024
1 parent a745bde commit baf2af2
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Add SentinelBlockingConnectionPool class
* Move doctests (doc code examples) to main branch
* Update `ResponseT` type hint
* Allow to control the minimum SSL version
Expand Down
5 changes: 5 additions & 0 deletions docs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ SentinelConnectionPool
.. autoclass:: redis.sentinel.SentinelConnectionPool
:members:

SentinelBlockingConnectionPool
======================
.. autoclass:: redis.sentinel.SentinelBlockingConnectionPool
:members:


Cluster Client
**************
Expand Down
2 changes: 2 additions & 0 deletions redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from redis.sentinel import (
Sentinel,
SentinelBlockingConnectionPool,
SentinelConnectionPool,
SentinelManagedConnection,
SentinelManagedSSLConnection,
Expand Down Expand Up @@ -77,6 +78,7 @@ def int_or_str(value):
"RedisError",
"ResponseError",
"Sentinel",
"SentinelBlockingConnectionPool",
"SentinelConnectionPool",
"SentinelManagedConnection",
"SentinelManagedSSLConnection",
Expand Down
2 changes: 2 additions & 0 deletions redis/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)
from redis.asyncio.sentinel import (
Sentinel,
SentinelBlockingConnectionPool,
SentinelConnectionPool,
SentinelManagedConnection,
SentinelManagedSSLConnection,
Expand Down Expand Up @@ -53,6 +54,7 @@
"RedisError",
"ResponseError",
"Sentinel",
"SentinelBlockingConnectionPool",
"SentinelConnectionPool",
"SentinelManagedConnection",
"SentinelManagedSSLConnection",
Expand Down
64 changes: 37 additions & 27 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from redis.asyncio.client import Redis
from redis.asyncio.connection import (
BlockingConnectionPool,
Connection,
ConnectionPool,
EncodableT,
Expand Down Expand Up @@ -203,12 +204,38 @@ async def get_master_address(self):
def rotate_slaves(self) -> AsyncIterator:
"""Round-robin slave balancer"""
return self.proxy.rotate_slaves()


class SentinelBlockingConnectionPool(BlockingConnectionPool):
"""
Sentinel blocking connection pool.
If ``check_connection`` flag is set to True, SentinelManagedConnection
sends a PING command right after establishing the connection.
"""

def __init__(self, service_name, sentinel_manager, **kwargs):
kwargs["connection_class"] = kwargs.get(
"connection_class",
(
SentinelManagedSSLConnection
if kwargs.pop("ssl", False)
else SentinelManagedConnection
),
)
self.is_master = kwargs.pop("is_master", True)
self.check_connection = kwargs.pop("check_connection", False)
self.proxy = SentinelConnectionPoolProxy(
connection_pool=self,
is_master=self.is_master,
check_connection=self.check_connection,
service_name=service_name,
sentinel_manager=sentinel_manager,
)
super().__init__(**kwargs)
self.connection_kwargs["connection_pool"] = weakref.proxy(self)
self.service_name = service_name
self.sentinel_manager = sentinel_manager
self.master_address = None
self.slave_rr_counter = None

def __repr__(self):
return (
Expand All @@ -218,8 +245,11 @@ def __repr__(self):

def reset(self):
super().reset()
self.master_address = None
self.slave_rr_counter = None
self.proxy.reset()

@property
def master_address(self):
return self.proxy.master_address

def owns_connection(self, connection: Connection):
check = not self.is_master or (
Expand All @@ -228,31 +258,11 @@ def owns_connection(self, connection: Connection):
return check and super().owns_connection(connection)

async def get_master_address(self):
master_address = await self.sentinel_manager.discover_master(self.service_name)
if self.is_master:
if self.master_address != master_address:
self.master_address = master_address
# disconnect any idle connections so that they reconnect
# to the new master the next time that they are used.
await self.disconnect(inuse_connections=False)
return master_address
return await self.proxy.get_master_address()

async def rotate_slaves(self) -> AsyncIterator:
def rotate_slaves(self) -> AsyncIterator:
"""Round-robin slave balancer"""
slaves = await self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield await self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
return self.proxy.rotate_slaves()


class Sentinel(AsyncSentinelCommands):
Expand Down
68 changes: 67 additions & 1 deletion redis/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

from redis.client import Redis
from redis.commands import SentinelCommands
from redis.connection import Connection, ConnectionPool, SSLConnection
from redis.connection import (
BlockingConnectionPool,
Connection,
ConnectionPool,
SSLConnection,
)
from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
from redis.utils import str_if_bytes

Expand Down Expand Up @@ -195,6 +200,67 @@ def rotate_slaves(self):
return self.proxy.rotate_slaves()


class SentinelBlockingConnectionPool(BlockingConnectionPool):
"""
Sentinel blocking connection pool.
If ``check_connection`` flag is set to True, SentinelManagedConnection
sends a PING command right after establishing the connection.
"""

def __init__(self, service_name, sentinel_manager, **kwargs):
kwargs["connection_class"] = kwargs.get(
"connection_class",
(
SentinelManagedSSLConnection
if kwargs.pop("ssl", False)
else SentinelManagedConnection
),
)
self.is_master = kwargs.pop("is_master", True)
self.check_connection = kwargs.pop("check_connection", False)
self.proxy = SentinelConnectionPoolProxy(
connection_pool=self,
is_master=self.is_master,
check_connection=self.check_connection,
service_name=service_name,
sentinel_manager=sentinel_manager,
)
super().__init__(**kwargs)
self.connection_kwargs["connection_pool"] = self.proxy
self.service_name = service_name
self.sentinel_manager = sentinel_manager

def __repr__(self):
role = "master" if self.is_master else "slave"
return (
f"<{type(self).__module__}.{type(self).__name__}"
f"(service={self.service_name}({role}))>"
)

def reset(self):
super().reset()
self.proxy.reset()

@property
def master_address(self):
return self.proxy.master_address

def owns_connection(self, connection):
check = not self.is_master or (
self.is_master and self.master_address == (connection.host, connection.port)
)
parent = super()
return check and parent.owns_connection(connection)

def get_master_address(self):
return self.proxy.get_master_address()

def rotate_slaves(self):
"Round-robin slave balancer"
return self.proxy.rotate_slaves()


class Sentinel(SentinelCommands):
"""
Redis Sentinel cluster client
Expand Down

0 comments on commit baf2af2

Please sign in to comment.