[Python-Dev] Async subprocesses on Windows with tulip

Richard Oudkerk shibturn at gmail.com
Sun May 19 17:59:52 CEST 2013


Attached is a pretty trivial example of asynchronous interaction with a 
python subprocess using tulip on Windows.  It does not use transports or 
protocols -- instead sock_recv() and sock_sendall() are used inside tasks.

I am not sure what the plan is for dealing with subprocesses currently. 
  Shall I just add this to the examples folder for now?

-- 
Richard
-------------- next part --------------
'''
Example of asynchronous interaction with a subprocess on Windows.

This requires use of overlapped pipe handles and (a modified) iocp proactor.
'''

import itertools
import logging
import msvcrt
import os
import subprocess
import sys
import tempfile
import _winapi

import tulip
from tulip import _overlapped, windows_events, events


PIPE = subprocess.PIPE
BUFSIZE = 8192
_mmap_counter=itertools.count()


def _pipe(duplex=True, overlapped=(True, True)):
    '''
    Return handles for a pipe with one or both ends overlapped.
    '''
    address = tempfile.mktemp(prefix=r'\\.\pipe\python-pipe-%d-%d-' %
                              (os.getpid(), next(_mmap_counter)))

    if duplex:
        openmode = _winapi.PIPE_ACCESS_DUPLEX
        access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
        obsize, ibsize = BUFSIZE, BUFSIZE
    else:
        openmode = _winapi.PIPE_ACCESS_INBOUND
        access = _winapi.GENERIC_WRITE
        obsize, ibsize = 0, BUFSIZE

    openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE

    if overlapped[0]:
        openmode |= _winapi.FILE_FLAG_OVERLAPPED

    if overlapped[1]:
        flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED
    else:
        flags_and_attribs = 0

    h1 = h2 = None
    try:
        h1 = _winapi.CreateNamedPipe(
            address, openmode, _winapi.PIPE_WAIT,
            1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)

        h2 = _winapi.CreateFile(
            address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
            flags_and_attribs, _winapi.NULL)

        ov = _winapi.ConnectNamedPipe(h1, overlapped=True)
        ov.GetOverlappedResult(True)
        return h1, h2
    except:
        if h1 is not None:
            _winapi.CloseHandle(h1)
        if h2 is not None:
            _winapi.CloseHandle(h2)
        raise


class PipeHandle:
    '''
    Wrapper for a pipe handle
    '''
    def __init__(self, handle):
        self._handle = handle

    @property
    def handle(self):
        return self._handle

    def fileno(self):
        return self._handle

    def close(self, *, CloseHandle=_winapi.CloseHandle):
        if self._handle is not None:
            CloseHandle(self._handle)
            self._handle = None

    __del__ = close

    def __enter__(self):
        return self

    def __exit__(self, t, v, tb):
        self.close()


class Popen(subprocess.Popen):
    '''
    Subclass of Popen which uses overlapped pipe handles wrapped with
    PipeHandle instead of normal file objects for stdin, stdout, stderr.
    '''
    _WriteWrapper = PipeHandle
    _ReadWrapper = PipeHandle

    def __init__(self, args, *, executable=None, stdin=None, stdout=None,
                 stderr=None, preexec_fn=None, close_fds=False,
                 shell=False, cwd=None, env=None, startupinfo=None,
                 creationflags=0, restore_signals=True,
                 start_new_session=False, pass_fds=()):
        stdin_rfd = stdout_wfd = stderr_wfd = None
        stdin_wh = stdout_rh = stderr_rh = None
        if stdin == PIPE:
            stdin_rh, stdin_wh = _pipe(False, (False, True))
            stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
        if stdout == PIPE:
            stdout_rh, stdout_wh = _pipe(False, (True, False))
            stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
        if stderr == PIPE:
            stderr_rh, stderr_wh = _pipe(False, (True, False))
            stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
        try:
            super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
                             stderr=stderr_wfd, executable=executable,
                             preexec_fn=preexec_fn, close_fds=close_fds,
                             shell=shell, cwd=cwd, env=env,
                             startupinfo=startupinfo,
                             creationflags=creationflags,
                             restore_signals=restore_signals,
                             start_new_session=start_new_session,
                             pass_fds=pass_fds)
        except:
            for h in (stdin_wh, stdout_rh, stderr_rh):
                _winapi.CloseHandle(h)
            raise
        else:
            if stdin_wh is not None:
                self.stdin = self._WriteWrapper(stdin_wh)
            if stdout_rh is not None:
                self.stdout = self._ReadWrapper(stdout_rh)
            if stderr_rh is not None:
                self.stderr = self._ReadWrapper(stderr_rh)
        finally:
            if stdin == PIPE:
                os.close(stdin_rfd)
            if stdout == PIPE:
                os.close(stdout_wfd)
            if stderr == PIPE:
                os.close(stderr_wfd)


class ProactorEventLoop(windows_events.ProactorEventLoop):
    '''
    Eventloop which uses ReadFile() and WriteFile() instead of
    WSARecv() and WSASend() for PipeHandle objects.
    '''
    def sock_recv(self, conn, n):
        self._proactor._register_with_iocp(conn)
        ov = _overlapped.Overlapped(_winapi.NULL)
        handle = getattr(conn, 'handle', None)
        if handle is None:
            ov.WSARecv(conn.fileno(), n, 0)
        else:
            ov.ReadFile(conn.fileno(), n)
        return self._proactor._register(ov, conn, ov.getresult)

    def sock_sendall(self, conn, data):
        self._proactor._register_with_iocp(conn)
        ov = _overlapped.Overlapped(_winapi.NULL)
        handle = getattr(conn, 'handle', None)
        if handle is None:
            ov.WSASend(conn.fileno(), data, 0)
        else:
            ov.WriteFile(conn.fileno(), data)
        return self._proactor._register(ov, conn, ov.getresult)


if __name__ == '__main__':
    @tulip.task
    def read_and_close(loop, f):
        with f:
            collected = []
            while True:
                s = yield from loop.sock_recv(f, 4096)
                if s == b'':
                    return b''.join(collected)
                collected.append(s)

    @tulip.task
    def write_and_close(loop, f, buf):
        with f:
            return (yield from loop.sock_sendall(f, buf))

    @tulip.task
    def main(loop):
        # start process which upper cases its input
        code = r'''if 1:
                       import os
                       os.write(2, b"starting\n")
                       while True:
                           s = os.read(0, 1024)
                           if not s:
                               break
                           s = s.upper()
                           while s:
                               n = os.write(1, s)
                               s = s[n:]
                       os.write(2, b"exiting\n")
                       '''
        p = Popen([sys.executable, '-c', code],
                  stdin=PIPE, stdout=PIPE, stderr=PIPE)

        # start tasks to write to and read from the process
        bytes_written = write_and_close(loop, p.stdin, b"hello world\n"*100000)
        stdout_data = read_and_close(loop, p.stdout)
        stderr_data = read_and_close(loop, p.stderr)

        # wait for tasks to finish and get the results
        bytes_written = yield from bytes_written
        stdout_data = yield from stdout_data
        stderr_data = yield from stderr_data

        # print results
        print('bytes_written:', bytes_written)
        print('stdout_data[:50]:', stdout_data[:50])
        print('len(stdout_data):', len(stdout_data))
        print('stderr_data:', stderr_data)


    loop = ProactorEventLoop()
    events.set_event_loop(loop)
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()


More information about the Python-Dev mailing list