From c59e128cc71e0323c027ddace271acb3f406ad85 Mon Sep 17 00:00:00 2001 From: Smirnov Danil Date: Tue, 14 Jan 2020 15:09:38 +0300 Subject: [PATCH] HH-102742 Use tornado fork --- docs/config.md | 1 + docs/frontik-app.md | 28 +++++++++++ frontik/app.py | 16 +++++-- frontik/file_cache.py | 16 ++++++- frontik/options.py | 1 + frontik/server.py | 72 ++++++++++++++++++++++++++++- tests/projects/test_app/__init__.py | 5 ++ tests/test_file_cache.py | 19 ++++++++ tests/test_frontik_testing.py | 1 + 9 files changed, 151 insertions(+), 8 deletions(-) diff --git a/docs/config.md b/docs/config.md index a5c1fd5b2..6c59f6d18 100644 --- a/docs/config.md +++ b/docs/config.md @@ -15,6 +15,7 @@ These options can be set for each Frontik instance (see [options.py](/frontik/op | `slow_callback_threshold_ms` | `int` | `None` | Enables logging of long-running callbacks | | `app` | `str` | `None` | Application package name (see [Frontik application structure](/docs/frontik-app.md)) | | `app_class` | `str` | `None` | Application class name defined in application root module (by default `FrontikApplication` class is used) | +| `workers` | `int` | `1` | Number of worker processes creates using fork. When default value is used, master itself become worker, without fork | | `xheaders ` | `bool` | `False` | Controls Tornado HTTPServer `xheaders` option | | `tornado_settings` | `dict` | `None` | tornado.web.Application settings | | `autoreload` | `bool` | `False` | Restart Frontik after changes in application sources or config files | diff --git a/docs/frontik-app.md b/docs/frontik-app.md index 6f49ca581..c588c41c5 100644 --- a/docs/frontik-app.md +++ b/docs/frontik-app.md @@ -30,6 +30,34 @@ class MyApplication(FrontikApplication): return config.urls ``` +Application initialization is done in 2 steps: + +``` +from frontik.app import FrontikApplication + + +class MyApplication(FrontikApplication): + def __init__(self, **settings): + super().__init__(**settings) + + self.cache = self.populate_shared_cache() + + def init_async(self): + futures = super().init_async() + + futures.append(self.connect_to_database()) + + return futures +``` + +At first, application instance is created by calling __init__ function. In case of multi-worker setup +(see [Configuring Frontik (workers)](/docs/config.md)) this is done before calling fork enabling copy-on-write +for read only data in child processes. It comes with a limitation - IOLoop instance must not be used inside this function. +See tornado.tcpserver.TCPServer for details. + +After that init_async is called in each worker process. This function returns a list of futures to be awaited +before worker starts accepting requests. Application must call parent's init_async and preserve returned futures. + For this class to be used, you should set app_class option on frontik configuration file to 'MyApplication' Config parameter from FrontikApplication application_config method is accessible from `PageHandler` instances diff --git a/frontik/app.py b/frontik/app.py index 32178b68c..f4f5c7a1d 100644 --- a/frontik/app.py +++ b/frontik/app.py @@ -112,7 +112,7 @@ def __init__(self, **settings): self.xml = frontik.producers.xml_producer.XMLProducerFactory(self) self.json = frontik.producers.json_producer.JsonProducerFactory(self) - self.http_client_factory = HttpClientFactory(self, getattr(self.config, 'http_upstreams', {})) + self.http_client_factory = None self.router = FrontikRouter(self) @@ -125,10 +125,19 @@ def __init__(self, **settings): if options.debug: core_handlers.insert(0, (r'/pydevd/?', PydevdHandler)) + self.available_integrations = None + super().__init__(core_handlers, **tornado_settings) - self.available_integrations, self.default_init_futures = integrations.load_integrations(self) + + def init_async(self): self.transforms.insert(0, partial(DebugTransform, self)) + self.http_client_factory = HttpClientFactory(self, getattr(self.config, 'http_upstreams', {})) + + self.available_integrations, default_init_futures = integrations.load_integrations(self) + + return default_init_futures + def find_handler(self, request, **kwargs): request_id = request.headers.get('X-Request-Id') if request_id is None: @@ -178,9 +187,6 @@ def application_version_xml(self): def application_version(self): return None - def init_async(self): - return [] - @staticmethod def next_request_id(): FrontikApplication.request_id += 1 diff --git a/frontik/file_cache.py b/frontik/file_cache.py index efa57ec68..961c57329 100644 --- a/frontik/file_cache.py +++ b/frontik/file_cache.py @@ -44,15 +44,29 @@ def __init__(self, cache_name, root_dir, load_fn, max_len=None, step=None, deepc self.cache_name = cache_name self.root_dir = root_dir self.load_fn = load_fn + self.frozen = False + self.max_len = max_len self.cache = LimitedDict(max_len, step, deepcopy) + def populate(self, filenames, log, freeze=False): + for filename in filenames: + self._load(filename, log) + + self.frozen = freeze and (self.max_len is None or self.max_len > 0) + def load(self, filename, log): if filename in self.cache: log.debug('got %s file from cache (%s cache size: %s)', filename, self.cache_name, len(self.cache)) return self.cache[filename] + if self.frozen: + raise Exception(f'encounter file {filename} not in cache while cache is frozen') + + return self._load(filename, log) + + def _load(self, filename, log): real_filename = os.path.normpath(os.path.join(self.root_dir, filename)) - log.debug('reading file "%s"', real_filename) + log.info('reading file "%s"', real_filename) result = self.load_fn(real_filename, log) self.cache[filename] = result diff --git a/frontik/options.py b/frontik/options.py index a0737afd5..01f3f4d0b 100644 --- a/frontik/options.py +++ b/frontik/options.py @@ -6,6 +6,7 @@ define('app', default=None, type=str) define('app_class', default=None, type=str) +define('workers', default=1, type=int) define('tornado_settings', default=None, type=dict) define('max_active_handlers', default=100, type=int) define('reuse_port', default=True, type=bool) diff --git a/frontik/server.py b/frontik/server.py index a48d0b3db..0fdd507b1 100644 --- a/frontik/server.py +++ b/frontik/server.py @@ -2,8 +2,10 @@ import logging import os.path import signal +import errno import sys import time +import gc from concurrent.futures import ThreadPoolExecutor import tornado.autoreload @@ -12,6 +14,7 @@ from tornado import gen from tornado.httputil import HTTPServerRequest from tornado.options import parse_command_line, parse_config_file +from tornado.util import errno_from_exception from frontik.app import FrontikApplication from frontik.loggers import bootstrap_logger, bootstrap_core_logging @@ -126,6 +129,61 @@ def ioloop_stop(): sys.exit(1) +def fork_processes(num_processes): + log.info("starting %d processes", num_processes) + children = {} + + def start_child(i): + pid = os.fork() + if pid == 0: + return i + else: + children[pid] = i + return None + + for i in range(num_processes): + id = start_child(i) + if id is not None: + return id + + def sigterm_handler(signum, frame): + log.info('received SIGTERM') + + for pid, id in children.items(): + log.info('sending SIGTERM to child %d (pid %d)', id, pid) + os.kill(pid, signal.SIGTERM) + + gc.enable() + signal.signal(signal.SIGTERM, sigterm_handler) + + while children: + try: + pid, status = os.wait() + except OSError as e: + if errno_from_exception(e) == errno.EINTR: + continue + raise + + if pid not in children: + continue + + id = children.pop(pid) + if os.WIFSIGNALED(status): + log.warning("child %d (pid %d) killed by signal %d, restarting", id, pid, os.WTERMSIG(status)) + elif os.WEXITSTATUS(status) != 0: + log.warning("child %d (pid %d) exited with status %d, restarting", id, pid, os.WEXITSTATUS(status)) + else: + log.info("child %d (pid %d) exited normally", id, pid) + continue + + new_id = start_child(id) + if new_id is not None: + return new_id + + log.info('all children terminated, exiting') + sys.exit(0) + + def main(config_file=None): parse_configs(config_files=config_file) @@ -149,6 +207,16 @@ def main(config_file=None): try: app = application(app_root=os.path.dirname(module.__file__), **options.as_dict()) + + gc.disable() + gc.collect() + gc.freeze() + + if options.workers != 1: + fork_processes(options.workers) + + gc.enable() + ioloop = tornado.ioloop.IOLoop.current() executor = ThreadPoolExecutor(options.common_executor_pool_size) @@ -156,7 +224,7 @@ def main(config_file=None): def _async_init_cb(): try: - init_futures = app.default_init_futures + list(app.init_async()) + init_futures = app.init_async() if init_futures: def await_init(future): @@ -177,6 +245,6 @@ def await_init(future): ioloop.add_callback(_async_init_cb) ioloop.start() - except BaseException: + except Exception: log.exception('frontik application exited with exception') sys.exit(1) diff --git a/tests/projects/test_app/__init__.py b/tests/projects/test_app/__init__.py index 4f9c561f8..f8a104c05 100644 --- a/tests/projects/test_app/__init__.py +++ b/tests/projects/test_app/__init__.py @@ -20,6 +20,9 @@ def __init__(self, **settings): super().__init__(**settings) + def init_async(self): + futures = super().init_async() + try: from frontik.integrations.kafka import KafkaIntegration kafka_integration = next(i for i in self.available_integrations if isinstance(i, KafkaIntegration)) @@ -27,6 +30,8 @@ def __init__(self, **settings): except Exception: pass + return futures + def application_urls(self): return [ (r'^/redirect', RedirectRouter()), diff --git a/tests/test_file_cache.py b/tests/test_file_cache.py index bf9704771..ec61f3572 100644 --- a/tests/test_file_cache.py +++ b/tests/test_file_cache.py @@ -1,5 +1,6 @@ import os import unittest +from functools import partial from frontik.file_cache import FileCache, LimitedDict @@ -60,6 +61,9 @@ def __init__(self): def debug(self, message, *args): self.message = message % args + def info(self, message, *args): + self.message = message % args + def test_file_cache(self): c = FileCache('test', self.CACHE_DIR, lambda filename, log: filename, max_len=3) log = TestFileCache.MockLog() @@ -80,3 +84,18 @@ def test_file_cache(self): c.load('parse_error.xsl', log) self.assertIn('reading file', log.message) + + def test_populate(self): + c = FileCache('test', self.CACHE_DIR, lambda filename, log: filename, max_len=3) + log = TestFileCache.MockLog() + + c.populate(['simple.xsl', 'parse_error.xsl', 'syntax_error.xsl'], log) + + self.assertEqual(len(c.cache), 3) + + c.load('apply_error.xsl', log) + self.assertIn('reading file', log.message) + + c.populate(['simple.xsl', 'parse_error.xsl', 'simple.xsl'], log, freeze=True) + + self.assertRaises(Exception, partial(c.load, 'apply_error.xsl', log)) diff --git a/tests/test_frontik_testing.py b/tests/test_frontik_testing.py index 0659b6fb4..aaa21546b 100644 --- a/tests/test_frontik_testing.py +++ b/tests/test_frontik_testing.py @@ -45,6 +45,7 @@ def application_urls(self): ] app = TestApplication(app='test_app') + app.init_async() self.patch_app_http_client(app) return app