From 239cf3f281ad76b4154b3ef66a64b852b387c42a Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Tue, 1 Dec 2020 01:08:35 +0100 Subject: [PATCH 01/21] Processor.__init__: fix #274 --- ocrd/ocrd/processor/base.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ocrd/ocrd/processor/base.py b/ocrd/ocrd/processor/base.py index bc5cb6cf3..1d3c978bb 100644 --- a/ocrd/ocrd/processor/base.py +++ b/ocrd/ocrd/processor/base.py @@ -27,11 +27,8 @@ def __init__( workspace, ocrd_tool=None, parameter=None, - # TODO OCR-D/core#274 - # input_file_grp=None, - # output_file_grp=None, - input_file_grp="INPUT", - output_file_grp="OUTPUT", + input_file_grp=None, + output_file_grp=None, page_id=None, show_help=False, show_version=False, From 0c3d970a73f67bedb3d66d5d8ea7fa45b396981a Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Tue, 1 Dec 2020 01:10:12 +0100 Subject: [PATCH 02/21] =?UTF-8?q?add=20workflow=20server=20and=20API=20int?= =?UTF-8?q?egration=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - add workflow CLI group: - add alias `ocrd workflow process` to `ocrd process` - add new `ocrd workflow server`, running a web server for the given workflow that tries to instantiate all Pythonic processors once (to re-use their API instead of starting CLI each time) - add `run_api` analogue to existing `run_cli` and let `run_processor` delegate to it in `ocrd.processor.helpers`: - `run_processor` only has workspace de/serialization and processor instantiation - `run_api` has core `process()`, but now also enters and leaves the workspace directory, and passes any exceptions - ocrd.task_sequence: differentiate between `parse_tasks` (independent of workspace or fileGrps) and `run_tasks`, generalize `run_tasks` to use either `run_cli` or new `run_api` (where instances are available, avoiding unnecessary METS de/serialisation) - amend `TaskSequence` by `instance` attribute and `instantiate` method: - peek into a CLI to check for Pythonic processors - try to compile and exec, using monkey-patching to disable normal argument passing, execution, and exiting; merely importing and fetching the class of the processor - instantiate processor without workspace or fileGrps - avoid unnecessary CLI call to get ocrd-tool.json --- ocrd/ocrd/cli/__init__.py | 2 + ocrd/ocrd/cli/process.py | 5 +- ocrd/ocrd/cli/workflow.py | 104 ++++++++++++++++++++++ ocrd/ocrd/processor/base.py | 2 +- ocrd/ocrd/processor/helpers.py | 60 ++++++++++--- ocrd/ocrd/task_sequence.py | 152 +++++++++++++++++++++++++++------ 6 files changed, 284 insertions(+), 41 deletions(-) create mode 100644 ocrd/ocrd/cli/workflow.py diff --git a/ocrd/ocrd/cli/__init__.py b/ocrd/ocrd/cli/__init__.py index 1d1aeda26..3b83642bc 100644 --- a/ocrd/ocrd/cli/__init__.py +++ b/ocrd/ocrd/cli/__init__.py @@ -16,6 +16,7 @@ def get_help(self, ctx): from ocrd.cli.ocrd_tool import ocrd_tool_cli from ocrd.cli.workspace import workspace_cli from ocrd.cli.process import process_cli +from ocrd.cli.workflow import workflow_cli from ocrd.cli.bashlib import bashlib_cli from ocrd.cli.validate import validate_cli from ocrd.decorators import ocrd_loglevel @@ -33,6 +34,7 @@ def cli(**kwargs): # pylint: disable=unused-argument cli.add_command(ocrd_tool_cli) cli.add_command(workspace_cli) cli.add_command(process_cli) +cli.add_command(workflow_cli) cli.add_command(bashlib_cli) cli.add_command(zip_cli) cli.add_command(validate_cli) diff --git a/ocrd/ocrd/cli/process.py b/ocrd/ocrd/cli/process.py index 086c669cd..3032754c2 100644 --- a/ocrd/ocrd/cli/process.py +++ b/ocrd/ocrd/cli/process.py @@ -4,7 +4,7 @@ import click from ocrd_utils import getLogger, initLogging -from ocrd.task_sequence import run_tasks +from ocrd.task_sequence import run_tasks, parse_tasks from ..decorators import ocrd_loglevel @@ -19,9 +19,10 @@ @click.argument('tasks', nargs=-1, required=True) def process_cli(log_level, mets, page_id, tasks, overwrite): """ - Process a series of tasks + Run processor CLIs in a series of tasks """ initLogging() log = getLogger('ocrd.cli.process') + tasks = parse_tasks(tasks) run_tasks(mets, log_level, page_id, tasks, overwrite) log.info("Finished") diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py new file mode 100644 index 000000000..784b5a580 --- /dev/null +++ b/ocrd/ocrd/cli/workflow.py @@ -0,0 +1,104 @@ +""" +CLI for task_sequence +""" +import click +import flask + +from ocrd_utils import getLogger, initLogging +from ocrd.task_sequence import run_tasks, parse_tasks + +from ..decorators import ocrd_loglevel +from .process import process_cli + +@click.group("workflow") +def workflow_cli(): + """ + Process a series of tasks + """ + initLogging() + +# ---------------------------------------------------------------------- +# ocrd workflow process +# ---------------------------------------------------------------------- +@workflow_cli.command('process') +@ocrd_loglevel +@click.option('-m', '--mets', help="METS to process", default="mets.xml") +@click.option('-g', '--page-id', help="ID(s) of the pages to process") +@click.option('--overwrite', is_flag=True, default=False, help="Remove output pages/images if they already exist") +@click.argument('tasks', nargs=-1, required=True) +def process_cli_alias(log_level, mets, page_id, tasks, overwrite): + """ + Run processor CLIs in a series of tasks + + (alias for ``ocrd process``) + """ + process_cli(log_level, mets, page_id, tasks, overwrite) + +# ---------------------------------------------------------------------- +# ocrd workflow server +# ---------------------------------------------------------------------- +@workflow_cli.command('server') +@ocrd_loglevel +@click.option('-h', '--host', help="host name/IP to listen at", default='127.0.0.1') +@click.option('-p', '--port', help="TCP port to listen at", default=5000, type=click.IntRange(min=1024)) +@click.argument('tasks', nargs=-1, required=True) +def server_cli(log_level, host, port, tasks): + """ + Start server for a series of tasks to run processor CLIs or APIs on workspaces + + Parse the given tasks and try to instantiate all Pythonic + processors among them with the given parameters. + Open a web server that listens on the given host and port + for GET requests named ``process`` with the following + (URL-encoded) arguments: + + mets (string): Path name (relative to the server's CWD, + or absolute) of the workspace to process + + page_id (string): Comma-separated list of page IDs to process + + overwrite (bool): Remove output pages/images if they already exist + + The server will handle each request by running the tasks + on the given workspace. Pythonic processors will be run via API + (on those same instances). Non-Pythonic processors (or those + not directly accessible in the current venv) will be run via CLI + normally, instantiating each time. + Also, between each contiguous chain of Pythonic tasks in the overall + series, no METS de/serialization will be performed. + + Stop the server by sending SIGINT (e.g. via ctrl+c + on the terminal), or sending a GET request named ``shutdown``. + """ + log = getLogger('ocrd.workflow.server') + log.debug("Parsing and instantiating %d tasks", len(tasks)) + tasks = parse_tasks(tasks) + app = flask.Flask(__name__) + @app.route('/process') + def process(): # pylint: disable=unused-variable + if flask.request.args.get("mets"): + mets = flask.request.args["mets"] + else: + return 'Error: No METS' + if flask.request.args.get('page_id'): + page_id = flask.request.args["page_id"] + else: + page_id = '' + if flask.request.args.get('overwrite'): + overwrite = flask.request.args["overwrite"] in ["True", "true", "1"] + else: + overwrite = False + try: + run_tasks(mets, log_level, page_id, tasks, overwrite) + except Exception as e: + log.exception("Request '%s' failed", str(flask.request.args)) + return 'Failed: %s' % str(e) + return 'Finished' + @app.route('/shutdown') + def shutdown(): # pylint: disable=unused-variable + fun = flask.request.environ.get('werkzeug.server.shutdown') + if fun is None: + raise RuntimeError('Not running with the Werkzeug Server') + fun() + log.debug("Running server on http://%s:%d", host, port) + app.run(host=host, port=port) diff --git a/ocrd/ocrd/processor/base.py b/ocrd/ocrd/processor/base.py index 1d3c978bb..0e293efd5 100644 --- a/ocrd/ocrd/processor/base.py +++ b/ocrd/ocrd/processor/base.py @@ -11,7 +11,7 @@ from ocrd_models.ocrd_page import MetadataItemType, LabelType, LabelsType # XXX imports must remain for backwards-compatibilty -from .helpers import run_cli, run_processor, generate_processor_help # pylint: disable=unused-import +from .helpers import run_api, run_cli, run_processor, generate_processor_help # pylint: disable=unused-import class Processor(): """ diff --git a/ocrd/ocrd/processor/helpers.py b/ocrd/ocrd/processor/helpers.py index 3f6c8e430..242b23799 100644 --- a/ocrd/ocrd/processor/helpers.py +++ b/ocrd/ocrd/processor/helpers.py @@ -2,15 +2,17 @@ Helper methods for running and documenting processors """ from time import perf_counter, process_time +import os import json import inspect -from subprocess import run, PIPE +from subprocess import run from click import wrap_text from ocrd_utils import getLogger __all__ = [ 'generate_processor_help', + 'run_api', 'run_cli', 'run_processor' ] @@ -50,16 +52,44 @@ def run_processor( mets_url, working_dir ) - log = getLogger('ocrd.processor.helpers.run_processor') - log.debug("Running processor %s", processorClass) processor = processorClass( workspace, ocrd_tool=ocrd_tool, + parameter=parameter + ) + error = run_api( + processor, page_id=page_id, input_file_grp=input_file_grp, - output_file_grp=output_file_grp, - parameter=parameter + output_file_grp=output_file_grp ) + if error: + raise error + workspace.save_mets() + return processor + +def run_api(processor, + workspace=None, + page_id=None, + input_file_grp=None, + output_file_grp=None +): # pylint: disable=too-many-locals + """ + Set workspace and fileGrps for processor and run through it + + Args: + processor (object): Processor instance + """ + log = getLogger('ocrd.processor.helpers.run_processor') + log.debug("Running processor %s", processor.__class__.__name__) + if workspace: + processor.workspace = workspace + if page_id: + processor.page_id = page_id + if input_file_grp: + processor.input_file_grp = input_file_grp + if output_file_grp: + processor.output_file_grp = output_file_grp ocrd_tool = processor.ocrd_tool name = '%s v%s' % (ocrd_tool['executable'], processor.version) otherrole = ocrd_tool['steps'][0] @@ -67,26 +97,32 @@ def run_processor( log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole) t0_wall = perf_counter() t0_cpu = process_time() - processor.process() + try: + oldcwd = os.getcwd() + os.chdir(processor.workspace.directory) + processor.process() + except Exception as err: + log.exception("Failure in processor '%s'" % ocrd_tool['executable']) + return err + finally: + os.chdir(oldcwd) t1_wall = perf_counter() - t0_wall t1_cpu = process_time() - t0_cpu logProfile.info("Executing processor '%s' took %fs (wall) %fs (CPU)( [--input-file-grp='%s' --output-file-grp='%s' --parameter='%s']" % ( ocrd_tool['executable'], t1_wall, t1_cpu, - input_file_grp if input_file_grp else '', - output_file_grp if output_file_grp else '', - json.dumps(parameter) if parameter else {} + processor.input_file_grp or '', + processor.output_file_grp or '', + json.dumps(processor.parameter) if processor.parameter else {} )) - workspace.mets.add_agent( + processor.workspace.mets.add_agent( name=name, _type='OTHER', othertype='SOFTWARE', role='OTHER', otherrole=otherrole ) - workspace.save_mets() - return processor def run_cli( executable, diff --git a/ocrd/ocrd/task_sequence.py b/ocrd/ocrd/task_sequence.py index 79c4fdbd3..d83095563 100644 --- a/ocrd/ocrd/task_sequence.py +++ b/ocrd/ocrd/task_sequence.py @@ -1,15 +1,28 @@ import json +import re +import sys from shlex import split as shlex_split from distutils.spawn import find_executable as which # pylint: disable=import-error,no-name-in-module from subprocess import run, PIPE -from ocrd_utils import getLogger, parse_json_string_or_file, set_json_key_value_overrides +# workaround venvs created for Python>=3.8 +from pkg_resources import load_entry_point # pylint: disable=unused-import + +from ocrd_utils import ( + getLogger, + setOverrideLogLevel, + parse_json_string_or_file, + set_json_key_value_overrides +) # from collections import Counter -from ocrd.processor.base import run_cli +from ocrd import decorators +from ocrd.processor.base import run_cli, run_api from ocrd.resolver import Resolver from ocrd_validators import ParameterValidator, WorkspaceValidator from ocrd_models import ValidationReport +_processor_class = None # for exec in ProcessorTask.instantiate + class ProcessorTask(): @classmethod @@ -44,6 +57,7 @@ def __init__(self, executable, input_file_grps, output_file_grps, parameters): self.output_file_grps = output_file_grps self.parameters = parameters self._ocrd_tool_json = None + self.instance = None # for API (instead of CLI) integration @property def ocrd_tool_json(self): @@ -78,6 +92,59 @@ def validate(self): raise Exception("Processor requires output_file_grp but none was provided.") return report + def instantiate(self): + logger = getLogger('ocrd.task_sequence.ProcessorTask') + program = which(self.executable) + if not program: + logger.warning("Cannot find processor '%s' in PATH", self.executable) + return False + # run CLI merely to do imports and fetch class + with open(program) as f: + # check shebang in first line of CLI file for Python + line = f.readline().strip() + if not re.fullmatch('[#][!].*/python[0-9.]*', line): + logger.info("Non-Pythonic processor '%s' breaks the chain", self.executable) + return False + # compile Python processor from CLI file + try: + code = compile(f.read(), program, 'exec') + except (TypeError, SyntaxError, ValueError) as e: + logger.warning("Cannot compile and instantiate processor '%s': %s", + self.executable, str(e)) + return False + # temporarily monkey-patch entry point and sys.exit/sys.argv + def ignore(anything): # pylint: disable=unused-argument + return + global _processor_class + _processor_class = None + def get_processor_class(cls, **kwargs): + global _processor_class + _processor_class = cls + wrap_processor = decorators.ocrd_cli_wrap_processor + decorators.ocrd_cli_wrap_processor = get_processor_class + sys_exit = sys.exit + sys.exit = ignore + sys_argv = sys.argv + sys.argv = [self.executable] + # run Python processor from CLI file + __name__ = '__main__' + try: + exec(code) + logger.info("Instantiating %s for processor '%s'", + _processor_class.__name__, self.executable) + # instantiate processor without workspace + self.instance = _processor_class(None, parameter=self.parameters) + # circumvent calling CLI to get .ocrd_tool_json + self._ocrd_tool_json = self.instance.ocrd_tool + except Exception as e: + logger.warning("Cannot exec and instantiate processor '%s': %s", + self.executable, str(e)) + # reset modules + sys.argv = sys_argv + sys.exit = sys_exit + decorators.ocrd_cli_wrap_processor = wrap_processor + return bool(self.instance) + def __str__(self): ret = '%s -I %s -O %s' % ( self.executable.replace('ocrd-', '', 1), @@ -117,43 +184,76 @@ def validate_tasks(tasks, workspace, page_id=None, overwrite=False): return report -def run_tasks(mets, log_level, page_id, task_strs, overwrite=False): +def parse_tasks(task_strs): + return [ProcessorTask.parse(task_str) for task_str in task_strs] + +def run_tasks(mets, log_level, page_id, tasks, overwrite=False): resolver = Resolver() workspace = resolver.workspace_from_url(mets) + if overwrite: + workspace.overwrite_mode = True log = getLogger('ocrd.task_sequence.run_tasks') - tasks = [ProcessorTask.parse(task_str) for task_str in task_strs] + if log_level: + setOverrideLogLevel(log_level) validate_tasks(tasks, workspace, page_id, overwrite) # Run the tasks + is_first = True + last_is_instance = False for task in tasks: - log.info("Start processing task '%s'", task) - - # execute cli - returncode = run_cli( - task.executable, - mets, - resolver, - workspace, - log_level=log_level, - page_id=page_id, - overwrite=overwrite, - input_file_grp=','.join(task.input_file_grps), - output_file_grp=','.join(task.output_file_grps), - parameter=json.dumps(task.parameters) - ) - - # check return code - if returncode != 0: - raise Exception("%s exited with non-zero return value %s." % (task.executable, returncode)) + is_instance = bool(task.instance) + log.info("Start processing %s task '%s'", + "API" if is_instance else "CLI", task) - log.info("Finished processing task '%s'", task) + if (not is_first and + not is_instance and + last_is_instance): + workspace.save_mets() + + if is_instance: + # execute API + error = run_api( + task.instance, + workspace, + page_id=page_id, + input_file_grp=','.join(task.input_file_grps), + output_file_grp=','.join(task.output_file_grps) + ) - # reload mets - workspace.reload_mets() + if error: + raise error + else: + # execute cli + returncode = run_cli( + task.executable, + mets, + resolver, + workspace, + log_level=log_level, + page_id=page_id, + overwrite=overwrite, + input_file_grp=','.join(task.input_file_grps), + output_file_grp=','.join(task.output_file_grps), + parameter=json.dumps(task.parameters) + ) + + # check return code + if returncode != 0: + raise Exception("%s exited with non-zero return value %s." % (task.executable, returncode)) + + workspace.reload_mets() # check output file groups are in mets for output_file_grp in task.output_file_grps: if not output_file_grp in workspace.mets.file_groups: raise Exception("Invalid state: expected output file group '%s' not in METS (despite processor success)" % output_file_grp) + + log.info("Finished processing task '%s'", task) + + is_first = False + last_is_instance = is_instance + + if last_is_instance: + workspace.save_mets() From 1cb161c5544d607cfa90bb0808e2c2463671b96d Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Tue, 1 Dec 2020 01:27:11 +0100 Subject: [PATCH 03/21] adapt test_task_sequence --- tests/test_task_sequence.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_task_sequence.py b/tests/test_task_sequence.py index e33da7c5f..40d8b5de7 100644 --- a/tests/test_task_sequence.py +++ b/tests/test_task_sequence.py @@ -12,7 +12,7 @@ from ocrd_utils import pushd_popd, MIMETYPE_PAGE from ocrd.resolver import Resolver -from ocrd.task_sequence import run_tasks, validate_tasks, ProcessorTask +from ocrd.task_sequence import parse_tasks, run_tasks, validate_tasks, ProcessorTask class TestOcrdWfStep(TestCase): @@ -141,10 +141,11 @@ def test_task_run(self): ws.add_file('GRP0', content='', local_filename='GRP0/foo', ID='file0', mimetype=MIMETYPE_PAGE, pageId=None) ws.save_mets() files_before = len(ws.mets.find_all_files()) - run_tasks('mets.xml', 'DEBUG', None, [ + tasks = parse_tasks([ "dummy -I OCR-D-IMG -O GRP1", "dummy -I GRP1 -O GRP2", ]) + run_tasks('mets.xml', 'DEBUG', None, tasks) ws.reload_mets() # step 1: 2 images in OCR-D-IMG -> 2 images 2 PAGEXML in GRP1 # step 2: 2 images and 2 PAGEXML in GRP1 -> process just the PAGEXML From 63be07d383eef68943c5229375735970a9b9d1fe Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Tue, 1 Dec 2020 01:43:30 +0100 Subject: [PATCH 04/21] workflow server: add response on shutdown --- ocrd/ocrd/cli/workflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index 784b5a580..75ac2af5b 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -100,5 +100,6 @@ def shutdown(): # pylint: disable=unused-variable if fun is None: raise RuntimeError('Not running with the Werkzeug Server') fun() + return 'Stopped' log.debug("Running server on http://%s:%d", host, port) app.run(host=host, port=port) From 990857f486e8791661d3a287b6d6df0d59c03083 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Tue, 1 Dec 2020 08:35:10 +0100 Subject: [PATCH 05/21] add workflow client; add server end-point list-tasks --- ocrd/ocrd/cli/workflow.py | 71 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index 75ac2af5b..ebc847f8e 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -3,6 +3,7 @@ """ import click import flask +import requests from ocrd_utils import getLogger, initLogging from ocrd.task_sequence import run_tasks, parse_tasks @@ -94,6 +95,12 @@ def process(): # pylint: disable=unused-variable log.exception("Request '%s' failed", str(flask.request.args)) return 'Failed: %s' % str(e) return 'Finished' + @app.route('/list-tasks') + def list_tasks(): # pylint: disable=unused-variable + seq = '' + for task in tasks: + seq += '\n' + str(task) + return seq @app.route('/shutdown') def shutdown(): # pylint: disable=unused-variable fun = flask.request.environ.get('werkzeug.server.shutdown') @@ -103,3 +110,67 @@ def shutdown(): # pylint: disable=unused-variable return 'Stopped' log.debug("Running server on http://%s:%d", host, port) app.run(host=host, port=port) + +# ---------------------------------------------------------------------- +# ocrd workflow client +# ---------------------------------------------------------------------- +@workflow_cli.group('client') +@click.option('-h', '--host', help="host name/IP to listen at", default='127.0.0.1') +@click.option('-p', '--port', help="TCP port to listen at", default=5000, type=click.IntRange(min=1024)) +@click.pass_context +def client_cli(ctx, host, port): + """ + Have the workflow server run commands + """ + url = 'http://' + host + ':' + str(port) + '/' + ctx.ensure_object(dict) + ctx.obj['URL'] = url + +@client_cli.command('process') +@click.option('-m', '--mets', help="METS to process", default="mets.xml") +@click.option('-g', '--page-id', help="ID(s) of the pages to process") +@click.option('--overwrite', is_flag=True, default=False, help="Remove output pages/images if they already exist") +@click.pass_context +def client_process_cli(ctx, mets, page_id, overwrite): + """ + Have the workflow server process another workspace + """ + url = ctx.obj['URL'] + 'process' + params = {'mets': mets, + 'page_id': page_id, + 'overwrite': str(overwrite) + } + response = requests.get(url, params=params) + if response.status_code != 200: + raise Exception("HTTP request failed: %s (HTTP %d)" % ( + response.url, response.status_code)) + print(response.text) + if response.text != 'Finished': + return 1 + +@client_cli.command('list-tasks') +@click.pass_context +def client_process_cli(ctx): + """ + Have the workflow server print the configured task sequence + """ + url = ctx.obj['URL'] + 'list-tasks' + response = requests.get(url) + if response.status_code != 200: + raise Exception("HTTP request failed: %s (HTTP %d)" % ( + response.url, response.status_code)) + print(response.text) + +@client_cli.command('shutdown') +@click.pass_context +def client_process_cli(ctx): + """ + Have the workflow server shutdown gracefully + """ + url = ctx.obj['URL'] + 'shutdown' + response = requests.get(url) + if response.status_code != 200: + raise Exception("HTTP request failed: %s (HTTP %d)" % ( + response.url, response.status_code)) + print(response.text) + From f4e71a88f2f7d1549dfe24a59180c7e55fbb8515 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Fri, 4 Dec 2020 23:53:49 +0100 Subject: [PATCH 06/21] run_processor: set fileGrps already during instantiation (as some implementations currently expect them in the constructor) --- ocrd/ocrd/processor/helpers.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/ocrd/ocrd/processor/helpers.py b/ocrd/ocrd/processor/helpers.py index 242b23799..17b741a3f 100644 --- a/ocrd/ocrd/processor/helpers.py +++ b/ocrd/ocrd/processor/helpers.py @@ -55,14 +55,12 @@ def run_processor( processor = processorClass( workspace, ocrd_tool=ocrd_tool, - parameter=parameter - ) - error = run_api( - processor, - page_id=page_id, input_file_grp=input_file_grp, - output_file_grp=output_file_grp + output_file_grp=output_file_grp, + page_id=page_id, + parameter=parameter ) + error = run_api(processor) if error: raise error workspace.save_mets() From fddb23647668a110ebd69c4be61f1a534c5ab15d Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Mon, 18 Jan 2021 23:43:22 +0100 Subject: [PATCH 07/21] workflow CLI: proper exit codes and error logging --- ocrd/ocrd/cli/workflow.py | 67 ++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index ebc847f8e..9f9feaf69 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -1,6 +1,7 @@ """ CLI for task_sequence """ +import sys import click import flask import requests @@ -115,8 +116,8 @@ def shutdown(): # pylint: disable=unused-variable # ocrd workflow client # ---------------------------------------------------------------------- @workflow_cli.group('client') -@click.option('-h', '--host', help="host name/IP to listen at", default='127.0.0.1') -@click.option('-p', '--port', help="TCP port to listen at", default=5000, type=click.IntRange(min=1024)) +@click.option('-h', '--host', help="host name/IP to request from", default='127.0.0.1') +@click.option('-p', '--port', help="TCP port to request from", default=5000, type=click.IntRange(min=1024)) @click.pass_context def client_cli(ctx, host, port): """ @@ -125,6 +126,7 @@ def client_cli(ctx, host, port): url = 'http://' + host + ':' + str(port) + '/' ctx.ensure_object(dict) ctx.obj['URL'] = url + ctx.obj['log'] = getLogger('ocrd.workflow.client') @client_cli.command('process') @click.option('-m', '--mets', help="METS to process", default="mets.xml") @@ -140,13 +142,23 @@ def client_process_cli(ctx, mets, page_id, overwrite): 'page_id': page_id, 'overwrite': str(overwrite) } - response = requests.get(url, params=params) - if response.status_code != 200: - raise Exception("HTTP request failed: %s (HTTP %d)" % ( - response.url, response.status_code)) - print(response.text) - if response.text != 'Finished': - return 1 + try: + response = requests.get(url, params=params) + response.raise_for_status() + print(response.text) + if response.text == 'Finished': + sys.exit(0) + else: + sys.exit(1) + except requests.exceptions.HTTPError as err: + ctx.obj['log'].error("Server error: %s", err) + except requests.exceptions.ConnectionError as err: + ctx.obj['log'].error("Connection error: %s", err) + except requests.exceptions.Timeout as err: + ctx.obj['log'].error("Timeout error: %s", err) + except requests.exceptions.RequestException as err: + ctx.obj['log'].error("Unknown error: %s", err) + sys.exit(2) @client_cli.command('list-tasks') @click.pass_context @@ -155,11 +167,20 @@ def client_process_cli(ctx): Have the workflow server print the configured task sequence """ url = ctx.obj['URL'] + 'list-tasks' - response = requests.get(url) - if response.status_code != 200: - raise Exception("HTTP request failed: %s (HTTP %d)" % ( - response.url, response.status_code)) - print(response.text) + try: + response = requests.get(url) + response.raise_for_status() + print(response.text) + sys.exit(0) + except requests.exceptions.HTTPError as err: + ctx.obj['log'].error("Server error: %s", err) + except requests.exceptions.ConnectionError as err: + ctx.obj['log'].error("Connection error: %s", err) + except requests.exceptions.Timeout as err: + ctx.obj['log'].error("Timeout error: %s", err) + except requests.exceptions.RequestException as err: + ctx.obj['log'].error("Unknown error: %s", err) + sys.exit(2) @client_cli.command('shutdown') @click.pass_context @@ -168,9 +189,17 @@ def client_process_cli(ctx): Have the workflow server shutdown gracefully """ url = ctx.obj['URL'] + 'shutdown' - response = requests.get(url) - if response.status_code != 200: - raise Exception("HTTP request failed: %s (HTTP %d)" % ( - response.url, response.status_code)) - print(response.text) + try: + response = requests.get(url) + print(response.text) + sys.exit(0) + except requests.exceptions.HTTPError as err: + ctx.obj['log'].error("Server error: %s", err) + except requests.exceptions.ConnectionError as err: + ctx.obj['log'].error("Connection error: %s", err) + except requests.exceptions.Timeout as err: + ctx.obj['log'].error("Timeout error: %s", err) + except requests.exceptions.RequestException as err: + ctx.obj['log'].error("Unknown error: %s", err) + sys.exit(2) From b4a8bcb052cb73032cfdc80a5a16e54524f0bce1 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Mon, 25 Jan 2021 23:41:57 +0100 Subject: [PATCH 08/21] workflow server: trigger the actual instantiation --- ocrd/ocrd/cli/workflow.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index 9f9feaf69..89e91a44b 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -75,6 +75,8 @@ def server_cli(log_level, host, port, tasks): log = getLogger('ocrd.workflow.server') log.debug("Parsing and instantiating %d tasks", len(tasks)) tasks = parse_tasks(tasks) + for task in tasks: + task.instantiate() app = flask.Flask(__name__) @app.route('/process') def process(): # pylint: disable=unused-variable From 6d150840e1e3923101273f3c06ee88862331ce27 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Tue, 26 Jan 2021 14:46:28 +0100 Subject: [PATCH 09/21] workflow server: run single-threaded --- ocrd/ocrd/cli/workflow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index 89e91a44b..15e9bac97 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -112,7 +112,10 @@ def shutdown(): # pylint: disable=unused-variable fun() return 'Stopped' log.debug("Running server on http://%s:%d", host, port) - app.run(host=host, port=port) + # disable multithreading here: + # - GPU processors need to have same thread context between startup and processing + # - we have no multiprocessing server backend anyway (until we move to external server) + app.run(host=host, port=port, debug=False, threaded=False) # ---------------------------------------------------------------------- # ocrd workflow client From e34b70ad925b4540e4ae6e877d16dda885e8f959 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Tue, 9 Feb 2021 14:00:37 +0100 Subject: [PATCH 10/21] fix conflicting cli command name --- ocrd/ocrd/cli/workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index 15e9bac97..a6e9191e5 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -167,7 +167,7 @@ def client_process_cli(ctx, mets, page_id, overwrite): @client_cli.command('list-tasks') @click.pass_context -def client_process_cli(ctx): +def client_list_tasks_cli(ctx): """ Have the workflow server print the configured task sequence """ @@ -189,7 +189,7 @@ def client_process_cli(ctx): @client_cli.command('shutdown') @click.pass_context -def client_process_cli(ctx): +def client_shutdown_cli(ctx): """ Have the workflow server shutdown gracefully """ From e3c992e8ebfd4a85024ba06f79d9ca20c3bc5eb7 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Wed, 9 Jun 2021 17:52:24 +0200 Subject: [PATCH 11/21] workflow server: send appropriate HTTP status codes in case of error --- ocrd/ocrd/cli/workflow.py | 4 ++-- repo/assets | 2 +- repo/spec | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index a6e9191e5..24153cd89 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -83,7 +83,7 @@ def process(): # pylint: disable=unused-variable if flask.request.args.get("mets"): mets = flask.request.args["mets"] else: - return 'Error: No METS' + return 'Error: No METS', 400 if flask.request.args.get('page_id'): page_id = flask.request.args["page_id"] else: @@ -96,7 +96,7 @@ def process(): # pylint: disable=unused-variable run_tasks(mets, log_level, page_id, tasks, overwrite) except Exception as e: log.exception("Request '%s' failed", str(flask.request.args)) - return 'Failed: %s' % str(e) + return 'Failed: %s' % str(e), 500 return 'Finished' @app.route('/list-tasks') def list_tasks(): # pylint: disable=unused-variable diff --git a/repo/assets b/repo/assets index 16b5ad903..32fde9eb2 160000 --- a/repo/assets +++ b/repo/assets @@ -1 +1 @@ -Subproject commit 16b5ad903927cd4cc0c15f13277d5668bddd8f0b +Subproject commit 32fde9eb242c595a1986a193090c689f52eeb734 diff --git a/repo/spec b/repo/spec index d4fe71db7..4e7827881 160000 --- a/repo/spec +++ b/repo/spec @@ -1 +1 @@ -Subproject commit d4fe71db7df95929b52e3d501382a3415585f0b1 +Subproject commit 4e78278813ab108d03557901503396d7766e305e From 2949925aca6a039835e0ee243b0dec29334c555f Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Wed, 9 Jun 2021 18:44:02 +0200 Subject: [PATCH 12/21] workflow client: show response text in case of server error --- ocrd/ocrd/cli/workflow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index 24153cd89..15b3cdf22 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -156,7 +156,7 @@ def client_process_cli(ctx, mets, page_id, overwrite): else: sys.exit(1) except requests.exceptions.HTTPError as err: - ctx.obj['log'].error("Server error: %s", err) + ctx.obj['log'].error("Server error: %s", err.response.text) except requests.exceptions.ConnectionError as err: ctx.obj['log'].error("Connection error: %s", err) except requests.exceptions.Timeout as err: @@ -178,7 +178,7 @@ def client_list_tasks_cli(ctx): print(response.text) sys.exit(0) except requests.exceptions.HTTPError as err: - ctx.obj['log'].error("Server error: %s", err) + ctx.obj['log'].error("Server error: %s", err.response.text) except requests.exceptions.ConnectionError as err: ctx.obj['log'].error("Connection error: %s", err) except requests.exceptions.Timeout as err: @@ -199,7 +199,7 @@ def client_shutdown_cli(ctx): print(response.text) sys.exit(0) except requests.exceptions.HTTPError as err: - ctx.obj['log'].error("Server error: %s", err) + ctx.obj['log'].error("Server error: %s", err.response.text) except requests.exceptions.ConnectionError as err: ctx.obj['log'].error("Connection error: %s", err) except requests.exceptions.Timeout as err: From ccb369a2b94e1cd15ca33f6fd542e9f9469ff884 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Wed, 9 Jun 2021 23:39:33 +0200 Subject: [PATCH 13/21] ocrd.processor.base: also init old_pwd when no workspace yet --- ocrd/ocrd/processor/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ocrd/ocrd/processor/base.py b/ocrd/ocrd/processor/base.py index 0df98201f..722145b96 100644 --- a/ocrd/ocrd/processor/base.py +++ b/ocrd/ocrd/processor/base.py @@ -97,6 +97,7 @@ def __init__( # FIXME HACK would be better to use pushd_popd(self.workspace.directory) # but there is no way to do that in process here since it's an # overridden method. chdir is almost always an anti-pattern. + self.old_pwd = getcwd() if self.workspace: self.old_pwd = getcwd() os.chdir(self.workspace.directory) From db14b507df16070d14355f4675d0a1eb0a91dbd5 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Fri, 11 Jun 2021 01:20:54 +0200 Subject: [PATCH 14/21] =?UTF-8?q?workflow=20server:=20run=20multi-processe?= =?UTF-8?q?d=20/=20queued=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - replace Flask dev server with external uwsgi call - factor out Flask app code into separate Python module which uWSGI can pick up - make uWSGI run given number of workers via multi-processing but not multi-threading, and prefork before loading app (to protect GPU and non-thread-safe processors, and because of GIL) - pass tasks and other settings via CLI options (wrapped in JSON) - set worker Harakiri (reload after timeout) based on number of pages multiplied by given page timeout - add option for number of processes and page timeout --- ocrd/ocrd/cli/server.py | 87 ++++++++++++++++++++++++++++++++++++ ocrd/ocrd/cli/workflow.py | 93 ++++++++++++++++++--------------------- ocrd/requirements.txt | 1 + 3 files changed, 131 insertions(+), 50 deletions(-) create mode 100644 ocrd/ocrd/cli/server.py diff --git a/ocrd/ocrd/cli/server.py b/ocrd/ocrd/cli/server.py new file mode 100644 index 000000000..48dd7cabd --- /dev/null +++ b/ocrd/ocrd/cli/server.py @@ -0,0 +1,87 @@ +""" +Flask application for uwsgi workflow server + +(This is not meant to be imported directly, but loaded from uwsgi.) +""" +import os +import signal +import json +import flask +import uwsgi # added to module path by uwsgi runner + +from ocrd_utils import getLogger, initLogging +from ocrd.task_sequence import run_tasks, parse_tasks +from ocrd.resolver import Resolver + +initLogging() +# unwrap user-defined workflow: +tasks = json.loads(uwsgi.opt["tasks"]) +loglevel = uwsgi.opt["loglevel"].decode() +timeout_per_page = int(uwsgi.opt["timeout_per_page"]) +res = Resolver() +app = flask.Flask(__name__) +log = getLogger('ocrd.workflow.server') +if loglevel: + log.setLevel(loglevel) + +def setup(): + global tasks + log.info("Parsing and instantiating %d tasks", len(tasks)) + tasks = parse_tasks(tasks) # raises exception if invalid (causing worker to exit) + for task in tasks: + task.instantiate() # returns False if impossible (causing CLI fallback below) + +@app.route('/process') +def process(): # pylint: disable=unused-variable + log.debug("Processing request: %s", str(flask.request)) + if flask.request.args.get("mets"): + mets = flask.request.args["mets"] + else: + return 'Error: No METS', 400 + if flask.request.args.get('page_id'): + page_id = flask.request.args["page_id"] + else: + page_id = '' + if flask.request.args.get('log_level'): + log_level = flask.request.args["log_level"] + else: + log_level = None + if flask.request.args.get('overwrite'): + overwrite = flask.request.args["overwrite"] in ["True", "true", "1"] + else: + overwrite = False + try: + if page_id: + npages = len(page_id.split(',')) + else: + workspace = res.workspace_from_url(mets) + npages = len(workspace.mets.physical_pages) + timeout = timeout_per_page * npages + log.info("Processing %d tasks on %d pages (timeout=%ds)", len(tasks), npages, timeout) + # FIXME: prevent multiple concurrent requests to the same workspace/METS + # (use internal routing rules to prevent that, perhaps send 503 or just push to backlog) + # allow no more than timeout_per_page before restarting worker: + uwsgi.set_user_harakiri(timeout) # go, go, go! + # run the workflow + run_tasks(mets, log_level, page_id, tasks, overwrite) + uwsgi.set_user_harakiri(0) # take a breath! + except Exception as e: + log.exception("Request '%s' failed", str(flask.request.args)) + return 'Failed: %s' % str(e), 500 + return 'Finished' + +@app.route('/list-tasks') +def list_tasks(): # pylint: disable=unused-variable + seq = '' + for task in tasks: + seq += '\n' + str(task) + return seq +@app.route('/shutdown') +def shutdown(): # pylint: disable=unused-variable + log.debug("Shutting down") + # does not work ("error managing signal 2 on worker 1"): + # uwsgi.signal(signal.SIGINT) + os.kill(uwsgi.masterpid(), signal.SIGINT) + return 'Stopped' + +setup() diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index 15b3cdf22..b0e750244 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -2,12 +2,13 @@ CLI for task_sequence """ import sys +import os +import json +import subprocess import click -import flask import requests from ocrd_utils import getLogger, initLogging -from ocrd.task_sequence import run_tasks, parse_tasks from ..decorators import ocrd_loglevel from .process import process_cli @@ -41,17 +42,20 @@ def process_cli_alias(log_level, mets, page_id, tasks, overwrite): # ---------------------------------------------------------------------- @workflow_cli.command('server') @ocrd_loglevel +@click.option('-t', '--timeout', help="maximum processing time (in sec per page) before reloading worker (0 to disable)", default=0) +@click.option('-j', '--processes', help="number of parallel workers to spawn", default=1) @click.option('-h', '--host', help="host name/IP to listen at", default='127.0.0.1') @click.option('-p', '--port', help="TCP port to listen at", default=5000, type=click.IntRange(min=1024)) @click.argument('tasks', nargs=-1, required=True) -def server_cli(log_level, host, port, tasks): +def server_cli(log_level, timeout, processes, host, port, tasks): """ Start server for a series of tasks to run processor CLIs or APIs on workspaces Parse the given tasks and try to instantiate all Pythonic processors among them with the given parameters. - Open a web server that listens on the given host and port - for GET requests named ``process`` with the following + Open a web server that listens on the given ``host`` and ``port`` + and queues requests into ``processes`` worker processes + for GET requests named ``/process`` with the following (URL-encoded) arguments: mets (string): Path name (relative to the server's CWD, @@ -59,6 +63,8 @@ def server_cli(log_level, host, port, tasks): page_id (string): Comma-separated list of page IDs to process + log_level (int): Override all logger levels during processing + overwrite (bool): Remove output pages/images if they already exist The server will handle each request by running the tasks @@ -69,53 +75,38 @@ def server_cli(log_level, host, port, tasks): Also, between each contiguous chain of Pythonic tasks in the overall series, no METS de/serialization will be performed. + If processing does not finish before ``timeout`` seconds per page, + then the request will fail and the respective worker be reloaded. + + To see the server's workflow configuration, send a GET request named + ``/list-tasks``. + Stop the server by sending SIGINT (e.g. via ctrl+c - on the terminal), or sending a GET request named ``shutdown``. + on the terminal), or sending a GET request named ``/shutdown``. """ log = getLogger('ocrd.workflow.server') - log.debug("Parsing and instantiating %d tasks", len(tasks)) - tasks = parse_tasks(tasks) - for task in tasks: - task.instantiate() - app = flask.Flask(__name__) - @app.route('/process') - def process(): # pylint: disable=unused-variable - if flask.request.args.get("mets"): - mets = flask.request.args["mets"] - else: - return 'Error: No METS', 400 - if flask.request.args.get('page_id'): - page_id = flask.request.args["page_id"] - else: - page_id = '' - if flask.request.args.get('overwrite'): - overwrite = flask.request.args["overwrite"] in ["True", "true", "1"] - else: - overwrite = False - try: - run_tasks(mets, log_level, page_id, tasks, overwrite) - except Exception as e: - log.exception("Request '%s' failed", str(flask.request.args)) - return 'Failed: %s' % str(e), 500 - return 'Finished' - @app.route('/list-tasks') - def list_tasks(): # pylint: disable=unused-variable - seq = '' - for task in tasks: - seq += '\n' + str(task) - return seq - @app.route('/shutdown') - def shutdown(): # pylint: disable=unused-variable - fun = flask.request.environ.get('werkzeug.server.shutdown') - if fun is None: - raise RuntimeError('Not running with the Werkzeug Server') - fun() - return 'Stopped' - log.debug("Running server on http://%s:%d", host, port) - # disable multithreading here: - # - GPU processors need to have same thread context between startup and processing - # - we have no multiprocessing server backend anyway (until we move to external server) - app.run(host=host, port=port, debug=False, threaded=False) + log.debug("Running server with %d workers on http://%s:%d", processes, host, port) + result = subprocess.run(["uwsgi", "--http-socket", "%s:%d" % (host, port), + "--wsgi-file", os.path.join(os.path.dirname(__file__), 'server.py'), + "--callable", "app", "--need-app", + "--disable-logging", # OCR-D logging is enough + # "--http-keepalive", "true", + # "--add-header", "Connection: Keep-Alive", + "--processes", "%d" % processes, + "--master", "--single-interpreter", + # "--daemonize2", # return from workflow CLI + "--lazy-apps", # fork before loading app + "--no-orphans", # kill workers if master dies + "--die-on-term", # do not reload on SIGTERM + "--reload-on-exception", # reload failed workers + "--enable-threads", # for multithreading in Numpy, TF, ... + # wrap in JSON to retain list/quotes (not possible with pyargv): + "--set", "tasks=%s" % json.dumps(tasks), + # server log level: + "--set", "loglevel=%s" % (log_level or ''), + # worker timeout per page: + "--set", "timeout_per_page=%d" % timeout]) + return result.returncode # ---------------------------------------------------------------------- # ocrd workflow client @@ -134,17 +125,19 @@ def client_cli(ctx, host, port): ctx.obj['log'] = getLogger('ocrd.workflow.client') @client_cli.command('process') +@ocrd_loglevel @click.option('-m', '--mets', help="METS to process", default="mets.xml") @click.option('-g', '--page-id', help="ID(s) of the pages to process") @click.option('--overwrite', is_flag=True, default=False, help="Remove output pages/images if they already exist") @click.pass_context -def client_process_cli(ctx, mets, page_id, overwrite): +def client_process_cli(ctx, log_level, mets, page_id, overwrite): """ Have the workflow server process another workspace """ url = ctx.obj['URL'] + 'process' params = {'mets': mets, 'page_id': page_id, + 'log_level': log_level, 'overwrite': str(overwrite) } try: diff --git a/ocrd/requirements.txt b/ocrd/requirements.txt index 2da0163b7..54b2e6c87 100644 --- a/ocrd/requirements.txt +++ b/ocrd/requirements.txt @@ -5,6 +5,7 @@ requests lxml opencv-python-headless Flask +uwsgi jsonschema pyyaml Deprecated == 1.2.0 From e6d61a338fcfc4f605be7883c19065c33976c53b Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Fri, 11 Jun 2021 18:33:17 +0200 Subject: [PATCH 15/21] workflow server: allow workers to opt out of CUDA via envvar --- ocrd/ocrd/cli/server.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ocrd/ocrd/cli/server.py b/ocrd/ocrd/cli/server.py index 48dd7cabd..bfd8149a1 100644 --- a/ocrd/ocrd/cli/server.py +++ b/ocrd/ocrd/cli/server.py @@ -26,7 +26,12 @@ def setup(): global tasks - log.info("Parsing and instantiating %d tasks", len(tasks)) + if "CUDA_WORKERS" in os.environ and uwsgi.worker_id() > int(os.environ["CUDA_WORKERS"]): + os.environ["CUDA_VISIBLE_DEVICES"] = "" + where = "CPU" + else: + where = "GPU" + log.info("Parsing and instantiating %d tasks (on %s)", len(tasks), where) tasks = parse_tasks(tasks) # raises exception if invalid (causing worker to exit) for task in tasks: task.instantiate() # returns False if impossible (causing CLI fallback below) From cac80d6408dceb87486df55b0f816de9721508f8 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Sun, 13 Jun 2021 14:20:13 +0200 Subject: [PATCH 16/21] workflow server: lock METS while processing --- ocrd/ocrd/cli/server.py | 53 +++++++++++++++++++++++++++++++++------ ocrd/ocrd/cli/workflow.py | 3 ++- 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/ocrd/ocrd/cli/server.py b/ocrd/ocrd/cli/server.py index bfd8149a1..a4e422cb1 100644 --- a/ocrd/ocrd/cli/server.py +++ b/ocrd/ocrd/cli/server.py @@ -13,24 +13,36 @@ from ocrd.task_sequence import run_tasks, parse_tasks from ocrd.resolver import Resolver -initLogging() # unwrap user-defined workflow: tasks = json.loads(uwsgi.opt["tasks"]) loglevel = uwsgi.opt["loglevel"].decode() timeout_per_page = int(uwsgi.opt["timeout_per_page"]) +workers = uwsgi.numproc +where = "GPU" # priority/general worker (i.e. contract worker / wage labourer) +if "CUDA_WORKERS" in os.environ: + gpu_workers = int(os.environ["CUDA_WORKERS"]) + assert gpu_workers <= workers, \ + "CUDA_WORKERS[%d] <= workers[%d] violated" % (gpu_workers, workers) +else: + gpu_workers = workers + +initLogging() res = Resolver() app = flask.Flask(__name__) log = getLogger('ocrd.workflow.server') if loglevel: log.setLevel(loglevel) +def setup_where(): + global where + log.debug("Setup for worker %d", uwsgi.worker_id()) + if uwsgi.worker_id() > gpu_workers: + # avoid GPU + os.environ["CUDA_VISIBLE_DEVICES"] = "" + def setup(): global tasks - if "CUDA_WORKERS" in os.environ and uwsgi.worker_id() > int(os.environ["CUDA_WORKERS"]): - os.environ["CUDA_VISIBLE_DEVICES"] = "" - where = "CPU" - else: - where = "GPU" + setup_where() log.info("Parsing and instantiating %d tasks (on %s)", len(tasks), where) tasks = parse_tasks(tasks) # raises exception if invalid (causing worker to exit) for task in tasks: @@ -43,6 +55,9 @@ def process(): # pylint: disable=unused-variable mets = flask.request.args["mets"] else: return 'Error: No METS', 400 + # prevent multiple concurrent requests to the same workspace/METS + if not lock(mets): + return 'Error: Locked METS', 423 if flask.request.args.get('page_id'): page_id = flask.request.args["page_id"] else: @@ -63,8 +78,6 @@ def process(): # pylint: disable=unused-variable npages = len(workspace.mets.physical_pages) timeout = timeout_per_page * npages log.info("Processing %d tasks on %d pages (timeout=%ds)", len(tasks), npages, timeout) - # FIXME: prevent multiple concurrent requests to the same workspace/METS - # (use internal routing rules to prevent that, perhaps send 503 or just push to backlog) # allow no more than timeout_per_page before restarting worker: uwsgi.set_user_harakiri(timeout) # go, go, go! # run the workflow @@ -72,7 +85,9 @@ def process(): # pylint: disable=unused-variable uwsgi.set_user_harakiri(0) # take a breath! except Exception as e: log.exception("Request '%s' failed", str(flask.request.args)) + unlock(mets) return 'Failed: %s' % str(e), 500 + unlock(mets) return 'Finished' @app.route('/list-tasks') @@ -81,6 +96,7 @@ def list_tasks(): # pylint: disable=unused-variable for task in tasks: seq += '\n' + str(task) return seq + @app.route('/shutdown') def shutdown(): # pylint: disable=unused-variable log.debug("Shutting down") @@ -89,4 +105,25 @@ def shutdown(): # pylint: disable=unused-variable os.kill(uwsgi.masterpid(), signal.SIGINT) return 'Stopped' +def lock(mets): + uwsgi.lock() + try: + log.debug("locking '%s'", mets) + if uwsgi.cache_exists(mets): + granted = False + else: + uwsgi.cache_set(mets, b'running') + granted = True + finally: + uwsgi.unlock() + return granted + +def unlock(mets): + uwsgi.lock() + try: + log.debug("unlocking '%s'", mets) + uwsgi.cache_del(mets) + finally: + uwsgi.unlock() + setup() diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index b0e750244..b205e5ebd 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -89,7 +89,7 @@ def server_cli(log_level, timeout, processes, host, port, tasks): result = subprocess.run(["uwsgi", "--http-socket", "%s:%d" % (host, port), "--wsgi-file", os.path.join(os.path.dirname(__file__), 'server.py'), "--callable", "app", "--need-app", - "--disable-logging", # OCR-D logging is enough + # "--disable-logging", # OCR-D logging is enough # "--http-keepalive", "true", # "--add-header", "Connection: Keep-Alive", "--processes", "%d" % processes, @@ -100,6 +100,7 @@ def server_cli(log_level, timeout, processes, host, port, tasks): "--die-on-term", # do not reload on SIGTERM "--reload-on-exception", # reload failed workers "--enable-threads", # for multithreading in Numpy, TF, ... + "--cache2", "name=workspace_lock,items=100", # wrap in JSON to retain list/quotes (not possible with pyargv): "--set", "tasks=%s" % json.dumps(tasks), # server log level: From 6263bb107941ff25278b75c8996e5e39db35ff87 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Wed, 16 Jun 2021 01:28:54 +0200 Subject: [PATCH 17/21] =?UTF-8?q?add=20processing=20server=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - add `--server` option to CLI decorator - implement via new `ocrd.server.ProcessingServer`: - based on gunicorn (for preforking directly from configured CLI in Python, but instantiating the processor after forking to avoid any shared GPU context) - using multiprocessing.Lock and Manager to lock (synchronize) workspaces among workers - using signal.alarm for worker timeout mechanics - using pre- and post-fork hooks for GPU- vs CPU- worker mechanics - doing Workspace validation within the request --- ocrd/ocrd/decorators/__init__.py | 43 ++++-- ocrd/ocrd/decorators/ocrd_cli_options.py | 1 + ocrd/ocrd/processor/__init__.py | 1 + ocrd/ocrd/processor/base.py | 1 + ocrd/ocrd/processor/helpers.py | 4 + ocrd/ocrd/server.py | 186 +++++++++++++++++++++++ ocrd/requirements.txt | 1 + 7 files changed, 225 insertions(+), 12 deletions(-) create mode 100644 ocrd/ocrd/server.py diff --git a/ocrd/ocrd/decorators/__init__.py b/ocrd/ocrd/decorators/__init__.py index 8e35155ef..fb2b0352d 100644 --- a/ocrd/ocrd/decorators/__init__.py +++ b/ocrd/ocrd/decorators/__init__.py @@ -14,6 +14,7 @@ from ..resolver import Resolver from ..processor.base import run_processor +from ..server import ProcessingServer from .loglevel_option import ocrd_loglevel from .parameter_option import parameter_option, parameter_override_option @@ -21,18 +22,19 @@ from .mets_find_options import mets_find_options def ocrd_cli_wrap_processor( - processorClass, - ocrd_tool=None, - mets=None, - working_dir=None, - dump_json=False, - help=False, # pylint: disable=redefined-builtin - version=False, - overwrite=False, - show_resource=None, - list_resources=False, - **kwargs -): + processorClass, + ocrd_tool=None, + mets=None, + working_dir=None, + server=None, + log_level=None, + dump_json=False, + help=False, # pylint: disable=redefined-builtin + version=False, + overwrite=False, + show_resource=None, + list_resources=False, + **kwargs): if not sys.argv[1:]: processorClass(workspace=None, show_help=True) sys.exit(1) @@ -46,6 +48,23 @@ def ocrd_cli_wrap_processor( list_resources=list_resources ) sys.exit() + elif server: + initLogging() + LOG = getLogger('ocrd_cli_wrap_processor') + # Merge parameter overrides and parameters + if 'parameter_override' in kwargs: + set_json_key_value_overrides(kwargs['parameter'], *kwargs['parameter_override']) + # instantiate processor without workspace + processorArgs = dict() + for param in kwargs: + if param in ['parameter', 'input_file_grp', 'output_file_grp', 'page_timeout']: + processorArgs[param] = kwargs[param] + host, port, workers = server + options = {'bind': '%s:%s' % (host, port), + 'workers': workers, + 'loglevel': log_level} + server = ProcessingServer(processorClass, processorArgs, options) + server.run() else: initLogging() LOG = getLogger('ocrd_cli_wrap_processor') diff --git a/ocrd/ocrd/decorators/ocrd_cli_options.py b/ocrd/ocrd/decorators/ocrd_cli_options.py index 9f7f8cafa..57be3fd78 100644 --- a/ocrd/ocrd/decorators/ocrd_cli_options.py +++ b/ocrd/ocrd/decorators/ocrd_cli_options.py @@ -26,6 +26,7 @@ def cli(mets_url): option('-O', '--output-file-grp', help='File group(s) used as output.', default='OUTPUT'), option('-g', '--page-id', help="ID(s) of the pages to process"), option('--overwrite', help="Overwrite the output file group or a page range (--page-id)", is_flag=True, default=False), + option('-s', '--server', help='Run web server instead of one-shot processing (shifts mets/working-dir/page-id options to HTTP request arguments); pass network interface to bind to, TCP port, number of worker processes', nargs=3), option('-C', '--show-resource', help='Dump the content of processor resource RESNAME', metavar='RESNAME'), option('-L', '--list-resources', is_flag=True, default=False, help='List names of processor resources'), parameter_option, diff --git a/ocrd/ocrd/processor/__init__.py b/ocrd/ocrd/processor/__init__.py index f01e2b3c9..ed3e7e1cb 100644 --- a/ocrd/ocrd/processor/__init__.py +++ b/ocrd/ocrd/processor/__init__.py @@ -2,6 +2,7 @@ Processor, ) from .helpers import ( + run_api, run_cli, run_processor, generate_processor_help diff --git a/ocrd/ocrd/processor/base.py b/ocrd/ocrd/processor/base.py index 722145b96..68a17c5c5 100644 --- a/ocrd/ocrd/processor/base.py +++ b/ocrd/ocrd/processor/base.py @@ -53,6 +53,7 @@ def __init__( input_file_grp=None, output_file_grp=None, page_id=None, + server=None, show_resource=None, list_resources=False, show_help=False, diff --git a/ocrd/ocrd/processor/helpers.py b/ocrd/ocrd/processor/helpers.py index 049602335..0e6246fa1 100644 --- a/ocrd/ocrd/processor/helpers.py +++ b/ocrd/ocrd/processor/helpers.py @@ -215,6 +215,10 @@ def wrap(s): or JSON file path -P, --param-override KEY VAL Override a single JSON object key-value pair, taking precedence over --parameter + -s, --server HOST PORT WORKERS Run web server instead of one-shot processing + (shifts mets/working-dir/page-id options to + HTTP request arguments); pass network interface + to bind to, TCP port, number of worker processes -m, --mets URL-PATH URL or file path of METS to process -w, --working-dir PATH Working directory of local workspace -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE] diff --git a/ocrd/ocrd/server.py b/ocrd/ocrd/server.py new file mode 100644 index 000000000..b3496ee84 --- /dev/null +++ b/ocrd/ocrd/server.py @@ -0,0 +1,186 @@ +""" +Flask application and gunicorn processing server for Processor +""" +import os +import signal +import multiprocessing as mp +import atexit +import json +import flask +import gunicorn.app.base + +from ocrd_validators import WorkspaceValidator +from ocrd_utils import getLogger +from ocrd.task_sequence import ProcessorTask +from .processor import run_api +from . import Resolver + +class ProcessingServer(gunicorn.app.base.BaseApplication): + + def __init__(self, processorClass, processorArgs, options=None): + # happens in pre-fork context + self.options = options or {'bind': '127.0.0.1:5000', 'workers': 1} + # TODOs: + # - add 'CUDA_VISIBLE_DEVICES' to 'raw_env' to options (server level instead of worker level) + # - customize 'errorlog' (over stdout) in options + # - customize 'accesslog' (over None) in options + self.options['accesslog'] = '-' + self.options['access_log_format'] = '%(t)s "%(r)s" %(s)s %(b)s "%(T)s"' + # - customize 'logger_class' in options + # - customize 'logconfig' or 'logconfig_dict' in options + # - customize 'access_log_format' in options + self.options['timeout'] = 0 # disable (timeout managed by workers on request level) + self.options['preload_app'] = False # instantiate workers independently + self.options['pre_fork'] = pre_fork # see below + self.options['post_fork'] = post_fork # see below + self.options['pre_request'] = pre_request # see below + self.options['post_request'] = post_request # see below + self.options['worker_abort'] = worker_abort # see below + self.processor_cls = processorClass + self.processor_opt = processorArgs + self.master_pid = os.getpid() + manager = mp.Manager() + self.master_lock = manager.Lock() + self.master_cache = manager.dict() + # (Manager creates an additional mp.Process on __enter__, + # and registers an atexit handler joining that in __exit__, + # but our forked workers inherit this. To prevent attempting + # to join a non-child, we need to remove that in post_fork.) + super().__init__() + + def load_config(self): + config = {key: value for key, value in self.options.items() + if key in self.cfg.settings and value is not None} + for key, value in config.items(): + self.cfg.set(key.lower(), value) + + def load(self): + # happens in (forked) worker context (because preload_app=False) + # instantiate + self.obj = self.processor_cls(None, **self.processor_opt) + self.exe = self.obj.ocrd_tool['executable'] + self.res = Resolver() + self.log = getLogger('ocrd.processor.server') + self.app = flask.Flask(self.exe) + # add routes + self.app.add_url_rule('/process', None, self.process) + self.app.add_url_rule('/list-tasks', None, self.list_tasks) + self.app.add_url_rule('/shutdown', None, self.shutdown) + return self.app + + def process(self): + self.log.debug("Processing request: %s", str(flask.request)) + if flask.request.args.get("mets"): + mets = flask.request.args["mets"] + else: + return 'Error: No METS', 400 + # prevent multiple concurrent requests to the same workspace/METS + if not self.lock(mets): + return 'Error: Locked METS', 423 + if flask.request.args.get('page_id'): + page_id = flask.request.args["page_id"] + else: + page_id = '' + # if flask.request.args.get('log_level'): + # log_level = flask.request.args["log_level"] + # else: + # log_level = None + if flask.request.args.get('overwrite'): + overwrite = flask.request.args["overwrite"] in ["True", "true", "1"] + else: + overwrite = False + try: + workspace = self.res.workspace_from_url(mets) + workspace.overwrite_mode = overwrite + report = WorkspaceValidator.check_file_grp( + workspace, + self.obj.input_file_grp, + '' if overwrite else self.obj.output_file_grp, + page_id) + if not report.is_valid: + raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors)) + if page_id: + npages = len(page_id.split(',')) + else: + npages = len(workspace.mets.physical_pages) + # allow no more than page_timeout before restarting worker: + timeout = getattr(self.obj, 'page_timeout', 0) + timeout *= npages + self.log.info("Processing %s on %d pages of '%s' (timeout=%ds)", self.exe, npages, mets, timeout) + with Timeout(timeout, "processing %s on %s cancelled after %d seconds on %d pages" % ( + self.exe, mets, timeout, npages)): + # run the workflow + error = run_api(self.obj, workspace, page_id) + if error: + raise error + workspace.save_mets() + except Exception as e: + self.log.exception("Request '%s' failed", str(flask.request.args)) + self.unlock(mets) + return 'Failed: %s' % str(e), 500 + self.unlock(mets) + return 'Finished' + + def list_tasks(self): + task = ProcessorTask(self.exe, [self.obj.input_file_grp], [self.obj.output_file_grp], self.obj.parameter) + return str(task) + '\n' + + def shutdown(self): + self.log.debug("Shutting down") + os.kill(self.master_pid, signal.SIGTERM) + return 'Stopped' + + def lock(self, resource): + with self.master_lock: + if resource in self.master_cache: + return False + self.master_cache[resource] = True + return True + def unlock(self, resource): + with self.master_lock: + del self.master_cache[resource] + +class Timeout: + def __init__(self, seconds, message): + self.seconds = seconds + self.message = message + def _handler(self, signum, stack): + raise TimeoutError(self.message) + def __enter__(self): + signal.signal(signal.SIGALRM, self._handler) + signal.alarm(self.seconds) + def __exit__(self, *args): + signal.alarm(0) + +def pre_fork(server, worker): + # happens when worker (but not app/processor) was instantiated (but not forked yet) + worker.num_workers = server.num_workers # nominal value + worker.worker_id = len(server.WORKERS) + 1 # actual value + +def post_fork(server, worker): + # happens when worker (but not app/processor) was was instantiated (and forked) + # remove atexit handler for multiprocessing.Manager process + atexit.unregister(mp.util._exit_function) + # differentiate GPU workers from CPU workers via envvar + if "CUDA_WORKERS" in os.environ: + cuda_workers = int(os.environ["CUDA_WORKERS"]) + assert cuda_workers <= worker.num_workers, \ + "CUDA_WORKERS[%d] <= workers[%d] violated" % (cuda_workers, worker.num_workers) + else: + cuda_workers = worker.num_workers + if worker.worker_id > cuda_workers: + worker.log.debug("Setup for worker %d (non-CUDA)", worker.worker_id) + os.environ["CUDA_VISIBLE_DEVICES"] = "" # avoid GPU + else: + worker.log.debug("Setup for worker %d (normal)", worker.worker_id) + +def pre_request(worker, req): + worker.log.debug("%s %s at worker %d" % (req.method, req.path, worker.worker_id)) + +def post_request(worker, req, env, res): + worker.log.debug("%s %s at worker %d: %s" % (req.method, req.path, worker.worker_id, res)) + +def worker_abort(worker): + worker.log.debug("aborting worker %s", worker) + # FIXME: skip/fallback remaining pages, save_mets, signalling ... + # worker.app.obj.clean_up() diff --git a/ocrd/requirements.txt b/ocrd/requirements.txt index 54b2e6c87..fd49a05a3 100644 --- a/ocrd/requirements.txt +++ b/ocrd/requirements.txt @@ -5,6 +5,7 @@ requests lxml opencv-python-headless Flask +gunicorn uwsgi jsonschema pyyaml From 4b593969f7b690b6b5e05cff86385e5ba4adf8e1 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Sun, 20 Jun 2021 16:57:11 +0200 Subject: [PATCH 18/21] workflow server: do monkey patching below module level to prevent circular import --- ocrd/ocrd/task_sequence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocrd/ocrd/task_sequence.py b/ocrd/ocrd/task_sequence.py index d83095563..d10ff9ce5 100644 --- a/ocrd/ocrd/task_sequence.py +++ b/ocrd/ocrd/task_sequence.py @@ -15,7 +15,6 @@ set_json_key_value_overrides ) # from collections import Counter -from ocrd import decorators from ocrd.processor.base import run_cli, run_api from ocrd.resolver import Resolver from ocrd_validators import ParameterValidator, WorkspaceValidator @@ -93,6 +92,7 @@ def validate(self): return report def instantiate(self): + from ocrd import decorators logger = getLogger('ocrd.task_sequence.ProcessorTask') program = which(self.executable) if not program: From 8193559716ec9ff952d0a41fdb18109a17d8fee6 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Wed, 13 Oct 2021 09:47:34 +0200 Subject: [PATCH 19/21] workflow server: improve log msg --- ocrd/ocrd/cli/server.py | 1 + repo/spec | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ocrd/ocrd/cli/server.py b/ocrd/ocrd/cli/server.py index a4e422cb1..626f8b048 100644 --- a/ocrd/ocrd/cli/server.py +++ b/ocrd/ocrd/cli/server.py @@ -39,6 +39,7 @@ def setup_where(): if uwsgi.worker_id() > gpu_workers: # avoid GPU os.environ["CUDA_VISIBLE_DEVICES"] = "" + where = 'CPU' def setup(): global tasks diff --git a/repo/spec b/repo/spec index 72c5efe8a..4e7827881 160000 --- a/repo/spec +++ b/repo/spec @@ -1 +1 @@ -Subproject commit 72c5efe8a04393c82de5058d70e891fe7e0e0cde +Subproject commit 4e78278813ab108d03557901503396d7766e305e From 5d482394165d2a67031f9bc065877551a65c4118 Mon Sep 17 00:00:00 2001 From: "J. Nathanael Philipp" Date: Tue, 3 Aug 2021 09:51:51 +0200 Subject: [PATCH 20/21] Move actuell process logic into seperate function. --- ocrd/ocrd/cli/server.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/ocrd/ocrd/cli/server.py b/ocrd/ocrd/cli/server.py index 626f8b048..a013946f7 100644 --- a/ocrd/ocrd/cli/server.py +++ b/ocrd/ocrd/cli/server.py @@ -72,18 +72,7 @@ def process(): # pylint: disable=unused-variable else: overwrite = False try: - if page_id: - npages = len(page_id.split(',')) - else: - workspace = res.workspace_from_url(mets) - npages = len(workspace.mets.physical_pages) - timeout = timeout_per_page * npages - log.info("Processing %d tasks on %d pages (timeout=%ds)", len(tasks), npages, timeout) - # allow no more than timeout_per_page before restarting worker: - uwsgi.set_user_harakiri(timeout) # go, go, go! - # run the workflow - run_tasks(mets, log_level, page_id, tasks, overwrite) - uwsgi.set_user_harakiri(0) # take a breath! + _process(mets, page_id, log_level, overwrite) except Exception as e: log.exception("Request '%s' failed", str(flask.request.args)) unlock(mets) @@ -91,6 +80,20 @@ def process(): # pylint: disable=unused-variable unlock(mets) return 'Finished' +def _process(mets, page_id='', log_level=None, overwrite=False): + if page_id: + npages = len(page_id.split(',')) + else: + workspace = res.workspace_from_url(mets) + npages = len(workspace.mets.physical_pages) + timeout = timeout_per_page * npages + log.info("Processing %d tasks on %d pages (timeout=%ds)", len(tasks), npages, timeout) + # allow no more than timeout_per_page before restarting worker: + uwsgi.set_user_harakiri(timeout) # go, go, go! + # run the workflow + run_tasks(mets, log_level, page_id, tasks, overwrite) + uwsgi.set_user_harakiri(0) # take a breath! + @app.route('/list-tasks') def list_tasks(): # pylint: disable=unused-variable seq = '' From 6ff1d407eced8f8d3b4c96076d1234a5dda57c2f Mon Sep 17 00:00:00 2001 From: "J. Nathanael Philipp" Date: Wed, 13 Oct 2021 09:43:03 +0200 Subject: [PATCH 21/21] Add process_images endpoint. --- ocrd/ocrd/cli/server.py | 70 ++++++++++++++++++++++++++++++++++++++- ocrd/ocrd/cli/workflow.py | 3 +- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/ocrd/ocrd/cli/server.py b/ocrd/ocrd/cli/server.py index a013946f7..3ee86aecf 100644 --- a/ocrd/ocrd/cli/server.py +++ b/ocrd/ocrd/cli/server.py @@ -3,15 +3,21 @@ (This is not meant to be imported directly, but loaded from uwsgi.) """ +import base64 import os import signal import json import flask import uwsgi # added to module path by uwsgi runner -from ocrd_utils import getLogger, initLogging +from io import BytesIO +from ocrd_modelfactory import page_from_file +from ocrd_utils import getLogger, initLogging, pushd_popd from ocrd.task_sequence import run_tasks, parse_tasks from ocrd.resolver import Resolver +from PIL import Image +from tempfile import TemporaryDirectory + # unwrap user-defined workflow: tasks = json.loads(uwsgi.opt["tasks"]) @@ -130,4 +136,66 @@ def unlock(mets): finally: uwsgi.unlock() +@app.route('/process_images', methods=["POST"]) +def process_images(): # pylint: disable=undefined-name + log.debug(f"Processing request: {flask.request}") + if flask.request.is_json: + req = flask.request.get_json() + + pages = {} + if "pages" in req: + for k, v in req["pages"].items(): + pages[k] = v + elif "PAGES" in req: + for k, v in pages["PAGES"].items(): + pages[k] = v + else: + return 'Missing "pages" param.', 400 + + try: + work_dir = TemporaryDirectory() + ws = res.workspace_from_nothing(directory=work_dir.name) + + for k, v in pages.items(): + img = Image.open(BytesIO(base64.b64decode(v))) + if img.mode != "RGB": + img = img.convert("RGB") + ws.save_image_file(img, k, "OCR-D-IMG", page_id=k, mimetype='image/png') + ws.save_mets() + ws.reload_mets() + except Exception as e: + work_dir.cleanup() + return f"An error occured while decoding image(s) and creating mets.xml. {e}", 400 + + try: + _process(ws.mets_target) + ws.reload_mets() + for k in pages.keys(): + pages[k] = {"img": None, "page": None} + + page_file = next(ws.mets.find_files( + pageId=k, + fileGrp=tasks[-1].output_file_grps[0], + )) + with pushd_popd(ws.directory): + if page_file and os.path.exists(page_file.local_filename): + with open(page_file.local_filename, "r", encoding="utf8") as f: + pages[k]["page"] = f.read() + img_path = page_from_file( + page_file + ).get_Page().get_AlternativeImage()[-1].get_filename() + if img_path and os.path.exists(img_path): + img = Image.open(img_path) + img_file = BytesIO() + img.save(img_file, format="PNG") + pages[k]["img"] = base64.b64encode(img_file.getvalue()).decode("utf8") + except Exception as e: + return f"Failed: {e}", 500 + finally: + work_dir.cleanup() + + return flask.json.jsonify(pages) + else: + return "Request was not JSON.", 400 + setup() diff --git a/ocrd/ocrd/cli/workflow.py b/ocrd/ocrd/cli/workflow.py index b205e5ebd..e7c21ef8e 100644 --- a/ocrd/ocrd/cli/workflow.py +++ b/ocrd/ocrd/cli/workflow.py @@ -106,7 +106,8 @@ def server_cli(log_level, timeout, processes, host, port, tasks): # server log level: "--set", "loglevel=%s" % (log_level or ''), # worker timeout per page: - "--set", "timeout_per_page=%d" % timeout]) + "--set", "timeout_per_page=%d" % timeout, + "--buffer-size", "102400"]) return result.returncode # ----------------------------------------------------------------------