Skip to content

Commit

Permalink
refactor: reorganize and cleanup the queue restapi endpoint
Browse files Browse the repository at this point in the history
Migrate towards a behavior-driven style of testing for the REST endpoints and away from heavy
mocking. Add docstrings to the refactored queue service methods.
  • Loading branch information
jkglasbrenner committed Oct 17, 2023
1 parent 9cac4e4 commit 4aa7eb6
Show file tree
Hide file tree
Showing 18 changed files with 741 additions and 1,583 deletions.
22 changes: 12 additions & 10 deletions src/dioptra/restapi/job/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import datetime
import uuid
from pathlib import Path
from typing import List, Optional
from typing import List, Optional, cast

import structlog
from injector import inject
Expand All @@ -31,8 +31,7 @@
from dioptra.restapi.app import db
from dioptra.restapi.experiment.errors import ExperimentDoesNotExistError
from dioptra.restapi.experiment.service import ExperimentService
from dioptra.restapi.queue.errors import QueueDoesNotExistError
from dioptra.restapi.queue.service import QueueService
from dioptra.restapi.queue.service import QueueNameService
from dioptra.restapi.shared.rq.service import RQService
from dioptra.restapi.shared.s3.service import S3Service

Expand All @@ -51,13 +50,13 @@ def __init__(
rq_service: RQService,
s3_service: S3Service,
experiment_service: ExperimentService,
queue_service: QueueService,
queue_name_service: QueueNameService,
) -> None:
self._job_form_schema = job_form_schema
self._rq_service = rq_service
self._s3_service = s3_service
self._experiment_service = experiment_service
self._queue_service = queue_service
self._queue_name_service = queue_name_service

@staticmethod
def create(job_form_data: JobFormData, **kwargs) -> Job:
Expand Down Expand Up @@ -101,13 +100,16 @@ def extract_data_from_form(self, job_form: JobForm, **kwargs) -> JobFormData:
if experiment is None:
raise ExperimentDoesNotExistError

queue: Optional[Queue] = self._queue_service.get_unlocked_by_name(
job_form_data["queue"], log=log
queue = cast(
Queue,
self._queue_name_service.get(
job_form_data["queue"],
unlocked_only=True,
error_if_not_found=True,
log=log,
),
)

if queue is None:
raise QueueDoesNotExistError

job_form_data["experiment_id"] = experiment.experiment_id
job_form_data["queue_id"] = queue.queue_id

Expand Down
9 changes: 1 addition & 8 deletions src/dioptra/restapi/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@
ExperimentRegistrationFormData,
)
from .job.model import Job, JobForm, JobFormData
from .queue.model import (
Queue,
QueueLock,
QueueRegistrationForm,
QueueRegistrationFormData,
)
from .queue.model import Queue, QueueLock
from .task_plugin.model import (
TaskPlugin,
TaskPluginUploadForm,
Expand All @@ -44,8 +39,6 @@
"JobForm",
"JobFormData",
"Queue",
"QueueRegistrationForm",
"QueueRegistrationFormData",
"QueueLock",
"TaskPlugin",
"TaskPluginUploadForm",
Expand Down
197 changes: 54 additions & 143 deletions src/dioptra/restapi/queue/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,25 @@
from __future__ import annotations

import uuid
from typing import List, Optional
from typing import Any, cast

import structlog
from flask import jsonify, request
from flask.wrappers import Response
from flask import request
from flask_accepts import accepts, responds
from flask_restx import Namespace, Resource
from injector import inject
from structlog.stdlib import BoundLogger

from dioptra.restapi.utils import as_api_parser
from dioptra.restapi.utils import slugify

from .errors import QueueDoesNotExistError, QueueRegistrationError
from .interface import QueueUpdateInterface
from .model import Queue, QueueRegistrationForm, QueueRegistrationFormData
from .schema import QueueNameUpdateSchema, QueueRegistrationSchema, QueueSchema
from .service import QueueService
from .model import Queue
from .schema import (
IdStatusResponseSchema,
NameStatusResponseSchema,
QueueEditableSchema,
QueueSchema,
)
from .service import QueueNameService, QueueService

LOGGER: BoundLogger = structlog.stdlib.get_logger()

Expand All @@ -54,39 +56,25 @@ def __init__(self, *args, queue_service: QueueService, **kwargs) -> None:
super().__init__(*args, **kwargs)

@responds(schema=QueueSchema(many=True), api=api)
def get(self) -> List[Queue]:
"""Gets a list of all registered queues."""
def get(self) -> list[Queue]:
"""Gets a list of all active queues."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="queue", request_type="GET"
) # noqa: F841
log.info("Request received")
return self._queue_service.get_all_unlocked(log=log)

@api.expect(as_api_parser(api, QueueRegistrationSchema))
@accepts(QueueRegistrationSchema, api=api)
@accepts(schema=QueueEditableSchema, api=api)
@responds(schema=QueueSchema, api=api)
def post(self) -> Queue:
"""Creates a new queue via a queue registration form."""
"""Registers a new queue."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="queue", request_type="POST"
) # noqa: F841
queue_registration_form: QueueRegistrationForm = QueueRegistrationForm()

log.info("Request received")

if not queue_registration_form.validate_on_submit():
log.error("Form validation failed")
raise QueueRegistrationError

log.info("Form validation successful")
queue_registration_form_data: QueueRegistrationFormData = (
self._queue_service.extract_data_from_form(
queue_registration_form=queue_registration_form, log=log
)
)
return self._queue_service.create(
queue_registration_form_data=queue_registration_form_data, log=log
)
parsed_obj = request.parsed_obj # type: ignore
name = slugify(str(parsed_obj["name"]))
return self._queue_service.create(name=name, log=log)


@api.route("/<int:queueId>")
Expand All @@ -106,43 +94,29 @@ def get(self, queueId: int) -> Queue:
request_id=str(uuid.uuid4()), resource="queueId", request_type="GET"
) # noqa: F841
log.info("Request received", queue_id=queueId)
queue: Optional[Queue] = self._queue_service.get_by_id(queueId, log=log)

if queue is None:
log.error("Queue not found", queue_id=queueId)
raise QueueDoesNotExistError

return queue
return cast(
Queue, self._queue_service.get(queueId, error_if_not_found=True, log=log)
)

def delete(self, queueId: int) -> Response:
@responds(schema=IdStatusResponseSchema, api=api)
def delete(self, queueId: int) -> dict[str, Any]:
"""Deletes a queue by its unique identifier."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="queueId", request_type="DELETE"
) # noqa: F841
log.info("Request received", queue_id=queueId)
id: List[int] = self._queue_service.delete_queue(queueId, log=log)
return self._queue_service.delete(queueId, log=log)

return jsonify(dict(status="Success", id=id))

@accepts(schema=QueueNameUpdateSchema, api=api)
@accepts(schema=QueueEditableSchema, api=api)
@responds(schema=QueueSchema, api=api)
def put(self, queueId: int) -> Queue:
"""Modifies a queue by its unique identifier."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="queueId", request_type="PUT"
) # noqa: F841
changes: QueueUpdateInterface = request.parsed_obj # type: ignore
queue: Optional[Queue] = self._queue_service.get_by_id(queueId, log=log)

if queue is None:
log.error("Queue not found", queue_id=queueId)
raise QueueDoesNotExistError

queue = self._queue_service.rename_queue(
queue=queue, new_name=changes["name"], log=log
)

return queue
parsed_obj = request.parsed_obj # type: ignore
new_name = slugify(str(parsed_obj["name"]))
return self._queue_service.rename(queueId, new_name=new_name, log=log)


@api.route("/<int:queueId>/lock")
Expand All @@ -155,37 +129,23 @@ def __init__(self, *args, queue_service: QueueService, **kwargs) -> None:
self._queue_service = queue_service
super().__init__(*args, **kwargs)

def delete(self, queueId: int) -> Response:
@responds(schema=IdStatusResponseSchema, api=api)
def delete(self, queueId: int) -> dict[str, Any]:
"""Removes the lock from the queue (id reference) if it exists."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="QueueIdLock", request_type="DELETE"
) # noqa: F841
log.info("Request received", queue_id=queueId)
queue: Optional[Queue] = self._queue_service.get_by_id(queueId, log=log)

if queue is None:
log.error("Queue not found", queue_id=queueId)
raise QueueDoesNotExistError
return self._queue_service.unlock(queueId, log=log)

id: List[int] = self._queue_service.unlock_queue(queue, log=log)

return jsonify(dict(status="Success", id=id))

def put(self, queueId: int) -> Queue:
@responds(schema=IdStatusResponseSchema, api=api)
def put(self, queueId: int) -> dict[str, Any]:
"""Locks the queue (id reference) if it is unlocked."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="QueueIdLock", request_type="PUT"
) # noqa: F841
log.info("Request received", queue_id=queueId)
queue: Optional[Queue] = self._queue_service.get_by_id(queueId, log=log)

if queue is None:
log.error("Queue not found", queue_id=queueId)
raise QueueDoesNotExistError

id: List[int] = self._queue_service.lock_queue(queue, log=log)

return jsonify(dict(status="Success", id=id)) # type: ignore
return self._queue_service.lock(queueId, log=log)


@api.route("/name/<string:queueName>")
Expand All @@ -194,8 +154,8 @@ class QueueNameResource(Resource):
"""Shows a single queue (name reference) and lets you modify and delete it."""

@inject
def __init__(self, *args, queue_service: QueueService, **kwargs) -> None:
self._queue_service = queue_service
def __init__(self, *args, queue_name_service: QueueNameService, **kwargs) -> None:
self._queue_name_service = queue_name_service
super().__init__(*args, **kwargs)

@responds(schema=QueueSchema, api=api)
Expand All @@ -204,58 +164,25 @@ def get(self, queueName: str) -> Queue:
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="queueName", request_type="GET"
) # noqa: F841
log.info("Request received", queue_name=queueName)
queue: Optional[Queue] = self._queue_service.get_by_name(
queue_name=queueName, log=log
log.info("Request received", queue_name=slugify(queueName))
return cast(
Queue,
self._queue_name_service.get(
slugify(queueName), error_if_not_found=True, log=log
),
)

if queue is None:
log.error("Queue not found", queue_name=queueName)
raise QueueDoesNotExistError

return queue

def delete(self, queueName: str) -> Response:
@responds(schema=NameStatusResponseSchema, api=api)
def delete(self, queueName: str) -> dict[str, Any]:
"""Deletes a queue by its unique name."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()),
resource="queueName",
queue_name=queueName,
request_type="DELETE",
) # noqa: F841
log.info("Request received")
queue: Optional[Queue] = self._queue_service.get_by_name(
queue_name=queueName, log=log
)

if queue is None:
return jsonify(dict(status="Success", id=[]))

id: List[int] = self._queue_service.delete_queue(queue_id=queue.queue_id)

return jsonify(dict(status="Success", id=id))

@accepts(schema=QueueNameUpdateSchema, api=api)
@responds(schema=QueueSchema, api=api)
def put(self, queueName: str) -> Queue:
"""Modifies a queue by its unique name."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="queueName", request_type="PUT"
) # noqa: F841
changes: QueueUpdateInterface = request.parsed_obj # type: ignore
queue: Optional[Queue] = self._queue_service.get_by_name(
queue_name=queueName, log=log
)

if queue is None:
log.error("Queue not found", queue_name=queueName)
raise QueueDoesNotExistError

queue = self._queue_service.rename_queue(
queue=queue, new_name=changes["name"], log=log
)

return queue
log.info("Request received", queue_name=slugify(queueName))
return self._queue_name_service.delete(slugify(queueName), log=log)


@api.route("/name/<string:queueName>/lock")
Expand All @@ -264,42 +191,26 @@ class QueueNameLockResource(Resource):
"""Lets you put a lock on a queue (name reference) and lets you delete it."""

@inject
def __init__(self, *args, queue_service: QueueService, **kwargs) -> None:
self._queue_service = queue_service
def __init__(self, *args, queue_name_service: QueueNameService, **kwargs) -> None:
self._queue_name_service = queue_name_service
super().__init__(*args, **kwargs)

def delete(self, queueName: str) -> Response:
@responds(schema=NameStatusResponseSchema, api=api)
def delete(self, queueName: str) -> dict[str, Any]:
"""Removes the lock from the queue (name reference) if it exists."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()),
resource="QueueNameLock",
request_type="DELETE",
) # noqa: F841
log.info("Request received", queue_name=queueName)
queue: Optional[Queue] = self._queue_service.get_by_name(queueName, log=log)
return self._queue_name_service.unlock(queueName, log=log)

if queue is None:
log.error("Queue not found", queue_name=queueName)
raise QueueDoesNotExistError

id: List[int] = self._queue_service.unlock_queue(queue, log=log)
name: List[str] = [queueName] if id else []

return jsonify(dict(status="Success", name=name))

def put(self, queueName: str) -> Queue:
@responds(schema=NameStatusResponseSchema, api=api)
def put(self, queueName: str) -> dict[str, Any]:
"""Locks the queue (name reference) if it is unlocked."""
log: BoundLogger = LOGGER.new(
request_id=str(uuid.uuid4()), resource="QueueNameLock", request_type="PUT"
) # noqa: F841
log.info("Request received", queue_name=queueName)
queue: Optional[Queue] = self._queue_service.get_by_name(queueName, log=log)

if queue is None:
log.error("Queue not found", queue_name=queueName)
raise QueueDoesNotExistError

id: List[int] = self._queue_service.lock_queue(queue, log=log)
name: List[str] = [queueName] if id else []

return jsonify(dict(status="Success", name=name)) # type: ignore
return self._queue_name_service.lock(queueName, log=log)
Loading

0 comments on commit 4aa7eb6

Please sign in to comment.