Skip to content

Commit

Permalink
Add config flag for testing (#181)
Browse files Browse the repository at this point in the history
* Add flag for testing

Add a flag that can be enabled
by the integration tests to bypass
checking that messages are coming
from google pub/sub

* Fix linting errors
  • Loading branch information
samanehsan authored Sep 4, 2019
1 parent b705b49 commit 97c6b57
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
5 changes: 4 additions & 1 deletion lira/api/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ def post(body):

def receive_messages(body):
"""Receive messages from Google pub/sub topic."""
if not lira_utils._is_authenticated_pubsub(connexion.request):
lira_config = current_app.config
if not lira_utils._is_authenticated_pubsub(
connexion.request, test_mode=lira_config.test_mode
):
raise connexion.ProblemException(
status=401,
title='Unauthorized',
Expand Down
4 changes: 4 additions & 0 deletions lira/lira_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ def __init__(self, config_object, *args, **kwargs):
if not config_object.get('google_pubsub_topic'):
self.google_pubsub_topic = f'hca-notifications-{env}'

# Only enable for integration testing
if not config_object.get('test_mode'):
config_object['test_mode'] = False

# Check cromwell credentials
use_caas = config_object.get('use_caas', None)
if not use_caas:
Expand Down
44 changes: 27 additions & 17 deletions lira/lira_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,35 @@
LIRA_SERVER_HEADER = {'Server': 'Lira Service', 'Content-type': 'application/json'}


def _is_authenticated_pubsub(request):
def _is_authenticated_pubsub(request, test_mode=False):
""" Check if the message is sent by Google:
https://google-auth.readthedocs.io/en/latest/reference/google.oauth2.id_token.html """
try:
# Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
bearer_token = request.headers.get('Authorization')
token = bearer_token.split(' ')[1]

# Verify and decode the JWT. `verify_oauth2_token` verifies
# the JWT signature, the `aud` claim, and the `exp` claim.
claim = id_token.verify_oauth2_token(token, google_transport_requests.Request())
# Must also verify the `iss` claim.
if claim['iss'] not in ['accounts.google.com', 'https://accounts.google.com']:
logger.error('Wrong issuer.')
return False
https://google-auth.readthedocs.io/en/latest/reference/google.oauth2.id_token.html
If the test_mode flag is set to True, skip authentication (only enable for tests).
"""
if test_mode:
return True
except Exception as e:
logger.error(f'Invalid token: {e}\n')
return False
else:
try:
# Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
bearer_token = request.headers.get('Authorization')
token = bearer_token.split(' ')[1]

# Verify and decode the JWT. `verify_oauth2_token` verifies
# the JWT signature, the `aud` claim, and the `exp` claim.
claim = id_token.verify_oauth2_token(
token, google_transport_requests.Request()
)
# Must also verify the `iss` claim.
if claim['iss'] not in [
'accounts.google.com',
'https://accounts.google.com',
]:
logger.error('Wrong issuer.')
return False
return True
except Exception as e:
logger.error(f'Invalid token: {e}\n')
return False


def response_with_server_header(body, status):
Expand Down

0 comments on commit 97c6b57

Please sign in to comment.