Skip to content

Commit

Permalink
HH-102742 Use tornado fork
Browse files Browse the repository at this point in the history
  • Loading branch information
Aulust committed Jan 20, 2020
1 parent f687aac commit c59e128
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ These options can be set for each Frontik instance (see [options.py](/frontik/op
| `slow_callback_threshold_ms` | `int` | `None` | Enables logging of long-running callbacks |
| `app` | `str` | `None` | Application package name (see [Frontik application structure](/docs/frontik-app.md)) |
| `app_class` | `str` | `None` | Application class name defined in application root module (by default `FrontikApplication` class is used) |
| `workers` | `int` | `1` | Number of worker processes creates using fork. When default value is used, master itself become worker, without fork |
| `xheaders ` | `bool` | `False` | Controls Tornado HTTPServer `xheaders` option |
| `tornado_settings` | `dict` | `None` | tornado.web.Application settings |
| `autoreload` | `bool` | `False` | Restart Frontik after changes in application sources or config files |
Expand Down
28 changes: 28 additions & 0 deletions docs/frontik-app.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,34 @@ class MyApplication(FrontikApplication):
return config.urls
```

Application initialization is done in 2 steps:

```
from frontik.app import FrontikApplication
class MyApplication(FrontikApplication):
def __init__(self, **settings):
super().__init__(**settings)
self.cache = self.populate_shared_cache()
def init_async(self):
futures = super().init_async()
futures.append(self.connect_to_database())
return futures
```

At first, application instance is created by calling __init__ function. In case of multi-worker setup
(see [Configuring Frontik (workers)](/docs/config.md)) this is done before calling fork enabling copy-on-write
for read only data in child processes. It comes with a limitation - IOLoop instance must not be used inside this function.
See tornado.tcpserver.TCPServer for details.

After that init_async is called in each worker process. This function returns a list of futures to be awaited
before worker starts accepting requests. Application must call parent's init_async and preserve returned futures.

For this class to be used, you should set app_class option on frontik configuration file to 'MyApplication'

Config parameter from FrontikApplication application_config method is accessible from `PageHandler` instances
Expand Down
16 changes: 11 additions & 5 deletions frontik/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(self, **settings):
self.xml = frontik.producers.xml_producer.XMLProducerFactory(self)
self.json = frontik.producers.json_producer.JsonProducerFactory(self)

self.http_client_factory = HttpClientFactory(self, getattr(self.config, 'http_upstreams', {}))
self.http_client_factory = None

self.router = FrontikRouter(self)

Expand All @@ -125,10 +125,19 @@ def __init__(self, **settings):
if options.debug:
core_handlers.insert(0, (r'/pydevd/?', PydevdHandler))

self.available_integrations = None

super().__init__(core_handlers, **tornado_settings)
self.available_integrations, self.default_init_futures = integrations.load_integrations(self)

def init_async(self):
self.transforms.insert(0, partial(DebugTransform, self))

self.http_client_factory = HttpClientFactory(self, getattr(self.config, 'http_upstreams', {}))

self.available_integrations, default_init_futures = integrations.load_integrations(self)

return default_init_futures

def find_handler(self, request, **kwargs):
request_id = request.headers.get('X-Request-Id')
if request_id is None:
Expand Down Expand Up @@ -178,9 +187,6 @@ def application_version_xml(self):
def application_version(self):
return None

def init_async(self):
return []

@staticmethod
def next_request_id():
FrontikApplication.request_id += 1
Expand Down
16 changes: 15 additions & 1 deletion frontik/file_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,29 @@ def __init__(self, cache_name, root_dir, load_fn, max_len=None, step=None, deepc
self.cache_name = cache_name
self.root_dir = root_dir
self.load_fn = load_fn
self.frozen = False
self.max_len = max_len
self.cache = LimitedDict(max_len, step, deepcopy)

def populate(self, filenames, log, freeze=False):
for filename in filenames:
self._load(filename, log)

self.frozen = freeze and (self.max_len is None or self.max_len > 0)

def load(self, filename, log):
if filename in self.cache:
log.debug('got %s file from cache (%s cache size: %s)', filename, self.cache_name, len(self.cache))
return self.cache[filename]

if self.frozen:
raise Exception(f'encounter file {filename} not in cache while cache is frozen')

return self._load(filename, log)

def _load(self, filename, log):
real_filename = os.path.normpath(os.path.join(self.root_dir, filename))
log.debug('reading file "%s"', real_filename)
log.info('reading file "%s"', real_filename)
result = self.load_fn(real_filename, log)
self.cache[filename] = result

Expand Down
1 change: 1 addition & 0 deletions frontik/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

define('app', default=None, type=str)
define('app_class', default=None, type=str)
define('workers', default=1, type=int)
define('tornado_settings', default=None, type=dict)
define('max_active_handlers', default=100, type=int)
define('reuse_port', default=True, type=bool)
Expand Down
72 changes: 70 additions & 2 deletions frontik/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import logging
import os.path
import signal
import errno
import sys
import time
import gc
from concurrent.futures import ThreadPoolExecutor

import tornado.autoreload
Expand All @@ -12,6 +14,7 @@
from tornado import gen
from tornado.httputil import HTTPServerRequest
from tornado.options import parse_command_line, parse_config_file
from tornado.util import errno_from_exception

from frontik.app import FrontikApplication
from frontik.loggers import bootstrap_logger, bootstrap_core_logging
Expand Down Expand Up @@ -126,6 +129,61 @@ def ioloop_stop():
sys.exit(1)


def fork_processes(num_processes):
log.info("starting %d processes", num_processes)
children = {}

def start_child(i):
pid = os.fork()
if pid == 0:
return i
else:
children[pid] = i
return None

for i in range(num_processes):
id = start_child(i)
if id is not None:
return id

def sigterm_handler(signum, frame):
log.info('received SIGTERM')

for pid, id in children.items():
log.info('sending SIGTERM to child %d (pid %d)', id, pid)
os.kill(pid, signal.SIGTERM)

gc.enable()
signal.signal(signal.SIGTERM, sigterm_handler)

while children:
try:
pid, status = os.wait()
except OSError as e:
if errno_from_exception(e) == errno.EINTR:
continue
raise

if pid not in children:
continue

id = children.pop(pid)
if os.WIFSIGNALED(status):
log.warning("child %d (pid %d) killed by signal %d, restarting", id, pid, os.WTERMSIG(status))
elif os.WEXITSTATUS(status) != 0:
log.warning("child %d (pid %d) exited with status %d, restarting", id, pid, os.WEXITSTATUS(status))
else:
log.info("child %d (pid %d) exited normally", id, pid)
continue

new_id = start_child(id)
if new_id is not None:
return new_id

log.info('all children terminated, exiting')
sys.exit(0)


def main(config_file=None):
parse_configs(config_files=config_file)

Expand All @@ -149,14 +207,24 @@ def main(config_file=None):

try:
app = application(app_root=os.path.dirname(module.__file__), **options.as_dict())

gc.disable()
gc.collect()
gc.freeze()

if options.workers != 1:
fork_processes(options.workers)

gc.enable()

ioloop = tornado.ioloop.IOLoop.current()

executor = ThreadPoolExecutor(options.common_executor_pool_size)
ioloop.asyncio_loop.set_default_executor(executor)

def _async_init_cb():
try:
init_futures = app.default_init_futures + list(app.init_async())
init_futures = app.init_async()

if init_futures:
def await_init(future):
Expand All @@ -177,6 +245,6 @@ def await_init(future):
ioloop.add_callback(_async_init_cb)
ioloop.start()

except BaseException:
except Exception:
log.exception('frontik application exited with exception')
sys.exit(1)
5 changes: 5 additions & 0 deletions tests/projects/test_app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ def __init__(self, **settings):

super().__init__(**settings)

def init_async(self):
futures = super().init_async()

try:
from frontik.integrations.kafka import KafkaIntegration
kafka_integration = next(i for i in self.available_integrations if isinstance(i, KafkaIntegration))
kafka_integration.kafka_producers = {'infrastructure': TestKafkaProducer()}
except Exception:
pass

return futures

def application_urls(self):
return [
(r'^/redirect', RedirectRouter()),
Expand Down
19 changes: 19 additions & 0 deletions tests/test_file_cache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import unittest
from functools import partial

from frontik.file_cache import FileCache, LimitedDict

Expand Down Expand Up @@ -60,6 +61,9 @@ def __init__(self):
def debug(self, message, *args):
self.message = message % args

def info(self, message, *args):
self.message = message % args

def test_file_cache(self):
c = FileCache('test', self.CACHE_DIR, lambda filename, log: filename, max_len=3)
log = TestFileCache.MockLog()
Expand All @@ -80,3 +84,18 @@ def test_file_cache(self):

c.load('parse_error.xsl', log)
self.assertIn('reading file', log.message)

def test_populate(self):
c = FileCache('test', self.CACHE_DIR, lambda filename, log: filename, max_len=3)
log = TestFileCache.MockLog()

c.populate(['simple.xsl', 'parse_error.xsl', 'syntax_error.xsl'], log)

self.assertEqual(len(c.cache), 3)

c.load('apply_error.xsl', log)
self.assertIn('reading file', log.message)

c.populate(['simple.xsl', 'parse_error.xsl', 'simple.xsl'], log, freeze=True)

self.assertRaises(Exception, partial(c.load, 'apply_error.xsl', log))
1 change: 1 addition & 0 deletions tests/test_frontik_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def application_urls(self):
]

app = TestApplication(app='test_app')
app.init_async()
self.patch_app_http_client(app)

return app
Expand Down

0 comments on commit c59e128

Please sign in to comment.