From 5de659180844a58a7c798538152acee8d553c72b Mon Sep 17 00:00:00 2001 From: Scott Palmer Date: Wed, 18 Sep 2024 06:17:50 -0700 Subject: [PATCH] Allow client to be configured during initialization --- nats/aio/client.py | 50 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/nats/aio/client.py b/nats/aio/client.py index 76fa0480..0f69abd9 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -22,7 +22,7 @@ import ssl import time import string -from dataclasses import dataclass +from dataclasses import asdict, dataclass, field, replace from email.parser import BytesParser from random import shuffle from secrets import token_hex @@ -196,6 +196,45 @@ async def _default_error_callback(ex: Exception) -> None: _logger.error('nats: encountered error', exc_info=ex) +@dataclass +class ClientConfig: + """The configuration of a client.""" + + servers: Union[str, List[str]] = field(default_factory=lambda: ["nats://localhost:4222"]), + error_cb: Optional[ErrorCallback] = None + disconnected_cb: Optional[Callback] = None + closed_cb: Optional[Callback] = None + discovered_server_cb: Optional[Callback] = None + reconnected_cb: Optional[Callback] = None + name: Optional[str] = None + pedantic: bool = False + verbose: bool = False + allow_reconnect: bool = True + connect_timeout: int = DEFAULT_CONNECT_TIMEOUT + reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT + max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS + ping_interval: int = DEFAULT_PING_INTERVAL + max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS + dont_randomize: bool = False + flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE + no_echo: bool = False + tls: Optional[ssl.SSLContext] = None + tls_hostname: Optional[str] = None + tls_handshake_first: bool = False + user: Optional[str] = None + password: Optional[str] = None + token: Optional[str] = None + drain_timeout: int = DEFAULT_DRAIN_TIMEOUT + signature_cb: Optional[SignatureCallback] = None + user_jwt_cb: Optional[JWTCallback] = None + user_credentials: Optional[Credentials] = None + nkeys_seed: Optional[str] = None + nkeys_seed_str: Optional[str] = None + inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX + pending_size: int = DEFAULT_PENDING_SIZE + flush_timeout: Optional[float] = None + + class Client: """ Asyncio based client for NATS. @@ -215,7 +254,11 @@ class Client: def __repr__(self) -> str: return f"" - def __init__(self) -> None: + def __init__(self, config: Optional[ClientConfig] = None, **kwargs) -> None: + self._config: ClientConfig = ( + ClientConfig(**kwargs) + if config is None else replace(config, **kwargs) + ) self._current_server: Optional[Srv] = None self._server_info: Dict[str, Any] = {} self._server_pool: List[Srv] = [] @@ -2131,6 +2174,9 @@ async def _read_loop(self) -> None: async def __aenter__(self) -> "Client": """For when NATS client is used in a context manager""" + if not (self.is_connecting or self.is_connected or self.is_reconnecting or + self.is_draining or self.is_draining_pubs): + await self.connect(**asdict(self._config)) return self async def __aexit__(self, *exc_info) -> None: