diff --git a/tiled/authenticators.py b/tiled/authenticators.py index e8fff3134..74d6427a3 100644 --- a/tiled/authenticators.py +++ b/tiled/authenticators.py @@ -5,10 +5,13 @@ import re import secrets from collections.abc import Iterable +from typing import Any, cast import httpx from fastapi import APIRouter, Request -from jose import JWTError, jwk, jwt +from jose import JWTError, jwk, jwt, jws +from pydantic import Secret +import requests from starlette.responses import RedirectResponse from .server.authentication import Mode @@ -107,6 +110,87 @@ async def authenticate(self, username: str, password: str) -> UserSessionState: else: return UserSessionState(username, {}) +class MinimalOIDCAuthenticator: + mode = Mode.external + configuration_schema = """ +$schema": http://json-schema.org/draft-07/schema# +type: object +additionalProperties: false +properties: + client_id: + type: string + client_secret: + type: string + well_known_uri: + type: string +""" + + def __init__( + self, + client_id: str, + client_secret: str, + well_known_uri: str + ): + self._client_id = client_id + self._client_secret = Secret(client_secret) + self.well_known_url = well_known_uri + + @functools.cached_property + def _config_from_oidc_url(self) -> dict[str, Any]: + response: requests.Response = requests.get(self.well_known_url) + response.raise_for_status() + return response.json() + + @functools.cached_property + def token_endpoint(self) -> str: + return cast(str, self._config_from_oidc_url.get("token_endpoint")) + + @functools.cached_property + def jwks_uri(self) -> str: + return cast(str, self._config_from_oidc_url.get("jwks_uri")) + + @functools.cached_property + def id_token_signing_alg_values_supported(self) -> list[str]: + return cast( + list[str], + self._config_from_oidc_url.get("id_token_signing_alg_values_supported"), + ) + + async def authenticate(self, request: Request) -> UserSessionState: + code = request.query_params["code"] + # A proxy in the middle may make the request into something like + # 'http://localhost:8000/...' so we fix the first part but keep + # the original URI path. + redirect_uri = f"{get_root_url(request)}{request.url.path}" + response = await exchange_code( + self.token_endpoint, + code, + self._client_id, + self._client_secret.get_secret_value(), + redirect_uri + ) + response_body = response.json() + if response.is_error: + logger.error("Authentication error: %r", response_body) + return None + response_body = response.json() + id_token = response_body["id_token"] + access_token = response_body["access_token"] + keys = request.get(self.jwks_uri) + try: + verified_body = jwt.decode( + access_token, + keys, + algorithms=self.id_token_signing_alg_values_supported, + ) + except JWTError: + logger.exception( + "Authentication error. Unverified token: %r", + jwt.get_unverified_claims(id_token), + ) + return None + return UserSessionState(verified_body["sub"], {}) + class OIDCAuthenticator: mode = Mode.external