[Python-Dev] Async subprocesses on Windows with tulip
Richard Oudkerk
shibturn at gmail.com
Sun May 19 17:59:52 CEST 2013
Attached is a pretty trivial example of asynchronous interaction with a
python subprocess using tulip on Windows. It does not use transports or
protocols -- instead sock_recv() and sock_sendall() are used inside tasks.
I am not sure what the plan is for dealing with subprocesses currently.
Shall I just add this to the examples folder for now?
--
Richard
-------------- next part --------------
'''
Example of asynchronous interaction with a subprocess on Windows.
This requires use of overlapped pipe handles and (a modified) iocp proactor.
'''
import itertools
import logging
import msvcrt
import os
import subprocess
import sys
import tempfile
import _winapi
import tulip
from tulip import _overlapped, windows_events, events
PIPE = subprocess.PIPE
BUFSIZE = 8192
_mmap_counter=itertools.count()
def _pipe(duplex=True, overlapped=(True, True)):
'''
Return handles for a pipe with one or both ends overlapped.
'''
address = tempfile.mktemp(prefix=r'\\.\pipe\python-pipe-%d-%d-' %
(os.getpid(), next(_mmap_counter)))
if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
openmode = _winapi.PIPE_ACCESS_INBOUND
access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
if overlapped[0]:
openmode |= _winapi.FILE_FLAG_OVERLAPPED
if overlapped[1]:
flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED
else:
flags_and_attribs = 0
h1 = h2 = None
try:
h1 = _winapi.CreateNamedPipe(
address, openmode, _winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
flags_and_attribs, _winapi.NULL)
ov = _winapi.ConnectNamedPipe(h1, overlapped=True)
ov.GetOverlappedResult(True)
return h1, h2
except:
if h1 is not None:
_winapi.CloseHandle(h1)
if h2 is not None:
_winapi.CloseHandle(h2)
raise
class PipeHandle:
'''
Wrapper for a pipe handle
'''
def __init__(self, handle):
self._handle = handle
@property
def handle(self):
return self._handle
def fileno(self):
return self._handle
def close(self, *, CloseHandle=_winapi.CloseHandle):
if self._handle is not None:
CloseHandle(self._handle)
self._handle = None
__del__ = close
def __enter__(self):
return self
def __exit__(self, t, v, tb):
self.close()
class Popen(subprocess.Popen):
'''
Subclass of Popen which uses overlapped pipe handles wrapped with
PipeHandle instead of normal file objects for stdin, stdout, stderr.
'''
_WriteWrapper = PipeHandle
_ReadWrapper = PipeHandle
def __init__(self, args, *, executable=None, stdin=None, stdout=None,
stderr=None, preexec_fn=None, close_fds=False,
shell=False, cwd=None, env=None, startupinfo=None,
creationflags=0, restore_signals=True,
start_new_session=False, pass_fds=()):
stdin_rfd = stdout_wfd = stderr_wfd = None
stdin_wh = stdout_rh = stderr_rh = None
if stdin == PIPE:
stdin_rh, stdin_wh = _pipe(False, (False, True))
stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
if stdout == PIPE:
stdout_rh, stdout_wh = _pipe(False, (True, False))
stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
if stderr == PIPE:
stderr_rh, stderr_wh = _pipe(False, (True, False))
stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
try:
super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
stderr=stderr_wfd, executable=executable,
preexec_fn=preexec_fn, close_fds=close_fds,
shell=shell, cwd=cwd, env=env,
startupinfo=startupinfo,
creationflags=creationflags,
restore_signals=restore_signals,
start_new_session=start_new_session,
pass_fds=pass_fds)
except:
for h in (stdin_wh, stdout_rh, stderr_rh):
_winapi.CloseHandle(h)
raise
else:
if stdin_wh is not None:
self.stdin = self._WriteWrapper(stdin_wh)
if stdout_rh is not None:
self.stdout = self._ReadWrapper(stdout_rh)
if stderr_rh is not None:
self.stderr = self._ReadWrapper(stderr_rh)
finally:
if stdin == PIPE:
os.close(stdin_rfd)
if stdout == PIPE:
os.close(stdout_wfd)
if stderr == PIPE:
os.close(stderr_wfd)
class ProactorEventLoop(windows_events.ProactorEventLoop):
'''
Eventloop which uses ReadFile() and WriteFile() instead of
WSARecv() and WSASend() for PipeHandle objects.
'''
def sock_recv(self, conn, n):
self._proactor._register_with_iocp(conn)
ov = _overlapped.Overlapped(_winapi.NULL)
handle = getattr(conn, 'handle', None)
if handle is None:
ov.WSARecv(conn.fileno(), n, 0)
else:
ov.ReadFile(conn.fileno(), n)
return self._proactor._register(ov, conn, ov.getresult)
def sock_sendall(self, conn, data):
self._proactor._register_with_iocp(conn)
ov = _overlapped.Overlapped(_winapi.NULL)
handle = getattr(conn, 'handle', None)
if handle is None:
ov.WSASend(conn.fileno(), data, 0)
else:
ov.WriteFile(conn.fileno(), data)
return self._proactor._register(ov, conn, ov.getresult)
if __name__ == '__main__':
@tulip.task
def read_and_close(loop, f):
with f:
collected = []
while True:
s = yield from loop.sock_recv(f, 4096)
if s == b'':
return b''.join(collected)
collected.append(s)
@tulip.task
def write_and_close(loop, f, buf):
with f:
return (yield from loop.sock_sendall(f, buf))
@tulip.task
def main(loop):
# start process which upper cases its input
code = r'''if 1:
import os
os.write(2, b"starting\n")
while True:
s = os.read(0, 1024)
if not s:
break
s = s.upper()
while s:
n = os.write(1, s)
s = s[n:]
os.write(2, b"exiting\n")
'''
p = Popen([sys.executable, '-c', code],
stdin=PIPE, stdout=PIPE, stderr=PIPE)
# start tasks to write to and read from the process
bytes_written = write_and_close(loop, p.stdin, b"hello world\n"*100000)
stdout_data = read_and_close(loop, p.stdout)
stderr_data = read_and_close(loop, p.stderr)
# wait for tasks to finish and get the results
bytes_written = yield from bytes_written
stdout_data = yield from stdout_data
stderr_data = yield from stderr_data
# print results
print('bytes_written:', bytes_written)
print('stdout_data[:50]:', stdout_data[:50])
print('len(stdout_data):', len(stdout_data))
print('stderr_data:', stderr_data)
loop = ProactorEventLoop()
events.set_event_loop(loop)
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
More information about the Python-Dev
mailing list