Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow client to be configured during initialization #604

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import ssl
import time
import string
from dataclasses import dataclass
from dataclasses import asdict, dataclass, field, replace
from email.parser import BytesParser
from random import shuffle
from secrets import token_hex
Expand Down Expand Up @@ -196,6 +196,45 @@ async def _default_error_callback(ex: Exception) -> None:
_logger.error('nats: encountered error', exc_info=ex)


@dataclass
class ClientConfig:
"""The configuration of a client."""

servers: Union[str, List[str]] = field(default_factory=lambda: ["nats://localhost:4222"]),
error_cb: Optional[ErrorCallback] = None
disconnected_cb: Optional[Callback] = None
closed_cb: Optional[Callback] = None
discovered_server_cb: Optional[Callback] = None
reconnected_cb: Optional[Callback] = None
name: Optional[str] = None
pedantic: bool = False
verbose: bool = False
allow_reconnect: bool = True
connect_timeout: int = DEFAULT_CONNECT_TIMEOUT
reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT
max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS
ping_interval: int = DEFAULT_PING_INTERVAL
max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS
dont_randomize: bool = False
flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE
no_echo: bool = False
tls: Optional[ssl.SSLContext] = None
tls_hostname: Optional[str] = None
tls_handshake_first: bool = False
user: Optional[str] = None
password: Optional[str] = None
token: Optional[str] = None
drain_timeout: int = DEFAULT_DRAIN_TIMEOUT
signature_cb: Optional[SignatureCallback] = None
user_jwt_cb: Optional[JWTCallback] = None
user_credentials: Optional[Credentials] = None
nkeys_seed: Optional[str] = None
nkeys_seed_str: Optional[str] = None
inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX
pending_size: int = DEFAULT_PENDING_SIZE
flush_timeout: Optional[float] = None


class Client:
"""
Asyncio based client for NATS.
Expand All @@ -215,7 +254,11 @@ class Client:
def __repr__(self) -> str:
return f"<nats client v{__version__}>"

def __init__(self) -> None:
def __init__(self, config: Optional[ClientConfig] = None, **kwargs) -> None:
self._config: ClientConfig = (
ClientConfig(**kwargs)
if config is None else replace(config, **kwargs)
)
self._current_server: Optional[Srv] = None
self._server_info: Dict[str, Any] = {}
self._server_pool: List[Srv] = []
Expand Down Expand Up @@ -2131,6 +2174,9 @@ async def _read_loop(self) -> None:

async def __aenter__(self) -> "Client":
"""For when NATS client is used in a context manager"""
if not (self.is_connecting or self.is_connected or self.is_reconnecting or
self.is_draining or self.is_draining_pubs):
await self.connect(**asdict(self._config))
return self

async def __aexit__(self, *exc_info) -> None:
Expand Down