diff --git a/README.md b/README.md index d693a73..2836221 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ It doesn't have much dependencies, follow instructions bellow to install them: With PIP: - pip install psutil PyAML argparse simplejson + pip install psutil PyAML argparse simplejson setproctitle Flask-RESTful Or install packages from your distribution repository. @@ -75,7 +75,7 @@ Usage ### Smoker Daemon (smokerd) Configuration can be done in two different ways: - * final configuration in single/multiple yaml files (if GENCONFIG option is 0, this is default), simply edit `/etc/smokerd/smokerd.yaml` + * final configuration in single/multiple yaml files (if GENCONFIG option is 0, this is default), simply create `/etc/smokerd/smokerd.yaml` * generated configuration from directories (eg. for easier Puppet deploy) by init script (see structure bellow) ``` @@ -120,7 +120,7 @@ When Smoker is started this way, it doesn't log into syslog. Standard and error #### Manual start Use following command to start smokerd in foreground with verbose output: - /usr/local/bin/smokerd.py -v -fg + /usr/bin/smokerd.py -v -fg This is very good for testing and development purposees. diff --git a/etc/smokerd-example.yaml b/etc/smokerd-example.yaml index 64bf8cb..0810dc5 100644 --- a/etc/smokerd-example.yaml +++ b/etc/smokerd-example.yaml @@ -59,7 +59,7 @@ plugins: Enabled: True # This Python module, class Plugin, method run() # will be executed to get result - Module: smoker.plugins.uname + Module: smoker.server.plugins.uname # Inherits default parameters ConnectorZendesk3: diff --git a/setup.py b/setup.py index fcf7ec1..6eb7147 100755 --- a/setup.py +++ b/setup.py @@ -75,7 +75,7 @@ ], 'platforms' : ['POSIX'], 'provides' : ['smoker'], - 'install_requires' : ['PyYAML', 'argparse', 'simplejson', 'psutil'], + 'install_requires' : ['PyYAML', 'argparse', 'simplejson', 'psutil', 'setproctitle', 'Flask-RESTful'], } setup(**params) diff --git a/smoker/server/daemon.py b/smoker/server/daemon.py index 6deba3a..13b8a95 100644 --- a/smoker/server/daemon.py +++ b/smoker/server/daemon.py @@ -6,7 +6,7 @@ lg = logging.getLogger('smokerd.daemon') from smoker.server.plugins import PluginManager -from smoker.server.httpserver import ThreadedHTTPServer, HTTPHandler +from smoker.server.restserver import RestServer import yaml @@ -16,7 +16,7 @@ import os import signal -import select +import time class Smokerd(object): """ @@ -48,13 +48,6 @@ def __init__(self, **kwargs): self._load_config() - # Catch SIGINT and SIGTERM if supported - if hasattr(signal, 'SIGINT'): - signal.signal(signal.SIGINT, self._shutdown) - - if hasattr(signal, 'SIGTERM'): - signal.signal(signal.SIGTERM, self._shutdown) - def _yaml_include(self, loader, node): """ Include another yaml file from main file @@ -156,20 +149,25 @@ def run(self): lg.info("Starting webserver on %(bind_host)s:%(bind_port)s" % self.conf) try: - self.server = ThreadedHTTPServer((self.conf['bind_host'], self.conf['bind_port']), HTTPHandler, self) - try: - self.server.serve_forever() - except select.error as e: - # Suppress exception during shutdown - # (4, 'Interrupted system call') - pass - except KeyboardInterrupt: - lg.info("Interrupted") + self.server = RestServer(self.conf['bind_host'], self.conf['bind_port'], self) + self.server.start() except Exception as e: lg.error("Can't start HTTP server: %s" % e) lg.exception(e) self._shutdown(exitcode=1) + # Catch SIGINT and SIGTERM if supported + if hasattr(signal, 'SIGINT'): + signal.signal(signal.SIGINT, self._shutdown) + + if hasattr(signal, 'SIGTERM'): + signal.signal(signal.SIGTERM, self._shutdown) + # API server loop now runs in a separate process and we don't want + # to terminate to keep an instance of pluginmanager + while True: + time.sleep(1) + + def stop(self): """ Kill running daemon @@ -265,7 +263,8 @@ def _shutdown(self, signum=None, frame=None, exitcode=0, exception=False): try: # Shutdown webserver if self.server: - self.server.socket.close() + self.server.terminate() + self.server.join() # Shutdown pluginmanager and all plugins if self.pluginmgr: diff --git a/smoker/server/httpserver.py b/smoker/server/httpserver.py deleted file mode 100644 index 546fb82..0000000 --- a/smoker/server/httpserver.py +++ /dev/null @@ -1,711 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# Copyright (C) 2007-2012, GoodData(R) Corporation. All rights reserved - -""" -Module providing base http server for smokerd REST API - -Resource: about - About page with links - header: / - - GET - - (200 OK) % self.print_about() - - (start example) - { - "about" : { - "title" : "Smoker daemon API", - "links" : [ - { - "rel" : "plugins", - "href" : "/plugins", - "title" : "Show details about all plugins" - }, - { - "rel" : "plugin", - "href" : "/plugins/", - "title" : "Show details about " - }, - { - "rel" : "processes", - "href" : "/plugins/processes", - "title" : "Force plugin run" - }, - { - "rel" : "process", - "href" : "/plugins/processes/", - "title" : "Poll process for result" - } - ] - } - } - (end) - -Resource: plugins - Show active plugins details and result - header: /plugins - - GET - - (200 OK) % self.print_plugins() - - (start example) - { - "plugins" : { - "items" : [ - { - "plugin" : { - "parameters" : { - "History" : 10, - "Interval" : 30, - "JMXBean" : "com.gooddata:name=SelfTest-connector", - "uid" : "default", - "JMXPort" : 5015, - "Module" : null, - "Parser" : "gdc.smoker.plugins.jmxparser", - "Enabled" : true, - "Template" : "JMXTest", - "Command" : "/Users/filip/gooddata/src/gdc-python/gdc-smokerd/bin/jmxcheck.py --port %(JMXPort)s --bean %(JMXBean)s --run %(JMXRun)s", - "Action" : null, - "Category" : "connectors", - "Timeout" : 10, - "gid" : "default", - "JMXRun" : "executeTestsJSON" - }, - "nextRun" : "2012-10-26T13:31:56.814448", - "name" : "ConnectorZendesk3", - "lastResult" : { - "messages" : null, - "lastRun" : "2012-10-26T13:31:26.813870", - "status" : "ERROR", - "action" : null, - "componentResults" : [ - { - "componentResult" : { - "messages" : { - "info" : [], - "warn" : [], - "error" : [ - "SelfTest: Rabbit MQ service is NOT listening (host='localhost', port='5672')." - ] - }, - "status" : "ERROR", - "name" : "RabbitSelfTest" - } - }, - { - "componentResult" : { - "messages" : { - "info" : [ - "SelfTest: DB service is listening (host='localhost', port='3306')." - ], - "warn" : [], - "error" : [] - }, - "status" : "OK", - "name" : "DBSelfTest" - } - }, - ] - } - } - }, - { - "plugin" : { - "parameters" : { - "History" : 10, - "Interval" : 2, - "uid" : "default", - "Module" : "gdc.smoker.plugins.uname", - "Enabled" : true, - "Parser" : null, - "Template" : null, - "Command" : null, - "Action" : null, - "Timeout" : 30, - "Category" : "system", - "gid" : "default" - }, - "nextRun" : "2012-10-26T13:31:53.881718", - "name" : "Uname", - "lastResult" : { - "messages" : { - "info" : [ - "Darwin Airy.local 12.2.0 Darwin Kernel Version 12.2.0: Sat Aug 25 00:48:52 PDT 2012; root:xnu-2050.18.24~1/RELEASE_X86_64 x86_64" - ], - "warn" : [], - "error" : [] - }, - "lastRun" : "2012-10-26T13:31:51.881592", - "status" : "OK", - "action" : null, - "componentResults" : null - } - } - } - ] - } - } - (end) - -Resource: plugin - Show single plugin details and results (with history) - header: /plugins/ - - GET - - (200 OK) % self.print_plugins(name=) - -Resource: processes - Force plugins run - - header: /plugins/processes - - POST % self.add_process() - - (201 Created) - - (400 Bad Request) % misunderstood JSON request - -Resource: process - Get result of forced run - These resources will always return last forced result of each plugin, - so it doesn't keep history! - If you realy need it, it can be implemented, but now we don't want to keep - history of forced run results. - - header: /plugins/processes/ - - GET - - (200 OK) - - (202 Accepted) % we don't have results yet - - (404 Not Found) -""" - -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -from SocketServer import ThreadingMixIn -import socket -import logging - -from smoker.server.exceptions import * - -lg = logging.getLogger('smokerd.httpserver') - -import simplejson as json - -class ThreadedHTTPServer(ThreadingMixIn, HTTPServer, object): - """ - Threaded HTTPServer object - """ - daemon_threads = True - - def __init__(self, server_address, RequestHandlerClass, smokerd): - """ - Overwrite constructor to pass Smokerd instance - """ - self.smokerd = smokerd - self.pluginmgr = smokerd.pluginmgr - - super(ThreadedHTTPServer, self).__init__(server_address, RequestHandlerClass) - -class HTTPHandler(BaseHTTPRequestHandler, object): - """ - HTPServer requests handler - """ - def __init__(self, *args): - """ - Overwrite constructor of BaseHTTPRequestHandler - - Error = - """ - # We want custom JSON-format errors instead of HTML - self.error_message_format = json.dumps({ - 'error': { - 'component' : 'Smoker', - 'code' : '%(code)s', - 'message' : '%(message)s', - } - }) - - super(HTTPHandler, self).__init__(*args) - - def do_POST(self): - """ - Serve POST requests - """ - path = self.path.split('/') - # Remove empty part before first / - path.pop(0) - - try: - resource = path[0] - except IndexError: - resource = None - pass - - try: - argument = path[1] - except IndexError: - argument = None - pass - - try: - id = path[2] - except IndexError: - id = None - pass - - if resource == 'plugins' and argument == 'processes': - length = int(self.headers.getheader('content-length')) - request = self.rfile.read(length) - - try: - js = json.loads(request) - except json.JSONDecodeError as e: - # Bad request, don't log - self.return_400("Can't load submitted JSON: %s" % e.message) - return - except Exception as e: - # Internal server error, log exception - lg.error("Can't load submitted JSON: %s" % e) - lg.exception(e) - self.return_500("Can't load submitted JSON: %s" % e.message) - return - - try: - result = self.add_process(js) - except InvalidArgument as e: - # Invalid arguments supplied, raise bad request, don't log - self.return_400(e.message) - return - except NoSuchPlugin as e: - # Invalid arguments supplied, raise bad request, don't log - self.return_400(e.message) - return - except NoPluginsFound: - # Invalid arguments supplied, raise bad request, don't log - self.return_400("No plugins was found by given names and filter") - return - except Exception as e: - # Internal server error, log exception - lg.error("Can't add submitted process: %s" % e) - lg.exception(e) - self.return_500("Can't add submitted process: %s" % e.message) - return - - # Return 201 and location header to poll - self.send_response(201) - self.send_header('Content-type', 'application/json') - self.send_header('Location', result['asyncTask']['link']['poll']) - self.end_headers() - - self.wfile.write(json.dumps(result)) - return - else: - # Method not allowed on other resources - self.return_405() - - def do_GET(self): - """ - Serve GET requests - """ - result = None - path = self.path.split('/') - # Remove empty part before first / - path.pop(0) - - try: - resource = path[0] - except IndexError: - resource = None - pass - - try: - argument = path[1] - except IndexError: - argument = None - pass - - try: - id = path[2] - except IndexError: - id = None - pass - - if not resource: - # Print about on index - result = self.print_about() - elif resource == 'plugins': - if not argument: - # Print all plugins information - try: - result = self.print_plugins() - except Exception as e: - lg.exception(e) - self.return_500(e.message) - return - elif argument == 'processes': - if id: - # Print result of given process - try: - result = self.print_process(id) - except InProgress: - # Return 202 and asyncTask structure - self.send_response(202, 'Accepted') - self.send_header('Location', self.path) - self.end_headers() - result = { - "asyncTask": { - "link": { - "poll": self.path - } - } - } - self.wfile.write(json.dumps(result)) - return - except IndexError: - self.return_404('Process id %s not found' % id) - return - except Exception as e: - lg.exception(e) - self.return_500(e.message) - return - else: - # GET method on processes is not allowed - self.return_405() - return - else: - # Print single plugin informations - try: - result = self.print_plugins(argument) - except NoSuchPlugin: - self.return_404('Plugin %s not found' % argument) - return - except Exception as e: - lg.exception(e) - self.return_500(e.message) - return - elif resource == 'favicon.ico': - try: - self.return_favicon() - except Exception as e: - lg.exception(e) - self.return_500(e.message) - return - return - else: - # Resource not found - self.return_404() - return - - # Print result - if result: - self.send_response(200) - self.send_header('Content-type', 'application/json') - - try: - response = json.dumps(result) - except Exception: - self.return_500("Malformed JSON result") - raise - - self.end_headers() - self.wfile.write(response) - else: - self.return_500() - - return - - def return_favicon(self): - """ - Open and return favicon.ico - """ - try: - fh = open(self.server.smokerd.conf['favicon'], 'r') - self.send_response(200) - self.send_header('Content-type', 'image/x-icon') - self.end_headers() - self.wfile.write(fh.read()) - fh.close() - except: - self.return_404() - - def return_500(self, msg='Unknown server error'): - """ - Return 500 and optional message - """ - self.send_error(500, msg) - return - - def return_400(self, msg='Bad request'): - """ - Return 400 to client with reason - """ - self.send_error(400, msg) - return - - def return_404(self, msg='Resource not found'): - """ - Return 404 to client with reason - """ - self.send_error(404, msg) - return - - def return_405(self, msg='Method not allowed'): - """ - Return 405 to client with reason - """ - self.send_error(405, msg) - return - - def print_about(self): - """ - Print about page with title and links - to all supported resources - - About = - """ - result = { - 'about': { - 'host' : socket.gethostname(), - 'title': 'Smoker daemon API', - 'links' : [ - { - 'rel' : 'plugins', - 'href' : '/plugins', - 'title': 'Show details about all plugins', - }, - { - 'rel' : 'processes', - 'href' : '/plugins/processes', - 'title': 'Force plugin run', - }, - ], - }, - } - return result - - def print_plugins(self, name=None): - """ - Print informations for all plugins - or for single plugin - - Plugins = - - Plugin = - }> - - Result = { - "status" : 'OK' | 'ERROR' | 'WARN', - "lastRun" : ISOTIME, - "action" : Result | NULL, - "messages" : Messages | NULL, - "componentResults" : [componentResult: { - "name" : STRING, - "status" : 'OK' | 'ERROR' | 'WARN', - "messages": Messages - }] | NULL - } - - Messages = { - "info" : [STRING], - "error": [STRING], - "warn" : [STRING] - } - """ - plugins = {} - plugins_result = [] - - if not name: - plugins = self.server.pluginmgr.get_plugins() - else: - plugins[name] = self.server.pluginmgr.get_plugin(name) - - for pname, plugin in plugins.iteritems(): - # Convert next run object to ISO format - if not isinstance(plugin.next_run, bool): - next_run = plugin.next_run.isoformat() - else: - next_run = None - - # Convert result componentResults to list - # just to have standarized API - last_result = plugin.get_last_result() - if last_result and last_result['componentResults']: - # Remove reference, because we don't want to modify - # Plugin object's structures - last_result = dict(last_result) - last_result['componentResults'] = self.to_list('componentResult', last_result['componentResults']) - - # Format plugin result - plugin_result = { - 'name' : pname, - 'links': { - 'self' : '/plugins/%s' % pname, - }, - 'parameters' : plugin.params, - 'lastResult' : last_result, - 'nextRun' : next_run, - } - - # Add results history only if we are listing single plugin - if name: - results = [] - for res in plugin.result: - # Remove reference, we don't want to edit Plugin object structures - res = dict(res) - - # Convert result componentResults to list - # just to have standarized API - if res['componentResults']: - res['componentResults'] = self.to_list('componentResult', res['componentResults']) - results.append({'result': res}) - - plugin_result['results'] = results - - plugins_result.append({'plugin': plugin_result}) - - if name: - return plugins_result[0] - else: - return { 'plugins' : {'items' : plugins_result} } - - def to_list(self, keyword, dictionary): - """ - Convert dictionary to list by given keyword - """ - result = [] - for key, value in dictionary.iteritems(): - res = value - res['name'] = key - result.append({keyword : res}) - - return result - - def print_process(self, id): - """ - Print process result - """ - try: - process = self.server.pluginmgr.get_process(int(id)) - except IndexError: - raise - - results = [] - for plugin in process['plugins']: - if not plugin.forced_result: - raise InProgress - else: - # Remove reference, because we don't want to modify - # Plugin object's structures - forced_result = dict(plugin.forced_result) - if forced_result['componentResults']: - forced_result['componentResults'] = self.to_list('componentResult', forced_result['componentResults']) - - # Convert next run object to ISO format - if not isinstance(plugin.next_run, bool): - next_run = plugin.next_run.isoformat() - else: - next_run = None - - result = { - 'plugin' : { - 'name' : plugin.name, - 'links': { - 'self' : '/plugins/%s' % plugin.name, - }, - 'parameters' : plugin.params, - 'lastResult': forced_result, - 'nextRun' : next_run, - } - } - results.append(result) - - return { 'plugins' : {'items' : results} } - - def add_process(self, definition): - """ - Add new process - - AsyncTask = - - PostProcess = - """ - if definition['process'].has_key("plugins"): - plugins = definition['process']['plugins'] - else: - plugins = None - - if definition['process'].has_key("filter"): - filter = definition['process']['filter'] - else: - filter = None - - # If plugins and filter are empty, raise exception - if not plugins and not filter: - raise InvalidArgument("Plugin names or filter have to be set") - - # Validate input - if plugins and not isinstance(plugins, list): - raise InvalidArgument("Element plugins have to be list") - - if filter and not isinstance(filter, dict): - raise InvalidArgument("Element filter have to be dictionary") - - id = self.server.pluginmgr.add_process(plugins, filter) - - result = { - 'asyncTask': { - 'link': { - 'poll': '/plugins/processes/%d' % id - } - } - } - - return result diff --git a/smoker/server/plugins/__init__.py b/smoker/server/plugins/__init__.py index 9f88b37..49feea0 100644 --- a/smoker/server/plugins/__init__.py +++ b/smoker/server/plugins/__init__.py @@ -2,25 +2,26 @@ # -*- coding: utf-8 -*- # Copyright (C) 2007-2012, GoodData(R) Corporation. All rights reserved +import datetime import logging -lg = logging.getLogger('smokerd.pluginmanager') +import multiprocessing +import os +import re +import simplejson +import setproctitle +import time +import types from smoker.server.exceptions import * import smoker.util.command -import os -import datetime -import time -import threading -import simplejson -import re -import types +lg = logging.getLogger('smokerd.pluginmanager') -# Initialize threading semamphore, by default limit by +# Initialize multiprocessing semamphore, by default limit by # number of online CPUs + 2 semaphore_count = int(os.sysconf('SC_NPROCESSORS_ONLN')) + 2 -lg.info("Plugins will run approximately at %s threads in parallel" % semaphore_count) -semaphore = threading.Semaphore(semaphore_count) +lg.info("Plugins will run approximately at %s parallel processes" % semaphore_count) +semaphore = multiprocessing.Semaphore(semaphore_count) class PluginManager(object): """ @@ -73,6 +74,7 @@ def stop(self, blocking=True): # Trigger stop of all plugins for name, plugin in self.plugins.iteritems(): plugin.stop() + plugin.terminate() # Wait until all plugins are stopped if blocking: @@ -81,8 +83,10 @@ def stop(self, blocking=True): while plugins_left: plugins_left = [] for name, plugin in self.plugins.iteritems(): - if plugin.isAlive(): + if plugin.is_alive(): plugins_left.append(name) + else: + plugin.join() if plugins_left: # Print info only if number of left plugins changed if len(plugins_left) != plugins_left_cnt: @@ -147,7 +151,7 @@ def load_plugin(self, plugin, options): options['Action'] = self.get_action(options['Action']) params = dict(template, **options) - return Plugin(self, plugin, params) + return Plugin(plugin, params) def get_template(self, name): """ @@ -239,7 +243,7 @@ def add_process(self, plugins=None, filter=None): # Force run for each plugin and clear forced_result for plugin in plugins_list: - plugin.force = True + plugin.forceFlag.set() plugin.forced_result = None return id @@ -250,7 +254,14 @@ def get_process(self, id): """ return self.processes[id] -class Plugin(threading.Thread, object): + def get_process_list(self): + """ + Return all processes + """ + return self.processes + + +class Plugin(multiprocessing.Process): """ Object that represents single plugin """ @@ -270,21 +281,16 @@ class Plugin(threading.Thread, object): 'Action' : None, } - result = [] - - # Instance of PluginManager - pluginmgr = None - - force = False - forced_result = None - - next_run = False - - def __init__(self, pluginmgr, name, params): + def __init__(self, name, params): """ Plugin constructor - * load parameters and plugin name - * prepare Thread + * prepare the process + + :param name: name of the plugin + :type name: string + + :param params: keyword arguments + :type params: dict """ assert isinstance(name, basestring) assert isinstance(params, dict) @@ -302,13 +308,13 @@ def __init__(self, pluginmgr, name, params): } self.params['Action'] = dict(action_default, **params['Action']) - # Set instance of PluginManager - self.pluginmgr = pluginmgr + # create the instances of the Queue and force flag + self.queue = multiprocessing.Queue() + self.forceFlag = multiprocessing.Event() # Set those variables or they will be # references, shared between plugins - self.result = [] - self.force = False + self.result = [] self.forced_result = None self.next_run = False @@ -332,7 +338,7 @@ def __init__(self, pluginmgr, name, params): if self.params['Interval']: self.schedule_run() - # Run Thread constructor, we want to be daemonic thread + # Run Process constructor, we want to be daemonic process super(Plugin, self).__init__() self.daemon = True @@ -366,25 +372,28 @@ def validate(self): def run(self): """ - Run thread + Run process Check if plugin should be run and execute it """ - while self.stopping is not True: - # Plugin run when forced - if self.force == True: - with semaphore: - self.run_plugin() - self.force = False - self.forced_result = self.get_last_result() - else: - # Plugin run in interval - if self.params['Interval']: - if datetime.datetime.now() >= self.next_run: - with semaphore: - self.run_plugin() - time.sleep(1) - - # Stop the thread + setproctitle.setproctitle('smokerd plugin %s' % self.name) + try: + while self.stopping is not True: + # Plugin run when forced + if self.forceFlag.is_set(): + with semaphore: + self.run_plugin(force=True) + self.forceFlag.clear() + else: + # Plugin run in interval + if self.params['Interval']: + if datetime.datetime.now() >= self.next_run: + with semaphore: + self.run_plugin() + time.sleep(1) + except KeyboardInterrupt: + pass + + # Stop the process lg.info("Shutting down plugin %s" % self.name) def schedule_run(self, time=None, now=False): @@ -528,9 +537,12 @@ def run_module(self, module, **kwargs): return result - def run_plugin(self): + def run_plugin(self, force=False): """ Run plugin, save result and schedule next run + + :param force: forced run + :type force: bool """ # External command will be executed if self.params['Command']: @@ -596,24 +608,21 @@ def run_plugin(self): # Add action result to plugin result result.set_action(action) - # Append to the results + result.set_forced(force) + # send to the daemon try: - self.result.append(result.get_result()) + self.queue.put(result.get_result()) except ValidationError as e: lg.error("Plugin %s: ValidationError: %s" % (self.name, e)) result = Result() result.set_status('ERROR') result.add_error('ValidationError: %s' % e) - self.result.append(result.get_result()) + result.set_forced(force) + self.queue.put(result.get_result()) # Log result lg.info("Plugin %s result: %s" % (self.name, result.get_result())) - # Remove earliest result to keep only number - # of results by parameter - if len(self.result) > self.params['History']: - self.result.pop(0) - # Finally schedule next run self.schedule_run() @@ -623,6 +632,13 @@ def get_last_result(self, dictionary=False): If dictionary=True, then return value will be always dict eg. for use like dict(self.params, **self.get_last_result(True)) """ + while not self.queue.empty(): + self.result.append(self.queue.get()) + # Remove earliest result to keep only number + # of results by parameter + if len(self.result) > self.params['History']: + self.result.pop(0) + try: res = self.result[-1] except IndexError: @@ -631,6 +647,9 @@ def get_last_result(self, dictionary=False): else: return None + if res and res['forced'] and not self.forceFlag.is_set(): + self.forced_result = res + return res def escape(self, tbe): @@ -686,7 +705,6 @@ class Result(object): """ Object that represents plugin result """ - result = {} validated = False def __init__(self): @@ -694,11 +712,12 @@ def __init__(self): Default result values """ self.result = { - 'status' : None, + 'status': None, 'messages': None, - 'lastRun' : datetime.datetime.now().isoformat(), - 'componentResults' : None, - 'action' : None, + 'lastRun': datetime.datetime.now().isoformat(), + 'componentResults': None, + 'action': None, + 'forced': False } def set_status(self, status=None): @@ -730,6 +749,9 @@ def _gen_component_status(self, default='OK'): status = 'ERROR' return status + def set_forced(self, forced=True): + self.result['forced'] = forced + def add_info(self, msg): """ Add info messge diff --git a/smoker/server/restserver.py b/smoker/server/restserver.py new file mode 100644 index 0000000..651c099 --- /dev/null +++ b/smoker/server/restserver.py @@ -0,0 +1,323 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2007-2015, GoodData(R) Corporation. All rights reserved +""" +Module providing base http server for smokerd REST API +""" + +from flask import Flask, request, make_response +from flask.ext.restful import Api, Resource, abort +import json +import multiprocessing +import setproctitle +import socket + +from smoker.server import exceptions + +# need to keep the daemon instance and common functions at module level since +# there's no other way how to pass the to Flask_restful class methods +smokerd = None + + +def next_run_iso_format(next_run): + """ + Convert next run timestamp object to the ISO format + """ + if isinstance(next_run, bool): + next_run = None + else: + next_run = next_run.isoformat() + + +def standardized_api_list(component): + """ + Convert result dict to list just to have standardized API + """ + keyword = 'componentResults' + + if not component: + return component + if not component[keyword]: + return component + + # Remove reference, because we don't want to modify + # Plugin object's structures + results = dict(component) + results[keyword] = [] + for key, value in component[keyword].iteritems(): + value['name'] = key + results[keyword].append({'componentResult': value}) + + return results + + +def print_plugin(name, forced=False): + """ + Print information about a plugin + + :param name: name of the plugin + :type name: string + :param forced: use forced_results instead of last_results + :type forced: bool + """ + + plugin = smokerd.pluginmgr.get_plugin(name) + + # Format plugin result + plugin_result = { + 'lastResult': standardized_api_list(plugin.get_last_result()), + 'links': { + 'self': '/plugins/%s' % name, + }, + 'name': name, + 'nextRun': next_run_iso_format(plugin.next_run), + 'parameters': plugin.params + } + + if forced: + if not plugin.forced_result: + raise exceptions.InProgress + plugin_result['forcedResult'] = plugin.forced_result + return {'plugin': plugin_result} + + +def print_plugins(plugins, forced=False): + """ + Print information about set of plugins + + :param plugins: list of plugin names + :type plugins: list of strings + :param forced: use forced_results instead of last_results + :type forced: bool + """ + plugins_result = [] + + for plugin in plugins: + plugins_result.append(print_plugin(plugin, forced)) + + return {'plugins': {'items': plugins_result}} + + +def get_plugin_history(name): + """ + Get history of results for single plugin + + :param name: name of the plugin + :type name: string + """ + plugin = smokerd.pluginmgr.get_plugin(name) + results = [] + + for res in plugin.result: + res = standardized_api_list(res) + results.append({'result': res}) + + return results + + +def print_in_progress(id): + """ + Format json info about process in progress + + :param id: process identifier + :type id: int + """ + location = '/processes/%d' % id + data = { + 'asyncTask': { + 'link': { + 'poll': location + } + } + } + + # need to create response manually in orted to have custom status code + response = make_response(json.dumps(data, indent=2)) + response.status = 'Accepted' + response.status_code = 202 + response.headers['Location'] = location + response.headers['content-type'] = 'application/json' + return response + + +class About(Resource): + """ + Print the basic usage + """ + def get(self): + return { + 'about': { + 'host': socket.gethostname(), + 'title': 'Smoker daemon API', + 'links': [ + { + 'rel': 'plugins', + 'href': '/plugins', + 'methods': 'GET', + 'title': 'Show details about all plugins' + }, + { + 'rel': 'processes', + 'href': '/processes', + 'methods': 'GET, POST', + 'title': 'Force plugin run' + } + ] + } + } + + +class Plugins(Resource): + def get(self): + """ + Print overview of all plugins + """ + return print_plugins(smokerd.pluginmgr.get_plugins().keys()) + + +class Plugin(Resource): + def get(self, name): + """ + Print a single plugin with history of results + + :param name: name of the plugin + :type name: string + """ + try: + plugin = print_plugin(name) + except exceptions.NoSuchPlugin as e: + abort(404, message=e.message) + history = get_plugin_history(name) + plugin['results'] = history + + return plugin + + +class Processes(Resource): + """ + Create or get process + """ + def get(self): + result = [] + processes = smokerd.pluginmgr.get_process_list() + # we index processes from 1, having dummy on position 0 + for id in range(1, len(processes)): + process = processes[id] + plugins = [] + for plugin in process['plugins']: + plugins.append(plugin.name) + + result.append( + { + 'href': 'processes/%d' % id, + 'plugins': plugins + } + ) + return result + + def post(self): + example = {} + example['example_input'] = { + 'process': { + 'plugins': '[STRING] | NULL', + 'filter': '{STRING : STRING} | NULL' + } + } + example['note'] = ( + 'filter is optional key : value pair of plugin parameters ' + 'to filter') + + definition = request.get_json(force=True) + + if (not definition or 'process' not in definition + or not definition['process']): + abort(400, **example) + + if 'plugins' in definition['process']: + plugins = definition['process']['plugins'] + else: + plugins = None + + if 'filter' in definition['process']: + filter = definition['process']['filter'] + else: + filter = None + + # If plugins and filter are empty, report bad request + if not plugins and not filter: + example['message'] = 'Plugin names or filter have to be set' + abort(400, **example) + + # Validate input + if plugins and not isinstance(plugins, list): + example['message'] = 'Element plugins have to be list' + abort(400, **example) + + if filter and not isinstance(filter, dict): + example['message'] = 'Element filter have to be dictionary' + abort(400, **example) + + try: + id = smokerd.pluginmgr.add_process(plugins, filter) + except Exception as e: + abort(500, message=e.message) + + return print_in_progress(id) + + +class Process(Resource): + def get(self, id): + """ + Get single process result + + :param id: process identifier + :type id: int + """ + try: + if id < 1: + raise IndexError + process = smokerd.pluginmgr.get_process(int(id)) + except IndexError: + abort(404, message='Process id %s not found' % id) + + plugins = [] + for plugin in process['plugins']: + plugins.append(plugin.name) + + try: + return print_plugins(plugins, forced=True) + except exceptions.InProgress: + return print_in_progress(id) + + +class RestServer(multiprocessing.Process): + def __init__(self, host, port, smoker_daemon): + """ + :param host: host to bind the server to" + :type host: string + :param port: port to bind the server to" + :type port: int + :param smoker_daemon: instance of the smoker daemon + :type smoker_daemon: smokerd.Smokerd + """ + self.host = host + self.port = port + self.app = Flask(__name__) + + self.api = Api(self.app) + self.api.add_resource(About, '/') + self.api.add_resource(Plugins, '/plugins', '/plugins/') + self.api.add_resource(Plugin, '/plugins/', + '/plugins//') + self.api.add_resource(Processes, '/processes', '/processes/') + self.api.add_resource(Process, '/processes/', + '/processes//') + + global smokerd + smokerd = smoker_daemon + + super(RestServer, self).__init__() + self.daemon = True + + def run(self): + setproctitle.setproctitle('smokerd rest api server') + self.app.run(self.host, self.port)