From 5c3e05efab00e68d401fb9a28c25eb886317afb6 Mon Sep 17 00:00:00 2001 From: Niklas Neugebauer <68709968+NiklasNeugebauer@users.noreply.github.com> Date: Thu, 27 Jun 2024 18:34:11 +0200 Subject: [PATCH] Upload mode rest endpoint (#19) * add rest endpoint for starting and stopping the outbox continuous upload * use lazy formatting in logging functions * upload images in batches of 8 * clear shutdown event when starting * match returned responses of other endpoints * include outbox_mode router * review 1: don;t use StrEnum, imrove logging * add checks if process is already started or stopped * improve rest response make sure no further upload thread is started * fix merge redundancy * increase batch-size and delay * move and adjust debug prints * document EPs in readme * change debug log texts * increase upload call timeout (uploading 20 Images may take longer than 30 seconds) * reduce outbox upload timeout * typing / readme * add test_set_outbox_mode and test_rest_outbox_mode * raise and communicate a timeout exception instead of using a success return value * fix test_outbox fixture --------- Co-authored-by: Dr. Dennis Wittich --- README.md | 22 +++ learning_loop_node/detector/detector_node.py | 6 +- learning_loop_node/detector/outbox.py | 138 +++++++++++++----- .../detector/rest/outbox_mode.py | 35 +++++ learning_loop_node/detector/rest/upload.py | 8 +- .../tests/test_client_communication.py | 16 ++ .../detector/tests/test_outbox.py | 28 +++- 7 files changed, 209 insertions(+), 44 deletions(-) create mode 100644 learning_loop_node/detector/rest/outbox_mode.py diff --git a/README.md b/README.md index dfb0dde1..817e3e7a 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,8 @@ from learning_loop_node/learning_loop_node Detector Nodes are normally deployed on edge devices like robots or machinery but can also run in the cloud to provide backend services for an app or similar. These nodes register themself at the Learning Loop. They provide REST and Socket.io APIs to run inference on images. The processed images can automatically be used for active learning: e.g. uncertain predictions will be send to the Learning Loop. +### Running Inference + Images can be send to the detector node via socketio or rest. The later approach can be used via curl, @@ -62,6 +64,26 @@ The detector also has a sio **upload endpoint** that can be used to upload image The endpoint returns None if the upload was successful and an error message otherwise. +### Changing the outbox mode + +If the autoupload is set to `all` or `filtered` (selected) images and the corresponding detections are saved on HDD (the outbox). A background thread will upload the images and detections to the Learning Loop. The outbox is located in the `outbox` folder in the root directory of the node. The outbox can be cleared by deleting the files in the folder. + +The continuous upload can be stopped/started via a REST enpoint: + +Example Usage: + +- Enable upload: `curl -X PUT -d "continuous_upload" http://localhost/outbox_mode` +- Disable upload: `curl -X PUT -d "stopped" http://localhost/outbox_mode` + +The current state can be queried via a GET request: +`curl http://localhost/outbox_mode` + +### Explicit upload + +The detector has a REST endpoint to upload images (and detections) to the Learning Loop. The endpoint takes a POST request with the image and optionally the detections. The image is expected to be in jpg format. The detections are expected to be a json dictionary. Example: + +`curl -X POST -F 'files=@test.jpg' "http://localhost:/upload"` + ## Trainer Node Trainers fetch the images and anntoations from the Learning Loop to train new models. diff --git a/learning_loop_node/detector/detector_node.py b/learning_loop_node/detector/detector_node.py index 3fa33e12..c7b6cee1 100644 --- a/learning_loop_node/detector/detector_node.py +++ b/learning_loop_node/detector/detector_node.py @@ -27,6 +27,7 @@ from .rest import backdoor_controls from .rest import detect as rest_detect from .rest import operation_mode as rest_mode +from .rest import outbox_mode as rest_outbox_mode from .rest import upload as rest_upload from .rest.operation_mode import OperationMode @@ -57,6 +58,7 @@ def __init__(self, name: str, detector: DetectorLogic, uuid: Optional[str] = Non self.include_router(rest_upload.router, prefix="") self.include_router(rest_mode.router, tags=["operation_mode"]) self.include_router(rest_about.router, tags=["about"]) + self.include_router(rest_outbox_mode.router, tags=["outbox_mode"]) if use_backdoor_controls: self.include_router(backdoor_controls.router) @@ -89,7 +91,7 @@ async def soft_reload(self) -> None: async def on_startup(self) -> None: try: - self.outbox.start_continuous_upload() + self.outbox.ensure_continuous_upload() self.detector_logic.load_model() except Exception: self.log.exception("error during 'startup'") @@ -97,7 +99,7 @@ async def on_startup(self) -> None: async def on_shutdown(self) -> None: try: - self.outbox.stop_continuous_upload() + self.outbox.ensure_continuous_upload_stopped() for sid in self.connected_clients: # pylint: disable=no-member await self.sio.disconnect(sid) # type:ignore diff --git a/learning_loop_node/detector/outbox.py b/learning_loop_node/detector/outbox.py index ca1a200d..ac69a673 100644 --- a/learning_loop_node/detector/outbox.py +++ b/learning_loop_node/detector/outbox.py @@ -5,7 +5,9 @@ import time from dataclasses import asdict from datetime import datetime +from enum import Enum from glob import glob +from io import BufferedReader, TextIOWrapper from multiprocessing import Event from multiprocessing.synchronize import Event as SyncEvent from threading import Thread @@ -19,13 +21,18 @@ from ..helpers import environment_reader -class Outbox(): +class OutboxMode(Enum): + CONTINUOUS_UPLOAD = 'continuous_upload' + STOPPED = 'stopped' + +class Outbox(): def __init__(self) -> None: self.log = logging.getLogger() self.path = f'{GLOBALS.data_folder}/outbox' os.makedirs(self.path, exist_ok=True) + self.log = logging.getLogger() host = environment_reader.host() o = environment_reader.organization() p = environment_reader.project() @@ -34,9 +41,12 @@ def __init__(self) -> None: base_url = f'http{"s" if "learning-loop.ai" in host else ""}://{host}/api' base: str = base_url self.target_uri = f'{base}/{o}/projects/{p}/images' - self.log.info(f'Outbox initialized with target_uri: {self.target_uri}') + self.log.info('Outbox initialized with target_uri: %s', self.target_uri) + + self.BATCH_SIZE = 20 + self.UPLOAD_TIMEOUT_S = 30 - self.shutdown_event: Optional[SyncEvent] = None + self.shutdown_event: SyncEvent = Event() self.upload_process: Optional[Thread] = None def save(self, image: bytes, detections: Optional[Detections] = None, tags: Optional[List[str]] = None) -> None: @@ -59,59 +69,117 @@ def save(self, image: bytes, detections: Optional[Detections] = None, tags: Opti if os.path.exists(tmp): os.rename(tmp, self.path + '/' + identifier) # NOTE rename is atomic so upload can run in parallel else: - self.log.error(f'Could not rename {tmp} to {self.path}/{identifier}') + self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier) def get_data_files(self): return glob(f'{self.path}/*') - def start_continuous_upload(self): - self.shutdown_event = Event() - self.upload_process = Thread(target=self._continuous_upload) + def ensure_continuous_upload(self): + self.log.debug('start_continuous_upload') + if self._upload_process_alive(): + self.log.debug('Upload thread already running') + return + + self.shutdown_event.clear() + self.upload_process = Thread(target=self._continuous_upload, name='OutboxUpload') self.upload_process.start() def _continuous_upload(self): - self.log.info('start continuous upload') + self.log.info('continuous upload started') assert self.shutdown_event is not None while not self.shutdown_event.is_set(): self.upload() - time.sleep(1) - self.log.info('stop continuous upload') + time.sleep(5) + self.log.info('continuous upload ended') def upload(self): items = self.get_data_files() if items: - self.log.info(f'Found {len(items)} images to upload') - for item in items: - if self.shutdown_event and self.shutdown_event.is_set(): - break - try: - data = [('files', open(f'{item}/image.json', 'r')), - ('files', open(f'{item}/image.jpg', 'rb'))] - - response = requests.post(self.target_uri, files=data, timeout=30) - if response.status_code == 200: - shutil.rmtree(item) - self.log.info(f'uploaded {item} successfully') - elif response.status_code == 422: - self.log.error(f'Broken content in {item}: dropping this data') - shutil.rmtree(item) - else: - self.log.error(f'Could not upload {item}: {response.status_code}') - except Exception: - self.log.exception('could not upload files') - - def stop_continuous_upload(self, timeout=5): + self.log.info('Found %s images to upload', len(items)) + for i in range(0, len(items), self.BATCH_SIZE): + batch_items = items[i:i+self.BATCH_SIZE] + if self.shutdown_event.is_set(): + break + try: + self._upload_batch(batch_items) + except Exception: + self.log.exception('Could not upload files') + else: + self.log.info('No images found to upload') + + def _upload_batch(self, items: List[str]): + data: List[tuple[str, TextIOWrapper | BufferedReader]] = [] + data = [('files', open(f'{item}/image.json', 'r')) for item in items] + data += [('files', open(f'{item}/image.jpg', 'rb')) for item in items] + + response = requests.post(self.target_uri, files=data, timeout=self.UPLOAD_TIMEOUT_S) + if response.status_code == 200: + for item in items: + shutil.rmtree(item, ignore_errors=True) + self.log.info('Uploaded %s images successfully', len(items)) + elif response.status_code == 422: + if len(items) == 1: + self.log.error('Broken content in image: %s\n Skipping.', items[0]) + shutil.rmtree(items[0], ignore_errors=True) + return + + self.log.exception('Broken content in batch. Splitting and retrying') + self._upload_batch(items[:len(items)//2]) + self._upload_batch(items[len(items)//2:]) + else: + self.log.error('Could not upload images: %s', response.content) + + def ensure_continuous_upload_stopped(self) -> bool: + self.log.debug('Outbox: Ensuring continuous upload') + if not self._upload_process_alive(): + self.log.debug('Upload thread already stopped') + return True proc = self.upload_process if not proc: - return + return True try: assert self.shutdown_event is not None self.shutdown_event.set() assert proc is not None - proc.join(timeout) + proc.join(self.UPLOAD_TIMEOUT_S + 1) except Exception: - logging.exception('error while shutting down upload thread') + self.log.exception('Error while shutting down upload thread: ') if proc.is_alive(): - self.log.error('upload thread did not terminate') + self.log.error('Upload thread did not terminate') + return False + + self.log.info('Upload thread terminated') + return True + + def _upload_process_alive(self) -> bool: + return bool(self.upload_process and self.upload_process.is_alive()) + + def get_mode(self) -> OutboxMode: + ''':return: current mode ('continuous_upload' or 'stopped')''' + if self.upload_process and self.upload_process.is_alive(): + current_mode = OutboxMode.CONTINUOUS_UPLOAD + else: + current_mode = OutboxMode.STOPPED + + self.log.debug('Outbox: Current mode is %s', current_mode) + return current_mode + + def set_mode(self, mode: OutboxMode | str): + ''':param mode: 'continuous_upload' or 'stopped' + :raises ValueError: if mode is not a valid OutboxMode + :raises TimeoutError: if the upload thread does not terminate within 31 seconds with mode='stopped' + ''' + if isinstance(mode, str): + mode = OutboxMode(mode) + + if mode == OutboxMode.CONTINUOUS_UPLOAD: + self.ensure_continuous_upload() + elif mode == OutboxMode.STOPPED: + try: + self.ensure_continuous_upload_stopped() + except TimeoutError as e: + raise TimeoutError(f'Upload thread did not terminate within {self.UPLOAD_TIMEOUT_S} seconds.') from e + + self.log.debug('set outbox mode to %s', mode) diff --git a/learning_loop_node/detector/rest/outbox_mode.py b/learning_loop_node/detector/rest/outbox_mode.py new file mode 100644 index 00000000..71c2ad93 --- /dev/null +++ b/learning_loop_node/detector/rest/outbox_mode.py @@ -0,0 +1,35 @@ +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import PlainTextResponse + +from ..outbox import Outbox + +router = APIRouter() + + +@router.get("/outbox_mode") +async def get_outbox_mode(request: Request): + ''' + Example Usage + curl http://localhost/outbox_mode + ''' + outbox: Outbox = request.app.outbox + return PlainTextResponse(outbox.get_mode().value) + + +@router.put("/outbox_mode") +async def put_outbox_mode(request: Request): + ''' + Example Usage + curl -X PUT -d "continuous_upload" http://localhost/outbox_mode + curl -X PUT -d "stopped" http://localhost/outbox_mode + ''' + outbox: Outbox = request.app.outbox + content = str(await request.body(), 'utf-8') + try: + outbox.set_mode(content) + except TimeoutError as e: + raise HTTPException(202, 'Setting has not completed, yet: ' + str(e)) from e + except ValueError as e: + raise HTTPException(422, 'Could not set outbox mode: ' + str(e)) from e + + return "OK" diff --git a/learning_loop_node/detector/rest/upload.py b/learning_loop_node/detector/rest/upload.py index df05e2d9..6776e50c 100644 --- a/learning_loop_node/detector/rest/upload.py +++ b/learning_loop_node/detector/rest/upload.py @@ -1,7 +1,10 @@ -from typing import List +from typing import TYPE_CHECKING, List from fastapi import APIRouter, File, Request, UploadFile +if TYPE_CHECKING: + from ..detector_node import DetectorNode + router = APIRouter() @@ -13,5 +16,6 @@ async def upload_image(request: Request, files: List[UploadFile] = File(...)): curl -X POST -F 'files=@test.jpg' "http://localhost:/upload" """ raw_files = [await file.read() for file in files] - await request.app.upload_images(raw_files) + node: DetectorNode = request.app + await node.upload_images(raw_files) return 200, "OK" diff --git a/learning_loop_node/detector/tests/test_client_communication.py b/learning_loop_node/detector/tests/test_client_communication.py index 24fbd095..ee87a0d6 100644 --- a/learning_loop_node/detector/tests/test_client_communication.py +++ b/learning_loop_node/detector/tests/test_client_communication.py @@ -102,3 +102,19 @@ async def test_about_endpoint(test_detector_node: DetectorNode): assert response_dict['state'] == 'online' assert response_dict['target_model'] == '1.1' assert any(c.name == 'purple point' for c in model_information.categories) + + +async def test_rest_outbox_mode(test_detector_node: DetectorNode): + await asyncio.sleep(3) + + def check_switch_to_mode(mode: str): + response = requests.put(f'http://localhost:{GLOBALS.detector_port}/outbox_mode', + data=mode, timeout=30) + assert response.status_code == 200 + response = requests.get(f'http://localhost:{GLOBALS.detector_port}/outbox_mode', timeout=30) + assert response.status_code == 200 + assert response.content == mode.encode() + + check_switch_to_mode('stopped') + check_switch_to_mode('continuous_upload') + check_switch_to_mode('stopped') diff --git a/learning_loop_node/detector/tests/test_outbox.py b/learning_loop_node/detector/tests/test_outbox.py index adf56744..31f6475a 100644 --- a/learning_loop_node/detector/tests/test_outbox.py +++ b/learning_loop_node/detector/tests/test_outbox.py @@ -1,5 +1,6 @@ import os import shutil +from time import sleep import numpy as np import pytest @@ -21,6 +22,7 @@ def test_outbox(): os.mkdir(test_outbox.path) yield test_outbox + test_outbox.set_mode('stopped') shutil.rmtree(test_outbox.path, ignore_errors=True) @@ -52,11 +54,7 @@ def test_saving_opencv_image(test_outbox: Outbox): def test_saving_binary(test_outbox: Outbox): assert len(test_outbox.get_data_files()) == 0 - img = Image.new('RGB', (60, 30), color=(73, 109, 137)) - img.save('/tmp/image.jpg') - with open('/tmp/image.jpg', 'rb') as f: - data = f.read() - test_outbox.save(data) + save_test_image_to_outbox(test_outbox) assert len(test_outbox.get_data_files()) == 1 @@ -66,3 +64,23 @@ async def test_files_are_automatically_uploaded(test_detector_node: DetectorNode assert len(test_detector_node.outbox.get_data_files()) == 1 assert len(test_detector_node.outbox.get_data_files()) == 1 + + +def test_set_outbox_mode(test_outbox: Outbox): + test_outbox.set_mode('stopped') + save_test_image_to_outbox(outbox=test_outbox) + sleep(6) + assert len(test_outbox.get_data_files()) == 1, 'File was cleared even though outbox should be stopped' + test_outbox.set_mode('continuous_upload') + sleep(6) + assert len(test_outbox.get_data_files()) == 0, 'File was not cleared even though outbox should be in continuous_upload' + +### Helper functions ### + + +def save_test_image_to_outbox(outbox: Outbox): + img = Image.new('RGB', (60, 30), color=(73, 109, 137)) + img.save('/tmp/image.jpg') + with open('/tmp/image.jpg', 'rb') as f: + data = f.read() + outbox.save(data)