Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Windows support #1065

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
86 changes: 58 additions & 28 deletions circus/arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from circus.plugins import get_plugin_cmd
from circus.sockets import CircusSocket, CircusSockets

if IS_WINDOWS:
import win32event


_ENV_EXCEPTIONS = ('__CF_USER_TEXT_ENCODING', 'PS1', 'COMP_WORDBREAKS',
'PROMPT_COMMAND')
Expand Down Expand Up @@ -87,7 +90,7 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0,
ssh_server=None, proc_name='circusd', pidfile=None,
loglevel=None, logoutput=None, loggerconfig=None,
fqdn_prefix=None, umask=None, endpoint_owner=None,
papa_endpoint=None):
papa_endpoint=None, py_exe=None):

self.watchers = watchers
self.endpoint = endpoint
Expand All @@ -104,6 +107,7 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0,
self.loggerconfig = loggerconfig
self.umask = umask
self.endpoint_owner = endpoint_owner
self.py_exe = py_exe or sys.executable
self._running = False
try:
# getfqdn appears to fail in Python3.3 in the unittest
Expand Down Expand Up @@ -156,8 +160,7 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0,
self.stats_endpoint = stats_endpoint

if self.statsd:
cmd = "%s -c 'from circus import stats; stats.main()'" % \
sys.executable
cmd = '-c "from circus import stats; stats.main()"'
cmd += ' --endpoint %s' % self.endpoint
cmd += ' --pubsub %s' % self.pubsub_endpoint
cmd += ' --statspoint %s' % self.stats_endpoint
Expand All @@ -169,7 +172,8 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0,
cmd += ' --log-level ' + self.loglevel
if self.logoutput:
cmd += ' --log-output ' + self.logoutput
stats_watcher = Watcher('circusd-stats', cmd, use_sockets=True,
stats_watcher = Watcher('circusd-stats', cmd=self.py_exe,
args=cmd, use_sockets=True,
singleton=True,
stdout_stream=self.stdout_stream,
stderr_stream=self.stderr_stream,
Expand All @@ -181,23 +185,28 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0,

# adding the httpd
if httpd:
# adding the socket
httpd_socket = CircusSocket(name='circushttpd', host=httpd_host,
port=httpd_port)
if sockets is None:
sockets = [httpd_socket]
else:
sockets.append(httpd_socket)

cmd = ("%s -c 'from circusweb import circushttpd; "
"circushttpd.main()'") % sys.executable
cmd = '-c "from circusweb import circushttpd; circushttpd.main()"'
cmd += ' --endpoint %s' % self.endpoint
cmd += ' --fd $(circus.sockets.circushttpd)'
if hasattr(socket, 'fromfd'):
use_sockets = True
# adding the socket
httpd_socket = CircusSocket(name='circushttpd', host=httpd_host,
port=httpd_port)
if sockets is None:
sockets = [httpd_socket]
else:
sockets.append(httpd_socket)
cmd += ' --fd $(circus.sockets.circushttpd)'
else:
use_sockets = False
cmd += ' --host %s' % (httpd_host)
cmd += ' --port %d' % (httpd_port)
if ssh_server is not None:
cmd += ' --ssh %s' % ssh_server

# Adding the watcher
httpd_watcher = Watcher('circushttpd', cmd, use_sockets=True,
httpd_watcher = Watcher('circushttpd', cmd=self.py_exe,
args=cmd, use_sockets=use_sockets,
singleton=True,
stdout_stream=self.stdout_stream,
stderr_stream=self.stderr_stream,
Expand All @@ -213,12 +222,14 @@ def __init__(self, watchers, endpoint, pubsub_endpoint, check_delay=1.0,
if plugins is not None:
for plugin in plugins:
fqn = plugin['use']
cmd = get_plugin_cmd(plugin, self.endpoint,
self.pubsub_endpoint, self.check_delay,
ssh_server, debug=self.debug,
loglevel=self.loglevel,
logoutput=self.logoutput)
plugin_cfg = dict(cmd=cmd, priority=1, singleton=True,
cmd, args = get_plugin_cmd(self.py_exe, plugin, self.endpoint,
self.pubsub_endpoint,
self.check_delay,
ssh_server, debug=self.debug,
loglevel=self.loglevel,
logoutput=self.logoutput)
plugin_cfg = dict(cmd=cmd, args=args, priority=1,
singleton=True,
stdout_stream=self.stdout_stream,
stderr_stream=self.stderr_stream,
copy_env=True, copy_path=True,
Expand Down Expand Up @@ -266,11 +277,12 @@ def get_plugin_config(self, config, name):
for i in config.get('plugins', []):
if i['name'] == name:
cfg = i.copy()
cmd = get_plugin_cmd(cfg, self.endpoint,
self.pubsub_endpoint, self.check_delay,
self.ssh_server, debug=self.debug)
cmd, args = get_plugin_cmd(self.py_exe, cfg, self.endpoint,
self.pubsub_endpoint,
self.check_delay,
self.ssh_server, debug=self.debug)

cfg.update(dict(cmd=cmd, priority=1, singleton=True,
cfg.update(dict(cmd=cmd, args=args, priority=1, singleton=True,
stdout_stream=self.stdout_stream,
stderr_stream=self.stderr_stream,
copy_env=True, copy_path=True))
Expand Down Expand Up @@ -460,7 +472,8 @@ def load_from_config(cls, config_file, loop=None):
loggerconfig=cfg.get('loggerconfig', None),
fqdn_prefix=cfg.get('fqdn_prefix', None),
umask=cfg['umask'],
endpoint_owner=cfg.get('endpoint_owner', None))
endpoint_owner=cfg.get('endpoint_owner', None),
py_exe=cfg.get('py_exe', sys.executable))

# store the cfg which will be used, so it can be used later
# for checking if the cfg has been changed
Expand Down Expand Up @@ -602,10 +615,16 @@ def _stop(self, for_shutdown=False):
def reap_processes(self):
# map watcher to pids
watchers_pids = {}
watchers_handles = {}
handles = []
for watcher in self.iter_watchers():
if not watcher.is_stopped():
for process in watcher.processes.values():
watchers_pids[process.pid] = watcher
if not IS_WINDOWS:
watchers_pids[process.pid] = watcher
else:
watchers_handles[process._worker._handle] = (watcher, process.pid)
handles.append(process._worker._handle)

# detect dead children
if not IS_WINDOWS:
Expand All @@ -625,6 +644,17 @@ def reap_processes(self):
return
else:
raise
else:
import win32event
while True:
rc = win32event.WaitForMultipleObjects(handles, False, 0)
if not(win32event.WAIT_OBJECT_0 <= rc < win32event.WAIT_OBJECT_0 + len(handles)):
break
handle = handles[rc - win32event.WAIT_OBJECT_0]
handles.remove(handle)
if handle in watchers_handles:
watcher, pid = watchers_handles[handle]
watcher.reap_process(pid, None)

@synchronized("manage_watchers")
@gen.coroutine
Expand Down
5 changes: 4 additions & 1 deletion circus/circusctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,10 @@ def start(self, globalopts):
parser = globalopts['parser']

if hasattr(args, 'command'):
sys.exit(self.controller.run(globalopts['args']))
try:
sys.exit(self.controller.run(globalopts['args']))
except:
sys.exit(-1)

if args.help:
for command in sorted(self.commands.keys()):
Expand Down
6 changes: 1 addition & 5 deletions circus/circusd.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def main():
sys.exit(0)

parser = argparse.ArgumentParser(description='Run some watchers.')
parser.add_argument('config', help='configuration file', nargs='?')
parser.add_argument('config', help='configuration file')

# XXX we should be able to add all these options in the config file as well
parser.add_argument('--log-level', dest='loglevel',
Expand Down Expand Up @@ -114,10 +114,6 @@ def main():
print(__version__)
sys.exit(0)

if args.config is None:
parser.print_usage()
sys.exit(0)

if args.daemonize:
daemonize()

Expand Down
115 changes: 115 additions & 0 deletions circus/circusrv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import win32serviceutil
import win32security
import servicemanager
import os
import logging
import traceback

from circus.arbiter import Arbiter
from circus.util import check_future_exception_and_log, LOG_LEVELS


def grant_service_login_right(account_name):
policy_handle = win32security.GetPolicyHandle('', win32security.POLICY_ALL_ACCESS)
try:
sid = win32security.LookupAccountName('', account_name)[0]
win32security.LsaAddAccountRights(policy_handle, sid, ['SeServiceLogonRight'])
finally:
win32security.LsaClose(policy_handle)


class ServiceManagerHandler(logging.Handler):
_map_ = {
logging.CRITICAL: servicemanager.EVENTLOG_ERROR_TYPE,
logging.ERROR: servicemanager.EVENTLOG_ERROR_TYPE,
logging.WARNING: servicemanager.EVENTLOG_WARNING_TYPE,
logging.INFO: servicemanager.EVENTLOG_INFORMATION_TYPE,
logging.DEBUG: servicemanager.EVENTLOG_INFORMATION_TYPE
}

def emit(self, record):
level = self._map_.get(record.levelno)
details = ""
if record.exc_info is not None:
formated_exc = traceback.format_exception(*record.exc_info)
details = os.linesep.join(formated_exc)
servicemanager.LogMsg(level, 0xF000, (record.getMessage(), details))


class CircusSrv(win32serviceutil.ServiceFramework):
_svc_name_ = 'circus'
_svc_display_name_ = 'Circus'
_svc_description_ = 'Run some watchers.'

_parameter_config = 'Config'
_parameter_loglevel = 'LogLevel'

def __init__(self, args):
self._svc_name_ = args[0]
super(CircusSrv, self).__init__(args)

config = win32serviceutil.GetServiceCustomOption(self._svc_name_, self._parameter_config)
loglevel = logging.INFO
try:
lls = win32serviceutil.GetServiceCustomOption(self._svc_name_, self._parameter_loglevel)
if lls is not None:
loglevel = LOG_LEVELS.get(lls.lower(), logging.INFO)
except:
pass

root_logger = logging.getLogger()
root_logger.setLevel(loglevel)
root_logger.handlers = [ServiceManagerHandler()]

# From here it can also come from the arbiter configuration
# load the arbiter from config
self.arbiter = Arbiter.load_from_config(config)

def SvcStop(self):
self.arbiter.loop.run_sync(self.arbiter._emergency_stop)

def SvcDoRun(self):
arbiter = self.arbiter
try:
future = arbiter.start()
check_future_exception_and_log(future)
except Exception as e:
# emergency stop
arbiter.loop.run_sync(arbiter._emergency_stop)
raise (e)
except KeyboardInterrupt:
pass

@classmethod
def OptionsHandler(cls, opts):
username = None
for opt, val in opts:
if opt == '--username':
username = val
if opt == '-c':
win32serviceutil.SetServiceCustomOption(cls._svc_name_,
cls._parameter_config,
val)
if opt == '-l':
win32serviceutil.SetServiceCustomOption(cls._svc_name_,
cls._parameter_loglevel,
val)
if username is not None:
# Take only user from domain\user
username = username.split('\\')[-1]
grant_service_login_right(username)

# Register now the source (rights of service's user may be different)
servicemanager.SetEventSourceName(cls._svc_name_, True)


def main():
kwargs = {}
kwargs['customInstallOptions'] = 'c:l:'
kwargs['customOptionHandler'] = CircusSrv.OptionsHandler
ret = win32serviceutil.HandleCommandLine(CircusSrv, **kwargs)
sys.exit(ret)


if __name__ == '__main__':
main()
4 changes: 3 additions & 1 deletion circus/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import glob
import os
import sys
import signal
import warnings
from fnmatch import fnmatch
Expand Down Expand Up @@ -183,7 +184,8 @@ def get_config(config_file):
config['logoutput'] = dget('circus', 'logoutput')
config['loggerconfig'] = dget('circus', 'loggerconfig', None)
config['fqdn_prefix'] = dget('circus', 'fqdn_prefix', None, str)
config['papa_endpoint'] = dget('circus', 'fqdn_prefix', None, str)
config['papa_endpoint'] = dget('circus', 'papa_endpoint', None, str)
config['py_exe'] = dget('circus', 'py_exe', sys.executable, str)

# Initialize watchers, plugins & sockets to manage
watchers = []
Expand Down
31 changes: 31 additions & 0 deletions circus/pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import io
import socket
from .winpopen import enable_overlapped, disable_overlapped


def make_pipe():
if os.name != 'nt':
a, b = os.pipe()
a, b = io.open(a, 'rb', -1), io.open(b, 'wb', -1)
return a, b
else:
disable_overlapped()
try:
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.bind(('127.0.0.1', 0))
serv.listen(1)

# need to save sockets in _rsock/_wsock so they don't get closed
_rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_rsock.connect(('127.0.0.1', serv.getsockname()[1]))

_wsock, addr = serv.accept()
serv.close()
_rsock_fd = _rsock.makefile('rb', 0)
_wsock_fd = _wsock.makefile('wb', 0)
_rsock.close()
_wsock.close()
return _rsock_fd, _wsock_fd
finally:
enable_overlapped()
6 changes: 3 additions & 3 deletions circus/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def _str2cfg(data):
return cfg


def get_plugin_cmd(config, endpoint, pubsub, check_delay, ssh_server,
def get_plugin_cmd(py_exe, config, endpoint, pubsub, check_delay, ssh_server,
debug=False, loglevel=None, logoutput=None):
fqn = config['use']
# makes sure the name exists
Expand All @@ -188,7 +188,7 @@ def get_plugin_cmd(config, endpoint, pubsub, check_delay, ssh_server,
# we're good, serializing the config
del config['use']
config = _cfg2str(config)
cmd = "%s -c 'from circus import plugins;plugins.main()'" % sys.executable
cmd = '-c "from circus import plugins;plugins.main()"'
cmd += ' --endpoint %s' % endpoint
cmd += ' --pubsub %s' % pubsub
if ssh_server is not None:
Expand All @@ -202,7 +202,7 @@ def get_plugin_cmd(config, endpoint, pubsub, check_delay, ssh_server,
if logoutput:
cmd += ' --log-output ' + logoutput
cmd += ' %s' % fqn
return cmd
return (py_exe, cmd)


def main():
Expand Down
Loading