From 268d31bea40caa307c1c38f02b1063bb1a32edd6 Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 01:15:40 +0300 Subject: [PATCH 01/10] Add TOTP Authorization --- moonraker/components/authorization.py | 44 +++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 204c675f6..14059cd0c 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -11,6 +11,7 @@ import hashlib import secrets import os +import pyotp import time import datetime import ipaddress @@ -78,6 +79,7 @@ def __init__(self, config: ConfigHelper) -> None: self.force_logins = config.getboolean('force_logins', False) self.default_source = config.get('default_source', "moonraker").lower() self.enable_api_key = config.getboolean('enable_api_key', True) + self.enable_totp = config.getboolean('enable_totp', False) self.max_logins = config.getint("max_login_attempts", None, above=0) self.failed_logins: Dict[IPAddr, int] = {} self.fqdn_cache: Dict[IPAddr, Dict[str, Any]] = {} @@ -110,6 +112,10 @@ def __init__(self, config: ConfigHelper) -> None: } else: self.api_key = api_user['api_key'] + if (self.enable_totp): + database.register_local_namespace('user_totp_secret_storage', forbidden=True) + self.totp_secret_db = database.wrap_namespace('user_totp_secret_storage') + self.totp_secrets: Dict[str, Tuple[str, bool]] = self.totp_secret_db.as_dict() hi = self.server.get_host_info() self.issuer = f"http://{hi['hostname']}:{hi['port']}" self.public_jwks: Dict[str, Dict[str, Any]] = {} @@ -267,6 +273,13 @@ def __init__(self, config: ConfigHelper) -> None: transports=TransportType.HTTP | TransportType.WEBSOCKET, auth_required=False ) + # Generate TOTP record + if (self.enable_totp): + self.server.register_endpoint( + "/access/get_totp_uri", RequestType.GET, self._handle_getTOTP_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET, + auth_required=False + ) wsm: WebsocketManager = self.server.lookup_component("websockets") wsm.register_notification("authorization:user_created") wsm.register_notification( @@ -354,6 +367,17 @@ async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]: "login_required": login_req, "trusted": request_trusted } + + async def _handle_getTOTP_request(self, web_request: WebRequest) -> Dict[str, Any]: + username: str = web_request.get_str('username') + (secret, is_activated) = self.totp_secrets.get(username) + if not secret: + raise ValueError("User does not have a TOTP key set up.") + uri = pyotp.TOTP(secret).provisioning_uri(username, issuer_name="Moonraker") + + return { + "TOTP_URI": uri, + } async def _handle_refresh_jwt(self, web_request: WebRequest @@ -442,6 +466,9 @@ async def _handle_password_reset(self, 'sha256', new_pass.encode(), salt, HASH_ITER).hex() self.users[username]['password'] = new_hashed_pass self._sync_user(username) + if (self.enable_totp): + self.totp_secrets[username] = (pyotp.random_base32(), False) + self.totp_secret_db.sync() return { 'username': username, 'action': "user_password_reset" @@ -471,6 +498,8 @@ async def _login_jwt_user( await self.ldap.authenticate_ldap_user(username, password) if username not in self.users: create = True + if (self.enable_totp): + totp_code: str = web_request.get_str('totp_code') if create: if username in self.users: raise self.server.error(f"User {username} already exists") @@ -491,6 +520,9 @@ async def _login_jwt_user( # Dont notify user created action = "user_logged_in" create = False + if (self.enable_totp): + self.totp_secrets[username] = (pyotp.random_base32(), False) + self.totp_secret_db.sync() else: if username not in self.users: raise self.server.error(f"Unregistered User: {username}") @@ -507,6 +539,15 @@ async def _login_jwt_user( action = "user_logged_in" if hashed_pass != user_info['password']: raise self.server.error("Invalid Password") + if (self.enable_totp): + (secret, is_activated) = self.totp_secrets.get(username) + if not secret: + raise self.server.error("User does not have a secret key set up.") + if (pyotp.TOTP(secret).verify(totp_code) == False): + raise self.server.error("Invalid TOTP code") + if (is_activated == False): + self.totp_secrets[username] = (secret, True) + self.totp_secret_db.sync() jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) if jwt_secret_hex is None: private_key = Signer() @@ -563,6 +604,9 @@ def _delete_jwt_user(self, web_request: WebRequest) -> Dict[str, str]: .005, self.server.send_event, "authorization:user_deleted", {'username': username}) + if (self.enable_totp): + del self.totp_secrets[username] + self.totp_secret_db.sync() return { "username": username, "action": "user_deleted" From 4fc726b9adac47a39d8b11194b693564e1e7d57f Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 01:16:37 +0300 Subject: [PATCH 02/10] Add TOTP support --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index c5ad05855..e4ce50bf6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "dbus-next==0.2.3", "apprise==1.7.0", "ldap3==2.9.1", + "pyotp==2.9.0", "python-periphery==2.4.1", "smart_open<=6.4.0" ] From 2db9ab36c426c4b1448c0bdfb1ee7d31ea5fd5a0 Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 01:27:39 +0300 Subject: [PATCH 03/10] Fix --- moonraker/components/authorization.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 14059cd0c..2224662f6 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -370,7 +370,7 @@ async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]: async def _handle_getTOTP_request(self, web_request: WebRequest) -> Dict[str, Any]: username: str = web_request.get_str('username') - (secret, is_activated) = self.totp_secrets.get(username) + (secret, is_activated) = self.totp_secrets.get(username, (None, None)) if not secret: raise ValueError("User does not have a TOTP key set up.") uri = pyotp.TOTP(secret).provisioning_uri(username, issuer_name="Moonraker") @@ -468,7 +468,7 @@ async def _handle_password_reset(self, self._sync_user(username) if (self.enable_totp): self.totp_secrets[username] = (pyotp.random_base32(), False) - self.totp_secret_db.sync() + self.totp_secret_db.sync(self.totp_secrets) return { 'username': username, 'action': "user_password_reset" @@ -522,7 +522,7 @@ async def _login_jwt_user( create = False if (self.enable_totp): self.totp_secrets[username] = (pyotp.random_base32(), False) - self.totp_secret_db.sync() + self.totp_secret_db.sync(self.totp_secrets) else: if username not in self.users: raise self.server.error(f"Unregistered User: {username}") @@ -540,14 +540,14 @@ async def _login_jwt_user( if hashed_pass != user_info['password']: raise self.server.error("Invalid Password") if (self.enable_totp): - (secret, is_activated) = self.totp_secrets.get(username) + (secret, is_activated) = self.totp_secrets.get(username, (None, None)) if not secret: raise self.server.error("User does not have a secret key set up.") if (pyotp.TOTP(secret).verify(totp_code) == False): raise self.server.error("Invalid TOTP code") if (is_activated == False): self.totp_secrets[username] = (secret, True) - self.totp_secret_db.sync() + self.totp_secret_db.sync(self.totp_secrets) jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) if jwt_secret_hex is None: private_key = Signer() From b96e52a7729fcab358f42a1220161a23e588faf7 Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 01:41:46 +0300 Subject: [PATCH 04/10] Fix --- moonraker/components/authorization.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 2224662f6..543851011 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -115,7 +115,7 @@ def __init__(self, config: ConfigHelper) -> None: if (self.enable_totp): database.register_local_namespace('user_totp_secret_storage', forbidden=True) self.totp_secret_db = database.wrap_namespace('user_totp_secret_storage') - self.totp_secrets: Dict[str, Tuple[str, bool]] = self.totp_secret_db.as_dict() + self.totp_secrets: Dict[str, Dict[str, Union[str, bool]]] = self.totp_secret_db.as_dict() hi = self.server.get_host_info() self.issuer = f"http://{hi['hostname']}:{hi['port']}" self.public_jwks: Dict[str, Dict[str, Any]] = {} @@ -467,8 +467,8 @@ async def _handle_password_reset(self, self.users[username]['password'] = new_hashed_pass self._sync_user(username) if (self.enable_totp): - self.totp_secrets[username] = (pyotp.random_base32(), False) - self.totp_secret_db.sync(self.totp_secrets) + self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} + self.totp_secret_db.sync(self.totp_secrets) return { 'username': username, 'action': "user_password_reset" @@ -521,8 +521,8 @@ async def _login_jwt_user( action = "user_logged_in" create = False if (self.enable_totp): - self.totp_secrets[username] = (pyotp.random_base32(), False) - self.totp_secret_db.sync(self.totp_secrets) + self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} + self.totp_secret_db.sync(self.totp_secrets) else: if username not in self.users: raise self.server.error(f"Unregistered User: {username}") @@ -540,14 +540,16 @@ async def _login_jwt_user( if hashed_pass != user_info['password']: raise self.server.error("Invalid Password") if (self.enable_totp): - (secret, is_activated) = self.totp_secrets.get(username, (None, None)) + user_data_totp = self.totp_secrets.get(username, {'secret': None, 'is_activated': None}) + secret = user_data_totp['secret'] + is_activated = user_data_totp['is_activated'] if not secret: - raise self.server.error("User does not have a secret key set up.") + raise self.server.error("User does not have a secret key set up.") if (pyotp.TOTP(secret).verify(totp_code) == False): - raise self.server.error("Invalid TOTP code") + raise self.server.error("Invalid TOTP code") if (is_activated == False): - self.totp_secrets[username] = (secret, True) - self.totp_secret_db.sync(self.totp_secrets) + self.totp_secrets[username] = {'secret': secret, 'is_activated': True} + self.totp_secret_db.sync(self.totp_secrets) jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) if jwt_secret_hex is None: private_key = Signer() @@ -606,7 +608,7 @@ def _delete_jwt_user(self, web_request: WebRequest) -> Dict[str, str]: {'username': username}) if (self.enable_totp): del self.totp_secrets[username] - self.totp_secret_db.sync() + self.totp_secret_db.sync(self.totp_secrets) return { "username": username, "action": "user_deleted" From 4ad9d72fa2516fed7ccfe8beba8d87259c12af4e Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 01:52:03 +0300 Subject: [PATCH 05/10] Fix --- moonraker/components/authorization.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 543851011..44d5b188c 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -370,8 +370,8 @@ async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]: async def _handle_getTOTP_request(self, web_request: WebRequest) -> Dict[str, Any]: username: str = web_request.get_str('username') - (secret, is_activated) = self.totp_secrets.get(username, (None, None)) - if not secret: + (secret, is_activated) = self.totp_secrets.get(username, ('', True)) + if secret == '': raise ValueError("User does not have a TOTP key set up.") uri = pyotp.TOTP(secret).provisioning_uri(username, issuer_name="Moonraker") @@ -540,10 +540,10 @@ async def _login_jwt_user( if hashed_pass != user_info['password']: raise self.server.error("Invalid Password") if (self.enable_totp): - user_data_totp = self.totp_secrets.get(username, {'secret': None, 'is_activated': None}) + user_data_totp = self.totp_secrets.get(username, {'secret': '', 'is_activated': True}) secret = user_data_totp['secret'] is_activated = user_data_totp['is_activated'] - if not secret: + if secret == '': raise self.server.error("User does not have a secret key set up.") if (pyotp.TOTP(secret).verify(totp_code) == False): raise self.server.error("Invalid TOTP code") From 8fdbf84d66af1ac8399199d383f6850398c8f435 Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 01:58:20 +0300 Subject: [PATCH 06/10] Syntax clean --- moonraker/authorization.py | 932 +++++++++++++++++++++++++++++++++++++ 1 file changed, 932 insertions(+) create mode 100644 moonraker/authorization.py diff --git a/moonraker/authorization.py b/moonraker/authorization.py new file mode 100644 index 000000000..c8cd65ffd --- /dev/null +++ b/moonraker/authorization.py @@ -0,0 +1,932 @@ +# API Key Based Authorization +# +# Copyright (C) 2020 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license + +from __future__ import annotations +import asyncio +import base64 +import uuid +import hashlib +import secrets +import os +import pyotp +import time +import datetime +import ipaddress +import re +import socket +import logging +from tornado.web import HTTPError +from libnacl.sign import Signer, Verifier +from ..utils import json_wrapper as jsonw +from ..common import RequestType, TransportType + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Tuple, + Optional, + Union, + Dict, + List, +) + +if TYPE_CHECKING: + from ..confighelper import ConfigHelper + from ..common import WebRequest + from .websockets import WebsocketManager + from tornado.httputil import HTTPServerRequest + from .database import MoonrakerDatabase as DBComp + from .ldap import MoonrakerLDAP + IPAddr = Union[ipaddress.IPv4Address, ipaddress.IPv6Address] + IPNetwork = Union[ipaddress.IPv4Network, ipaddress.IPv6Network] + OneshotToken = Tuple[IPAddr, Optional[Dict[str, Any]], asyncio.Handle] + +# Helpers for base64url encoding and decoding +def base64url_encode(data: bytes) -> bytes: + return base64.urlsafe_b64encode(data).rstrip(b"=") + +def base64url_decode(data: str) -> bytes: + pad_cnt = len(data) % 4 + if pad_cnt: + data += "=" * (4 - pad_cnt) + return base64.urlsafe_b64decode(data) + + +ONESHOT_TIMEOUT = 5 +TRUSTED_CONNECTION_TIMEOUT = 3600 +FQDN_CACHE_TIMEOUT = 84000 +PRUNE_CHECK_TIME = 300. + +AUTH_SOURCES = ["moonraker", "ldap"] +HASH_ITER = 100000 +API_USER = "_API_KEY_USER_" +TRUSTED_USER = "_TRUSTED_USER_" +RESERVED_USERS = [API_USER, TRUSTED_USER] +JWT_EXP_TIME = datetime.timedelta(hours=1) +JWT_HEADER = { + 'alg': "EdDSA", + 'typ': "JWT" +} + +class Authorization: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.login_timeout = config.getint('login_timeout', 90) + self.force_logins = config.getboolean('force_logins', False) + self.default_source = config.get('default_source', "moonraker").lower() + self.enable_api_key = config.getboolean('enable_api_key', True) + self.enable_totp = config.getboolean('enable_totp', False) + self.max_logins = config.getint("max_login_attempts", None, above=0) + self.failed_logins: Dict[IPAddr, int] = {} + self.fqdn_cache: Dict[IPAddr, Dict[str, Any]] = {} + if self.default_source not in AUTH_SOURCES: + self.server.add_warning( + "[authorization]: option 'default_source' - Invalid " + f"value '{self.default_source}', falling back to " + "'moonraker'." + ) + self.default_source = "moonraker" + self.ldap: Optional[MoonrakerLDAP] = None + if config.has_section("ldap"): + self.ldap = self.server.load_component(config, "ldap", None) + if self.default_source == "ldap" and self.ldap is None: + self.server.add_warning( + "[authorization]: Option 'default_source' set to 'ldap'," + " however [ldap] section failed to load or not configured" + ) + database: DBComp = self.server.lookup_component('database') + database.register_local_namespace('authorized_users', forbidden=True) + self.user_db = database.wrap_namespace('authorized_users') + self.users: Dict[str, Dict[str, Any]] = self.user_db.as_dict() + api_user: Optional[Dict[str, Any]] = self.users.get(API_USER, None) + if api_user is None: + self.api_key = uuid.uuid4().hex + self.users[API_USER] = { + 'username': API_USER, + 'api_key': self.api_key, + 'created_on': time.time() + } + else: + self.api_key = api_user['api_key'] + if (self.enable_totp): + database.register_local_namespace( + 'user_totp_secret_storage', + forbidden=True) + self.totp_secret_db = database.wrap_namespace('user_totp_secret_storage') + self.totp_secrets: Dict[str, Dict[str, Union[str, bool]]] = ( + self.totp_secret_db.as_dict() + ) + hi = self.server.get_host_info() + self.issuer = f"http://{hi['hostname']}:{hi['port']}" + self.public_jwks: Dict[str, Dict[str, Any]] = {} + for username, user_info in list(self.users.items()): + if username == API_USER: + # Validate the API User + for item in ["username", "api_key", "created_on"]: + if item not in user_info: + self.users[API_USER] = { + 'username': API_USER, + 'api_key': self.api_key, + 'created_on': time.time() + } + break + continue + else: + # validate created users + valid = True + for item in ["username", "password", "salt", "created_on"]: + if item not in user_info: + logging.info( + f"Authorization: User {username} does not " + f"contain field {item}, removing") + del self.users[username] + valid = False + break + if not valid: + continue + # generate jwks for valid users + if 'jwt_secret' in user_info: + try: + priv_key = self._load_private_key(user_info['jwt_secret']) + jwk_id = user_info['jwk_id'] + except (self.server.error, KeyError): + logging.info("Invalid key found for user, removing") + user_info.pop('jwt_secret', None) + user_info.pop('jwk_id', None) + self.users[username] = user_info + continue + self.public_jwks[jwk_id] = self._generate_public_jwk(priv_key) + # sync user changes to the database + self.user_db.sync(self.users) + self.trusted_users: Dict[IPAddr, Any] = {} + self.oneshot_tokens: Dict[str, OneshotToken] = {} + + # Get allowed cors domains + self.cors_domains: List[str] = [] + for domain in config.getlist('cors_domains', []): + bad_match = re.search(r"^.+\.[^:]*\*", domain) + if bad_match is not None: + self.server.add_warning( + f"[authorization]: Unsafe domain '{domain}' in option " + f"'cors_domains'. Wildcards are not permitted in the" + " top level domain." + ) + continue + if domain.endswith("/"): + self.server.add_warning( + f"[authorization]: Invalid domain '{domain}' in option " + "'cors_domains'. Domain's cannot contain a trailing " + "slash." + ) + else: + self.cors_domains.append( + domain.replace(".", "\\.").replace("*", ".*")) + + # Get Trusted Clients + self.trusted_ips: List[IPAddr] = [] + self.trusted_ranges: List[IPNetwork] = [] + self.trusted_domains: List[str] = [] + for val in config.getlist('trusted_clients', []): + # Check IP address + try: + tc = ipaddress.ip_address(val) + except ValueError: + pass + else: + self.trusted_ips.append(tc) + continue + # Check ip network + try: + tn = ipaddress.ip_network(val) + except ValueError as e: + if "has host bits set" in str(e): + self.server.add_warning( + f"[authorization]: Invalid CIDR expression '{val}' " + "in option 'trusted_clients'") + continue + pass + else: + self.trusted_ranges.append(tn) + continue + # Check hostname + match = re.match(r"([a-z0-9]+(-[a-z0-9]+)*\.?)+[a-z]{2,}$", val) + if match is not None: + self.trusted_domains.append(val.lower()) + else: + self.server.add_warning( + f"[authorization]: Invalid domain name '{val}' " + "in option 'trusted_clients'") + + t_clients = "\n".join( + [str(ip) for ip in self.trusted_ips] + + [str(rng) for rng in self.trusted_ranges] + + self.trusted_domains) + c_domains = "\n".join(self.cors_domains) + + logging.info( + f"Authorization Configuration Loaded\n" + f"Trusted Clients:\n{t_clients}\n" + f"CORS Domains:\n{c_domains}") + + eventloop = self.server.get_event_loop() + self.prune_timer = eventloop.register_timer( + self._prune_conn_handler) + + # Register Authorization Endpoints + self.server.register_endpoint( + "/access/login", RequestType.POST, self._handle_login, + transports=TransportType.HTTP | TransportType.WEBSOCKET, + auth_required=False + ) + self.server.register_endpoint( + "/access/logout", RequestType.POST, self._handle_logout, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/refresh_jwt", RequestType.POST, self._handle_refresh_jwt, + transports=TransportType.HTTP | TransportType.WEBSOCKET, + auth_required=False + ) + self.server.register_endpoint( + "/access/user", RequestType.all(), self._handle_user_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/users/list", RequestType.GET, self._handle_list_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/user/password", RequestType.POST, self._handle_password_reset, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/api_key", RequestType.GET | RequestType.POST, + self._handle_apikey_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/oneshot_token", RequestType.GET, self._handle_oneshot_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/info", RequestType.GET, self._handle_info_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET, + auth_required=False + ) + # Generate TOTP record + if (self.enable_totp): + self.server.register_endpoint( + "/access/get_totp_uri", RequestType.GET, self._handle_getTOTP_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET, + auth_required=False + ) + wsm: WebsocketManager = self.server.lookup_component("websockets") + wsm.register_notification("authorization:user_created") + wsm.register_notification( + "authorization:user_deleted", event_type="logout" + ) + wsm.register_notification( + "authorization:user_logged_out", event_type="logout" + ) + + def _sync_user(self, username: str) -> None: + self.user_db[username] = self.users[username] + + async def component_init(self) -> None: + self.prune_timer.start(delay=PRUNE_CHECK_TIME) + + async def _handle_apikey_request(self, web_request: WebRequest) -> str: + if web_request.get_request_type() == RequestType.POST: + self.api_key = uuid.uuid4().hex + self.users[API_USER]['api_key'] = self.api_key + self._sync_user(API_USER) + return self.api_key + + async def _handle_oneshot_request(self, web_request: WebRequest) -> str: + ip = web_request.get_ip_address() + assert ip is not None + user_info = web_request.get_current_user() + return self.get_oneshot_token(ip, user_info) + + async def _handle_login(self, web_request: WebRequest) -> Dict[str, Any]: + ip = web_request.get_ip_address() + if ip is not None and self.check_logins_maxed(ip): + raise HTTPError( + 401, "Unauthorized, Maximum Login Attempts Reached" + ) + try: + ret = await self._login_jwt_user(web_request) + except asyncio.CancelledError: + raise + except Exception: + if ip is not None: + failed = self.failed_logins.get(ip, 0) + self.failed_logins[ip] = failed + 1 + raise + if ip is not None: + self.failed_logins.pop(ip, None) + return ret + + async def _handle_logout(self, web_request: WebRequest) -> Dict[str, str]: + user_info = web_request.get_current_user() + if user_info is None: + raise self.server.error("No user logged in") + username: str = user_info['username'] + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid log out request for user {username}") + self.users[username].pop("jwt_secret", None) + jwk_id: str = self.users[username].pop("jwk_id", None) + self._sync_user(username) + self.public_jwks.pop(jwk_id, None) + eventloop = self.server.get_event_loop() + eventloop.delay_callback( + .005, self.server.send_event, "authorization:user_logged_out", + {'username': username} + ) + return { + "username": username, + "action": "user_logged_out" + } + + async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]: + sources = ["moonraker"] + if self.ldap is not None: + sources.append("ldap") + login_req = self.force_logins and len(self.users) > 1 + request_trusted: Optional[bool] = None + user = web_request.current_user + req_ip = web_request.ip_addr + if user is not None and user.get("username") == TRUSTED_USER: + request_trusted = True + elif req_ip is not None: + request_trusted = await self._check_authorized_ip(req_ip) + return { + "default_source": self.default_source, + "available_sources": sources, + "login_required": login_req, + "trusted": request_trusted + } + + async def _handle_getTOTP_request(self, web_request: WebRequest) -> Dict[str, Any]: + username: str = web_request.get_str('username') + (secret, is_activated) = self.totp_secrets.get(username, ('', True)) + if secret == '': + raise ValueError("User does not have a TOTP key set up.") + uri = pyotp.TOTP(secret).provisioning_uri(username, issuer_name="Moonraker") + return { + "TOTP_URI": uri, + } + + async def _handle_refresh_jwt(self, + web_request: WebRequest + ) -> Dict[str, str]: + refresh_token: str = web_request.get_str('refresh_token') + try: + user_info = self.decode_jwt(refresh_token, token_type="refresh") + except Exception: + raise self.server.error("Invalid Refresh Token", 401) + username: str = user_info['username'] + if 'jwt_secret' not in user_info or "jwk_id" not in user_info: + raise self.server.error("User not logged in", 401) + private_key = self._load_private_key(user_info['jwt_secret']) + jwk_id: str = user_info['jwk_id'] + token = self._generate_jwt(username, jwk_id, private_key) + return { + 'username': username, + 'token': token, + 'source': user_info.get("source", "moonraker"), + 'action': 'user_jwt_refresh' + } + + async def _handle_user_request( + self, web_request: WebRequest + ) -> Dict[str, Any]: + req_type = web_request.get_request_type() + if req_type == RequestType.GET: + user = web_request.get_current_user() + if user is None: + return { + 'username': None, + 'source': None, + 'created_on': None, + } + else: + return { + 'username': user['username'], + 'source': user.get("source", "moonraker"), + 'created_on': user.get('created_on') + } + elif req_type == RequestType.POST: + # Create User + return await self._login_jwt_user(web_request, create=True) + elif req_type == RequestType.DELETE: + # Delete User + return self._delete_jwt_user(web_request) + raise self.server.error("Invalid Request Method") + + async def _handle_list_request(self, + web_request: WebRequest + ) -> Dict[str, List[Dict[str, Any]]]: + user_list = [] + for user in self.users.values(): + if user['username'] == API_USER: + continue + user_list.append({ + 'username': user['username'], + 'source': user.get("source", "moonraker"), + 'created_on': user['created_on'] + }) + return { + 'users': user_list + } + + async def _handle_password_reset(self, + web_request: WebRequest + ) -> Dict[str, str]: + password: str = web_request.get_str('password') + new_pass: str = web_request.get_str('new_password') + user_info = web_request.get_current_user() + if user_info is None: + raise self.server.error("No Current User") + username = user_info['username'] + if user_info.get("source", "moonraker") == "ldap": + raise self.server.error( + f"Can´t Reset password for ldap user {username}") + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid Reset Request for user {username}") + salt = bytes.fromhex(user_info['salt']) + hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', password.encode(), salt, HASH_ITER).hex() + if hashed_pass != user_info['password']: + raise self.server.error("Invalid Password") + new_hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', new_pass.encode(), salt, HASH_ITER).hex() + self.users[username]['password'] = new_hashed_pass + self._sync_user(username) + if (self.enable_totp): + self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} + self.totp_secret_db.sync(self.totp_secrets) + return { + 'username': username, + 'action': "user_password_reset" + } + + async def _login_jwt_user( + self, web_request: WebRequest, create: bool = False + ) -> Dict[str, Any]: + username: str = web_request.get_str('username') + password: str = web_request.get_str('password') + source: str = web_request.get_str( + 'source', self.default_source + ).lower() + if source not in AUTH_SOURCES: + raise self.server.error(f"Invalid 'source': {source}") + user_info: Dict[str, Any] + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid Request for user {username}") + if source == "ldap": + if create: + raise self.server.error("Cannot Create LDAP User") + if self.ldap is None: + raise self.server.error( + "LDAP authentication not available", 401 + ) + await self.ldap.authenticate_ldap_user(username, password) + if username not in self.users: + create = True + if (self.enable_totp): + totp_code: str = web_request.get_str('totp_code') + if create: + if username in self.users: + raise self.server.error(f"User {username} already exists") + salt = secrets.token_bytes(32) + hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', password.encode(), salt, HASH_ITER).hex() + user_info = { + 'username': username, + 'password': hashed_pass, + 'salt': salt.hex(), + 'source': source, + 'created_on': time.time() + } + self.users[username] = user_info + self._sync_user(username) + action = "user_created" + if source == "ldap": + # Dont notify user created + action = "user_logged_in" + create = False + if (self.enable_totp): + self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} + self.totp_secret_db.sync(self.totp_secrets) + else: + if username not in self.users: + raise self.server.error(f"Unregistered User: {username}") + user_info = self.users[username] + auth_src = user_info.get("source", "moonraker") + if auth_src != source: + raise self.server.error( + f"Moonraker cannot authenticate user '{username}', must " + f"specify source '{auth_src}'", 401 + ) + salt = bytes.fromhex(user_info['salt']) + hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', password.encode(), salt, HASH_ITER).hex() + action = "user_logged_in" + if hashed_pass != user_info['password']: + raise self.server.error("Invalid Password") + if (self.enable_totp): + user_data_totp = self.totp_secrets.get(username, {'secret': '', 'is_activated': True}) + secret = user_data_totp['secret'] + is_activated = user_data_totp['is_activated'] + if secret == '': + raise self.server.error("User does not have a secret key set up.") + if pyotp.TOTP(secret).verify(totp_code) is False: + raise self.server.error("Invalid TOTP code") + if is_activated is False: + self.totp_secrets[username] = {'secret': secret, 'is_activated': True} + self.totp_secret_db.sync(self.totp_secrets) + jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) + if jwt_secret_hex is None: + private_key = Signer() + jwk_id = base64url_encode(secrets.token_bytes()).decode() + user_info['jwt_secret'] = private_key.hex_seed().decode() + user_info['jwk_id'] = jwk_id + self.users[username] = user_info + self._sync_user(username) + self.public_jwks[jwk_id] = self._generate_public_jwk(private_key) + else: + private_key = self._load_private_key(jwt_secret_hex) + jwk_id = user_info['jwk_id'] + token = self._generate_jwt(username, jwk_id, private_key) + refresh_token = self._generate_jwt( + username, jwk_id, private_key, token_type="refresh", + exp_time=datetime.timedelta(days=self.login_timeout)) + conn = web_request.get_client_connection() + if create: + event_loop = self.server.get_event_loop() + event_loop.delay_callback( + .005, self.server.send_event, + "authorization:user_created", + {'username': username}) + elif conn is not None: + conn.user_info = user_info + return { + 'username': username, + 'token': token, + 'source': user_info.get("source", "moonraker"), + 'refresh_token': refresh_token, + 'action': action + } + + def _delete_jwt_user(self, web_request: WebRequest) -> Dict[str, str]: + username: str = web_request.get_str('username') + current_user = web_request.get_current_user() + if current_user is not None: + curname = current_user.get('username', None) + if curname is not None and curname == username: + raise self.server.error( + f"Cannot delete logged in user {curname}") + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid Request for reserved user {username}") + user_info: Optional[Dict[str, Any]] = self.users.get(username) + if user_info is None: + raise self.server.error(f"No registered user: {username}") + if 'jwk_id' in user_info: + self.public_jwks.pop(user_info['jwk_id'], None) + del self.users[username] + del self.user_db[username] + event_loop = self.server.get_event_loop() + event_loop.delay_callback( + .005, self.server.send_event, + "authorization:user_deleted", + {'username': username}) + if (self.enable_totp): + del self.totp_secrets[username] + self.totp_secret_db.sync(self.totp_secrets) + return { + "username": username, + "action": "user_deleted" + } + + def _generate_jwt(self, + username: str, + jwk_id: str, + private_key: Signer, + token_type: str = "access", + exp_time: datetime.timedelta = JWT_EXP_TIME + ) -> str: + curtime = int(time.time()) + payload = { + 'iss': self.issuer, + 'aud': "Moonraker", + 'iat': curtime, + 'exp': curtime + int(exp_time.total_seconds()), + 'username': username, + 'token_type': token_type + } + header = {'kid': jwk_id} + header.update(JWT_HEADER) + jwt_header = base64url_encode(jsonw.dumps(header)) + jwt_payload = base64url_encode(jsonw.dumps(payload)) + jwt_msg = b".".join([jwt_header, jwt_payload]) + sig = private_key.signature(jwt_msg) + jwt_sig = base64url_encode(sig) + return b".".join([jwt_msg, jwt_sig]).decode() + + def decode_jwt( + self, token: str, token_type: str = "access", check_exp: bool = True + ) -> Dict[str, Any]: + message, sig = token.rsplit('.', maxsplit=1) + enc_header, enc_payload = message.split('.') + header: Dict[str, Any] = jsonw.loads(base64url_decode(enc_header)) + sig_bytes = base64url_decode(sig) + + # verify header + if header.get('typ') != "JWT" or header.get('alg') != "EdDSA": + raise self.server.error("Invalid JWT header") + jwk_id = header.get('kid') + if jwk_id not in self.public_jwks: + raise self.server.error("Invalid key ID") + + # validate signature + public_key = self._public_key_from_jwk(self.public_jwks[jwk_id]) + public_key.verify(sig_bytes + message.encode()) + + # validate claims + payload: Dict[str, Any] = jsonw.loads(base64url_decode(enc_payload)) + if payload['token_type'] != token_type: + raise self.server.error( + f"JWT Token type mismatch: Expected {token_type}, " + f"Recd: {payload['token_type']}", 401) + if payload['iss'] != self.issuer: + raise self.server.error("Invalid JWT Issuer", 401) + if payload['aud'] != "Moonraker": + raise self.server.error("Invalid JWT Audience", 401) + if check_exp and payload['exp'] < int(time.time()): + raise self.server.error("JWT Expired", 401) + + # get user + user_info: Optional[Dict[str, Any]] = self.users.get( + payload.get('username', ""), None) + if user_info is None: + raise self.server.error("Unknown user", 401) + return user_info + + def validate_jwt(self, token: str) -> Dict[str, Any]: + try: + user_info = self.decode_jwt(token) + except Exception as e: + if isinstance(e, self.server.error): + raise + raise self.server.error( + f"Failed to decode JWT: {e}", 401 + ) from e + return user_info + + def validate_api_key(self, api_key: str) -> Dict[str, Any]: + if not self.enable_api_key: + raise self.server.error("API Key authentication is disabled", 401) + if api_key and api_key == self.api_key: + return self.users[API_USER] + raise self.server.error("Invalid API Key", 401) + + def _load_private_key(self, secret: str) -> Signer: + try: + key = Signer(bytes.fromhex(secret)) + except Exception: + raise self.server.error( + "Error decoding private key, user data may" + " be corrupt", 500) from None + return key + + def _generate_public_jwk(self, private_key: Signer) -> Dict[str, Any]: + public_key = private_key.vk + return { + 'x': base64url_encode(public_key).decode(), + 'kty': "OKP", + 'crv': "Ed25519", + 'use': "sig" + } + + def _public_key_from_jwk(self, jwk: Dict[str, Any]) -> Verifier: + if jwk.get('kty') != "OKP": + raise self.server.error("Not an Octet Key Pair") + if jwk.get('crv') != "Ed25519": + raise self.server.error("Invalid Curve") + if 'x' not in jwk: + raise self.server.error("No 'x' argument in jwk") + key = base64url_decode(jwk['x']) + return Verifier(key.hex().encode()) + + def _prune_conn_handler(self, eventtime: float) -> float: + cur_time = time.time() + for ip, user_info in list(self.trusted_users.items()): + exp_time: float = user_info['expires_at'] + if cur_time >= exp_time: + self.trusted_users.pop(ip, None) + logging.info(f"Trusted Connection Expired, IP: {ip}") + for ip, fqdn_info in list(self.fqdn_cache.items()): + exp_time = fqdn_info["expires_at"] + if cur_time >= exp_time: + domain: str = fqdn_info["domain"] + self.fqdn_cache.pop(ip, None) + logging.info(f"Cached FQDN Expired, IP: {ip}, domain: {domain}") + return eventtime + PRUNE_CHECK_TIME + + def _oneshot_token_expire_handler(self, token): + self.oneshot_tokens.pop(token, None) + + def get_oneshot_token(self, + ip_addr: IPAddr, + user: Optional[Dict[str, Any]] + ) -> str: + token = base64.b32encode(os.urandom(20)).decode() + event_loop = self.server.get_event_loop() + hdl = event_loop.delay_callback( + ONESHOT_TIMEOUT, self._oneshot_token_expire_handler, token) + self.oneshot_tokens[token] = (ip_addr, user, hdl) + return token + + def _check_json_web_token( + self, request: HTTPServerRequest, required: bool = True + ) -> Optional[Dict[str, Any]]: + auth_token: Optional[str] = request.headers.get("Authorization") + if auth_token is None: + auth_token = request.headers.get("X-Access-Token") + if auth_token is None: + qtoken = request.query_arguments.get('access_token', None) + if qtoken is not None: + auth_token = qtoken[-1].decode(errors="ignore") + elif auth_token.startswith("Bearer "): + auth_token = auth_token[7:] + else: + return None + if auth_token: + try: + return self.decode_jwt(auth_token, check_exp=required) + except Exception: + logging.exception(f"JWT Decode Error {auth_token}") + raise HTTPError(401, "JWT Decode Error") + return None + + async def _check_authorized_ip(self, ip: IPAddr) -> bool: + if ip in self.trusted_ips: + return True + for rng in self.trusted_ranges: + if ip in rng: + return True + if self.trusted_domains: + if ip in self.fqdn_cache: + fqdn: str = self.fqdn_cache[ip]["domain"] + else: + eventloop = self.server.get_event_loop() + try: + fut = eventloop.run_in_thread(socket.getfqdn, str(ip)) + fqdn = await asyncio.wait_for(fut, 5.0) + except asyncio.TimeoutError: + logging.info("Call to socket.getfqdn() timed out") + return False + else: + fqdn = fqdn.lower() + self.fqdn_cache[ip] = { + "expires_at": time.time() + FQDN_CACHE_TIMEOUT, + "domain": fqdn + } + return fqdn in self.trusted_domains + return False + + async def _check_trusted_connection( + self, ip: Optional[IPAddr] + ) -> Optional[Dict[str, Any]]: + if ip is not None: + curtime = time.time() + exp_time = curtime + TRUSTED_CONNECTION_TIMEOUT + if ip in self.trusted_users: + self.trusted_users[ip]['expires_at'] = exp_time + return self.trusted_users[ip] + elif await self._check_authorized_ip(ip): + logging.info( + f"Trusted Connection Detected, IP: {ip}") + self.trusted_users[ip] = { + 'username': TRUSTED_USER, + 'password': None, + 'created_on': curtime, + 'expires_at': exp_time + } + return self.trusted_users[ip] + return None + + def _check_oneshot_token(self, + token: str, + cur_ip: Optional[IPAddr] + ) -> Optional[Dict[str, Any]]: + if token in self.oneshot_tokens: + ip_addr, user, hdl = self.oneshot_tokens.pop(token) + hdl.cancel() + if cur_ip != ip_addr: + logging.info(f"Oneshot Token IP Mismatch: expected{ip_addr}" + f", Recd: {cur_ip}") + return None + return user + else: + return None + + def check_logins_maxed(self, ip_addr: IPAddr) -> bool: + if self.max_logins is None: + return False + return self.failed_logins.get(ip_addr, 0) >= self.max_logins + + async def authenticate_request( + self, request: HTTPServerRequest, auth_required: bool = True + ) -> Optional[Dict[str, Any]]: + if request.method == "OPTIONS": + return None + + # Check JSON Web Token + jwt_user = self._check_json_web_token(request, auth_required) + if jwt_user is not None: + return jwt_user + + try: + ip = ipaddress.ip_address(request.remote_ip) # type: ignore + except ValueError: + logging.exception( + f"Unable to Create IP Address {request.remote_ip}") + ip = None + + # Check oneshot access token + ost: Optional[List[bytes]] = request.arguments.get('token', None) + if ost is not None: + ost_user = self._check_oneshot_token(ost[-1].decode(), ip) + if ost_user is not None: + return ost_user + + # Check API Key Header + if self.enable_api_key: + key: Optional[str] = request.headers.get("X-Api-Key") + if key and key == self.api_key: + return self.users[API_USER] + + # If the force_logins option is enabled and at least one user is created + # then trusted user authentication is disabled + if self.force_logins and len(self.users) > 1: + if not auth_required: + return None + raise HTTPError(401, "Unauthorized, Force Logins Enabled") + + # Check if IP is trusted. If this endpoint doesn't require authentication + # then it is acceptable to return None + trusted_user = await self._check_trusted_connection(ip) + if trusted_user is not None or not auth_required: + return trusted_user + + raise HTTPError(401, "Unauthorized") + + async def check_cors(self, origin: Optional[str]) -> bool: + if origin is None or not self.cors_domains: + return False + for regex in self.cors_domains: + match = re.match(regex, origin) + if match is not None: + if match.group() == origin: + logging.debug(f"CORS Pattern Matched, origin: {origin} " + f" | pattern: {regex}") + return True + else: + logging.debug(f"Partial Cors Match: {match.group()}") + else: + # Check to see if the origin contains an IP that matches a + # current trusted connection + match = re.search(r"^https?://([^/:]+)", origin) + if match is not None: + ip = match.group(1) + try: + ipaddr = ipaddress.ip_address(ip) + except ValueError: + pass + else: + if await self._check_authorized_ip(ipaddr): + logging.debug(f"Cors request matched trusted IP: {ip}") + return True + logging.debug(f"No CORS match for origin: {origin}\n" + f"Patterns: {self.cors_domains}") + return False + + def cors_enabled(self) -> bool: + return self.cors_domains is not None + + def close(self) -> None: + self.prune_timer.stop() + + +def load_component(config: ConfigHelper) -> Authorization: + return Authorization(config) From ede68cbde77b1a92d112163673fd6dc4b355d810 Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 02:00:00 +0300 Subject: [PATCH 07/10] Delete moonraker/authorization.py --- moonraker/authorization.py | 932 ------------------------------------- 1 file changed, 932 deletions(-) delete mode 100644 moonraker/authorization.py diff --git a/moonraker/authorization.py b/moonraker/authorization.py deleted file mode 100644 index c8cd65ffd..000000000 --- a/moonraker/authorization.py +++ /dev/null @@ -1,932 +0,0 @@ -# API Key Based Authorization -# -# Copyright (C) 2020 Eric Callahan -# -# This file may be distributed under the terms of the GNU GPLv3 license - -from __future__ import annotations -import asyncio -import base64 -import uuid -import hashlib -import secrets -import os -import pyotp -import time -import datetime -import ipaddress -import re -import socket -import logging -from tornado.web import HTTPError -from libnacl.sign import Signer, Verifier -from ..utils import json_wrapper as jsonw -from ..common import RequestType, TransportType - -# Annotation imports -from typing import ( - TYPE_CHECKING, - Any, - Tuple, - Optional, - Union, - Dict, - List, -) - -if TYPE_CHECKING: - from ..confighelper import ConfigHelper - from ..common import WebRequest - from .websockets import WebsocketManager - from tornado.httputil import HTTPServerRequest - from .database import MoonrakerDatabase as DBComp - from .ldap import MoonrakerLDAP - IPAddr = Union[ipaddress.IPv4Address, ipaddress.IPv6Address] - IPNetwork = Union[ipaddress.IPv4Network, ipaddress.IPv6Network] - OneshotToken = Tuple[IPAddr, Optional[Dict[str, Any]], asyncio.Handle] - -# Helpers for base64url encoding and decoding -def base64url_encode(data: bytes) -> bytes: - return base64.urlsafe_b64encode(data).rstrip(b"=") - -def base64url_decode(data: str) -> bytes: - pad_cnt = len(data) % 4 - if pad_cnt: - data += "=" * (4 - pad_cnt) - return base64.urlsafe_b64decode(data) - - -ONESHOT_TIMEOUT = 5 -TRUSTED_CONNECTION_TIMEOUT = 3600 -FQDN_CACHE_TIMEOUT = 84000 -PRUNE_CHECK_TIME = 300. - -AUTH_SOURCES = ["moonraker", "ldap"] -HASH_ITER = 100000 -API_USER = "_API_KEY_USER_" -TRUSTED_USER = "_TRUSTED_USER_" -RESERVED_USERS = [API_USER, TRUSTED_USER] -JWT_EXP_TIME = datetime.timedelta(hours=1) -JWT_HEADER = { - 'alg': "EdDSA", - 'typ': "JWT" -} - -class Authorization: - def __init__(self, config: ConfigHelper) -> None: - self.server = config.get_server() - self.login_timeout = config.getint('login_timeout', 90) - self.force_logins = config.getboolean('force_logins', False) - self.default_source = config.get('default_source', "moonraker").lower() - self.enable_api_key = config.getboolean('enable_api_key', True) - self.enable_totp = config.getboolean('enable_totp', False) - self.max_logins = config.getint("max_login_attempts", None, above=0) - self.failed_logins: Dict[IPAddr, int] = {} - self.fqdn_cache: Dict[IPAddr, Dict[str, Any]] = {} - if self.default_source not in AUTH_SOURCES: - self.server.add_warning( - "[authorization]: option 'default_source' - Invalid " - f"value '{self.default_source}', falling back to " - "'moonraker'." - ) - self.default_source = "moonraker" - self.ldap: Optional[MoonrakerLDAP] = None - if config.has_section("ldap"): - self.ldap = self.server.load_component(config, "ldap", None) - if self.default_source == "ldap" and self.ldap is None: - self.server.add_warning( - "[authorization]: Option 'default_source' set to 'ldap'," - " however [ldap] section failed to load or not configured" - ) - database: DBComp = self.server.lookup_component('database') - database.register_local_namespace('authorized_users', forbidden=True) - self.user_db = database.wrap_namespace('authorized_users') - self.users: Dict[str, Dict[str, Any]] = self.user_db.as_dict() - api_user: Optional[Dict[str, Any]] = self.users.get(API_USER, None) - if api_user is None: - self.api_key = uuid.uuid4().hex - self.users[API_USER] = { - 'username': API_USER, - 'api_key': self.api_key, - 'created_on': time.time() - } - else: - self.api_key = api_user['api_key'] - if (self.enable_totp): - database.register_local_namespace( - 'user_totp_secret_storage', - forbidden=True) - self.totp_secret_db = database.wrap_namespace('user_totp_secret_storage') - self.totp_secrets: Dict[str, Dict[str, Union[str, bool]]] = ( - self.totp_secret_db.as_dict() - ) - hi = self.server.get_host_info() - self.issuer = f"http://{hi['hostname']}:{hi['port']}" - self.public_jwks: Dict[str, Dict[str, Any]] = {} - for username, user_info in list(self.users.items()): - if username == API_USER: - # Validate the API User - for item in ["username", "api_key", "created_on"]: - if item not in user_info: - self.users[API_USER] = { - 'username': API_USER, - 'api_key': self.api_key, - 'created_on': time.time() - } - break - continue - else: - # validate created users - valid = True - for item in ["username", "password", "salt", "created_on"]: - if item not in user_info: - logging.info( - f"Authorization: User {username} does not " - f"contain field {item}, removing") - del self.users[username] - valid = False - break - if not valid: - continue - # generate jwks for valid users - if 'jwt_secret' in user_info: - try: - priv_key = self._load_private_key(user_info['jwt_secret']) - jwk_id = user_info['jwk_id'] - except (self.server.error, KeyError): - logging.info("Invalid key found for user, removing") - user_info.pop('jwt_secret', None) - user_info.pop('jwk_id', None) - self.users[username] = user_info - continue - self.public_jwks[jwk_id] = self._generate_public_jwk(priv_key) - # sync user changes to the database - self.user_db.sync(self.users) - self.trusted_users: Dict[IPAddr, Any] = {} - self.oneshot_tokens: Dict[str, OneshotToken] = {} - - # Get allowed cors domains - self.cors_domains: List[str] = [] - for domain in config.getlist('cors_domains', []): - bad_match = re.search(r"^.+\.[^:]*\*", domain) - if bad_match is not None: - self.server.add_warning( - f"[authorization]: Unsafe domain '{domain}' in option " - f"'cors_domains'. Wildcards are not permitted in the" - " top level domain." - ) - continue - if domain.endswith("/"): - self.server.add_warning( - f"[authorization]: Invalid domain '{domain}' in option " - "'cors_domains'. Domain's cannot contain a trailing " - "slash." - ) - else: - self.cors_domains.append( - domain.replace(".", "\\.").replace("*", ".*")) - - # Get Trusted Clients - self.trusted_ips: List[IPAddr] = [] - self.trusted_ranges: List[IPNetwork] = [] - self.trusted_domains: List[str] = [] - for val in config.getlist('trusted_clients', []): - # Check IP address - try: - tc = ipaddress.ip_address(val) - except ValueError: - pass - else: - self.trusted_ips.append(tc) - continue - # Check ip network - try: - tn = ipaddress.ip_network(val) - except ValueError as e: - if "has host bits set" in str(e): - self.server.add_warning( - f"[authorization]: Invalid CIDR expression '{val}' " - "in option 'trusted_clients'") - continue - pass - else: - self.trusted_ranges.append(tn) - continue - # Check hostname - match = re.match(r"([a-z0-9]+(-[a-z0-9]+)*\.?)+[a-z]{2,}$", val) - if match is not None: - self.trusted_domains.append(val.lower()) - else: - self.server.add_warning( - f"[authorization]: Invalid domain name '{val}' " - "in option 'trusted_clients'") - - t_clients = "\n".join( - [str(ip) for ip in self.trusted_ips] + - [str(rng) for rng in self.trusted_ranges] + - self.trusted_domains) - c_domains = "\n".join(self.cors_domains) - - logging.info( - f"Authorization Configuration Loaded\n" - f"Trusted Clients:\n{t_clients}\n" - f"CORS Domains:\n{c_domains}") - - eventloop = self.server.get_event_loop() - self.prune_timer = eventloop.register_timer( - self._prune_conn_handler) - - # Register Authorization Endpoints - self.server.register_endpoint( - "/access/login", RequestType.POST, self._handle_login, - transports=TransportType.HTTP | TransportType.WEBSOCKET, - auth_required=False - ) - self.server.register_endpoint( - "/access/logout", RequestType.POST, self._handle_logout, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/refresh_jwt", RequestType.POST, self._handle_refresh_jwt, - transports=TransportType.HTTP | TransportType.WEBSOCKET, - auth_required=False - ) - self.server.register_endpoint( - "/access/user", RequestType.all(), self._handle_user_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/users/list", RequestType.GET, self._handle_list_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/user/password", RequestType.POST, self._handle_password_reset, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/api_key", RequestType.GET | RequestType.POST, - self._handle_apikey_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/oneshot_token", RequestType.GET, self._handle_oneshot_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET - ) - self.server.register_endpoint( - "/access/info", RequestType.GET, self._handle_info_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET, - auth_required=False - ) - # Generate TOTP record - if (self.enable_totp): - self.server.register_endpoint( - "/access/get_totp_uri", RequestType.GET, self._handle_getTOTP_request, - transports=TransportType.HTTP | TransportType.WEBSOCKET, - auth_required=False - ) - wsm: WebsocketManager = self.server.lookup_component("websockets") - wsm.register_notification("authorization:user_created") - wsm.register_notification( - "authorization:user_deleted", event_type="logout" - ) - wsm.register_notification( - "authorization:user_logged_out", event_type="logout" - ) - - def _sync_user(self, username: str) -> None: - self.user_db[username] = self.users[username] - - async def component_init(self) -> None: - self.prune_timer.start(delay=PRUNE_CHECK_TIME) - - async def _handle_apikey_request(self, web_request: WebRequest) -> str: - if web_request.get_request_type() == RequestType.POST: - self.api_key = uuid.uuid4().hex - self.users[API_USER]['api_key'] = self.api_key - self._sync_user(API_USER) - return self.api_key - - async def _handle_oneshot_request(self, web_request: WebRequest) -> str: - ip = web_request.get_ip_address() - assert ip is not None - user_info = web_request.get_current_user() - return self.get_oneshot_token(ip, user_info) - - async def _handle_login(self, web_request: WebRequest) -> Dict[str, Any]: - ip = web_request.get_ip_address() - if ip is not None and self.check_logins_maxed(ip): - raise HTTPError( - 401, "Unauthorized, Maximum Login Attempts Reached" - ) - try: - ret = await self._login_jwt_user(web_request) - except asyncio.CancelledError: - raise - except Exception: - if ip is not None: - failed = self.failed_logins.get(ip, 0) - self.failed_logins[ip] = failed + 1 - raise - if ip is not None: - self.failed_logins.pop(ip, None) - return ret - - async def _handle_logout(self, web_request: WebRequest) -> Dict[str, str]: - user_info = web_request.get_current_user() - if user_info is None: - raise self.server.error("No user logged in") - username: str = user_info['username'] - if username in RESERVED_USERS: - raise self.server.error( - f"Invalid log out request for user {username}") - self.users[username].pop("jwt_secret", None) - jwk_id: str = self.users[username].pop("jwk_id", None) - self._sync_user(username) - self.public_jwks.pop(jwk_id, None) - eventloop = self.server.get_event_loop() - eventloop.delay_callback( - .005, self.server.send_event, "authorization:user_logged_out", - {'username': username} - ) - return { - "username": username, - "action": "user_logged_out" - } - - async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]: - sources = ["moonraker"] - if self.ldap is not None: - sources.append("ldap") - login_req = self.force_logins and len(self.users) > 1 - request_trusted: Optional[bool] = None - user = web_request.current_user - req_ip = web_request.ip_addr - if user is not None and user.get("username") == TRUSTED_USER: - request_trusted = True - elif req_ip is not None: - request_trusted = await self._check_authorized_ip(req_ip) - return { - "default_source": self.default_source, - "available_sources": sources, - "login_required": login_req, - "trusted": request_trusted - } - - async def _handle_getTOTP_request(self, web_request: WebRequest) -> Dict[str, Any]: - username: str = web_request.get_str('username') - (secret, is_activated) = self.totp_secrets.get(username, ('', True)) - if secret == '': - raise ValueError("User does not have a TOTP key set up.") - uri = pyotp.TOTP(secret).provisioning_uri(username, issuer_name="Moonraker") - return { - "TOTP_URI": uri, - } - - async def _handle_refresh_jwt(self, - web_request: WebRequest - ) -> Dict[str, str]: - refresh_token: str = web_request.get_str('refresh_token') - try: - user_info = self.decode_jwt(refresh_token, token_type="refresh") - except Exception: - raise self.server.error("Invalid Refresh Token", 401) - username: str = user_info['username'] - if 'jwt_secret' not in user_info or "jwk_id" not in user_info: - raise self.server.error("User not logged in", 401) - private_key = self._load_private_key(user_info['jwt_secret']) - jwk_id: str = user_info['jwk_id'] - token = self._generate_jwt(username, jwk_id, private_key) - return { - 'username': username, - 'token': token, - 'source': user_info.get("source", "moonraker"), - 'action': 'user_jwt_refresh' - } - - async def _handle_user_request( - self, web_request: WebRequest - ) -> Dict[str, Any]: - req_type = web_request.get_request_type() - if req_type == RequestType.GET: - user = web_request.get_current_user() - if user is None: - return { - 'username': None, - 'source': None, - 'created_on': None, - } - else: - return { - 'username': user['username'], - 'source': user.get("source", "moonraker"), - 'created_on': user.get('created_on') - } - elif req_type == RequestType.POST: - # Create User - return await self._login_jwt_user(web_request, create=True) - elif req_type == RequestType.DELETE: - # Delete User - return self._delete_jwt_user(web_request) - raise self.server.error("Invalid Request Method") - - async def _handle_list_request(self, - web_request: WebRequest - ) -> Dict[str, List[Dict[str, Any]]]: - user_list = [] - for user in self.users.values(): - if user['username'] == API_USER: - continue - user_list.append({ - 'username': user['username'], - 'source': user.get("source", "moonraker"), - 'created_on': user['created_on'] - }) - return { - 'users': user_list - } - - async def _handle_password_reset(self, - web_request: WebRequest - ) -> Dict[str, str]: - password: str = web_request.get_str('password') - new_pass: str = web_request.get_str('new_password') - user_info = web_request.get_current_user() - if user_info is None: - raise self.server.error("No Current User") - username = user_info['username'] - if user_info.get("source", "moonraker") == "ldap": - raise self.server.error( - f"Can´t Reset password for ldap user {username}") - if username in RESERVED_USERS: - raise self.server.error( - f"Invalid Reset Request for user {username}") - salt = bytes.fromhex(user_info['salt']) - hashed_pass = hashlib.pbkdf2_hmac( - 'sha256', password.encode(), salt, HASH_ITER).hex() - if hashed_pass != user_info['password']: - raise self.server.error("Invalid Password") - new_hashed_pass = hashlib.pbkdf2_hmac( - 'sha256', new_pass.encode(), salt, HASH_ITER).hex() - self.users[username]['password'] = new_hashed_pass - self._sync_user(username) - if (self.enable_totp): - self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} - self.totp_secret_db.sync(self.totp_secrets) - return { - 'username': username, - 'action': "user_password_reset" - } - - async def _login_jwt_user( - self, web_request: WebRequest, create: bool = False - ) -> Dict[str, Any]: - username: str = web_request.get_str('username') - password: str = web_request.get_str('password') - source: str = web_request.get_str( - 'source', self.default_source - ).lower() - if source not in AUTH_SOURCES: - raise self.server.error(f"Invalid 'source': {source}") - user_info: Dict[str, Any] - if username in RESERVED_USERS: - raise self.server.error( - f"Invalid Request for user {username}") - if source == "ldap": - if create: - raise self.server.error("Cannot Create LDAP User") - if self.ldap is None: - raise self.server.error( - "LDAP authentication not available", 401 - ) - await self.ldap.authenticate_ldap_user(username, password) - if username not in self.users: - create = True - if (self.enable_totp): - totp_code: str = web_request.get_str('totp_code') - if create: - if username in self.users: - raise self.server.error(f"User {username} already exists") - salt = secrets.token_bytes(32) - hashed_pass = hashlib.pbkdf2_hmac( - 'sha256', password.encode(), salt, HASH_ITER).hex() - user_info = { - 'username': username, - 'password': hashed_pass, - 'salt': salt.hex(), - 'source': source, - 'created_on': time.time() - } - self.users[username] = user_info - self._sync_user(username) - action = "user_created" - if source == "ldap": - # Dont notify user created - action = "user_logged_in" - create = False - if (self.enable_totp): - self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} - self.totp_secret_db.sync(self.totp_secrets) - else: - if username not in self.users: - raise self.server.error(f"Unregistered User: {username}") - user_info = self.users[username] - auth_src = user_info.get("source", "moonraker") - if auth_src != source: - raise self.server.error( - f"Moonraker cannot authenticate user '{username}', must " - f"specify source '{auth_src}'", 401 - ) - salt = bytes.fromhex(user_info['salt']) - hashed_pass = hashlib.pbkdf2_hmac( - 'sha256', password.encode(), salt, HASH_ITER).hex() - action = "user_logged_in" - if hashed_pass != user_info['password']: - raise self.server.error("Invalid Password") - if (self.enable_totp): - user_data_totp = self.totp_secrets.get(username, {'secret': '', 'is_activated': True}) - secret = user_data_totp['secret'] - is_activated = user_data_totp['is_activated'] - if secret == '': - raise self.server.error("User does not have a secret key set up.") - if pyotp.TOTP(secret).verify(totp_code) is False: - raise self.server.error("Invalid TOTP code") - if is_activated is False: - self.totp_secrets[username] = {'secret': secret, 'is_activated': True} - self.totp_secret_db.sync(self.totp_secrets) - jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) - if jwt_secret_hex is None: - private_key = Signer() - jwk_id = base64url_encode(secrets.token_bytes()).decode() - user_info['jwt_secret'] = private_key.hex_seed().decode() - user_info['jwk_id'] = jwk_id - self.users[username] = user_info - self._sync_user(username) - self.public_jwks[jwk_id] = self._generate_public_jwk(private_key) - else: - private_key = self._load_private_key(jwt_secret_hex) - jwk_id = user_info['jwk_id'] - token = self._generate_jwt(username, jwk_id, private_key) - refresh_token = self._generate_jwt( - username, jwk_id, private_key, token_type="refresh", - exp_time=datetime.timedelta(days=self.login_timeout)) - conn = web_request.get_client_connection() - if create: - event_loop = self.server.get_event_loop() - event_loop.delay_callback( - .005, self.server.send_event, - "authorization:user_created", - {'username': username}) - elif conn is not None: - conn.user_info = user_info - return { - 'username': username, - 'token': token, - 'source': user_info.get("source", "moonraker"), - 'refresh_token': refresh_token, - 'action': action - } - - def _delete_jwt_user(self, web_request: WebRequest) -> Dict[str, str]: - username: str = web_request.get_str('username') - current_user = web_request.get_current_user() - if current_user is not None: - curname = current_user.get('username', None) - if curname is not None and curname == username: - raise self.server.error( - f"Cannot delete logged in user {curname}") - if username in RESERVED_USERS: - raise self.server.error( - f"Invalid Request for reserved user {username}") - user_info: Optional[Dict[str, Any]] = self.users.get(username) - if user_info is None: - raise self.server.error(f"No registered user: {username}") - if 'jwk_id' in user_info: - self.public_jwks.pop(user_info['jwk_id'], None) - del self.users[username] - del self.user_db[username] - event_loop = self.server.get_event_loop() - event_loop.delay_callback( - .005, self.server.send_event, - "authorization:user_deleted", - {'username': username}) - if (self.enable_totp): - del self.totp_secrets[username] - self.totp_secret_db.sync(self.totp_secrets) - return { - "username": username, - "action": "user_deleted" - } - - def _generate_jwt(self, - username: str, - jwk_id: str, - private_key: Signer, - token_type: str = "access", - exp_time: datetime.timedelta = JWT_EXP_TIME - ) -> str: - curtime = int(time.time()) - payload = { - 'iss': self.issuer, - 'aud': "Moonraker", - 'iat': curtime, - 'exp': curtime + int(exp_time.total_seconds()), - 'username': username, - 'token_type': token_type - } - header = {'kid': jwk_id} - header.update(JWT_HEADER) - jwt_header = base64url_encode(jsonw.dumps(header)) - jwt_payload = base64url_encode(jsonw.dumps(payload)) - jwt_msg = b".".join([jwt_header, jwt_payload]) - sig = private_key.signature(jwt_msg) - jwt_sig = base64url_encode(sig) - return b".".join([jwt_msg, jwt_sig]).decode() - - def decode_jwt( - self, token: str, token_type: str = "access", check_exp: bool = True - ) -> Dict[str, Any]: - message, sig = token.rsplit('.', maxsplit=1) - enc_header, enc_payload = message.split('.') - header: Dict[str, Any] = jsonw.loads(base64url_decode(enc_header)) - sig_bytes = base64url_decode(sig) - - # verify header - if header.get('typ') != "JWT" or header.get('alg') != "EdDSA": - raise self.server.error("Invalid JWT header") - jwk_id = header.get('kid') - if jwk_id not in self.public_jwks: - raise self.server.error("Invalid key ID") - - # validate signature - public_key = self._public_key_from_jwk(self.public_jwks[jwk_id]) - public_key.verify(sig_bytes + message.encode()) - - # validate claims - payload: Dict[str, Any] = jsonw.loads(base64url_decode(enc_payload)) - if payload['token_type'] != token_type: - raise self.server.error( - f"JWT Token type mismatch: Expected {token_type}, " - f"Recd: {payload['token_type']}", 401) - if payload['iss'] != self.issuer: - raise self.server.error("Invalid JWT Issuer", 401) - if payload['aud'] != "Moonraker": - raise self.server.error("Invalid JWT Audience", 401) - if check_exp and payload['exp'] < int(time.time()): - raise self.server.error("JWT Expired", 401) - - # get user - user_info: Optional[Dict[str, Any]] = self.users.get( - payload.get('username', ""), None) - if user_info is None: - raise self.server.error("Unknown user", 401) - return user_info - - def validate_jwt(self, token: str) -> Dict[str, Any]: - try: - user_info = self.decode_jwt(token) - except Exception as e: - if isinstance(e, self.server.error): - raise - raise self.server.error( - f"Failed to decode JWT: {e}", 401 - ) from e - return user_info - - def validate_api_key(self, api_key: str) -> Dict[str, Any]: - if not self.enable_api_key: - raise self.server.error("API Key authentication is disabled", 401) - if api_key and api_key == self.api_key: - return self.users[API_USER] - raise self.server.error("Invalid API Key", 401) - - def _load_private_key(self, secret: str) -> Signer: - try: - key = Signer(bytes.fromhex(secret)) - except Exception: - raise self.server.error( - "Error decoding private key, user data may" - " be corrupt", 500) from None - return key - - def _generate_public_jwk(self, private_key: Signer) -> Dict[str, Any]: - public_key = private_key.vk - return { - 'x': base64url_encode(public_key).decode(), - 'kty': "OKP", - 'crv': "Ed25519", - 'use': "sig" - } - - def _public_key_from_jwk(self, jwk: Dict[str, Any]) -> Verifier: - if jwk.get('kty') != "OKP": - raise self.server.error("Not an Octet Key Pair") - if jwk.get('crv') != "Ed25519": - raise self.server.error("Invalid Curve") - if 'x' not in jwk: - raise self.server.error("No 'x' argument in jwk") - key = base64url_decode(jwk['x']) - return Verifier(key.hex().encode()) - - def _prune_conn_handler(self, eventtime: float) -> float: - cur_time = time.time() - for ip, user_info in list(self.trusted_users.items()): - exp_time: float = user_info['expires_at'] - if cur_time >= exp_time: - self.trusted_users.pop(ip, None) - logging.info(f"Trusted Connection Expired, IP: {ip}") - for ip, fqdn_info in list(self.fqdn_cache.items()): - exp_time = fqdn_info["expires_at"] - if cur_time >= exp_time: - domain: str = fqdn_info["domain"] - self.fqdn_cache.pop(ip, None) - logging.info(f"Cached FQDN Expired, IP: {ip}, domain: {domain}") - return eventtime + PRUNE_CHECK_TIME - - def _oneshot_token_expire_handler(self, token): - self.oneshot_tokens.pop(token, None) - - def get_oneshot_token(self, - ip_addr: IPAddr, - user: Optional[Dict[str, Any]] - ) -> str: - token = base64.b32encode(os.urandom(20)).decode() - event_loop = self.server.get_event_loop() - hdl = event_loop.delay_callback( - ONESHOT_TIMEOUT, self._oneshot_token_expire_handler, token) - self.oneshot_tokens[token] = (ip_addr, user, hdl) - return token - - def _check_json_web_token( - self, request: HTTPServerRequest, required: bool = True - ) -> Optional[Dict[str, Any]]: - auth_token: Optional[str] = request.headers.get("Authorization") - if auth_token is None: - auth_token = request.headers.get("X-Access-Token") - if auth_token is None: - qtoken = request.query_arguments.get('access_token', None) - if qtoken is not None: - auth_token = qtoken[-1].decode(errors="ignore") - elif auth_token.startswith("Bearer "): - auth_token = auth_token[7:] - else: - return None - if auth_token: - try: - return self.decode_jwt(auth_token, check_exp=required) - except Exception: - logging.exception(f"JWT Decode Error {auth_token}") - raise HTTPError(401, "JWT Decode Error") - return None - - async def _check_authorized_ip(self, ip: IPAddr) -> bool: - if ip in self.trusted_ips: - return True - for rng in self.trusted_ranges: - if ip in rng: - return True - if self.trusted_domains: - if ip in self.fqdn_cache: - fqdn: str = self.fqdn_cache[ip]["domain"] - else: - eventloop = self.server.get_event_loop() - try: - fut = eventloop.run_in_thread(socket.getfqdn, str(ip)) - fqdn = await asyncio.wait_for(fut, 5.0) - except asyncio.TimeoutError: - logging.info("Call to socket.getfqdn() timed out") - return False - else: - fqdn = fqdn.lower() - self.fqdn_cache[ip] = { - "expires_at": time.time() + FQDN_CACHE_TIMEOUT, - "domain": fqdn - } - return fqdn in self.trusted_domains - return False - - async def _check_trusted_connection( - self, ip: Optional[IPAddr] - ) -> Optional[Dict[str, Any]]: - if ip is not None: - curtime = time.time() - exp_time = curtime + TRUSTED_CONNECTION_TIMEOUT - if ip in self.trusted_users: - self.trusted_users[ip]['expires_at'] = exp_time - return self.trusted_users[ip] - elif await self._check_authorized_ip(ip): - logging.info( - f"Trusted Connection Detected, IP: {ip}") - self.trusted_users[ip] = { - 'username': TRUSTED_USER, - 'password': None, - 'created_on': curtime, - 'expires_at': exp_time - } - return self.trusted_users[ip] - return None - - def _check_oneshot_token(self, - token: str, - cur_ip: Optional[IPAddr] - ) -> Optional[Dict[str, Any]]: - if token in self.oneshot_tokens: - ip_addr, user, hdl = self.oneshot_tokens.pop(token) - hdl.cancel() - if cur_ip != ip_addr: - logging.info(f"Oneshot Token IP Mismatch: expected{ip_addr}" - f", Recd: {cur_ip}") - return None - return user - else: - return None - - def check_logins_maxed(self, ip_addr: IPAddr) -> bool: - if self.max_logins is None: - return False - return self.failed_logins.get(ip_addr, 0) >= self.max_logins - - async def authenticate_request( - self, request: HTTPServerRequest, auth_required: bool = True - ) -> Optional[Dict[str, Any]]: - if request.method == "OPTIONS": - return None - - # Check JSON Web Token - jwt_user = self._check_json_web_token(request, auth_required) - if jwt_user is not None: - return jwt_user - - try: - ip = ipaddress.ip_address(request.remote_ip) # type: ignore - except ValueError: - logging.exception( - f"Unable to Create IP Address {request.remote_ip}") - ip = None - - # Check oneshot access token - ost: Optional[List[bytes]] = request.arguments.get('token', None) - if ost is not None: - ost_user = self._check_oneshot_token(ost[-1].decode(), ip) - if ost_user is not None: - return ost_user - - # Check API Key Header - if self.enable_api_key: - key: Optional[str] = request.headers.get("X-Api-Key") - if key and key == self.api_key: - return self.users[API_USER] - - # If the force_logins option is enabled and at least one user is created - # then trusted user authentication is disabled - if self.force_logins and len(self.users) > 1: - if not auth_required: - return None - raise HTTPError(401, "Unauthorized, Force Logins Enabled") - - # Check if IP is trusted. If this endpoint doesn't require authentication - # then it is acceptable to return None - trusted_user = await self._check_trusted_connection(ip) - if trusted_user is not None or not auth_required: - return trusted_user - - raise HTTPError(401, "Unauthorized") - - async def check_cors(self, origin: Optional[str]) -> bool: - if origin is None or not self.cors_domains: - return False - for regex in self.cors_domains: - match = re.match(regex, origin) - if match is not None: - if match.group() == origin: - logging.debug(f"CORS Pattern Matched, origin: {origin} " - f" | pattern: {regex}") - return True - else: - logging.debug(f"Partial Cors Match: {match.group()}") - else: - # Check to see if the origin contains an IP that matches a - # current trusted connection - match = re.search(r"^https?://([^/:]+)", origin) - if match is not None: - ip = match.group(1) - try: - ipaddr = ipaddress.ip_address(ip) - except ValueError: - pass - else: - if await self._check_authorized_ip(ipaddr): - logging.debug(f"Cors request matched trusted IP: {ip}") - return True - logging.debug(f"No CORS match for origin: {origin}\n" - f"Patterns: {self.cors_domains}") - return False - - def cors_enabled(self) -> bool: - return self.cors_domains is not None - - def close(self) -> None: - self.prune_timer.stop() - - -def load_component(config: ConfigHelper) -> Authorization: - return Authorization(config) From 2747da5b94764cd124423058d89d78fae941b93b Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 02:00:22 +0300 Subject: [PATCH 08/10] Syntax cleaning --- moonraker/components/authorization.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 44d5b188c..e0e43b6a8 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -113,9 +113,13 @@ def __init__(self, config: ConfigHelper) -> None: else: self.api_key = api_user['api_key'] if (self.enable_totp): - database.register_local_namespace('user_totp_secret_storage', forbidden=True) + database.register_local_namespace( + 'user_totp_secret_storage', + forbidden=True) self.totp_secret_db = database.wrap_namespace('user_totp_secret_storage') - self.totp_secrets: Dict[str, Dict[str, Union[str, bool]]] = self.totp_secret_db.as_dict() + self.totp_secrets: Dict[str, Dict[str, Union[str, bool]]] = ( + self.totp_secret_db.as_dict() + ) hi = self.server.get_host_info() self.issuer = f"http://{hi['hostname']}:{hi['port']}" self.public_jwks: Dict[str, Dict[str, Any]] = {} @@ -367,14 +371,13 @@ async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]: "login_required": login_req, "trusted": request_trusted } - + async def _handle_getTOTP_request(self, web_request: WebRequest) -> Dict[str, Any]: username: str = web_request.get_str('username') (secret, is_activated) = self.totp_secrets.get(username, ('', True)) if secret == '': raise ValueError("User does not have a TOTP key set up.") uri = pyotp.TOTP(secret).provisioning_uri(username, issuer_name="Moonraker") - return { "TOTP_URI": uri, } @@ -499,7 +502,7 @@ async def _login_jwt_user( if username not in self.users: create = True if (self.enable_totp): - totp_code: str = web_request.get_str('totp_code') + totp_code: str = web_request.get_str('totp_code') if create: if username in self.users: raise self.server.error(f"User {username} already exists") @@ -545,9 +548,9 @@ async def _login_jwt_user( is_activated = user_data_totp['is_activated'] if secret == '': raise self.server.error("User does not have a secret key set up.") - if (pyotp.TOTP(secret).verify(totp_code) == False): + if pyotp.TOTP(secret).verify(totp_code) is False: raise self.server.error("Invalid TOTP code") - if (is_activated == False): + if is_activated is False: self.totp_secrets[username] = {'secret': secret, 'is_activated': True} self.totp_secret_db.sync(self.totp_secrets) jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) From 09d0337193112ae0c90c12d94bdd7d8f2d8505e5 Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 02:04:24 +0300 Subject: [PATCH 09/10] Syntax cleaning --- moonraker/components/authorization.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index e0e43b6a8..82edc3f0f 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -470,7 +470,10 @@ async def _handle_password_reset(self, self.users[username]['password'] = new_hashed_pass self._sync_user(username) if (self.enable_totp): - self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} + self.totp_secrets[username] = { + 'secret': pyotp.random_base32(), + 'is_activated': False + } self.totp_secret_db.sync(self.totp_secrets) return { 'username': username, @@ -524,7 +527,10 @@ async def _login_jwt_user( action = "user_logged_in" create = False if (self.enable_totp): - self.totp_secrets[username] = {'secret': pyotp.random_base32(), 'is_activated': False} + self.totp_secrets[username] = { + 'secret': pyotp.random_base32(), + 'is_activated': False + } self.totp_secret_db.sync(self.totp_secrets) else: if username not in self.users: @@ -543,7 +549,10 @@ async def _login_jwt_user( if hashed_pass != user_info['password']: raise self.server.error("Invalid Password") if (self.enable_totp): - user_data_totp = self.totp_secrets.get(username, {'secret': '', 'is_activated': True}) + user_data_totp = self.totp_secrets.get(username, { + 'secret': '', + 'is_activated': True + }) secret = user_data_totp['secret'] is_activated = user_data_totp['is_activated'] if secret == '': @@ -551,7 +560,10 @@ async def _login_jwt_user( if pyotp.TOTP(secret).verify(totp_code) is False: raise self.server.error("Invalid TOTP code") if is_activated is False: - self.totp_secrets[username] = {'secret': secret, 'is_activated': True} + self.totp_secrets[username] = { + 'secret': secret, + 'is_activated': True + } self.totp_secret_db.sync(self.totp_secrets) jwt_secret_hex: Optional[str] = user_info.get('jwt_secret', None) if jwt_secret_hex is None: From 10e4f062d4f22486bf0c854578ddb6318fc2bbc4 Mon Sep 17 00:00:00 2001 From: Alrott SlimRG <39348033+SlimRG@users.noreply.github.com> Date: Wed, 24 Apr 2024 02:06:42 +0300 Subject: [PATCH 10/10] Add files via upload --- moonraker/components/authorization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moonraker/components/authorization.py b/moonraker/components/authorization.py index 82edc3f0f..9bbff69c8 100644 --- a/moonraker/components/authorization.py +++ b/moonraker/components/authorization.py @@ -471,7 +471,7 @@ async def _handle_password_reset(self, self._sync_user(username) if (self.enable_totp): self.totp_secrets[username] = { - 'secret': pyotp.random_base32(), + 'secret': pyotp.random_base32(), 'is_activated': False } self.totp_secret_db.sync(self.totp_secrets)