Skip to content

Commit

Permalink
Add modified OIDC authenticator
Browse files Browse the repository at this point in the history
  • Loading branch information
DiamondJoseph committed Jan 10, 2025
1 parent cc21998 commit 6cbe666
Showing 1 changed file with 85 additions and 1 deletion.
86 changes: 85 additions & 1 deletion tiled/authenticators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import re
import secrets
from collections.abc import Iterable
from typing import Any, cast

import httpx
from fastapi import APIRouter, Request
from jose import JWTError, jwk, jwt
from jose import JWTError, jwk, jwt, jws
from pydantic import Secret
import requests
from starlette.responses import RedirectResponse

from .server.authentication import Mode
Expand Down Expand Up @@ -107,6 +110,87 @@ async def authenticate(self, username: str, password: str) -> UserSessionState:
else:
return UserSessionState(username, {})

class MinimalOIDCAuthenticator:
mode = Mode.external
configuration_schema = """
$schema": http://json-schema.org/draft-07/schema#
type: object
additionalProperties: false
properties:
client_id:
type: string
client_secret:
type: string
well_known_uri:
type: string
"""

def __init__(
self,
client_id: str,
client_secret: str,
well_known_uri: str
):
self._client_id = client_id
self._client_secret = Secret(client_secret)
self.well_known_url = well_known_uri

@functools.cached_property
def _config_from_oidc_url(self) -> dict[str, Any]:
response: requests.Response = requests.get(self.well_known_url)
response.raise_for_status()
return response.json()

@functools.cached_property
def token_endpoint(self) -> str:
return cast(str, self._config_from_oidc_url.get("token_endpoint"))

@functools.cached_property
def jwks_uri(self) -> str:
return cast(str, self._config_from_oidc_url.get("jwks_uri"))

@functools.cached_property
def id_token_signing_alg_values_supported(self) -> list[str]:
return cast(
list[str],
self._config_from_oidc_url.get("id_token_signing_alg_values_supported"),
)

async def authenticate(self, request: Request) -> UserSessionState:
code = request.query_params["code"]
# A proxy in the middle may make the request into something like
# 'http://localhost:8000/...' so we fix the first part but keep
# the original URI path.
redirect_uri = f"{get_root_url(request)}{request.url.path}"
response = await exchange_code(
self.token_endpoint,
code,
self._client_id,
self._client_secret.get_secret_value(),
redirect_uri
)
response_body = response.json()
if response.is_error:
logger.error("Authentication error: %r", response_body)
return None
response_body = response.json()
id_token = response_body["id_token"]
access_token = response_body["access_token"]
keys = request.get(self.jwks_uri)
try:
verified_body = jwt.decode(
access_token,
keys,
algorithms=self.id_token_signing_alg_values_supported,
)
except JWTError:
logger.exception(
"Authentication error. Unverified token: %r",
jwt.get_unverified_claims(id_token),
)
return None
return UserSessionState(verified_body["sub"], {})


class OIDCAuthenticator:
mode = Mode.external
Expand Down

0 comments on commit 6cbe666

Please sign in to comment.