diff --git a/rc.d/init.d/smokerd-rhel b/rc.d/init.d/smokerd-rhel index 66fc914..dcecf9f 100755 --- a/rc.d/init.d/smokerd-rhel +++ b/rc.d/init.d/smokerd-rhel @@ -73,7 +73,7 @@ start() { touch $LOCKFILE # Wait a second before it's loaded and check if it's still running sleep 1 - if [ "`pgrep -f $BINARY`" ]; then + if [ "`pgrep -f $PROG`" ]; then success echo return 0 diff --git a/smoker/server/daemon.py b/smoker/server/daemon.py index 13b8a95..b87d2bc 100644 --- a/smoker/server/daemon.py +++ b/smoker/server/daemon.py @@ -3,20 +3,17 @@ # Copyright (C) 2007-2012, GoodData(R) Corporation. All rights reserved import logging -lg = logging.getLogger('smokerd.daemon') +import os +import signal +import sys +import time +import yaml from smoker.server.plugins import PluginManager from smoker.server.restserver import RestServer -import yaml - -import threading - -import sys -import os +lg = logging.getLogger('smokerd.daemon') -import signal -import time class Smokerd(object): """ @@ -164,9 +161,32 @@ def run(self): signal.signal(signal.SIGTERM, self._shutdown) # API server loop now runs in a separate process and we don't want # to terminate to keep an instance of pluginmanager - while True: - time.sleep(1) + self._watchdog() + + def _watchdog(self): + lg.debug("Starting the smoker watchdog") + + while True: + if not self.server.is_alive(): + lg.error("REST API server is dead") + self._restart_api_server() + lg.info("restarted the REST API server") + + for plugin in self.pluginmgr.plugins.values(): + if not plugin.is_alive(): + lg.error("Plugin %s is dead" % plugin.name) + self.pluginmgr.restart_plugin(plugin.name) + # need to restart the REST server so it has reference to + # the new plugin instance + self._restart_api_server() + time.sleep(10) + + def _restart_api_server(self): + self.server.terminate() + self.server.join() + self.server = RestServer(self.conf['bind_host'], self.conf['bind_port'], self) + self.server.start() def stop(self): """ @@ -270,14 +290,6 @@ def _shutdown(self, signum=None, frame=None, exitcode=0, exception=False): if self.pluginmgr: self.pluginmgr.stop() - # Kill all running threads that has left (there shouldn't be any) - if threading.activeCount() > 1: - lg.info("Killing %s active threads" % (threading.activeCount() - 1)) - for t in threading.enumerate(): - if t.getName() == 'MainThread': - continue - t._Thread__stop() - # Remove PID file if exists if os.path.isfile(self.conf['pidfile']): os.remove(self.conf['pidfile']) diff --git a/smoker/server/plugins/__init__.py b/smoker/server/plugins/__init__.py index 49feea0..ef7c283 100644 --- a/smoker/server/plugins/__init__.py +++ b/smoker/server/plugins/__init__.py @@ -7,6 +7,7 @@ import multiprocessing import os import re +import signal import simplejson import setproctitle import time @@ -23,6 +24,11 @@ lg.info("Plugins will run approximately at %s parallel processes" % semaphore_count) semaphore = multiprocessing.Semaphore(semaphore_count) + +def alarm_handler(signum, frame): + raise PluginExecutionTimeout + + class PluginManager(object): """ PluginManager provides management and @@ -153,6 +159,14 @@ def load_plugin(self, plugin, options): params = dict(template, **options) return Plugin(plugin, params) + def restart_plugin(self, name): + lg.info("Restarting plugin %s" % name) + self.plugins[name].join() + + self.plugins[name] = self.load_plugin(name, self.conf_plugins[name]) + self.plugins[name].start() + + def get_template(self, name): """ Return template parameters @@ -528,13 +542,24 @@ def run_module(self, module, **kwargs): lg.exception(e) raise + signal.signal(signal.SIGALRM, alarm_handler) + if 'timeout' not in kwargs: + kwargs['timeout'] = self.get_param('Timeout', default=120) + try: + signal.alarm(kwargs['timeout']) result = plugin.run() + except PluginExecutionTimeout: + result = Result() + result.set_status('ERROR') + result.add_error('Plugin execution exceeded timeout %d seconds' % + kwargs['timeout']) except Exception as e: lg.error("Plugin %s: module execution failed: %s" % (self.name, e)) lg.exception(e) raise + signal.alarm(0) return result def run_plugin(self, force=False):