Skip to content

Commit

Permalink
Upload mode rest endpoint (#19)
Browse files Browse the repository at this point in the history
* add rest endpoint for starting and stopping the outbox continuous upload

* use lazy formatting in logging functions

* upload images in batches of 8

* clear shutdown event when starting

* match returned responses of other endpoints

* include outbox_mode router

* review 1: don;t use StrEnum, imrove logging

* add checks if process is already started or stopped

* improve rest response
make sure no further upload thread is started

* fix merge redundancy

* increase batch-size and delay

* move and adjust debug prints

* document EPs in readme

* change debug log texts

* increase upload call timeout (uploading 20 Images may take longer than 30 seconds)

* reduce outbox upload timeout

* typing / readme

* add test_set_outbox_mode and test_rest_outbox_mode

* raise and communicate a timeout exception instead of using a success return value

* fix test_outbox fixture

---------

Co-authored-by: Dr. Dennis Wittich <[email protected]>
  • Loading branch information
NiklasNeugebauer and denniswittich authored Jun 27, 2024
1 parent dbb900f commit 5c3e05e
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 44 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ from learning_loop_node/learning_loop_node

Detector Nodes are normally deployed on edge devices like robots or machinery but can also run in the cloud to provide backend services for an app or similar. These nodes register themself at the Learning Loop. They provide REST and Socket.io APIs to run inference on images. The processed images can automatically be used for active learning: e.g. uncertain predictions will be send to the Learning Loop.

### Running Inference

Images can be send to the detector node via socketio or rest.
The later approach can be used via curl,

Expand All @@ -62,6 +64,26 @@ The detector also has a sio **upload endpoint** that can be used to upload image

The endpoint returns None if the upload was successful and an error message otherwise.

### Changing the outbox mode

If the autoupload is set to `all` or `filtered` (selected) images and the corresponding detections are saved on HDD (the outbox). A background thread will upload the images and detections to the Learning Loop. The outbox is located in the `outbox` folder in the root directory of the node. The outbox can be cleared by deleting the files in the folder.

The continuous upload can be stopped/started via a REST enpoint:

Example Usage:

- Enable upload: `curl -X PUT -d "continuous_upload" http://localhost/outbox_mode`
- Disable upload: `curl -X PUT -d "stopped" http://localhost/outbox_mode`

The current state can be queried via a GET request:
`curl http://localhost/outbox_mode`

### Explicit upload

The detector has a REST endpoint to upload images (and detections) to the Learning Loop. The endpoint takes a POST request with the image and optionally the detections. The image is expected to be in jpg format. The detections are expected to be a json dictionary. Example:

`curl -X POST -F '[email protected]' "http://localhost:/upload"`

## Trainer Node

Trainers fetch the images and anntoations from the Learning Loop to train new models.
Expand Down
6 changes: 4 additions & 2 deletions learning_loop_node/detector/detector_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .rest import backdoor_controls
from .rest import detect as rest_detect
from .rest import operation_mode as rest_mode
from .rest import outbox_mode as rest_outbox_mode
from .rest import upload as rest_upload
from .rest.operation_mode import OperationMode

Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(self, name: str, detector: DetectorLogic, uuid: Optional[str] = Non
self.include_router(rest_upload.router, prefix="")
self.include_router(rest_mode.router, tags=["operation_mode"])
self.include_router(rest_about.router, tags=["about"])
self.include_router(rest_outbox_mode.router, tags=["outbox_mode"])

if use_backdoor_controls:
self.include_router(backdoor_controls.router)
Expand Down Expand Up @@ -89,15 +91,15 @@ async def soft_reload(self) -> None:

async def on_startup(self) -> None:
try:
self.outbox.start_continuous_upload()
self.outbox.ensure_continuous_upload()
self.detector_logic.load_model()
except Exception:
self.log.exception("error during 'startup'")
self.operation_mode = OperationMode.Idle

async def on_shutdown(self) -> None:
try:
self.outbox.stop_continuous_upload()
self.outbox.ensure_continuous_upload_stopped()
for sid in self.connected_clients:
# pylint: disable=no-member
await self.sio.disconnect(sid) # type:ignore
Expand Down
138 changes: 103 additions & 35 deletions learning_loop_node/detector/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import time
from dataclasses import asdict
from datetime import datetime
from enum import Enum
from glob import glob
from io import BufferedReader, TextIOWrapper
from multiprocessing import Event
from multiprocessing.synchronize import Event as SyncEvent
from threading import Thread
Expand All @@ -19,13 +21,18 @@
from ..helpers import environment_reader


class Outbox():
class OutboxMode(Enum):
CONTINUOUS_UPLOAD = 'continuous_upload'
STOPPED = 'stopped'


class Outbox():
def __init__(self) -> None:
self.log = logging.getLogger()
self.path = f'{GLOBALS.data_folder}/outbox'
os.makedirs(self.path, exist_ok=True)

self.log = logging.getLogger()
host = environment_reader.host()
o = environment_reader.organization()
p = environment_reader.project()
Expand All @@ -34,9 +41,12 @@ def __init__(self) -> None:
base_url = f'http{"s" if "learning-loop.ai" in host else ""}://{host}/api'
base: str = base_url
self.target_uri = f'{base}/{o}/projects/{p}/images'
self.log.info(f'Outbox initialized with target_uri: {self.target_uri}')
self.log.info('Outbox initialized with target_uri: %s', self.target_uri)

self.BATCH_SIZE = 20
self.UPLOAD_TIMEOUT_S = 30

self.shutdown_event: Optional[SyncEvent] = None
self.shutdown_event: SyncEvent = Event()
self.upload_process: Optional[Thread] = None

def save(self, image: bytes, detections: Optional[Detections] = None, tags: Optional[List[str]] = None) -> None:
Expand All @@ -59,59 +69,117 @@ def save(self, image: bytes, detections: Optional[Detections] = None, tags: Opti
if os.path.exists(tmp):
os.rename(tmp, self.path + '/' + identifier) # NOTE rename is atomic so upload can run in parallel
else:
self.log.error(f'Could not rename {tmp} to {self.path}/{identifier}')
self.log.error('Could not rename %s to %s', tmp, self.path + '/' + identifier)

def get_data_files(self):
return glob(f'{self.path}/*')

def start_continuous_upload(self):
self.shutdown_event = Event()
self.upload_process = Thread(target=self._continuous_upload)
def ensure_continuous_upload(self):
self.log.debug('start_continuous_upload')
if self._upload_process_alive():
self.log.debug('Upload thread already running')
return

self.shutdown_event.clear()
self.upload_process = Thread(target=self._continuous_upload, name='OutboxUpload')
self.upload_process.start()

def _continuous_upload(self):
self.log.info('start continuous upload')
self.log.info('continuous upload started')
assert self.shutdown_event is not None
while not self.shutdown_event.is_set():
self.upload()
time.sleep(1)
self.log.info('stop continuous upload')
time.sleep(5)
self.log.info('continuous upload ended')

def upload(self):
items = self.get_data_files()
if items:
self.log.info(f'Found {len(items)} images to upload')
for item in items:
if self.shutdown_event and self.shutdown_event.is_set():
break
try:
data = [('files', open(f'{item}/image.json', 'r')),
('files', open(f'{item}/image.jpg', 'rb'))]

response = requests.post(self.target_uri, files=data, timeout=30)
if response.status_code == 200:
shutil.rmtree(item)
self.log.info(f'uploaded {item} successfully')
elif response.status_code == 422:
self.log.error(f'Broken content in {item}: dropping this data')
shutil.rmtree(item)
else:
self.log.error(f'Could not upload {item}: {response.status_code}')
except Exception:
self.log.exception('could not upload files')

def stop_continuous_upload(self, timeout=5):
self.log.info('Found %s images to upload', len(items))
for i in range(0, len(items), self.BATCH_SIZE):
batch_items = items[i:i+self.BATCH_SIZE]
if self.shutdown_event.is_set():
break
try:
self._upload_batch(batch_items)
except Exception:
self.log.exception('Could not upload files')
else:
self.log.info('No images found to upload')

def _upload_batch(self, items: List[str]):
data: List[tuple[str, TextIOWrapper | BufferedReader]] = []
data = [('files', open(f'{item}/image.json', 'r')) for item in items]
data += [('files', open(f'{item}/image.jpg', 'rb')) for item in items]

response = requests.post(self.target_uri, files=data, timeout=self.UPLOAD_TIMEOUT_S)
if response.status_code == 200:
for item in items:
shutil.rmtree(item, ignore_errors=True)
self.log.info('Uploaded %s images successfully', len(items))
elif response.status_code == 422:
if len(items) == 1:
self.log.error('Broken content in image: %s\n Skipping.', items[0])
shutil.rmtree(items[0], ignore_errors=True)
return

self.log.exception('Broken content in batch. Splitting and retrying')
self._upload_batch(items[:len(items)//2])
self._upload_batch(items[len(items)//2:])
else:
self.log.error('Could not upload images: %s', response.content)

def ensure_continuous_upload_stopped(self) -> bool:
self.log.debug('Outbox: Ensuring continuous upload')
if not self._upload_process_alive():
self.log.debug('Upload thread already stopped')
return True
proc = self.upload_process
if not proc:
return
return True

try:
assert self.shutdown_event is not None
self.shutdown_event.set()
assert proc is not None
proc.join(timeout)
proc.join(self.UPLOAD_TIMEOUT_S + 1)
except Exception:
logging.exception('error while shutting down upload thread')
self.log.exception('Error while shutting down upload thread: ')

if proc.is_alive():
self.log.error('upload thread did not terminate')
self.log.error('Upload thread did not terminate')
return False

self.log.info('Upload thread terminated')
return True

def _upload_process_alive(self) -> bool:
return bool(self.upload_process and self.upload_process.is_alive())

def get_mode(self) -> OutboxMode:
''':return: current mode ('continuous_upload' or 'stopped')'''
if self.upload_process and self.upload_process.is_alive():
current_mode = OutboxMode.CONTINUOUS_UPLOAD
else:
current_mode = OutboxMode.STOPPED

self.log.debug('Outbox: Current mode is %s', current_mode)
return current_mode

def set_mode(self, mode: OutboxMode | str):
''':param mode: 'continuous_upload' or 'stopped'
:raises ValueError: if mode is not a valid OutboxMode
:raises TimeoutError: if the upload thread does not terminate within 31 seconds with mode='stopped'
'''
if isinstance(mode, str):
mode = OutboxMode(mode)

if mode == OutboxMode.CONTINUOUS_UPLOAD:
self.ensure_continuous_upload()
elif mode == OutboxMode.STOPPED:
try:
self.ensure_continuous_upload_stopped()
except TimeoutError as e:
raise TimeoutError(f'Upload thread did not terminate within {self.UPLOAD_TIMEOUT_S} seconds.') from e

self.log.debug('set outbox mode to %s', mode)
35 changes: 35 additions & 0 deletions learning_loop_node/detector/rest/outbox_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import PlainTextResponse

from ..outbox import Outbox

router = APIRouter()


@router.get("/outbox_mode")
async def get_outbox_mode(request: Request):
'''
Example Usage
curl http://localhost/outbox_mode
'''
outbox: Outbox = request.app.outbox
return PlainTextResponse(outbox.get_mode().value)


@router.put("/outbox_mode")
async def put_outbox_mode(request: Request):
'''
Example Usage
curl -X PUT -d "continuous_upload" http://localhost/outbox_mode
curl -X PUT -d "stopped" http://localhost/outbox_mode
'''
outbox: Outbox = request.app.outbox
content = str(await request.body(), 'utf-8')
try:
outbox.set_mode(content)
except TimeoutError as e:
raise HTTPException(202, 'Setting has not completed, yet: ' + str(e)) from e
except ValueError as e:
raise HTTPException(422, 'Could not set outbox mode: ' + str(e)) from e

return "OK"
8 changes: 6 additions & 2 deletions learning_loop_node/detector/rest/upload.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import List
from typing import TYPE_CHECKING, List

from fastapi import APIRouter, File, Request, UploadFile

if TYPE_CHECKING:
from ..detector_node import DetectorNode

router = APIRouter()


Expand All @@ -13,5 +16,6 @@ async def upload_image(request: Request, files: List[UploadFile] = File(...)):
curl -X POST -F '[email protected]' "http://localhost:/upload"
"""
raw_files = [await file.read() for file in files]
await request.app.upload_images(raw_files)
node: DetectorNode = request.app
await node.upload_images(raw_files)
return 200, "OK"
16 changes: 16 additions & 0 deletions learning_loop_node/detector/tests/test_client_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,19 @@ async def test_about_endpoint(test_detector_node: DetectorNode):
assert response_dict['state'] == 'online'
assert response_dict['target_model'] == '1.1'
assert any(c.name == 'purple point' for c in model_information.categories)


async def test_rest_outbox_mode(test_detector_node: DetectorNode):
await asyncio.sleep(3)

def check_switch_to_mode(mode: str):
response = requests.put(f'http://localhost:{GLOBALS.detector_port}/outbox_mode',
data=mode, timeout=30)
assert response.status_code == 200
response = requests.get(f'http://localhost:{GLOBALS.detector_port}/outbox_mode', timeout=30)
assert response.status_code == 200
assert response.content == mode.encode()

check_switch_to_mode('stopped')
check_switch_to_mode('continuous_upload')
check_switch_to_mode('stopped')
28 changes: 23 additions & 5 deletions learning_loop_node/detector/tests/test_outbox.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import shutil
from time import sleep

import numpy as np
import pytest
Expand All @@ -21,6 +22,7 @@ def test_outbox():
os.mkdir(test_outbox.path)

yield test_outbox
test_outbox.set_mode('stopped')
shutil.rmtree(test_outbox.path, ignore_errors=True)


Expand Down Expand Up @@ -52,11 +54,7 @@ def test_saving_opencv_image(test_outbox: Outbox):

def test_saving_binary(test_outbox: Outbox):
assert len(test_outbox.get_data_files()) == 0
img = Image.new('RGB', (60, 30), color=(73, 109, 137))
img.save('/tmp/image.jpg')
with open('/tmp/image.jpg', 'rb') as f:
data = f.read()
test_outbox.save(data)
save_test_image_to_outbox(test_outbox)
assert len(test_outbox.get_data_files()) == 1


Expand All @@ -66,3 +64,23 @@ async def test_files_are_automatically_uploaded(test_detector_node: DetectorNode
assert len(test_detector_node.outbox.get_data_files()) == 1

assert len(test_detector_node.outbox.get_data_files()) == 1


def test_set_outbox_mode(test_outbox: Outbox):
test_outbox.set_mode('stopped')
save_test_image_to_outbox(outbox=test_outbox)
sleep(6)
assert len(test_outbox.get_data_files()) == 1, 'File was cleared even though outbox should be stopped'
test_outbox.set_mode('continuous_upload')
sleep(6)
assert len(test_outbox.get_data_files()) == 0, 'File was not cleared even though outbox should be in continuous_upload'

### Helper functions ###


def save_test_image_to_outbox(outbox: Outbox):
img = Image.new('RGB', (60, 30), color=(73, 109, 137))
img.save('/tmp/image.jpg')
with open('/tmp/image.jpg', 'rb') as f:
data = f.read()
outbox.save(data)

0 comments on commit 5c3e05e

Please sign in to comment.