Skip to content

Commit

Permalink
Merge pull request #14 from pbenas/watchdog
Browse files Browse the repository at this point in the history
Plugin watchdog and module execution timeout
  • Loading branch information
Filip Pytloun committed Feb 9, 2015
2 parents 462f3e2 + bf6e2f1 commit 864215c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 20 deletions.
2 changes: 1 addition & 1 deletion rc.d/init.d/smokerd-rhel
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ start() {
touch $LOCKFILE
# Wait a second before it's loaded and check if it's still running
sleep 1
if [ "`pgrep -f $BINARY`" ]; then
if [ "`pgrep -f $PROG`" ]; then
success
echo
return 0
Expand Down
50 changes: 31 additions & 19 deletions smoker/server/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@
# Copyright (C) 2007-2012, GoodData(R) Corporation. All rights reserved

import logging
lg = logging.getLogger('smokerd.daemon')
import os
import signal
import sys
import time
import yaml

from smoker.server.plugins import PluginManager
from smoker.server.restserver import RestServer

import yaml

import threading

import sys
import os
lg = logging.getLogger('smokerd.daemon')

import signal
import time

class Smokerd(object):
"""
Expand Down Expand Up @@ -164,9 +161,32 @@ def run(self):
signal.signal(signal.SIGTERM, self._shutdown)
# API server loop now runs in a separate process and we don't want
# to terminate to keep an instance of pluginmanager
while True:
time.sleep(1)

self._watchdog()

def _watchdog(self):
lg.debug("Starting the smoker watchdog")

while True:
if not self.server.is_alive():
lg.error("REST API server is dead")
self._restart_api_server()
lg.info("restarted the REST API server")

for plugin in self.pluginmgr.plugins.values():
if not plugin.is_alive():
lg.error("Plugin %s is dead" % plugin.name)
self.pluginmgr.restart_plugin(plugin.name)
# need to restart the REST server so it has reference to
# the new plugin instance
self._restart_api_server()
time.sleep(10)

def _restart_api_server(self):
self.server.terminate()
self.server.join()
self.server = RestServer(self.conf['bind_host'], self.conf['bind_port'], self)
self.server.start()

def stop(self):
"""
Expand Down Expand Up @@ -270,14 +290,6 @@ def _shutdown(self, signum=None, frame=None, exitcode=0, exception=False):
if self.pluginmgr:
self.pluginmgr.stop()

# Kill all running threads that has left (there shouldn't be any)
if threading.activeCount() > 1:
lg.info("Killing %s active threads" % (threading.activeCount() - 1))
for t in threading.enumerate():
if t.getName() == 'MainThread':
continue
t._Thread__stop()

# Remove PID file if exists
if os.path.isfile(self.conf['pidfile']):
os.remove(self.conf['pidfile'])
Expand Down
25 changes: 25 additions & 0 deletions smoker/server/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import multiprocessing
import os
import re
import signal
import simplejson
import setproctitle
import time
Expand All @@ -23,6 +24,11 @@
lg.info("Plugins will run approximately at %s parallel processes" % semaphore_count)
semaphore = multiprocessing.Semaphore(semaphore_count)


def alarm_handler(signum, frame):
raise PluginExecutionTimeout


class PluginManager(object):
"""
PluginManager provides management and
Expand Down Expand Up @@ -153,6 +159,14 @@ def load_plugin(self, plugin, options):
params = dict(template, **options)
return Plugin(plugin, params)

def restart_plugin(self, name):
lg.info("Restarting plugin %s" % name)
self.plugins[name].join()

self.plugins[name] = self.load_plugin(name, self.conf_plugins[name])
self.plugins[name].start()


def get_template(self, name):
"""
Return template parameters
Expand Down Expand Up @@ -528,13 +542,24 @@ def run_module(self, module, **kwargs):
lg.exception(e)
raise

signal.signal(signal.SIGALRM, alarm_handler)
if 'timeout' not in kwargs:
kwargs['timeout'] = self.get_param('Timeout', default=120)

try:
signal.alarm(kwargs['timeout'])
result = plugin.run()
except PluginExecutionTimeout:
result = Result()
result.set_status('ERROR')
result.add_error('Plugin execution exceeded timeout %d seconds' %
kwargs['timeout'])
except Exception as e:
lg.error("Plugin %s: module execution failed: %s" % (self.name, e))
lg.exception(e)
raise

signal.alarm(0)
return result

def run_plugin(self, force=False):
Expand Down

0 comments on commit 864215c

Please sign in to comment.