Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow server #652

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
239cf3f
Processor.__init__: fix #274
bertsky Dec 1, 2020
0c3d970
add workflow server and API integration…
bertsky Dec 1, 2020
1cb161c
adapt test_task_sequence
bertsky Dec 1, 2020
63be07d
workflow server: add response on shutdown
bertsky Dec 1, 2020
990857f
add workflow client; add server end-point list-tasks
bertsky Dec 1, 2020
f4e71a8
run_processor: set fileGrps already during instantiation (as some imp…
bertsky Dec 4, 2020
fddb236
workflow CLI: proper exit codes and error logging
bertsky Jan 18, 2021
b4a8bcb
workflow server: trigger the actual instantiation
bertsky Jan 25, 2021
6d15084
workflow server: run single-threaded
bertsky Jan 26, 2021
6e2e7ff
Merge remote-tracking branch 'upstream/master' into workflow-server
bertsky Feb 9, 2021
e34b70a
fix conflicting cli command name
bertsky Feb 9, 2021
1dd2d54
Merge remote-tracking branch 'upstream/master' into workflow-server
bertsky Mar 9, 2021
e637a57
Merge remote-tracking branch 'bertsky/fix-recrop' into workflow-server
bertsky May 13, 2021
e3c992e
workflow server: send appropriate HTTP status codes in case of error
bertsky Jun 9, 2021
2949925
workflow client: show response text in case of server error
bertsky Jun 9, 2021
ccb369a
ocrd.processor.base: also init old_pwd when no workspace yet
bertsky Jun 9, 2021
db14b50
workflow server: run multi-processed / queued…
bertsky Jun 10, 2021
e6d61a3
workflow server: allow workers to opt out of CUDA via envvar
bertsky Jun 11, 2021
cac80d6
workflow server: lock METS while processing
bertsky Jun 13, 2021
6263bb1
add processing server…
bertsky Jun 15, 2021
4b59396
workflow server: do monkey patching below module level to prevent cir…
bertsky Jun 20, 2021
fa1bc37
Merge remote-tracking branch 'upstream/master' into workflow-server
bertsky Jun 30, 2021
fcbcc82
Merge remote-tracking branch 'upstream/master' into workflow-server
bertsky Sep 29, 2021
8193559
workflow server: improve log msg
bertsky Oct 13, 2021
5d48239
Move actuell process logic into seperate function.
jnphilipp Aug 3, 2021
6ff1d40
Add process_images endpoint.
jnphilipp Oct 13, 2021
08658f9
Merge remote-tracking branch 'upstream/master' into workflow-server
bertsky Nov 10, 2021
417faf0
Merge remote-tracking branch 'upstream/master' into workflow-server
bertsky Jan 17, 2022
83b10f5
Merge branch 'master' of ssh://github.com/OCR-D/core into workflow-se…
bertsky Feb 8, 2022
d98daa8
Merge branch 'master' of ssh://github.com/OCR-D/core into workflow-se…
bertsky May 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ocrd/ocrd/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def get_help(self, ctx):
from ocrd.cli.ocrd_tool import ocrd_tool_cli
from ocrd.cli.workspace import workspace_cli
from ocrd.cli.process import process_cli
from ocrd.cli.workflow import workflow_cli
from ocrd.cli.bashlib import bashlib_cli
from ocrd.cli.validate import validate_cli
from ocrd.cli.resmgr import resmgr_cli
Expand All @@ -43,6 +44,7 @@ def cli(**kwargs): # pylint: disable=unused-argument
cli.add_command(ocrd_tool_cli)
cli.add_command(workspace_cli)
cli.add_command(process_cli)
cli.add_command(workflow_cli)
cli.add_command(bashlib_cli)
cli.add_command(zip_cli)
cli.add_command(validate_cli)
Expand Down
5 changes: 3 additions & 2 deletions ocrd/ocrd/cli/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import click

from ocrd_utils import getLogger, initLogging
from ocrd.task_sequence import run_tasks
from ocrd.task_sequence import run_tasks, parse_tasks

from ..decorators import ocrd_loglevel

Expand All @@ -24,9 +24,10 @@
@click.argument('tasks', nargs=-1, required=True)
def process_cli(log_level, mets, page_id, tasks, overwrite):
"""
Process a series of tasks
Run processor CLIs in a series of tasks
"""
initLogging()
log = getLogger('ocrd.cli.process')
tasks = parse_tasks(tasks)
run_tasks(mets, log_level, page_id, tasks, overwrite)
log.info("Finished")
201 changes: 201 additions & 0 deletions ocrd/ocrd/cli/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
Flask application for uwsgi workflow server

(This is not meant to be imported directly, but loaded from uwsgi.)
"""
import base64
import os
import signal
import json
import flask
import uwsgi # added to module path by uwsgi runner

from io import BytesIO
from ocrd_modelfactory import page_from_file
from ocrd_utils import getLogger, initLogging, pushd_popd
from ocrd.task_sequence import run_tasks, parse_tasks
from ocrd.resolver import Resolver
from PIL import Image
from tempfile import TemporaryDirectory


# unwrap user-defined workflow:
tasks = json.loads(uwsgi.opt["tasks"])
loglevel = uwsgi.opt["loglevel"].decode()
timeout_per_page = int(uwsgi.opt["timeout_per_page"])
workers = uwsgi.numproc
where = "GPU" # priority/general worker (i.e. contract worker / wage labourer)
if "CUDA_WORKERS" in os.environ:
gpu_workers = int(os.environ["CUDA_WORKERS"])
assert gpu_workers <= workers, \
"CUDA_WORKERS[%d] <= workers[%d] violated" % (gpu_workers, workers)
else:
gpu_workers = workers

initLogging()
res = Resolver()
app = flask.Flask(__name__)
log = getLogger('ocrd.workflow.server')
if loglevel:
log.setLevel(loglevel)

def setup_where():
global where
log.debug("Setup for worker %d", uwsgi.worker_id())
if uwsgi.worker_id() > gpu_workers:
# avoid GPU
os.environ["CUDA_VISIBLE_DEVICES"] = ""
where = 'CPU'

def setup():
global tasks
setup_where()
log.info("Parsing and instantiating %d tasks (on %s)", len(tasks), where)
tasks = parse_tasks(tasks) # raises exception if invalid (causing worker to exit)
for task in tasks:
task.instantiate() # returns False if impossible (causing CLI fallback below)

@app.route('/process')
def process(): # pylint: disable=unused-variable
log.debug("Processing request: %s", str(flask.request))
if flask.request.args.get("mets"):
mets = flask.request.args["mets"]
else:
return 'Error: No METS', 400
# prevent multiple concurrent requests to the same workspace/METS
if not lock(mets):
return 'Error: Locked METS', 423
if flask.request.args.get('page_id'):
page_id = flask.request.args["page_id"]
else:
page_id = ''
if flask.request.args.get('log_level'):
log_level = flask.request.args["log_level"]
else:
log_level = None
if flask.request.args.get('overwrite'):
overwrite = flask.request.args["overwrite"] in ["True", "true", "1"]
else:
overwrite = False
try:
_process(mets, page_id, log_level, overwrite)
except Exception as e:
log.exception("Request '%s' failed", str(flask.request.args))
unlock(mets)
return 'Failed: %s' % str(e), 500
unlock(mets)
return 'Finished'

def _process(mets, page_id='', log_level=None, overwrite=False):
if page_id:
npages = len(page_id.split(','))
else:
workspace = res.workspace_from_url(mets)
npages = len(workspace.mets.physical_pages)
timeout = timeout_per_page * npages
log.info("Processing %d tasks on %d pages (timeout=%ds)", len(tasks), npages, timeout)
# allow no more than timeout_per_page before restarting worker:
uwsgi.set_user_harakiri(timeout) # go, go, go!
# run the workflow
run_tasks(mets, log_level, page_id, tasks, overwrite)
uwsgi.set_user_harakiri(0) # take a breath!

@app.route('/list-tasks')
def list_tasks(): # pylint: disable=unused-variable
seq = ''
for task in tasks:
seq += '\n' + str(task)
return seq

@app.route('/shutdown')
def shutdown(): # pylint: disable=unused-variable
log.debug("Shutting down")
# does not work ("error managing signal 2 on worker 1"):
# uwsgi.signal(signal.SIGINT)
os.kill(uwsgi.masterpid(), signal.SIGINT)
return 'Stopped'

def lock(mets):
uwsgi.lock()
try:
log.debug("locking '%s'", mets)
if uwsgi.cache_exists(mets):
granted = False
else:
uwsgi.cache_set(mets, b'running')
granted = True
finally:
uwsgi.unlock()
return granted

def unlock(mets):
uwsgi.lock()
try:
log.debug("unlocking '%s'", mets)
uwsgi.cache_del(mets)
finally:
uwsgi.unlock()

@app.route('/process_images', methods=["POST"])
def process_images(): # pylint: disable=undefined-name
log.debug(f"Processing request: {flask.request}")
if flask.request.is_json:
req = flask.request.get_json()

pages = {}
if "pages" in req:
for k, v in req["pages"].items():
pages[k] = v
elif "PAGES" in req:
for k, v in pages["PAGES"].items():
pages[k] = v
else:
return 'Missing "pages" param.', 400

try:
work_dir = TemporaryDirectory()
ws = res.workspace_from_nothing(directory=work_dir.name)

for k, v in pages.items():
img = Image.open(BytesIO(base64.b64decode(v)))
if img.mode != "RGB":
img = img.convert("RGB")
ws.save_image_file(img, k, "OCR-D-IMG", page_id=k, mimetype='image/png')
ws.save_mets()
ws.reload_mets()
except Exception as e:
work_dir.cleanup()
return f"An error occured while decoding image(s) and creating mets.xml. {e}", 400

try:
_process(ws.mets_target)
ws.reload_mets()
for k in pages.keys():
pages[k] = {"img": None, "page": None}

page_file = next(ws.mets.find_files(
pageId=k,
fileGrp=tasks[-1].output_file_grps[0],
))
with pushd_popd(ws.directory):
if page_file and os.path.exists(page_file.local_filename):
with open(page_file.local_filename, "r", encoding="utf8") as f:
pages[k]["page"] = f.read()
img_path = page_from_file(
page_file
).get_Page().get_AlternativeImage()[-1].get_filename()
if img_path and os.path.exists(img_path):
img = Image.open(img_path)
img_file = BytesIO()
img.save(img_file, format="PNG")
pages[k]["img"] = base64.b64encode(img_file.getvalue()).decode("utf8")
except Exception as e:
return f"Failed: {e}", 500
finally:
work_dir.cleanup()

return flask.json.jsonify(pages)
else:
return "Request was not JSON.", 400

setup()
Loading