Skip to content

Commit

Permalink
feat: add unit tests for queue repo get_by_filters_paged()
Browse files Browse the repository at this point in the history
  • Loading branch information
chisholm committed Oct 18, 2024
1 parent d839bda commit dc62493
Showing 1 changed file with 375 additions and 0 deletions.
375 changes: 375 additions & 0 deletions tests/unit/restapi/test_queue_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
# ACCESS THE FULL CC BY 4.0 LICENSE HERE:
# https://creativecommons.org/licenses/by/4.0/legalcode
import datetime
import itertools
import re

import pytest

import dioptra.restapi.db.models as m
from dioptra.restapi.db.models.constants import resource_lock_types
from dioptra.restapi.db.repository.errors import QueueSortError, UnsupportedFilterField
from dioptra.restapi.db.repository.utils import DeletionPolicy
from dioptra.restapi.db.shared_errors import (
ResourceDeletedError,
Expand Down Expand Up @@ -54,6 +57,103 @@ def queue_snap_setup(queue_repo, account, db, fake_data):
return queues, latest_snaps


@pytest.fixture
def queue_filter_setup(account, db, fake_data):

# names from https://www.behindthename.com/random/

# These are fixed, so I can write tests against them. It's not really
# possible to do that without some fixed data. E.g. you can't write a
# pattern to match some names if you don't know the names. You can't
# search by a tag which matches a few queues, if you don't know which
# queues have which tags.
queue_info = [
{
"name": "Elfreda",
"description": "Type city nor painting Democrat.",
"tags": ["because", "support", "early"]
},
{
"name": "Lorelai",
"description": "Miss daughter bill information performance.",
"tags": ["want", "use", "quickly"]
},
{
"name": "Zelda",
"description": "System prepare suddenly toward activity.",
"tags": ["continue", "use", "down"]
},
{
"name": "Reanna",
"description": "Upon rich time model.",
"tags": ["early", "down", "quickly"]
},
{
"name": "Adelle",
"description": "What know continue true.",
"tags": ["because", "before", "continue"]
},
{
"name": "Kizzy",
"description": "Great crime reveal chair.",
"tags": ["use", "quickly", "early"]
},
{
"name": "Allannah",
"description": "Institution life chair two them.",
"tags": ["early", "continue", "down"]
},
{
"name": "Jodi",
"description": "Society learn easy along set world down however.",
"tags": ["support", "before", "use"]
},
{
"name": "Kaylani",
"description": "Himself work wait service.",
"tags": ["support", "early", "defense"]
},
{
"name": "Arn",
"description": "Responsibility program middle room interest guy.",
"tags": ["use", "down", "early"]
}
]

# Create queue objects
queue_objects = []
for info in queue_info:
queue_object = fake_data.queue(account.user, account.group)
queue_object.name = info["name"]
queue_object.description = info["description"]
queue_objects.append(queue_object)
db.session.add(queue_object)

db.session.commit()

# Create tag objects
tag_names = set(
itertools.chain.from_iterable(info["tags"] for info in queue_info)
)
tags_by_name = {}
for tag_name in tag_names:
tag_object = fake_data.tag(account.user, account.group)
tag_object.name = tag_name
tags_by_name[tag_name] = tag_object
db.session.add(tag_object)

db.session.commit()

# Link tags to queues
for info, queue_object in zip(queue_info, queue_objects):
for tag_name in info["tags"]:
queue_object.tags.append(tags_by_name[tag_name])

db.session.commit()

return queue_objects


def test_queue_create_queue_not_exists(queue_repo, account, db, fake_data):
queue = fake_data.queue(account.user, account.group)
queue_repo.create(queue)
Expand Down Expand Up @@ -642,3 +742,278 @@ def test_queue_delete_not_exist(queue_repo, account, fake_data):

with pytest.raises(ResourceNotFoundError):
queue_repo.delete(queue)


def test_queue_filter_all_unsorted(queue_filter_setup, queue_repo):

results, count = queue_repo.get_by_filters_paged(
group=None,
filters=[], # empty list means get all; no filtering
page_start=0,
page_length=0, # <=0 means get all, no page limit
sort_by=None,
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

assert count == 10
assert len(results) == 10

# no order guarantee, so sort both lists
sorted_results = sorted(results, key=lambda q: q.resource_snapshot_id)
sorted_queues = sorted(queue_filter_setup, key=lambda q: q.resource_snapshot_id)

assert sorted_results == sorted_queues


@pytest.mark.parametrize(
"sort_by, sort_key", [
("name", lambda q: q.name),
("createdOn", lambda q: q.created_on),
# takes toooo long!
# ("lastModifiedOn", lambda q: q.resource.last_modified_on),
("description", lambda q: q.description)
]
)
def test_queue_filter_all_sorted(queue_filter_setup, queue_repo, sort_by, sort_key):

results, count = queue_repo.get_by_filters_paged(
group=None,
filters=[], # empty list means get all; no filtering
page_start=0,
page_length=0, # <=0 means get all, no page limit
sort_by=sort_by,
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

assert count == 10
assert len(results) == 10

sorted_queues = sorted(queue_filter_setup, key=sort_key)
assert results == sorted_queues

# Try again, this time sorting the other way
results, count = queue_repo.get_by_filters_paged(
group=None,
filters=[], # empty list means get all; no filtering
page_start=0,
page_length=0, # <=0 means get all, no page limit
sort_by=sort_by,
descending=True,
deletion_policy=DeletionPolicy.NOT_DELETED
)

assert count == 10
assert len(results) == 10

sorted_queues = sorted(queue_filter_setup, key=sort_key, reverse=True)
assert results == sorted_queues


@pytest.mark.parametrize(
"page_start, page_length", [
(0, 4),
(1, 4),
(5, 99),
(99, 999),
]
)
def test_queue_filter_all_paging(
queue_filter_setup, queue_repo, page_start, page_length
):

results, count = queue_repo.get_by_filters_paged(
group=None,
filters=[], # empty list means get all; no filtering
page_start=page_start,
page_length=page_length,
sort_by="name",
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

assert count == 10
# page_length if the page is fully contained in the data; 10 - page_start
# if page_length takes you off the end; but no less than zero.
assert len(results) == max(0, min(10 - page_start, page_length))

sorted_queues = sorted(queue_filter_setup, key=lambda q: q.name)
assert results == sorted_queues[page_start:page_start + page_length]


def test_queue_filter_name(queue_filter_setup, queue_repo):

results, count = queue_repo.get_by_filters_paged(
group=None,
filters=[{"field": "name", "value": ["Jodi"]}],
page_start=0,
page_length=0,
sort_by=None,
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

jodi = next(q for q in queue_filter_setup if q.name == "Jodi")

assert count == 1
assert results == [jodi]

results, count = queue_repo.get_by_filters_paged(
group=None,
# when field is non-null, values are all just concatenated, so this
# produces the same results
filters=[{"field": "name", "value": ["Jo", "di"]}],
page_start=0,
page_length=0,
sort_by=None,
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

assert count == 1
assert results == [jodi]


def test_queue_filter_name_wild_1(queue_filter_setup, queue_repo):

results, count = queue_repo.get_by_filters_paged(
group=None,
# names which end with "i"
filters=[{"field": "name", "value": ["*", "i"]}],
page_start=0,
page_length=0,
sort_by="name",
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

objs_ending_with_i = sorted(
(q for q in queue_filter_setup if q.name.endswith("i")),
key=lambda q: q.name
)

assert count == 3
assert results == objs_ending_with_i


def test_queue_filter_name_wild_2(queue_filter_setup, queue_repo):

results, count = queue_repo.get_by_filters_paged(
group=None,
filters=[{"field": "name", "value": ["Ze", "?", "da"]}],
page_start=0,
page_length=0,
sort_by=None,
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

zelda = next(q for q in queue_filter_setup if q.name == "Zelda")

assert count == 1
assert results == [zelda]


def test_queue_filter_tag(queue_filter_setup, queue_repo):

results, count = queue_repo.get_by_filters_paged(
group=None,
filters=[{"field": "tag", "value": ["continue"]}],
page_start=0,
page_length=0,
sort_by="name",
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

continues = sorted(
(q for q in queue_filter_setup if "continue" in (
tag.name for tag in q.tags
)),
key=lambda q: q.name
)

assert count == len(continues)
assert results == continues


def test_queue_filter_null_field_1(queue_filter_setup, queue_repo):

# "down" just happens to be a word which occurs both as a tag
# and in a description. ("continue" is another one.)
results, count = queue_repo.get_by_filters_paged(
group=None,
# None field name means search all fields in a fuzzy way
filters=[{"field": None, "value": ["down"]}],
page_start=0,
page_length=0,
sort_by="name",
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

downs = sorted(
(q for q in queue_filter_setup if "down" in (
tag.name for tag in q.tags
) or "down" in q.description or "down" in q.name),
key=lambda q: q.name
)

assert count == len(downs)
assert results == downs


def test_queue_filter_null_field_2(queue_filter_setup, queue_repo):

# "down" just happens to be a word which occurs both as a tag
# and in a description. ("continue" is another one.)
results, count = queue_repo.get_by_filters_paged(
group=None,
# Find objects with two "o"s in any of their searchable fields
filters=[{"field": None, "value": ["o", "o"]}],
page_start=0,
page_length=0,
sort_by="name",
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

# equivalent to the search above
o_re = re.compile(r"o.*o", re.S)

oos = sorted(
(q for q in queue_filter_setup if any(
o_re.search(t.name) for t in q.tags
) or o_re.search(q.description) or o_re.search(q.name)),
key=lambda q: q.name
)

assert count == len(oos)
assert results == oos


def test_queue_filter_unsupported_sort(queue_repo):
with pytest.raises(QueueSortError):
queue_repo.get_by_filters_paged(
group=None,
filters=[],
page_start=0,
page_length=0,
sort_by="wrong",
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)


def test_queue_filter_unsupported_field(queue_repo):
with pytest.raises(UnsupportedFilterField):
queue_repo.get_by_filters_paged(
group=None,
filters=[{"field": "wrong", "value": ["foo"]}],
page_start=0,
page_length=0,
sort_by=None,
descending=False,
deletion_policy=DeletionPolicy.NOT_DELETED
)

0 comments on commit dc62493

Please sign in to comment.