Skip to content

Commit

Permalink
feat: improve error handling in authentication policies
Browse files Browse the repository at this point in the history
  • Loading branch information
eray-inuits committed Aug 28, 2024
1 parent 5d70b38 commit 6f1e67b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def authenticate(self, user_context, _):
self._token_schema.get("preferred_username", ""), ""
).lower()
return user_context
except InvalidTokenError as error:
except (InvalidTokenError, ValidatorException) as error:
raise Unauthorized(str(error))
except OAuth2Error as error:
if self._allow_anonymous_users:
Expand Down Expand Up @@ -222,22 +222,18 @@ def authenticate_token(self, token_string):
If authentication is successful, returns a JWTBearerToken object containing
the decoded JWT claims. If authentication fails, None is returned.
"""
try:
issuer = self.__get_issuer_from_token_string(token_string)
if not self.allowed_issuers:
raise Exception(f"No allowed issuers configured")
elif issuer not in self.allowed_issuers:
raise Exception(f"Issuer {issuer} not allowed")
issuer = self.__get_issuer_from_token_string(token_string)
if not self.allowed_issuers:
raise ValidatorException(f"No allowed issuers configured")
elif issuer not in self.allowed_issuers:
raise ValidatorException(f"Issuer {issuer} not allowed")
jwks = self.__get_jwks(issuer)
token = self.__decode_token(token_string, jwks)
if not token:
self.jwks_cache[issuer] = None
jwks = self.__get_jwks(issuer)
token = self.__decode_token(token_string, jwks)
if not token:
self.jwks_cache[issuer] = None
jwks = self.__get_jwks(issuer)
token = self.__decode_token(token_string, jwks)
return token
except Exception as ex:
self.logger.error(f"Could not get decoded & validated token: {ex}")
return None
return token

@staticmethod
def __get_issuer_from_token_string(token_string):
Expand All @@ -250,11 +246,16 @@ def __get_issuer_from_token_string(token_string):
def __get_jwks_from_issuer(issuer):
req = requests.get(f"{issuer}/.well-known/openid-configuration")
if req.status_code != 200:
raise Exception(
raise ValidatorException(
f"Failed to get issuer's OpenID configuration: {req.text.strip()}"
)
jwks_url = req.json()["jwks_uri"]
req = requests.get(jwks_url)
if req.status_code != 200:
raise Exception(f"Failed to get issuer's JWKS: {req.text.strip()}")
raise ValidatorException(f"Failed to get issuer's JWKS: {req.text.strip()}")
return req.json()["keys"]


class ValidatorException(Exception):
def __init__(self, message):
super().__init__(message)
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,24 @@ def authenticate(self, user_context: UserContext, _):

try:
token = user_context.auth_objects["token"]
flattened_token = user_context.flatten_auth_object(token)
user_context.x_tenant.roles = flattened_token.get(
self._token_schema["roles"], []
)
if not self._role_scope_mapping:
return user_context
for role in user_context.x_tenant.roles:
try:
user_context.x_tenant.scopes.extend(self._role_scope_mapping[role])
except KeyError:
continue
return user_context
except Exception:
if self._allow_anonymous_users:
return user_context
else:
raise Unauthorized()
raise Unauthorized("No token")

flattened_token = user_context.flatten_auth_object(token)
user_context.x_tenant.roles = flattened_token.get(
self._token_schema["roles"], []
)
if not self._role_scope_mapping:
return user_context
for role in user_context.x_tenant.roles:
try:
user_context.x_tenant.scopes.extend(self._role_scope_mapping[role])
except KeyError:
continue
return user_context

def __load_role_scope_mapping(self, file):
try:
Expand Down

0 comments on commit 6f1e67b

Please sign in to comment.