diff --git a/circus/arbiter.py b/circus/arbiter.py index 11a781742..dd90029a0 100644 --- a/circus/arbiter.py +++ b/circus/arbiter.py @@ -22,6 +22,9 @@ from circus.plugins import get_plugin_cmd from circus.sockets import CircusSocket, CircusSockets +if IS_WINDOWS: + import win32event + _ENV_EXCEPTIONS = ('__CF_USER_TEXT_ENCODING', 'PS1', 'COMP_WORDBREAKS', 'PROMPT_COMMAND') @@ -87,7 +90,7 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0, ssh_server=None, proc_name='circusd', pidfile=None, loglevel=None, logoutput=None, loggerconfig=None, fqdn_prefix=None, umask=None, endpoint_owner=None, - papa_endpoint=None): + papa_endpoint=None, py_exe=None): self.watchers = watchers self.endpoint = endpoint @@ -104,6 +107,7 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0, self.loggerconfig = loggerconfig self.umask = umask self.endpoint_owner = endpoint_owner + self.py_exe = py_exe or sys.executable self._running = False try: # getfqdn appears to fail in Python3.3 in the unittest @@ -156,8 +160,7 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0, self.stats_endpoint = stats_endpoint if self.statsd: - cmd = "%s -c 'from circus import stats; stats.main()'" % \ - sys.executable + cmd = '-c "from circus import stats; stats.main()"' cmd += ' --endpoint %s' % self.endpoint cmd += ' --pubsub %s' % self.pubsub_endpoint cmd += ' --statspoint %s' % self.stats_endpoint @@ -169,7 +172,8 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0, cmd += ' --log-level ' + self.loglevel if self.logoutput: cmd += ' --log-output ' + self.logoutput - stats_watcher = Watcher('circusd-stats', cmd, use_sockets=True, + stats_watcher = Watcher('circusd-stats', cmd=self.py_exe, + args=cmd, use_sockets=True, singleton=True, stdout_stream=self.stdout_stream, stderr_stream=self.stderr_stream, @@ -181,23 +185,28 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0, # adding the httpd if httpd: - # adding the socket - httpd_socket = CircusSocket(name='circushttpd', host=httpd_host, - port=httpd_port) - if sockets is None: - sockets = [httpd_socket] - else: - sockets.append(httpd_socket) - - cmd = ("%s -c 'from circusweb import circushttpd; " - "circushttpd.main()'") % sys.executable + cmd = '-c "from circusweb import circushttpd; circushttpd.main()"' cmd += ' --endpoint %s' % self.endpoint - cmd += ' --fd $(circus.sockets.circushttpd)' + if hasattr(socket, 'fromfd'): + use_sockets = True + # adding the socket + httpd_socket = CircusSocket(name='circushttpd', host=httpd_host, + port=httpd_port) + if sockets is None: + sockets = [httpd_socket] + else: + sockets.append(httpd_socket) + cmd += ' --fd $(circus.sockets.circushttpd)' + else: + use_sockets = False + cmd += ' --host %s' % (httpd_host) + cmd += ' --port %d' % (httpd_port) if ssh_server is not None: cmd += ' --ssh %s' % ssh_server # Adding the watcher - httpd_watcher = Watcher('circushttpd', cmd, use_sockets=True, + httpd_watcher = Watcher('circushttpd', cmd=self.py_exe, + args=cmd, use_sockets=use_sockets, singleton=True, stdout_stream=self.stdout_stream, stderr_stream=self.stderr_stream, @@ -213,12 +222,14 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0, if plugins is not None: for plugin in plugins: fqn = plugin['use'] - cmd = get_plugin_cmd(plugin, self.endpoint, - self.pubsub_endpoint, self.check_delay, - ssh_server, debug=self.debug, - loglevel=self.loglevel, - logoutput=self.logoutput) - plugin_cfg = dict(cmd=cmd, priority=1, singleton=True, + cmd, args = get_plugin_cmd(self.py_exe, plugin, self.endpoint, + self.pubsub_endpoint, + self.check_delay, + ssh_server, debug=self.debug, + loglevel=self.loglevel, + logoutput=self.logoutput) + plugin_cfg = dict(cmd=cmd, args=args, priority=1, + singleton=True, stdout_stream=self.stdout_stream, stderr_stream=self.stderr_stream, copy_env=True, copy_path=True, @@ -266,11 +277,12 @@ def get_plugin_config(self, config, name): for i in config.get('plugins', []): if i['name'] == name: cfg = i.copy() - cmd = get_plugin_cmd(cfg, self.endpoint, - self.pubsub_endpoint, self.check_delay, - self.ssh_server, debug=self.debug) + cmd, args = get_plugin_cmd(self.py_exe, cfg, self.endpoint, + self.pubsub_endpoint, + self.check_delay, + self.ssh_server, debug=self.debug) - cfg.update(dict(cmd=cmd, priority=1, singleton=True, + cfg.update(dict(cmd=cmd, args=args, priority=1, singleton=True, stdout_stream=self.stdout_stream, stderr_stream=self.stderr_stream, copy_env=True, copy_path=True)) @@ -460,7 +472,8 @@ def load_from_config(cls, config_file, loop=None): loggerconfig=cfg.get('loggerconfig', None), fqdn_prefix=cfg.get('fqdn_prefix', None), umask=cfg['umask'], - endpoint_owner=cfg.get('endpoint_owner', None)) + endpoint_owner=cfg.get('endpoint_owner', None), + py_exe=cfg.get('py_exe', sys.executable)) # store the cfg which will be used, so it can be used later # for checking if the cfg has been changed @@ -602,10 +615,16 @@ def _stop(self, for_shutdown=False): def reap_processes(self): # map watcher to pids watchers_pids = {} + watchers_handles = {} + handles = [] for watcher in self.iter_watchers(): if not watcher.is_stopped(): for process in watcher.processes.values(): - watchers_pids[process.pid] = watcher + if not IS_WINDOWS: + watchers_pids[process.pid] = watcher + else: + watchers_handles[process._worker._handle] = (watcher, process.pid) + handles.append(process._worker._handle) # detect dead children if not IS_WINDOWS: @@ -625,6 +644,17 @@ def reap_processes(self): return else: raise + else: + import win32event + while True: + rc = win32event.WaitForMultipleObjects(handles, False, 0) + if not(win32event.WAIT_OBJECT_0 <= rc < win32event.WAIT_OBJECT_0 + len(handles)): + break + handle = handles[rc - win32event.WAIT_OBJECT_0] + handles.remove(handle) + if handle in watchers_handles: + watcher, pid = watchers_handles[handle] + watcher.reap_process(pid, None) @synchronized("manage_watchers") @gen.coroutine diff --git a/circus/circusctl.py b/circus/circusctl.py index 83c80c84a..3617491f7 100644 --- a/circus/circusctl.py +++ b/circus/circusctl.py @@ -309,7 +309,10 @@ def start(self, globalopts): parser = globalopts['parser'] if hasattr(args, 'command'): - sys.exit(self.controller.run(globalopts['args'])) + try: + sys.exit(self.controller.run(globalopts['args'])) + except: + sys.exit(-1) if args.help: for command in sorted(self.commands.keys()): diff --git a/circus/circusd.py b/circus/circusd.py index c0c22be74..b367a876f 100644 --- a/circus/circusd.py +++ b/circus/circusd.py @@ -85,7 +85,7 @@ def main(): sys.exit(0) parser = argparse.ArgumentParser(description='Run some watchers.') - parser.add_argument('config', help='configuration file', nargs='?') + parser.add_argument('config', help='configuration file') # XXX we should be able to add all these options in the config file as well parser.add_argument('--log-level', dest='loglevel', @@ -114,10 +114,6 @@ def main(): print(__version__) sys.exit(0) - if args.config is None: - parser.print_usage() - sys.exit(0) - if args.daemonize: daemonize() diff --git a/circus/circusrv.py b/circus/circusrv.py new file mode 100644 index 000000000..6a75a58f1 --- /dev/null +++ b/circus/circusrv.py @@ -0,0 +1,115 @@ +import win32serviceutil +import win32security +import servicemanager +import os +import logging +import traceback + +from circus.arbiter import Arbiter +from circus.util import check_future_exception_and_log, LOG_LEVELS + + +def grant_service_login_right(account_name): + policy_handle = win32security.GetPolicyHandle('', win32security.POLICY_ALL_ACCESS) + try: + sid = win32security.LookupAccountName('', account_name)[0] + win32security.LsaAddAccountRights(policy_handle, sid, ['SeServiceLogonRight']) + finally: + win32security.LsaClose(policy_handle) + + +class ServiceManagerHandler(logging.Handler): + _map_ = { + logging.CRITICAL: servicemanager.EVENTLOG_ERROR_TYPE, + logging.ERROR: servicemanager.EVENTLOG_ERROR_TYPE, + logging.WARNING: servicemanager.EVENTLOG_WARNING_TYPE, + logging.INFO: servicemanager.EVENTLOG_INFORMATION_TYPE, + logging.DEBUG: servicemanager.EVENTLOG_INFORMATION_TYPE + } + + def emit(self, record): + level = self._map_.get(record.levelno) + details = "" + if record.exc_info is not None: + formated_exc = traceback.format_exception(*record.exc_info) + details = os.linesep.join(formated_exc) + servicemanager.LogMsg(level, 0xF000, (record.getMessage(), details)) + + +class CircusSrv(win32serviceutil.ServiceFramework): + _svc_name_ = 'circus' + _svc_display_name_ = 'Circus' + _svc_description_ = 'Run some watchers.' + + _parameter_config = 'Config' + _parameter_loglevel = 'LogLevel' + + def __init__(self, args): + self._svc_name_ = args[0] + super(CircusSrv, self).__init__(args) + + config = win32serviceutil.GetServiceCustomOption(self._svc_name_, self._parameter_config) + loglevel = logging.INFO + try: + lls = win32serviceutil.GetServiceCustomOption(self._svc_name_, self._parameter_loglevel) + if lls is not None: + loglevel = LOG_LEVELS.get(lls.lower(), logging.INFO) + except: + pass + + root_logger = logging.getLogger() + root_logger.setLevel(loglevel) + root_logger.handlers = [ServiceManagerHandler()] + + # From here it can also come from the arbiter configuration + # load the arbiter from config + self.arbiter = Arbiter.load_from_config(config) + + def SvcStop(self): + self.arbiter.loop.run_sync(self.arbiter._emergency_stop) + + def SvcDoRun(self): + arbiter = self.arbiter + try: + future = arbiter.start() + check_future_exception_and_log(future) + except Exception as e: + # emergency stop + arbiter.loop.run_sync(arbiter._emergency_stop) + raise (e) + except KeyboardInterrupt: + pass + + @classmethod + def OptionsHandler(cls, opts): + username = None + for opt, val in opts: + if opt == '--username': + username = val + if opt == '-c': + win32serviceutil.SetServiceCustomOption(cls._svc_name_, + cls._parameter_config, + val) + if opt == '-l': + win32serviceutil.SetServiceCustomOption(cls._svc_name_, + cls._parameter_loglevel, + val) + if username is not None: + # Take only user from domain\user + username = username.split('\\')[-1] + grant_service_login_right(username) + + # Register now the source (rights of service's user may be different) + servicemanager.SetEventSourceName(cls._svc_name_, True) + + +def main(): + kwargs = {} + kwargs['customInstallOptions'] = 'c:l:' + kwargs['customOptionHandler'] = CircusSrv.OptionsHandler + ret = win32serviceutil.HandleCommandLine(CircusSrv, **kwargs) + sys.exit(ret) + + +if __name__ == '__main__': + main() diff --git a/circus/config.py b/circus/config.py index aed1d78c8..2c1e2d7ac 100644 --- a/circus/config.py +++ b/circus/config.py @@ -1,5 +1,6 @@ import glob import os +import sys import signal import warnings from fnmatch import fnmatch @@ -183,7 +184,8 @@ def get_config(config_file): config['logoutput'] = dget('circus', 'logoutput') config['loggerconfig'] = dget('circus', 'loggerconfig', None) config['fqdn_prefix'] = dget('circus', 'fqdn_prefix', None, str) - config['papa_endpoint'] = dget('circus', 'fqdn_prefix', None, str) + config['papa_endpoint'] = dget('circus', 'papa_endpoint', None, str) + config['py_exe'] = dget('circus', 'py_exe', sys.executable, str) # Initialize watchers, plugins & sockets to manage watchers = [] diff --git a/circus/pipe.py b/circus/pipe.py new file mode 100644 index 000000000..c6bdd79d5 --- /dev/null +++ b/circus/pipe.py @@ -0,0 +1,31 @@ +import os +import io +import socket +from .winpopen import enable_overlapped, disable_overlapped + + +def make_pipe(): + if os.name != 'nt': + a, b = os.pipe() + a, b = io.open(a, 'rb', -1), io.open(b, 'wb', -1) + return a, b + else: + disable_overlapped() + try: + serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serv.bind(('127.0.0.1', 0)) + serv.listen(1) + + # need to save sockets in _rsock/_wsock so they don't get closed + _rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + _rsock.connect(('127.0.0.1', serv.getsockname()[1])) + + _wsock, addr = serv.accept() + serv.close() + _rsock_fd = _rsock.makefile('rb', 0) + _wsock_fd = _wsock.makefile('wb', 0) + _rsock.close() + _wsock.close() + return _rsock_fd, _wsock_fd + finally: + enable_overlapped() \ No newline at end of file diff --git a/circus/plugins/__init__.py b/circus/plugins/__init__.py index b6fb5cc60..af0bb6445 100644 --- a/circus/plugins/__init__.py +++ b/circus/plugins/__init__.py @@ -179,7 +179,7 @@ def _str2cfg(data): return cfg -def get_plugin_cmd(config, endpoint, pubsub, check_delay, ssh_server, +def get_plugin_cmd(py_exe, config, endpoint, pubsub, check_delay, ssh_server, debug=False, loglevel=None, logoutput=None): fqn = config['use'] # makes sure the name exists @@ -188,7 +188,7 @@ def get_plugin_cmd(config, endpoint, pubsub, check_delay, ssh_server, # we're good, serializing the config del config['use'] config = _cfg2str(config) - cmd = "%s -c 'from circus import plugins;plugins.main()'" % sys.executable + cmd = '-c "from circus import plugins;plugins.main()"' cmd += ' --endpoint %s' % endpoint cmd += ' --pubsub %s' % pubsub if ssh_server is not None: @@ -202,7 +202,7 @@ def get_plugin_cmd(config, endpoint, pubsub, check_delay, ssh_server, if logoutput: cmd += ' --log-output ' + logoutput cmd += ' %s' % fqn - return cmd + return (py_exe, cmd) def main(): diff --git a/circus/plugins/flapping.py b/circus/plugins/flapping.py index 5b7f90729..9fcd83ed5 100644 --- a/circus/plugins/flapping.py +++ b/circus/plugins/flapping.py @@ -41,6 +41,7 @@ def __init__(self, endpoint, pubsub_endpoint, check_delay, ssh_server, self.timers = {} self.configs = {} self.tries = {} + self.stops = {} # default options self.attempts = int(config.get('attempts', 2)) @@ -62,6 +63,13 @@ def handle_recv(self, data): self.check(watcher_name) elif action == "updated": self.update_conf(watcher_name) + elif action == "stop": + if watcher_name in self.stops: + if self.stops[watcher_name] == 0: + logger.info("%s: flapping stopped: external stop", watcher_name) + self.reset(watcher_name) + else: + self.stops[watcher_name] = self.stops[watcher_name] - 1 def update_conf(self, watcher_name): msg = self.call("options", name=watcher_name) @@ -84,7 +92,8 @@ def update_conf(self, watcher_name): def reset(self, watcher_name): self.timelines[watcher_name] = [] self.tries[watcher_name] = 0 - if watcher_name is self.timers: + self.stops[watcher_name] = 0 + if watcher_name in self.timers: timer = self.timers.pop(watcher_name) timer.cancel() @@ -116,23 +125,30 @@ def check(self, watcher_name): "(attempt number %s)", watcher_name, self._get_conf(conf, 'retry_in'), next_tries) - self.cast("stop", name=watcher_name) - self.timelines[watcher_name] = [] - self.tries[watcher_name] = next_tries - def _start(): self.cast("start", name=watcher_name) + # Remove previous timer + if watcher_name in self.timers: + timer = self.timers.pop(watcher_name) + timer.cancel() timer = Timer(self._get_conf(conf, 'retry_in'), _start) timer.start() self.timers[watcher_name] = timer + + self.timelines[watcher_name] = [] + self.tries[watcher_name] = next_tries + self.stops[watcher_name] = 1 if watcher_name not in self.stops else self.stops[watcher_name] + 1 + self.cast("stop", name=watcher_name) else: logger.info( "%s: flapping detected: reached max retry limit", watcher_name) self.timelines[watcher_name] = [] self.tries[watcher_name] = 0 + self.stops[watcher_name] = 1 if watcher_name not in self.stops else self.stops[watcher_name] + 1 self.cast("stop", name=watcher_name) else: self.timelines[watcher_name] = [] self.tries[watcher_name] = 0 + self.stops[watcher_name] = 0 diff --git a/circus/process.py b/circus/process.py index 0b995511d..aecad3d5d 100644 --- a/circus/process.py +++ b/circus/process.py @@ -11,7 +11,8 @@ import sys import errno import os -from subprocess import PIPE +import subprocess +import signal import time import shlex import warnings @@ -29,6 +30,7 @@ ObjectDict, replace_gnu_args, get_default_gid, get_username_from_uid, IS_WINDOWS) from circus import logger +from circus.pipe import make_pipe _INFOLINE = ("%(pid)s %(cmdline)s %(username)s %(nice)s %(mem_info1)s " @@ -210,9 +212,7 @@ def __init__(self, name, wid, cmd, args=None, working_dir=None, self.gid = get_default_gid(self.uid) if IS_WINDOWS: - if not self.use_fds and (self.pipe_stderr or self.pipe_stdout): - raise ValueError("On Windows, you can't close the fds if " - "you are redirecting stdout or stderr") + self.use_fds = True if spawn: self.spawn() @@ -345,23 +345,36 @@ def preexec(): if stdin_socket_fd is not None: os.dup2(stdin_socket_fd, 0) + extra = {} if IS_WINDOWS: # On Windows we can't use a pre-exec function preexec_fn = None + extra['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP else: preexec_fn = preexec - extra = {} + stdout = None + stderr = None if self.pipe_stdout: - extra['stdout'] = PIPE + stdout, extra['stdout'] = make_pipe() if self.pipe_stderr: - extra['stderr'] = PIPE + stderr, extra['stderr'] = make_pipe() + + try: + self._worker = Popen(args, cwd=self.working_dir, + shell=self.shell, preexec_fn=preexec_fn, + env=self.env, close_fds=not self.use_fds, + executable=self.executable, **extra) + finally: + if 'stdout' in extra: + extra['stdout'].close() + + if 'stderr' in extra: + extra['stderr'].close() - self._worker = Popen(args, cwd=self.working_dir, - shell=self.shell, preexec_fn=preexec_fn, - env=self.env, close_fds=not self.use_fds, - executable=self.executable, **extra) + self._worker.stderr = stderr + self._worker.stdout = stdout # let go of sockets created only for self._worker to inherit self._sockets = [] @@ -403,7 +416,7 @@ def format_args(self, sockets_fds=None): if self.args is not None: if isinstance(self.args, string_types): args = shlex.split(bytestring(replace_gnu_args( - self.args, **format_kwargs))) + self.args, **format_kwargs)), posix=not IS_WINDOWS) else: args = [bytestring(replace_gnu_args(arg, **format_kwargs)) for arg in self.args] @@ -411,6 +424,16 @@ def format_args(self, sockets_fds=None): else: args = shlex.split(bytestring(cmd), posix=not IS_WINDOWS) + def unquote(cmd): + if cmd.startswith('"') and cmd.endswith('"'): + return cmd[1:-1] + elif cmd.startswith("'") and cmd.endswith("'"): + return cmd[1:-1] + else: + return cmd + + args = [unquote(cmd) for cmd in args] + if self.shell: # subprocess.Popen(shell=True) implies that 1st arg is the # requested command, remaining args are applied to sh. @@ -447,7 +470,10 @@ def is_alive(self): def send_signal(self, sig): """Sends a signal **sig** to the process.""" logger.debug("sending signal %s to %s" % (sig, self.pid)) - return self._worker.send_signal(sig) + if os.name == 'nt': + return os.kill(self._worker.pid, sig) + else: + return self._worker.send_signal(sig) @debuglog def stop(self): @@ -597,3 +623,4 @@ def __lt__(self, other): def __gt__(self, other): return self.started > other.started + diff --git a/circus/sighandler.py b/circus/sighandler.py index 182ba4503..384992b3c 100644 --- a/circus/sighandler.py +++ b/circus/sighandler.py @@ -25,7 +25,10 @@ def __init__(self, controller): # init signals logger.info('Registering signals...') self._old = {} - self._register() + try: + self._register() + except ValueError as e: + logger.warning("Can't register signals: %s" % e) def stop(self): for sig, callback in self._old.items(): diff --git a/circus/stats/collector.py b/circus/stats/collector.py index ee3434d4b..f2c1653e2 100644 --- a/circus/stats/collector.py +++ b/circus/stats/collector.py @@ -122,6 +122,10 @@ def stop(self): BaseStatsCollector.stop(self) def _select(self): + if len(self.sockets) == 0: + return + + rlist = [] try: rlist, wlist, xlist = select.select(self.sockets, [], [], .01) except socket.error as err: diff --git a/circus/stream/file_stream.py b/circus/stream/file_stream.py index 22fb6c74d..eab61ca87 100644 --- a/circus/stream/file_stream.py +++ b/circus/stream/file_stream.py @@ -38,6 +38,8 @@ def close(self): def write_data(self, data): # data to write on file file_data = s(data['data']) + if os.linesep != '\n': + file_data = file_data.replace(os.linesep, '\n') # If we want to prefix the stream with the current datetime if self._time_format is not None: diff --git a/circus/stream/redirector.py b/circus/stream/redirector.py index b99d013ae..6cbc64848 100644 --- a/circus/stream/redirector.py +++ b/circus/stream/redirector.py @@ -20,13 +20,18 @@ def __call__(self, fd, events): self.redirector.remove_fd(fd) return try: - data = os.read(fd, self.redirector.buffer) + if hasattr(self.pipe, 'read1'): + data = self.pipe.read1(self.redirector.buffer) + else: + data = self.pipe.read(self.redirector.buffer) if len(data) == 0: self.redirector.remove_fd(fd) else: datamap = {'data': data, 'pid': self.process.pid, 'name': self.name} self.redirector.redirect[self.name](datamap) + except ConnectionResetError: + self.redirector.remove_fd(fd) except IOError as ex: if ex.args[0] != errno.EAGAIN: raise diff --git a/circus/tests/support.py b/circus/tests/support.py index 38964ccdc..6d16cf075 100644 --- a/circus/tests/support.py +++ b/circus/tests/support.py @@ -55,12 +55,15 @@ def __init__(self, name): # Script used to sleep for a specified amount of seconds. # Should be used instead of the 'sleep' command for # compatibility -SLEEP = PYTHON + " -c 'import time;time.sleep(%d)'" +SLEEP = '"' + PYTHON + '" -c "import time;time.sleep(%d)"' def get_ioloop(): - from zmq.eventloop.ioloop import ZMQPoller - from zmq.eventloop.ioloop import ZMQError, ETERM + try: + from zmq.eventloop._deprecated import ZMQPoller, ZMQError, ETERM + except ImportError: + from zmq.eventloop.ioloop import ZMQPoller + from zmq.eventloop.ioloop import ZMQError, ETERM from tornado.ioloop import PollIOLoop class DebugPoller(ZMQPoller): @@ -272,7 +275,8 @@ def _create_circus(cls, callable_path, plugins=None, stats=False, os.path.realpath(__file__)))) args = ['circus/tests/generic.py', callable_path, testfile] worker = {'cmd': PYTHON, 'args': args, 'working_dir': wdir, - 'name': 'test', 'graceful_timeout': 2} + 'name': 'test', 'graceful_timeout': 2, + 'stop_signal': signal.SIGINT if not IS_WINDOWS else signal.CTRL_BREAK_EVENT} worker.update(kw) if not arbiter_kw: arbiter_kw = {} @@ -443,7 +447,7 @@ def poll_for(filename, needles, timeout=5): @tornado.gen.coroutine -def async_poll_for(filename, needles, timeout=5): +def async_poll_for(filename, needles, timeout=5 if not IS_WINDOWS else 20): """Async version of poll_for """ if isinstance(needles, str): diff --git a/circus/tests/test_arbiter.py b/circus/tests/test_arbiter.py index 3e890648b..8930bd0ec 100644 --- a/circus/tests/test_arbiter.py +++ b/circus/tests/test_arbiter.py @@ -18,12 +18,16 @@ EasyTestSuite, skipIf, get_ioloop, SLEEP, PYTHON) from circus.util import (DEFAULT_ENDPOINT_DEALER, DEFAULT_ENDPOINT_MULTICAST, - DEFAULT_ENDPOINT_SUB) + DEFAULT_ENDPOINT_SUB, IS_WINDOWS) from circus.tests.support import (MockWatcher, has_circusweb, poll_for_callable, get_available_port) from circus import watcher as watcher_mod from circus.py3compat import s +if IS_WINDOWS: + import win32api + import win32con + import pywintypes _GENERIC = os.path.join(os.path.dirname(__file__), 'generic.py') @@ -283,8 +287,20 @@ def test_rm_watcher_nostop(self): pid = pids[0] os.kill(pid, 0) os.kill(pid, signal.SIGTERM) - os.waitpid(pid, 0) - except OSError: + if not IS_WINDOWS: + os.waitpid(pid, 0) + else: + handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid) + if handle: + try: + os.waitpid(handle, 0) + finally: + try: + win32api.CloseHandle(handle) + except pywintypes.error: + pass + + except OSError as e: self.assertFalse(True, "process was incorrectly killed") yield self.stop_arbiter() diff --git a/circus/tests/test_circusctl.py b/circus/tests/test_circusctl.py index 66691af6d..5de9586d3 100644 --- a/circus/tests/test_circusctl.py +++ b/circus/tests/test_circusctl.py @@ -9,7 +9,7 @@ from circus.circusctl import USAGE, VERSION, CircusCtl from circus.tests.support import (TestCircus, async_poll_for, EasyTestSuite, skipIf, DEBUG, PYTHON, SLEEP) -from circus.util import tornado_sleep, DEFAULT_ENDPOINT_DEALER +from circus.util import tornado_sleep, DEFAULT_ENDPOINT_DEALER, IS_WINDOWS from circus.py3compat import b, s @@ -18,6 +18,8 @@ def run_ctl(args, queue=None, stdin='', endpoint=DEFAULT_ENDPOINT_DEALER): if '--endpoint' not in args: args = '--endpoint %s ' % endpoint + args + if IS_WINDOWS: + args = args.replace('\\', '\\\\') proc = subprocess.Popen(cmd.split() + shlex.split(args), stdin=subprocess.PIPE if stdin else None, stdout=subprocess.PIPE, diff --git a/circus/tests/test_circusd.py b/circus/tests/test_circusd.py index 9f7cee7bb..5f81d0ff3 100644 --- a/circus/tests/test_circusd.py +++ b/circus/tests/test_circusd.py @@ -84,7 +84,7 @@ def test_daemon(self): # daemonize() to work self.assertRaises(ValueError, daemonize) - for module in sys.modules.keys(): + for module in list(sys.modules): if module.startswith('gevent'): del sys.modules[module] diff --git a/circus/tests/test_client.py b/circus/tests/test_client.py index 9a56a11d0..76f3f37ae 100644 --- a/circus/tests/test_client.py +++ b/circus/tests/test_client.py @@ -78,7 +78,8 @@ def test_client(self): yield self.stop_arbiter() -_, tmp_filename = tempfile.mkstemp(prefix='test_hook') +f, tmp_filename = tempfile.mkstemp(prefix='test_hook') +os.close(f) def long_hook(*args, **kw): @@ -120,7 +121,10 @@ def test_message_id(self): self.assertFalse(os.path.exists(tmp_filename)) finally: if os.path.exists(tmp_filename): - os.unlink(tmp_filename) + try: + os.unlink(tmp_filename) + except OSError: + pass # May fail on Windows arbiter.stop() diff --git a/circus/tests/test_command_signal.py b/circus/tests/test_command_signal.py index 79b3db829..b3173e8a0 100644 --- a/circus/tests/test_command_signal.py +++ b/circus/tests/test_command_signal.py @@ -99,7 +99,7 @@ def test_handler(self): stderr_stream = {'stream': stream} yield self.start_arbiter(cmd=cmd, stdout_stream=stdout_stream, stderr_stream=stderr_stream, stats=True, - stop_signal=signal.SIGINT, + stop_signal=signal.SIGINT if not IS_WINDOWS else signal.CTRL_BREAK_EVENT, debug=False) # waiting for data to appear in the queue @@ -194,7 +194,7 @@ def test_handler(self): stderr_stream = {'stream': stream} yield self.start_arbiter(cmd=cmd, stdout_stream=stdout_stream, stderr_stream=stderr_stream, stats=True, - stop_signal=signal.SIGINT, + stop_signal=signal.SIGINT if not IS_WINDOWS else signal.CTRL_BREAK_EVENT, debug=False) def assert_read(channel, *values): diff --git a/circus/tests/test_pidfile.py b/circus/tests/test_pidfile.py index 920b0725b..56c59bc6e 100644 --- a/circus/tests/test_pidfile.py +++ b/circus/tests/test_pidfile.py @@ -5,6 +5,7 @@ from circus.pidfile import Pidfile from circus.tests.support import TestCase, EasyTestSuite, SLEEP +from circus.util import IS_WINDOWS class TestPidfile(TestCase): @@ -19,7 +20,8 @@ def test_pidfile(self): pidfile.create(proc.pid) mode = os.stat(path).st_mode - self.assertEqual(stat.S_IMODE(mode), pidfile.perm_mode, path) + if not IS_WINDOWS: + self.assertEqual(stat.S_IMODE(mode), pidfile.perm_mode, path) pidfile.unlink() self.assertFalse(os.path.exists(path)) pidfile.create(proc.pid) @@ -27,7 +29,8 @@ def test_pidfile(self): self.assertTrue(os.path.exists(rf)) self.assertFalse(os.path.exists(path)) mode = os.stat(rf).st_mode - self.assertEqual(stat.S_IMODE(mode), pidfile.perm_mode, rf) + if not IS_WINDOWS: + self.assertEqual(stat.S_IMODE(mode), pidfile.perm_mode, rf) finally: os.remove(rf) diff --git a/circus/tests/test_sockets.py b/circus/tests/test_sockets.py index e5df45700..47eaa277f 100644 --- a/circus/tests/test_sockets.py +++ b/circus/tests/test_sockets.py @@ -6,7 +6,6 @@ except ImportError: pass import mock -import fcntl from circus.tests.support import TestCase, skipIf, EasyTestSuite, IS_WINDOWS from circus.sockets import CircusSocket, CircusSockets @@ -22,6 +21,7 @@ def so_bindtodevice_supported(): def is_nonblock(fd): + import fcntl fl = fcntl.fcntl(fd, fcntl.F_GETFL) nonblock = fl & os.O_NONBLOCK return nonblock != 0 diff --git a/circus/util.py b/circus/util.py index bf67d3e9b..6e4dae36a 100644 --- a/circus/util.py +++ b/circus/util.py @@ -7,7 +7,6 @@ import socket import sys import time -import traceback import json import struct try: @@ -327,7 +326,7 @@ def to_signum(signum): m = re.match(r'(\w+)(\+(\d+))?', signum) if m: name = m.group(1).upper() - if not name.startswith('SIG'): + if not name.startswith('SIG') and not name.startswith('CTRL'): name = 'SIG' + name offset = int(m.group(3)) if m.group(3) else 0 @@ -1084,10 +1083,9 @@ def exception(self, timeout=None): def check_future_exception_and_log(future): if isinstance(future, concurrent.Future): - exception = future.exception() - if exception is not None: - logger.error("exception %s caught" % exception) - if hasattr(future, "exc_info"): - exc_info = future.exc_info() - traceback.print_tb(exc_info[2]) - return exception + try: + future.result() + return None + except Exception as e: + logger.exception("exception %s caught" % e) + return e \ No newline at end of file diff --git a/circus/watcher.py b/circus/watcher.py index 7a810f308..2dba94133 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -255,10 +255,6 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., " watcher" % self.numprocesses) if IS_WINDOWS: - if self.stdout_stream or self.stderr_stream: - raise NotImplementedError("Streams are not supported" - " on Windows.") - if not copy_env and not env: # Copy the env by default on Windows as we can't run any # executable without some env variables @@ -686,7 +682,7 @@ def spawn_process(self, recovery_wid=None): # catch ValueError as well, as a misconfigured rlimit setting could # lead to bad infinite retries here except (OSError, ValueError) as e: - logger.warning('error in %r: %s', self.name, str(e)) + logger.exception('error in %r: %s', self.name, str(e)) if process is None: nb_tries += 1 @@ -855,6 +851,8 @@ def stop(self): @gen.coroutine def _stop(self, close_output_streams=False, for_shutdown=False): if self.is_stopped(): + if self.evpub_socket is not None: + self.notify_event("stop", {"time": time.time(), "already": True}) return self._status = "stopping" skip = for_shutdown and self.use_papa diff --git a/circus/winpopen.py b/circus/winpopen.py new file mode 100644 index 000000000..5da829e43 --- /dev/null +++ b/circus/winpopen.py @@ -0,0 +1,212 @@ +from __future__ import print_function, division, absolute_import + +import os + +""" +Fix windows Popen for supporting socket as pipe +""" + +if os.name == "nt": + import subprocess + import socket + import sys + import msvcrt + if sys.version_info < (3, 0): + import _subprocess + else: + import _winapi + + SO_OPENTYPE = 0x7008 + SO_SYNCHRONOUS_ALERT = 0x10 + SO_SYNCHRONOUS_NONALERT = 0x20 + + PIPE = subprocess.PIPE + STDOUT = subprocess.STDOUT + if sys.version_info >= (3, 0): + DEVNULL = subprocess.DEVNULL + + class WindowsPopen(subprocess.Popen): + def __init__(self, *args, **kwargs): + super(WindowsPopen, self).__init__(*args, **kwargs) + + if sys.version_info < (3, 0): + def _get_handles(self, stdin, stdout, stderr): + """Construct and return tuple with IO objects: + p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite + """ + to_close = set() + if stdin is None and stdout is None and stderr is None: + return (None, None, None, None, None, None), to_close + + p2cread, p2cwrite = None, None + c2pread, c2pwrite = None, None + errread, errwrite = None, None + + if stdin is None: + p2cread = _subprocess.GetStdHandle(_subprocess.STD_INPUT_HANDLE) + if p2cread is None: + p2cread, _ = _subprocess.CreatePipe(None, 0) + elif stdin == PIPE: + p2cread, p2cwrite = _subprocess.CreatePipe(None, 0) + elif isinstance(stdin, int): + p2cread = msvcrt.get_osfhandle(stdin) + else: + # Assuming file-like object + if not hasattr(stdin, '_sock'): + p2cread = msvcrt.get_osfhandle(stdin.fileno()) + else: + p2cread = stdin.fileno() + p2cread = self._make_inheritable(p2cread) + # We just duplicated the handle, it has to be closed at the end + to_close.add(p2cread) + if stdin == PIPE: + to_close.add(p2cwrite) + + if stdout is None: + c2pwrite = _subprocess.GetStdHandle(_subprocess.STD_OUTPUT_HANDLE) + if c2pwrite is None: + _, c2pwrite = _subprocess.CreatePipe(None, 0) + elif stdout == PIPE: + c2pread, c2pwrite = _subprocess.CreatePipe(None, 0) + elif isinstance(stdout, int): + c2pwrite = msvcrt.get_osfhandle(stdout) + else: + # Assuming file-like object + if not hasattr(stdout, '_sock'): + c2pwrite = msvcrt.get_osfhandle(stdout.fileno()) + else: + c2pwrite = stdout.fileno() + c2pwrite = self._make_inheritable(c2pwrite) + # We just duplicated the handle, it has to be closed at the end + to_close.add(c2pwrite) + if stdout == PIPE: + to_close.add(c2pread) + + if stderr is None: + errwrite = _subprocess.GetStdHandle(_subprocess.STD_ERROR_HANDLE) + if errwrite is None: + _, errwrite = _subprocess.CreatePipe(None, 0) + elif stderr == PIPE: + errread, errwrite = _subprocess.CreatePipe(None, 0) + elif stderr == STDOUT: + errwrite = c2pwrite + elif isinstance(stderr, int): + errwrite = msvcrt.get_osfhandle(stderr) + else: + # Assuming file-like object + if not hasattr(stderr, '_sock'): + errwrite = msvcrt.get_osfhandle(stderr.fileno()) + else: + errwrite = stderr.fileno() + errwrite = self._make_inheritable(errwrite) + # We just duplicated the handle, it has to be closed at the end + to_close.add(errwrite) + if stderr == PIPE: + to_close.add(errread) + + return (p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite), to_close + else: + def _get_handles(self, stdin, stdout, stderr): + """Construct and return tuple with IO objects: + p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite + """ + if stdin is None and stdout is None and stderr is None: + return (-1, -1, -1, -1, -1, -1) + + p2cread, p2cwrite = -1, -1 + c2pread, c2pwrite = -1, -1 + errread, errwrite = -1, -1 + + if stdin is None: + p2cread = _winapi.GetStdHandle(_winapi.STD_INPUT_HANDLE) + if p2cread is None: + p2cread, _ = _winapi.CreatePipe(None, 0) + p2cread = subprocess.Handle(p2cread) + _winapi.CloseHandle(_) + elif stdin == PIPE: + p2cread, p2cwrite = _winapi.CreatePipe(None, 0) + p2cread, p2cwrite = subprocess.Handle(p2cread), subprocess.Handle(p2cwrite) + elif stdin == DEVNULL: + p2cread = msvcrt.get_osfhandle(self._get_devnull()) + elif isinstance(stdin, int): + p2cread = msvcrt.get_osfhandle(stdin) + else: + # Assuming file-like object + if not hasattr(stdin, '_sock'): + p2cread = msvcrt.get_osfhandle(stdin.fileno()) + else: + p2cread = stdin.fileno() + p2cread = self._make_inheritable(p2cread) + + if stdout is None: + c2pwrite = _winapi.GetStdHandle(_winapi.STD_OUTPUT_HANDLE) + if c2pwrite is None: + _, c2pwrite = _winapi.CreatePipe(None, 0) + c2pwrite = subprocess.Handle(c2pwrite) + _winapi.CloseHandle(_) + elif stdout == PIPE: + c2pread, c2pwrite = _winapi.CreatePipe(None, 0) + c2pread, c2pwrite = subprocess.Handle(c2pread), subprocess.Handle(c2pwrite) + elif stdout == DEVNULL: + c2pwrite = msvcrt.get_osfhandle(self._get_devnull()) + elif isinstance(stdout, int): + c2pwrite = msvcrt.get_osfhandle(stdout) + else: + # Assuming file-like object + if not hasattr(stdout, '_sock'): + c2pwrite = msvcrt.get_osfhandle(stdout.fileno()) + else: + c2pwrite = stdout.fileno() + c2pwrite = self._make_inheritable(c2pwrite) + + if stderr is None: + errwrite = _winapi.GetStdHandle(_winapi.STD_ERROR_HANDLE) + if errwrite is None: + _, errwrite = _winapi.CreatePipe(None, 0) + errwrite = subprocess.Handle(errwrite) + _winapi.CloseHandle(_) + elif stderr == PIPE: + errread, errwrite = _winapi.CreatePipe(None, 0) + errread, errwrite = subprocess.Handle(errread), subprocess.Handle(errwrite) + elif stderr == STDOUT: + errwrite = c2pwrite + elif stderr == DEVNULL: + errwrite = msvcrt.get_osfhandle(self._get_devnull()) + elif isinstance(stderr, int): + errwrite = msvcrt.get_osfhandle(stderr) + else: + # Assuming file-like object + if not hasattr(stderr, '_sock'): + errwrite = msvcrt.get_osfhandle(stderr.fileno()) + else: + errwrite = stderr.fileno() + errwrite = self._make_inheritable(errwrite) + + return (p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite) + + subprocess.Popen_old = subprocess.Popen + subprocess.Popen = WindowsPopen + + def disable_overlapped(): + # Enable socket to be non overlapped + try: + dummy = socket.socket(0xDEAD, socket.SOCK_STREAM) # After that python will not force WSA_FLAG_OVERLAPPED + except: + pass + dummy = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + dummy.setsockopt(socket.SOL_SOCKET, SO_OPENTYPE, SO_SYNCHRONOUS_NONALERT) + + def enable_overlapped(): + dummy = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + dummy.setsockopt(socket.SOL_SOCKET, SO_OPENTYPE, SO_SYNCHRONOUS_ALERT) + +else: + def disable_overlapped(): + pass + + def enable_overlapped(): + pass \ No newline at end of file diff --git a/setup.py b/setup.py index 6a814f1ed..ac9816865 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ import sys +import os from setuptools import setup, find_packages from circus import __version__ @@ -13,6 +14,9 @@ except ImportError: install_requires.append('argparse') +if os.name == 'nt': + install_requires.append('pypiwin32') + with open("README.rst") as f: README = f.read() @@ -40,6 +44,7 @@ entry_points=""" [console_scripts] circusd = circus.circusd:main + circusrv = circus.circusrv:main circusd-stats = circus.stats:main circusctl = circus.circusctl:main circus-top = circus.stats.client:main