Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to auth bdd tests to be parametrized tests #4046

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/app/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import timezone, datetime
import logging
from typing import Optional
from aiohttp import ClientSession
from unittest.mock import MagicMock
import requests
import pytest
Expand All @@ -17,6 +18,7 @@
default_aiobotocore_get_session,
default_mock_requests_get,
default_mock_requests_post,
default_mock_client_get,
default_mock_requests_session_get,
default_mock_requests_session_post,
)
Expand Down Expand Up @@ -185,6 +187,14 @@ def mock_requests_session(monkeypatch):
return monkeypatch


@pytest.fixture()
def mock_client_session(monkeypatch):
"""Patch all calls to aiohttp.ClientSession"""
monkeypatch.setattr(ClientSession, "get", default_mock_client_get)
monkeypatch.setattr(ClientSession, "post", default_mock_client_get)
return monkeypatch


@pytest.fixture(autouse=True)
def spy_access_logging(mocker: MockerFixture):
"""Spies on access audting logging for tests"""
Expand Down
27 changes: 0 additions & 27 deletions api/app/tests/test_auth.feature

This file was deleted.

105 changes: 38 additions & 67 deletions api/app/tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,57 @@
""" Functional testing for authentication """

from datetime import datetime, timezone
from aiohttp import ClientSession
from pytest_bdd import scenario, given, then, parsers
from fastapi.testclient import TestClient
from datetime import datetime, timezone
import pytest
import app.auth
import app.main
from app.tests import load_json_file
from app.tests.common import default_mock_client_get


@scenario('test_auth.feature', 'Handling unauthenticated users')
def test_auth_1st_scenario():
""" BDD Scenario #1. """


@given(parsers.parse(
"I am an unauthenticated user {token} when I {verb} a protected {endpoint} with {payload}"),
converters={'token': str, 'verb': str, 'endpoint': str, 'payload': load_json_file(__file__)},
target_fixture='response')
def given_unauthenticated_user(monkeypatch, token: str, endpoint: str, verb: str, payload: dict):
""" Request (post/get) {endpoint} request which is protected """
@pytest.mark.parametrize(
"token, status, endpoint, verb, payload",
[
("Basic token", 401, "/api/weather_models/GDPS/predictions/summaries/", "post", "test_auth_stations_payload.json"),
("just_token", 401, "/api/weather_models/GDPS/predictions/summaries/", "post", "test_auth_stations_payload.json"),
("Bearer token", 401, "/api/weather_models/GDPS/predictions/summaries/", "post", "test_auth_stations_payload.json"),
("just_token", 401, "/api/weather_models/GDPS/predictions/most_recent/", "post", "test_auth_stations_payload.json"),
("Bearer token", 401, "/api/weather_models/GDPS/predictions/most_recent/", "post", "test_auth_stations_payload.json"),
("just_token", 401, "/api/stations/details/", "get", "test_auth_stations_payload.json"),
],
)
@pytest.mark.usefixtures("mock_client_session")
def test_unauthenticated_requests(token, status, endpoint, verb, payload, spy_access_logging):
client = TestClient(app.main.app)
monkeypatch.setattr(ClientSession, 'get', default_mock_client_get)
if verb == 'post':
response = client.post(endpoint, headers={'Authorization': token}, json=payload)
elif verb == 'get':
response = client.get(endpoint, headers={'Authorization': token})
payload = load_json_file(__file__)(payload)
response = None
if verb == "post":
response = client.post(endpoint, headers={"Authorization": token}, json=payload)
else:
raise NotImplementedError('unexpected verb', verb)
return response
response = client.get(endpoint, headers={"Authorization": token})


@then("Unauthenticated access audit logs are created", converters={'endpoint': str})
def no_access_is_logged(spy_access_logging, endpoint):
"""Access audit logs are created"""
spy_access_logging.assert_called_once_with(None, False, endpoint)


@then(parsers.parse("I will get an error with {status} code"), converters={'status': int})
def status_code(response, status: int):
""" Assert that we receive the expected status code """
assert response.status_code == status
spy_access_logging.assert_called_once_with(None, False, endpoint)


@pytest.mark.parametrize(
"status, endpoint, verb, utc_time",
[
(200, "/api/weather_models/GDPS/predictions/summaries/", "post", 1618870929583),
(200, "/api/weather_models/GDPS/predictions/most_recent/", "post", 1618870929583),
(200, "/api/stations/details/", "get", 1618870929583),
],
)
@pytest.mark.usefixtures("mock_client_session")
@pytest.mark.usefixtures("mock_jwt_decode")
@scenario("test_auth.feature", "Verifying authenticated users")
def test_auth_2nd_scenario():
""" BDD Scenario #2. """


@then("Authenticated access audit logs are created")
def access_is_logged(spy_access_logging, endpoint):
"""Access audit logs are created"""
spy_access_logging.assert_called_once_with("test_username", True, endpoint)


@given(parsers.parse("utc_time: {utc_time}"), converters={'utc_time': int})
def given_utc_time(monkeypatch, utc_time: int):
""" Mock out utc time """
def test_authenticated_requests(status, endpoint, verb, utc_time, spy_access_logging, monkeypatch):
def mock_get_utc_now():
return datetime.fromtimestamp(utc_time / 1000, tz=timezone.utc)
monkeypatch.setattr(app.routers.stations, 'get_utc_now', mock_get_utc_now)


@given(parsers.parse("I am an authenticated user when I {verb} a protected {endpoint}"),
converters={'verb': str, 'endpoint': str},
target_fixture='response_2')
def given_authenticated_user(monkeypatch, endpoint: str, verb: str):
""" Request (post/get) {endpoint} request which is protected """
monkeypatch.setattr(app.routers.stations, "get_utc_now", mock_get_utc_now)
client = TestClient(app.main.app)
monkeypatch.setattr(ClientSession, 'get', default_mock_client_get)
if verb == 'post':
return client.post(endpoint, headers={"Authorization": "Bearer token"}, json={"stations": [838]})
if verb == 'get':
return client.get(
endpoint, headers={'Authorization': 'Bearer token'})
raise NotImplementedError('unexpected verb', verb)

response = None
if verb == "post":
response = client.post(endpoint, headers={"Authorization": "Bearer token"}, json={"stations": [838]})
if verb == "get":
response = client.get(endpoint, headers={"Authorization": "Bearer token"})

@then(parsers.parse("I shouldn't get an unauthorized error {status} code"), converters={'status': int})
def status_code_2(response_2, status: int):
""" Assert that we receive the expected status code """
assert response_2.status_code == status
assert response.status_code == status
spy_access_logging.assert_called_once_with("test_username", True, endpoint)
Loading