diff --git a/nats/aio/client.py b/nats/aio/client.py index 8df3cfc1..81e65f50 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -26,6 +26,7 @@ from dataclasses import dataclass from email.parser import BytesParser from io import BytesIO +from pathlib import Path from random import shuffle from secrets import token_hex from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union @@ -107,7 +108,7 @@ class RawCredentials(UserString): pass -Credentials = Union[str, Tuple[str, str], RawCredentials] +Credentials = Union[str, Tuple[str, str], RawCredentials, Path] @dataclass @@ -558,7 +559,8 @@ def sig_cb(nonce: str) -> bytes: return sig self._signature_cb = sig_cb - elif isinstance(creds, str) or isinstance(creds, UserString): + elif (isinstance(creds, str) or isinstance(creds, UserString) + or isinstance(creds, Path)): # Define the functions to be able to sign things using nkeys. def user_cb() -> bytearray: return self._read_creds_user_jwt(creds) @@ -579,7 +581,9 @@ def sig_cb(nonce: str) -> bytes: self._signature_cb = sig_cb - def _read_creds_user_nkey(self, creds: str | UserString) -> bytearray: + def _read_creds_user_nkey( + self, creds: str | UserString | Path + ) -> bytearray: def get_user_seed(f): for line in f: @@ -608,7 +612,7 @@ def get_user_seed(f): with open(creds, "rb", buffering=0) as f: return get_user_seed(f) - def _read_creds_user_jwt(self, creds: str | RawCredentials): + def _read_creds_user_jwt(self, creds: str | RawCredentials | Path): def get_user_jwt(f): user_jwt = None