Skip to content

Commit

Permalink
singularise the enum on client states
Browse files Browse the repository at this point in the history
  • Loading branch information
Stanley Kudrow committed Oct 17, 2024
1 parent d188bbb commit 88a131d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
52 changes: 26 additions & 26 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async def _default_error_callback(ex: Exception) -> None:
# Client section


class ClientStates(Enum):
class ClientState(Enum):
DISCONNECTED = 0
CONNECTED = 1
CLOSED = 2
Expand Down Expand Up @@ -234,7 +234,7 @@ def __init__(self) -> None:
self._client_id: Optional[int] = None
self._sid: int = 0
self._subs: Dict[int, Subscription] = {}
self._status = ClientStates.DISCONNECTED
self._status = ClientState.DISCONNECTED
self._ps: Parser = Parser(self)

# pending queue of commands that will be flushed to the server.
Expand Down Expand Up @@ -515,7 +515,7 @@ async def subscribe_handler(msg):
if not self.options["allow_reconnect"]:
raise e

await self._close(ClientStates.DISCONNECTED, False)
await self._close(ClientState.DISCONNECTED, False)
if self._current_server is not None:
self._current_server.last_attempt = time.monotonic()
self._current_server.reconnects += 1
Expand Down Expand Up @@ -673,9 +673,9 @@ async def close(self) -> None:
sets the client to be in the CLOSED state.
No further reconnections occur once reaching this point.
"""
await self._close(ClientStates.CLOSED)
await self._close(ClientState.CLOSED)

async def _close(self, status: ClientStates, do_cbs: bool = True) -> None:
async def _close(self, status: ClientState, do_cbs: bool = True) -> None:
if self.is_closed:
self._status = status
return
Expand Down Expand Up @@ -750,7 +750,7 @@ async def _close(self, status: ClientStates, do_cbs: bool = True) -> None:
if self._closed_cb is not None:
await self._closed_cb()

self._status = ClientStates.CLOSED
self._status = ClientState.CLOSED

# Set the client_id and subscription prefix back to None
self._client_id = None
Expand Down Expand Up @@ -784,7 +784,7 @@ async def drain(self) -> None:
# Relinquish CPU to allow drain tasks to start in the background,
# before setting state to draining.
await asyncio.sleep(0)
self._status = ClientStates.DRAINING_SUBS
self._status = ClientState.DRAINING_SUBS

try:
await asyncio.wait_for(
Expand All @@ -797,9 +797,9 @@ async def drain(self) -> None:
except asyncio.CancelledError:
pass
finally:
self._status = ClientStates.DRAINING_PUBS
self._status = ClientState.DRAINING_PUBS
await self.flush()
await self._close(ClientStates.CLOSED)
await self._close(ClientState.CLOSED)

async def publish(
self,
Expand Down Expand Up @@ -1184,30 +1184,30 @@ def pending_data_size(self) -> int:

@property
def is_closed(self) -> bool:
return self._status == ClientStates.CLOSED
return self._status == ClientState.CLOSED

@property
def is_reconnecting(self) -> bool:
return self._status == ClientStates.RECONNECTING
return self._status == ClientState.RECONNECTING

@property
def is_connected(self) -> bool:
return (self._status == ClientStates.CONNECTED) or self.is_draining
return (self._status == ClientState.CONNECTED) or self.is_draining

@property
def is_connecting(self) -> bool:
return self._status == ClientStates.CONNECTING
return self._status == ClientState.CONNECTING

@property
def is_draining(self) -> bool:
return (
self._status == ClientStates.DRAINING_SUBS
or self._status == ClientStates.DRAINING_PUBS
self._status == ClientState.DRAINING_SUBS
or self._status == ClientState.DRAINING_PUBS
)

@property
def is_draining_pubs(self) -> bool:
return self._status == ClientStates.DRAINING_PUBS
return self._status == ClientState.DRAINING_PUBS

@property
def connected_server_version(self) -> ServerVersion:
Expand Down Expand Up @@ -1397,7 +1397,7 @@ async def _process_err(self, err_msg: str) -> None:
# FIXME: Some errors such as 'Invalid Subscription'
# do not cause the server to close the connection.
# For now we handle similar as other clients and close.
asyncio.create_task(self._close(ClientStates.CLOSED, do_cbs))
asyncio.create_task(self._close(ClientState.CLOSED, do_cbs))

async def _process_op_err(self, e: Exception) -> None:
"""
Expand All @@ -1410,7 +1410,7 @@ async def _process_op_err(self, e: Exception) -> None:
return

if self.options["allow_reconnect"] and self.is_connected:
self._status = ClientStates.RECONNECTING
self._status = ClientState.RECONNECTING
self._ps.reset()

if (self._reconnection_task is not None
Expand All @@ -1424,7 +1424,7 @@ async def _process_op_err(self, e: Exception) -> None:
else:
self._process_disconnect()
self._err = e
await self._close(ClientStates.CLOSED, True)
await self._close(ClientState.CLOSED, True)

async def _attempt_reconnect(self) -> None:
assert self._current_server, "Client.connect must be called first"
Expand Down Expand Up @@ -1510,7 +1510,7 @@ async def _attempt_reconnect(self) -> None:
# to bail earlier in case there are errors in the connection.
# await self._flush_pending(force_flush=True)
await self._flush_pending()
self._status = ClientStates.CONNECTED
self._status = ClientState.CONNECTED
await self.flush()
if self._reconnected_cb is not None:
await self._reconnected_cb()
Expand All @@ -1523,7 +1523,7 @@ async def _attempt_reconnect(self) -> None:
except (OSError, errors.Error, asyncio.TimeoutError) as e:
self._err = e
await self._error_cb(e)
self._status = ClientStates.RECONNECTING
self._status = ClientState.RECONNECTING
self._current_server.last_attempt = time.monotonic()
self._current_server.reconnects += 1
except asyncio.CancelledError:
Expand Down Expand Up @@ -1853,7 +1853,7 @@ def _process_disconnect(self) -> None:
Process disconnection from the server and set client status
to DISCONNECTED.
"""
self._status = ClientStates.DISCONNECTED
self._status = ClientState.DISCONNECTED

def _process_info(
self, info: Dict[str, Any], initial_connection: bool = False
Expand Down Expand Up @@ -1917,7 +1917,7 @@ async def _process_connect_init(self) -> None:
"""
assert self._transport, "must be called only from Client.connect"
assert self._current_server, "must be called only from Client.connect"
self._status = ClientStates.CONNECTING
self._status = ClientState.CONNECTING

# Check whether to reuse the original hostname for an implicit route.
hostname = None
Expand Down Expand Up @@ -2018,7 +2018,7 @@ async def _process_connect_init(self) -> None:
)

if PONG_PROTO in next_op:
self._status = ClientStates.CONNECTED
self._status = ClientState.CONNECTED
elif ERR_OP in next_op:
err_line = next_op.decode()
_, err_msg = err_line.split(" ", 1)
Expand All @@ -2029,7 +2029,7 @@ async def _process_connect_init(self) -> None:
raise errors.Error("nats: " + err_msg.rstrip("\r\n"))

if PONG_PROTO in next_op:
self._status = ClientStates.CONNECTED
self._status = ClientState.CONNECTED

self._reading_task = asyncio.get_running_loop().create_task(
self._read_loop()
Expand Down Expand Up @@ -2142,7 +2142,7 @@ async def __aenter__(self) -> "Client":

async def __aexit__(self, *exc_info) -> None:
"""Close connection to NATS when used in a context manager"""
await self._close(ClientStates.CLOSED, do_cbs=True)
await self._close(ClientState.CLOSED, do_cbs=True)

def jetstream(self, **opts) -> nats.js.JetStreamContext:
"""
Expand Down
6 changes: 3 additions & 3 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import nats
import nats.errors
import pytest
from nats.aio.client import Client as NATS, ClientStates, __version__
from nats.aio.client import Client as NATS, ClientState, __version__
from tests.utils import (
ClusteringDiscoveryAuthTestCase,
ClusteringTestCase,
Expand Down Expand Up @@ -2906,8 +2906,8 @@ async def disconnected_cb():
await asyncio.wait_for(disconnected, 2)
await nc.close()

disconnected_states[0] == ClientStates.RECONNECTING
disconnected_states[1] == ClientStates.CLOSED
disconnected_states[0] == ClientState.RECONNECTING
disconnected_states[1] == ClientState.CLOSED


if __name__ == "__main__":
Expand Down

0 comments on commit 88a131d

Please sign in to comment.