diff --git a/circus/pipe.py b/circus/pipe.py new file mode 100644 index 000000000..c6bdd79d5 --- /dev/null +++ b/circus/pipe.py @@ -0,0 +1,31 @@ +import os +import io +import socket +from .winpopen import enable_overlapped, disable_overlapped + + +def make_pipe(): + if os.name != 'nt': + a, b = os.pipe() + a, b = io.open(a, 'rb', -1), io.open(b, 'wb', -1) + return a, b + else: + disable_overlapped() + try: + serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serv.bind(('127.0.0.1', 0)) + serv.listen(1) + + # need to save sockets in _rsock/_wsock so they don't get closed + _rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + _rsock.connect(('127.0.0.1', serv.getsockname()[1])) + + _wsock, addr = serv.accept() + serv.close() + _rsock_fd = _rsock.makefile('rb', 0) + _wsock_fd = _wsock.makefile('wb', 0) + _rsock.close() + _wsock.close() + return _rsock_fd, _wsock_fd + finally: + enable_overlapped() \ No newline at end of file diff --git a/circus/process.py b/circus/process.py index f5a6fc5c9..775b21728 100644 --- a/circus/process.py +++ b/circus/process.py @@ -30,6 +30,7 @@ ObjectDict, replace_gnu_args, get_default_gid, get_username_from_uid, IS_WINDOWS) from circus import logger +from circus.pipe import make_pipe _INFOLINE = ("%(pid)s %(cmdline)s %(username)s %(nice)s %(mem_info1)s " @@ -211,9 +212,7 @@ def __init__(self, name, wid, cmd, args=None, working_dir=None, self.gid = get_default_gid(self.uid) if IS_WINDOWS: - if not self.use_fds and (self.pipe_stderr or self.pipe_stdout): - raise ValueError("On Windows, you can't close the fds if " - "you are redirecting stdout or stderr") + self.use_fds = True if spawn: self.spawn() @@ -354,16 +353,28 @@ def preexec(): else: preexec_fn = preexec + stdout = None + stderr = None if self.pipe_stdout: - extra['stdout'] = subprocess.PIPE + stdout, extra['stdout'] = make_pipe() if self.pipe_stderr: - extra['stderr'] = subprocess.PIPE + stderr, extra['stderr'] = make_pipe() - self._worker = Popen(args, cwd=self.working_dir, - shell=self.shell, preexec_fn=preexec_fn, - env=self.env, close_fds=not self.use_fds, - executable=self.executable, **extra) + try: + self._worker = Popen(args, cwd=self.working_dir, + shell=self.shell, preexec_fn=preexec_fn, + env=self.env, close_fds=not self.use_fds, + executable=self.executable, **extra) + finally: + if 'stdout' in extra: + extra['stdout'].close() + + if 'stderr' in extra: + extra['stderr'].close() + + self._worker.stderr = stderr + self._worker.stdout = stdout # let go of sockets created only for self._worker to inherit self._sockets = [] diff --git a/circus/stream/file_stream.py b/circus/stream/file_stream.py index 22fb6c74d..eab61ca87 100644 --- a/circus/stream/file_stream.py +++ b/circus/stream/file_stream.py @@ -38,6 +38,8 @@ def close(self): def write_data(self, data): # data to write on file file_data = s(data['data']) + if os.linesep != '\n': + file_data = file_data.replace(os.linesep, '\n') # If we want to prefix the stream with the current datetime if self._time_format is not None: diff --git a/circus/stream/redirector.py b/circus/stream/redirector.py index b99d013ae..6cbc64848 100644 --- a/circus/stream/redirector.py +++ b/circus/stream/redirector.py @@ -20,13 +20,18 @@ def __call__(self, fd, events): self.redirector.remove_fd(fd) return try: - data = os.read(fd, self.redirector.buffer) + if hasattr(self.pipe, 'read1'): + data = self.pipe.read1(self.redirector.buffer) + else: + data = self.pipe.read(self.redirector.buffer) if len(data) == 0: self.redirector.remove_fd(fd) else: datamap = {'data': data, 'pid': self.process.pid, 'name': self.name} self.redirector.redirect[self.name](datamap) + except ConnectionResetError: + self.redirector.remove_fd(fd) except IOError as ex: if ex.args[0] != errno.EAGAIN: raise diff --git a/circus/watcher.py b/circus/watcher.py index 7a810f308..1220b89dd 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -255,10 +255,6 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., " watcher" % self.numprocesses) if IS_WINDOWS: - if self.stdout_stream or self.stderr_stream: - raise NotImplementedError("Streams are not supported" - " on Windows.") - if not copy_env and not env: # Copy the env by default on Windows as we can't run any # executable without some env variables @@ -686,7 +682,7 @@ def spawn_process(self, recovery_wid=None): # catch ValueError as well, as a misconfigured rlimit setting could # lead to bad infinite retries here except (OSError, ValueError) as e: - logger.warning('error in %r: %s', self.name, str(e)) + logger.exception('error in %r: %s', self.name, str(e)) if process is None: nb_tries += 1 diff --git a/circus/winpopen.py b/circus/winpopen.py new file mode 100644 index 000000000..5da829e43 --- /dev/null +++ b/circus/winpopen.py @@ -0,0 +1,212 @@ +from __future__ import print_function, division, absolute_import + +import os + +""" +Fix windows Popen for supporting socket as pipe +""" + +if os.name == "nt": + import subprocess + import socket + import sys + import msvcrt + if sys.version_info < (3, 0): + import _subprocess + else: + import _winapi + + SO_OPENTYPE = 0x7008 + SO_SYNCHRONOUS_ALERT = 0x10 + SO_SYNCHRONOUS_NONALERT = 0x20 + + PIPE = subprocess.PIPE + STDOUT = subprocess.STDOUT + if sys.version_info >= (3, 0): + DEVNULL = subprocess.DEVNULL + + class WindowsPopen(subprocess.Popen): + def __init__(self, *args, **kwargs): + super(WindowsPopen, self).__init__(*args, **kwargs) + + if sys.version_info < (3, 0): + def _get_handles(self, stdin, stdout, stderr): + """Construct and return tuple with IO objects: + p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite + """ + to_close = set() + if stdin is None and stdout is None and stderr is None: + return (None, None, None, None, None, None), to_close + + p2cread, p2cwrite = None, None + c2pread, c2pwrite = None, None + errread, errwrite = None, None + + if stdin is None: + p2cread = _subprocess.GetStdHandle(_subprocess.STD_INPUT_HANDLE) + if p2cread is None: + p2cread, _ = _subprocess.CreatePipe(None, 0) + elif stdin == PIPE: + p2cread, p2cwrite = _subprocess.CreatePipe(None, 0) + elif isinstance(stdin, int): + p2cread = msvcrt.get_osfhandle(stdin) + else: + # Assuming file-like object + if not hasattr(stdin, '_sock'): + p2cread = msvcrt.get_osfhandle(stdin.fileno()) + else: + p2cread = stdin.fileno() + p2cread = self._make_inheritable(p2cread) + # We just duplicated the handle, it has to be closed at the end + to_close.add(p2cread) + if stdin == PIPE: + to_close.add(p2cwrite) + + if stdout is None: + c2pwrite = _subprocess.GetStdHandle(_subprocess.STD_OUTPUT_HANDLE) + if c2pwrite is None: + _, c2pwrite = _subprocess.CreatePipe(None, 0) + elif stdout == PIPE: + c2pread, c2pwrite = _subprocess.CreatePipe(None, 0) + elif isinstance(stdout, int): + c2pwrite = msvcrt.get_osfhandle(stdout) + else: + # Assuming file-like object + if not hasattr(stdout, '_sock'): + c2pwrite = msvcrt.get_osfhandle(stdout.fileno()) + else: + c2pwrite = stdout.fileno() + c2pwrite = self._make_inheritable(c2pwrite) + # We just duplicated the handle, it has to be closed at the end + to_close.add(c2pwrite) + if stdout == PIPE: + to_close.add(c2pread) + + if stderr is None: + errwrite = _subprocess.GetStdHandle(_subprocess.STD_ERROR_HANDLE) + if errwrite is None: + _, errwrite = _subprocess.CreatePipe(None, 0) + elif stderr == PIPE: + errread, errwrite = _subprocess.CreatePipe(None, 0) + elif stderr == STDOUT: + errwrite = c2pwrite + elif isinstance(stderr, int): + errwrite = msvcrt.get_osfhandle(stderr) + else: + # Assuming file-like object + if not hasattr(stderr, '_sock'): + errwrite = msvcrt.get_osfhandle(stderr.fileno()) + else: + errwrite = stderr.fileno() + errwrite = self._make_inheritable(errwrite) + # We just duplicated the handle, it has to be closed at the end + to_close.add(errwrite) + if stderr == PIPE: + to_close.add(errread) + + return (p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite), to_close + else: + def _get_handles(self, stdin, stdout, stderr): + """Construct and return tuple with IO objects: + p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite + """ + if stdin is None and stdout is None and stderr is None: + return (-1, -1, -1, -1, -1, -1) + + p2cread, p2cwrite = -1, -1 + c2pread, c2pwrite = -1, -1 + errread, errwrite = -1, -1 + + if stdin is None: + p2cread = _winapi.GetStdHandle(_winapi.STD_INPUT_HANDLE) + if p2cread is None: + p2cread, _ = _winapi.CreatePipe(None, 0) + p2cread = subprocess.Handle(p2cread) + _winapi.CloseHandle(_) + elif stdin == PIPE: + p2cread, p2cwrite = _winapi.CreatePipe(None, 0) + p2cread, p2cwrite = subprocess.Handle(p2cread), subprocess.Handle(p2cwrite) + elif stdin == DEVNULL: + p2cread = msvcrt.get_osfhandle(self._get_devnull()) + elif isinstance(stdin, int): + p2cread = msvcrt.get_osfhandle(stdin) + else: + # Assuming file-like object + if not hasattr(stdin, '_sock'): + p2cread = msvcrt.get_osfhandle(stdin.fileno()) + else: + p2cread = stdin.fileno() + p2cread = self._make_inheritable(p2cread) + + if stdout is None: + c2pwrite = _winapi.GetStdHandle(_winapi.STD_OUTPUT_HANDLE) + if c2pwrite is None: + _, c2pwrite = _winapi.CreatePipe(None, 0) + c2pwrite = subprocess.Handle(c2pwrite) + _winapi.CloseHandle(_) + elif stdout == PIPE: + c2pread, c2pwrite = _winapi.CreatePipe(None, 0) + c2pread, c2pwrite = subprocess.Handle(c2pread), subprocess.Handle(c2pwrite) + elif stdout == DEVNULL: + c2pwrite = msvcrt.get_osfhandle(self._get_devnull()) + elif isinstance(stdout, int): + c2pwrite = msvcrt.get_osfhandle(stdout) + else: + # Assuming file-like object + if not hasattr(stdout, '_sock'): + c2pwrite = msvcrt.get_osfhandle(stdout.fileno()) + else: + c2pwrite = stdout.fileno() + c2pwrite = self._make_inheritable(c2pwrite) + + if stderr is None: + errwrite = _winapi.GetStdHandle(_winapi.STD_ERROR_HANDLE) + if errwrite is None: + _, errwrite = _winapi.CreatePipe(None, 0) + errwrite = subprocess.Handle(errwrite) + _winapi.CloseHandle(_) + elif stderr == PIPE: + errread, errwrite = _winapi.CreatePipe(None, 0) + errread, errwrite = subprocess.Handle(errread), subprocess.Handle(errwrite) + elif stderr == STDOUT: + errwrite = c2pwrite + elif stderr == DEVNULL: + errwrite = msvcrt.get_osfhandle(self._get_devnull()) + elif isinstance(stderr, int): + errwrite = msvcrt.get_osfhandle(stderr) + else: + # Assuming file-like object + if not hasattr(stderr, '_sock'): + errwrite = msvcrt.get_osfhandle(stderr.fileno()) + else: + errwrite = stderr.fileno() + errwrite = self._make_inheritable(errwrite) + + return (p2cread, p2cwrite, + c2pread, c2pwrite, + errread, errwrite) + + subprocess.Popen_old = subprocess.Popen + subprocess.Popen = WindowsPopen + + def disable_overlapped(): + # Enable socket to be non overlapped + try: + dummy = socket.socket(0xDEAD, socket.SOCK_STREAM) # After that python will not force WSA_FLAG_OVERLAPPED + except: + pass + dummy = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + dummy.setsockopt(socket.SOL_SOCKET, SO_OPENTYPE, SO_SYNCHRONOUS_NONALERT) + + def enable_overlapped(): + dummy = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + dummy.setsockopt(socket.SOL_SOCKET, SO_OPENTYPE, SO_SYNCHRONOUS_ALERT) + +else: + def disable_overlapped(): + pass + + def enable_overlapped(): + pass \ No newline at end of file