''' Example of asynchronous interaction with a subprocess on Windows. This requires use of overlapped pipe handles and (a modified) iocp proactor. ''' import itertools import logging import msvcrt import os import subprocess import sys import tempfile import _winapi import tulip from tulip import _overlapped, windows_events, events PIPE = subprocess.PIPE BUFSIZE = 8192 _mmap_counter=itertools.count() def _pipe(duplex=True, overlapped=(True, True)): ''' Return handles for a pipe with one or both ends overlapped. ''' address = tempfile.mktemp(prefix=r'\\.\pipe\python-pipe-%d-%d-' % (os.getpid(), next(_mmap_counter))) if duplex: openmode = _winapi.PIPE_ACCESS_DUPLEX access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE obsize, ibsize = BUFSIZE, BUFSIZE else: openmode = _winapi.PIPE_ACCESS_INBOUND access = _winapi.GENERIC_WRITE obsize, ibsize = 0, BUFSIZE openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE if overlapped[0]: openmode |= _winapi.FILE_FLAG_OVERLAPPED if overlapped[1]: flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED else: flags_and_attribs = 0 h1 = h2 = None try: h1 = _winapi.CreateNamedPipe( address, openmode, _winapi.PIPE_WAIT, 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) h2 = _winapi.CreateFile( address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, flags_and_attribs, _winapi.NULL) ov = _winapi.ConnectNamedPipe(h1, overlapped=True) ov.GetOverlappedResult(True) return h1, h2 except: if h1 is not None: _winapi.CloseHandle(h1) if h2 is not None: _winapi.CloseHandle(h2) raise class PipeHandle: ''' Wrapper for a pipe handle ''' def __init__(self, handle): self._handle = handle @property def handle(self): return self._handle def fileno(self): return self._handle def close(self, *, CloseHandle=_winapi.CloseHandle): if self._handle is not None: CloseHandle(self._handle) self._handle = None __del__ = close def __enter__(self): return self def __exit__(self, t, v, tb): self.close() class Popen(subprocess.Popen): ''' Subclass of Popen which uses overlapped pipe handles wrapped with PipeHandle instead of normal file objects for stdin, stdout, stderr. ''' _WriteWrapper = PipeHandle _ReadWrapper = PipeHandle def __init__(self, args, *, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=()): stdin_rfd = stdout_wfd = stderr_wfd = None stdin_wh = stdout_rh = stderr_rh = None if stdin == PIPE: stdin_rh, stdin_wh = _pipe(False, (False, True)) stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY) if stdout == PIPE: stdout_rh, stdout_wh = _pipe(False, (True, False)) stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0) if stderr == PIPE: stderr_rh, stderr_wh = _pipe(False, (True, False)) stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0) try: super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd, stderr=stderr_wfd, executable=executable, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, startupinfo=startupinfo, creationflags=creationflags, restore_signals=restore_signals, start_new_session=start_new_session, pass_fds=pass_fds) except: for h in (stdin_wh, stdout_rh, stderr_rh): _winapi.CloseHandle(h) raise else: if stdin_wh is not None: self.stdin = self._WriteWrapper(stdin_wh) if stdout_rh is not None: self.stdout = self._ReadWrapper(stdout_rh) if stderr_rh is not None: self.stderr = self._ReadWrapper(stderr_rh) finally: if stdin == PIPE: os.close(stdin_rfd) if stdout == PIPE: os.close(stdout_wfd) if stderr == PIPE: os.close(stderr_wfd) class ProactorEventLoop(windows_events.ProactorEventLoop): ''' Eventloop which uses ReadFile() and WriteFile() instead of WSARecv() and WSASend() for PipeHandle objects. ''' def sock_recv(self, conn, n): self._proactor._register_with_iocp(conn) ov = _overlapped.Overlapped(_winapi.NULL) handle = getattr(conn, 'handle', None) if handle is None: ov.WSARecv(conn.fileno(), n, 0) else: ov.ReadFile(conn.fileno(), n) return self._proactor._register(ov, conn, ov.getresult) def sock_sendall(self, conn, data): self._proactor._register_with_iocp(conn) ov = _overlapped.Overlapped(_winapi.NULL) handle = getattr(conn, 'handle', None) if handle is None: ov.WSASend(conn.fileno(), data, 0) else: ov.WriteFile(conn.fileno(), data) return self._proactor._register(ov, conn, ov.getresult) if __name__ == '__main__': @tulip.task def read_and_close(loop, f): with f: collected = [] while True: s = yield from loop.sock_recv(f, 4096) if s == b'': return b''.join(collected) collected.append(s) @tulip.task def write_and_close(loop, f, buf): with f: return (yield from loop.sock_sendall(f, buf)) @tulip.task def main(loop): # start process which upper cases its input code = r'''if 1: import os os.write(2, b"starting\n") while True: s = os.read(0, 1024) if not s: break s = s.upper() while s: n = os.write(1, s) s = s[n:] os.write(2, b"exiting\n") ''' p = Popen([sys.executable, '-c', code], stdin=PIPE, stdout=PIPE, stderr=PIPE) # start tasks to write to and read from the process bytes_written = write_and_close(loop, p.stdin, b"hello world\n"*100000) stdout_data = read_and_close(loop, p.stdout) stderr_data = read_and_close(loop, p.stderr) # wait for tasks to finish and get the results bytes_written = yield from bytes_written stdout_data = yield from stdout_data stderr_data = yield from stderr_data # print results print('bytes_written:', bytes_written) print('stdout_data[:50]:', stdout_data[:50]) print('len(stdout_data):', len(stdout_data)) print('stderr_data:', stderr_data) loop = ProactorEventLoop() events.set_event_loop(loop) try: loop.run_until_complete(main(loop)) finally: loop.close()