From 88a131d294352641b1fd51cacbc1b3da59176511 Mon Sep 17 00:00:00 2001 From: Stanley Kudrow Date: Thu, 17 Oct 2024 09:02:14 +0300 Subject: [PATCH] singularise the enum on client states --- nats/aio/client.py | 52 ++++++++++++++++++++++---------------------- tests/test_client.py | 6 ++--- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/nats/aio/client.py b/nats/aio/client.py index 64228b7f..3a5e567f 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -189,7 +189,7 @@ async def _default_error_callback(ex: Exception) -> None: # Client section -class ClientStates(Enum): +class ClientState(Enum): DISCONNECTED = 0 CONNECTED = 1 CLOSED = 2 @@ -234,7 +234,7 @@ def __init__(self) -> None: self._client_id: Optional[int] = None self._sid: int = 0 self._subs: Dict[int, Subscription] = {} - self._status = ClientStates.DISCONNECTED + self._status = ClientState.DISCONNECTED self._ps: Parser = Parser(self) # pending queue of commands that will be flushed to the server. @@ -515,7 +515,7 @@ async def subscribe_handler(msg): if not self.options["allow_reconnect"]: raise e - await self._close(ClientStates.DISCONNECTED, False) + await self._close(ClientState.DISCONNECTED, False) if self._current_server is not None: self._current_server.last_attempt = time.monotonic() self._current_server.reconnects += 1 @@ -673,9 +673,9 @@ async def close(self) -> None: sets the client to be in the CLOSED state. No further reconnections occur once reaching this point. """ - await self._close(ClientStates.CLOSED) + await self._close(ClientState.CLOSED) - async def _close(self, status: ClientStates, do_cbs: bool = True) -> None: + async def _close(self, status: ClientState, do_cbs: bool = True) -> None: if self.is_closed: self._status = status return @@ -750,7 +750,7 @@ async def _close(self, status: ClientStates, do_cbs: bool = True) -> None: if self._closed_cb is not None: await self._closed_cb() - self._status = ClientStates.CLOSED + self._status = ClientState.CLOSED # Set the client_id and subscription prefix back to None self._client_id = None @@ -784,7 +784,7 @@ async def drain(self) -> None: # Relinquish CPU to allow drain tasks to start in the background, # before setting state to draining. await asyncio.sleep(0) - self._status = ClientStates.DRAINING_SUBS + self._status = ClientState.DRAINING_SUBS try: await asyncio.wait_for( @@ -797,9 +797,9 @@ async def drain(self) -> None: except asyncio.CancelledError: pass finally: - self._status = ClientStates.DRAINING_PUBS + self._status = ClientState.DRAINING_PUBS await self.flush() - await self._close(ClientStates.CLOSED) + await self._close(ClientState.CLOSED) async def publish( self, @@ -1184,30 +1184,30 @@ def pending_data_size(self) -> int: @property def is_closed(self) -> bool: - return self._status == ClientStates.CLOSED + return self._status == ClientState.CLOSED @property def is_reconnecting(self) -> bool: - return self._status == ClientStates.RECONNECTING + return self._status == ClientState.RECONNECTING @property def is_connected(self) -> bool: - return (self._status == ClientStates.CONNECTED) or self.is_draining + return (self._status == ClientState.CONNECTED) or self.is_draining @property def is_connecting(self) -> bool: - return self._status == ClientStates.CONNECTING + return self._status == ClientState.CONNECTING @property def is_draining(self) -> bool: return ( - self._status == ClientStates.DRAINING_SUBS - or self._status == ClientStates.DRAINING_PUBS + self._status == ClientState.DRAINING_SUBS + or self._status == ClientState.DRAINING_PUBS ) @property def is_draining_pubs(self) -> bool: - return self._status == ClientStates.DRAINING_PUBS + return self._status == ClientState.DRAINING_PUBS @property def connected_server_version(self) -> ServerVersion: @@ -1397,7 +1397,7 @@ async def _process_err(self, err_msg: str) -> None: # FIXME: Some errors such as 'Invalid Subscription' # do not cause the server to close the connection. # For now we handle similar as other clients and close. - asyncio.create_task(self._close(ClientStates.CLOSED, do_cbs)) + asyncio.create_task(self._close(ClientState.CLOSED, do_cbs)) async def _process_op_err(self, e: Exception) -> None: """ @@ -1410,7 +1410,7 @@ async def _process_op_err(self, e: Exception) -> None: return if self.options["allow_reconnect"] and self.is_connected: - self._status = ClientStates.RECONNECTING + self._status = ClientState.RECONNECTING self._ps.reset() if (self._reconnection_task is not None @@ -1424,7 +1424,7 @@ async def _process_op_err(self, e: Exception) -> None: else: self._process_disconnect() self._err = e - await self._close(ClientStates.CLOSED, True) + await self._close(ClientState.CLOSED, True) async def _attempt_reconnect(self) -> None: assert self._current_server, "Client.connect must be called first" @@ -1510,7 +1510,7 @@ async def _attempt_reconnect(self) -> None: # to bail earlier in case there are errors in the connection. # await self._flush_pending(force_flush=True) await self._flush_pending() - self._status = ClientStates.CONNECTED + self._status = ClientState.CONNECTED await self.flush() if self._reconnected_cb is not None: await self._reconnected_cb() @@ -1523,7 +1523,7 @@ async def _attempt_reconnect(self) -> None: except (OSError, errors.Error, asyncio.TimeoutError) as e: self._err = e await self._error_cb(e) - self._status = ClientStates.RECONNECTING + self._status = ClientState.RECONNECTING self._current_server.last_attempt = time.monotonic() self._current_server.reconnects += 1 except asyncio.CancelledError: @@ -1853,7 +1853,7 @@ def _process_disconnect(self) -> None: Process disconnection from the server and set client status to DISCONNECTED. """ - self._status = ClientStates.DISCONNECTED + self._status = ClientState.DISCONNECTED def _process_info( self, info: Dict[str, Any], initial_connection: bool = False @@ -1917,7 +1917,7 @@ async def _process_connect_init(self) -> None: """ assert self._transport, "must be called only from Client.connect" assert self._current_server, "must be called only from Client.connect" - self._status = ClientStates.CONNECTING + self._status = ClientState.CONNECTING # Check whether to reuse the original hostname for an implicit route. hostname = None @@ -2018,7 +2018,7 @@ async def _process_connect_init(self) -> None: ) if PONG_PROTO in next_op: - self._status = ClientStates.CONNECTED + self._status = ClientState.CONNECTED elif ERR_OP in next_op: err_line = next_op.decode() _, err_msg = err_line.split(" ", 1) @@ -2029,7 +2029,7 @@ async def _process_connect_init(self) -> None: raise errors.Error("nats: " + err_msg.rstrip("\r\n")) if PONG_PROTO in next_op: - self._status = ClientStates.CONNECTED + self._status = ClientState.CONNECTED self._reading_task = asyncio.get_running_loop().create_task( self._read_loop() @@ -2142,7 +2142,7 @@ async def __aenter__(self) -> "Client": async def __aexit__(self, *exc_info) -> None: """Close connection to NATS when used in a context manager""" - await self._close(ClientStates.CLOSED, do_cbs=True) + await self._close(ClientState.CLOSED, do_cbs=True) def jetstream(self, **opts) -> nats.js.JetStreamContext: """ diff --git a/tests/test_client.py b/tests/test_client.py index cbea2acc..e657d64c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -11,7 +11,7 @@ import nats import nats.errors import pytest -from nats.aio.client import Client as NATS, ClientStates, __version__ +from nats.aio.client import Client as NATS, ClientState, __version__ from tests.utils import ( ClusteringDiscoveryAuthTestCase, ClusteringTestCase, @@ -2906,8 +2906,8 @@ async def disconnected_cb(): await asyncio.wait_for(disconnected, 2) await nc.close() - disconnected_states[0] == ClientStates.RECONNECTING - disconnected_states[1] == ClientStates.CLOSED + disconnected_states[0] == ClientState.RECONNECTING + disconnected_states[1] == ClientState.CLOSED if __name__ == "__main__":