[Python-checkins] cpython: Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed

antoine.pitrou python-checkins at python.org
Wed Jun 8 17:23:28 CEST 2011


http://hg.python.org/cpython/rev/6d6099f7fe89
changeset:   70711:6d6099f7fe89
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Wed Jun 08 17:21:55 2011 +0200
summary:
  Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
children and raises BrokenProcessPool in such a situation.  Previously it
would reliably freeze/deadlock.

files:
  Doc/library/concurrent.futures.rst         |   19 +
  Lib/concurrent/futures/process.py          |  105 +-
  Lib/multiprocessing/connection.py          |  149 ++-
  Lib/multiprocessing/forking.py             |    1 +
  Lib/multiprocessing/queues.py              |    6 +-
  Lib/test/test_concurrent_futures.py        |   20 +-
  Misc/NEWS                                  |    4 +
  Modules/_multiprocessing/win32_functions.c |  390 +++++++++-
  8 files changed, 587 insertions(+), 107 deletions(-)


diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -169,6 +169,12 @@
    of at most *max_workers* processes.  If *max_workers* is ``None`` or not
    given, it will default to the number of processors on the machine.
 
+   .. versionchanged:: 3.3
+      When one of the worker processes terminates abruptly, a
+      :exc:`BrokenProcessPool` error is now raised.  Previously, behaviour
+      was undefined but operations on the executor or its futures would often
+      freeze or deadlock.
+
 
 .. _processpoolexecutor-example:
 
@@ -369,3 +375,16 @@
    :pep:`3148` -- futures - execute computations asynchronously
       The proposal which described this feature for inclusion in the Python
       standard library.
+
+
+Exception classes
+-----------------
+
+.. exception:: BrokenProcessPool
+
+   Derived from :exc:`RuntimeError`, this exception class is raised when
+   one of the workers of a :class:`ProcessPoolExecutor` has terminated
+   in a non-clean fashion (for example, if it was killed from the outside).
+
+   .. versionadded:: 3.3
+
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -46,10 +46,11 @@
 __author__ = 'Brian Quinlan (brian at sweetapp.com)'
 
 import atexit
+import os
 from concurrent.futures import _base
 import queue
 import multiprocessing
-from multiprocessing.queues import SimpleQueue
+from multiprocessing.queues import SimpleQueue, SentinelReady
 import threading
 import weakref
 
@@ -122,7 +123,7 @@
         call_item = call_queue.get(block=True)
         if call_item is None:
             # Wake up queue management thread
-            result_queue.put(None)
+            result_queue.put(os.getpid())
             return
         try:
             r = call_item.fn(*call_item.args, **call_item.kwargs)
@@ -194,29 +195,63 @@
         result_queue: A multiprocessing.Queue of _ResultItems generated by the
             process workers.
     """
-    nb_shutdown_processes = 0
-    def shutdown_one_process():
-        """Tell a worker to terminate, which will in turn wake us again"""
-        nonlocal nb_shutdown_processes
-        call_queue.put(None)
-        nb_shutdown_processes += 1
+
+    def shutdown_worker():
+        # This is an upper bound
+        nb_children_alive = sum(p.is_alive() for p in processes.values())
+        for i in range(0, nb_children_alive):
+            call_queue.put(None)
+        # If .join() is not called on the created processes then
+        # some multiprocessing.Queue methods may deadlock on Mac OS
+        # X.
+        for p in processes.values():
+            p.join()
+
     while True:
         _add_call_item_to_queue(pending_work_items,
                                 work_ids_queue,
                                 call_queue)
 
-        result_item = result_queue.get()
-        if result_item is not None:
-            work_item = pending_work_items[result_item.work_id]
-            del pending_work_items[result_item.work_id]
-
-            if result_item.exception:
-                work_item.future.set_exception(result_item.exception)
-            else:
-                work_item.future.set_result(result_item.result)
-            continue
-        # If we come here, we either got a timeout or were explicitly woken up.
-        # In either case, check whether we should start shutting down.
+        sentinels = [p.sentinel for p in processes.values()]
+        assert sentinels
+        try:
+            result_item = result_queue.get(sentinels=sentinels)
+        except SentinelReady as e:
+            # Mark the process pool broken so that submits fail right now.
+            executor = executor_reference()
+            if executor is not None:
+                executor._broken = True
+                executor._shutdown_thread = True
+                del executor
+            # All futures in flight must be marked failed
+            for work_id, work_item in pending_work_items.items():
+                work_item.future.set_exception(
+                    BrokenProcessPool(
+                        "A process in the process pool was "
+                        "terminated abruptly while the future was "
+                        "running or pending."
+                    ))
+            pending_work_items.clear()
+            # Terminate remaining workers forcibly: the queues or their
+            # locks may be in a dirty state and block forever.
+            for p in processes.values():
+                p.terminate()
+            for p in processes.values():
+                p.join()
+            return
+        if isinstance(result_item, int):
+            # Clean shutdown of a worker using its PID
+            # (avoids marking the executor broken)
+            del processes[result_item]
+        elif result_item is not None:
+            work_item = pending_work_items.pop(result_item.work_id, None)
+            # work_item can be None if another process terminated (see above)
+            if work_item is not None:
+                if result_item.exception:
+                    work_item.future.set_exception(result_item.exception)
+                else:
+                    work_item.future.set_result(result_item.result)
+        # Check whether we should start shutting down.
         executor = executor_reference()
         # No more work items can be added if:
         #   - The interpreter is shutting down OR
@@ -226,17 +261,11 @@
             # Since no new work items can be added, it is safe to shutdown
             # this thread if there are no pending work items.
             if not pending_work_items:
-                while nb_shutdown_processes < len(processes):
-                    shutdown_one_process()
-                # If .join() is not called on the created processes then
-                # some multiprocessing.Queue methods may deadlock on Mac OS
-                # X.
-                for p in processes:
-                    p.join()
+                shutdown_worker()
                 return
             else:
                 # Start shutting down by telling a process it can exit.
-                shutdown_one_process()
+                call_queue.put(None)
         del executor
 
 _system_limits_checked = False
@@ -264,6 +293,14 @@
     _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
     raise NotImplementedError(_system_limited)
 
+
+class BrokenProcessPool(RuntimeError):
+    """
+    Raised when a process in a ProcessPoolExecutor terminated abruptly
+    while a future was in the running state.
+    """
+
+
 class ProcessPoolExecutor(_base.Executor):
     def __init__(self, max_workers=None):
         """Initializes a new ProcessPoolExecutor instance.
@@ -288,11 +325,13 @@
         self._result_queue = SimpleQueue()
         self._work_ids = queue.Queue()
         self._queue_management_thread = None
-        self._processes = set()
+        # Map of pids to processes
+        self._processes = {}
 
         # Shutdown is a two-step process.
         self._shutdown_thread = False
         self._shutdown_lock = threading.Lock()
+        self._broken = False
         self._queue_count = 0
         self._pending_work_items = {}
 
@@ -302,6 +341,8 @@
         def weakref_cb(_, q=self._result_queue):
             q.put(None)
         if self._queue_management_thread is None:
+            # Start the processes so that their sentinels are known.
+            self._adjust_process_count()
             self._queue_management_thread = threading.Thread(
                     target=_queue_management_worker,
                     args=(weakref.ref(self, weakref_cb),
@@ -321,10 +362,13 @@
                     args=(self._call_queue,
                           self._result_queue))
             p.start()
-            self._processes.add(p)
+            self._processes[p.pid] = p
 
     def submit(self, fn, *args, **kwargs):
         with self._shutdown_lock:
+            if self._broken:
+                raise BrokenProcessPool('A child process terminated '
+                    'abruptly, the process pool is not usable anymore')
             if self._shutdown_thread:
                 raise RuntimeError('cannot schedule new futures after shutdown')
 
@@ -338,7 +382,6 @@
             self._result_queue.put(None)
 
             self._start_queue_management_thread()
-            self._adjust_process_count()
             return f
     submit.__doc__ = _base.Executor.submit.__doc__
 
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -48,14 +48,18 @@
 
 import _multiprocessing
 from multiprocessing import current_process, AuthenticationError, BufferTooShort
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
+from multiprocessing.util import (
+    get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
 try:
     from _multiprocessing import win32
+    from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
 except ImportError:
     if sys.platform == 'win32':
         raise
     win32 = None
 
+_select = _eintr_retry(select.select)
+
 #
 #
 #
@@ -118,6 +122,15 @@
     else:
         raise ValueError('address type of %r unrecognized' % address)
 
+
+class SentinelReady(Exception):
+    """
+    Raised when a sentinel is ready when polling.
+    """
+    def __init__(self, *args):
+        Exception.__init__(self, *args)
+        self.sentinels = args[0]
+
 #
 # Connection classes
 #
@@ -253,19 +266,17 @@
                               (offset + size) // itemsize])
             return size
 
-    def recv(self):
+    def recv(self, sentinels=None):
         """Receive a (picklable) object"""
         self._check_closed()
         self._check_readable()
-        buf = self._recv_bytes()
+        buf = self._recv_bytes(sentinels=sentinels)
         return pickle.loads(buf.getbuffer())
 
     def poll(self, timeout=0.0):
         """Whether there is any input available to be read"""
         self._check_closed()
         self._check_readable()
-        if timeout < 0.0:
-            timeout = None
         return self._poll(timeout)
 
 
@@ -274,61 +285,88 @@
     class PipeConnection(_ConnectionBase):
         """
         Connection class based on a Windows named pipe.
+        Overlapped I/O is used, so the handles must have been created
+        with FILE_FLAG_OVERLAPPED.
         """
+        _buffered = b''
 
         def _close(self):
             win32.CloseHandle(self._handle)
 
         def _send_bytes(self, buf):
-            nwritten = win32.WriteFile(self._handle, buf)
+            overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
+            nwritten, complete = overlapped.GetOverlappedResult(True)
+            assert complete
             assert nwritten == len(buf)
 
-        def _recv_bytes(self, maxsize=None):
+        def _recv_bytes(self, maxsize=None, sentinels=()):
+            if sentinels:
+                self._poll(-1.0, sentinels)
             buf = io.BytesIO()
-            bufsize = 512
-            if maxsize is not None:
-                bufsize = min(bufsize, maxsize)
-            try:
-                firstchunk, complete = win32.ReadFile(self._handle, bufsize)
-            except IOError as e:
-                if e.errno == win32.ERROR_BROKEN_PIPE:
-                    raise EOFError
-                raise
-            lenfirstchunk = len(firstchunk)
-            buf.write(firstchunk)
-            if complete:
-                return buf
+            firstchunk = self._buffered
+            if firstchunk:
+                lenfirstchunk = len(firstchunk)
+                buf.write(firstchunk)
+                self._buffered = b''
+            else:
+                # A reasonable size for the first chunk transfer
+                bufsize = 128
+                if maxsize is not None and maxsize < bufsize:
+                    bufsize = maxsize
+                try:
+                    overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
+                    lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
+                    firstchunk = overlapped.getbuffer()
+                    assert lenfirstchunk == len(firstchunk)
+                except IOError as e:
+                    if e.errno == win32.ERROR_BROKEN_PIPE:
+                        raise EOFError
+                    raise
+                buf.write(firstchunk)
+                if complete:
+                    return buf
             navail, nleft = win32.PeekNamedPipe(self._handle)
             if maxsize is not None and lenfirstchunk + nleft > maxsize:
                 return None
-            lastchunk, complete = win32.ReadFile(self._handle, nleft)
-            assert complete
-            buf.write(lastchunk)
+            if nleft > 0:
+                overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
+                res, complete = overlapped.GetOverlappedResult(True)
+                assert res == nleft
+                assert complete
+                buf.write(overlapped.getbuffer())
             return buf
 
-        def _poll(self, timeout):
+        def _poll(self, timeout, sentinels=()):
+            # Fast non-blocking path
             navail, nleft = win32.PeekNamedPipe(self._handle)
             if navail > 0:
                 return True
             elif timeout == 0.0:
                 return False
-            # Setup a polling loop (translated straight from old
-            # pipe_connection.c)
+            # Blocking: use overlapped I/O
             if timeout < 0.0:
-                deadline = None
+                timeout = INFINITE
             else:
-                deadline = time.time() + timeout
-            delay = 0.001
-            max_delay = 0.02
-            while True:
-                time.sleep(delay)
-                navail, nleft = win32.PeekNamedPipe(self._handle)
-                if navail > 0:
-                    return True
-                if deadline and time.time() > deadline:
-                    return False
-                if delay < max_delay:
-                    delay += 0.001
+                timeout = int(timeout * 1000 + 0.5)
+            overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
+            try:
+                handles = [overlapped.event]
+                handles += sentinels
+                res = win32.WaitForMultipleObjects(handles, False, timeout)
+            finally:
+                # Always cancel overlapped I/O in the same thread
+                # (because CancelIoEx() appears only in Vista)
+                overlapped.cancel()
+            if res == WAIT_TIMEOUT:
+                return False
+            idx = res - WAIT_OBJECT_0
+            if idx == 0:
+                # I/O was successful, store received data
+                overlapped.GetOverlappedResult(True)
+                self._buffered += overlapped.getbuffer()
+                return True
+            assert 0 < idx < len(handles)
+            raise SentinelReady([handles[idx]])
 
 
 class Connection(_ConnectionBase):
@@ -357,11 +395,18 @@
                 break
             buf = buf[n:]
 
-    def _recv(self, size, read=_read):
+    def _recv(self, size, sentinels=(), read=_read):
         buf = io.BytesIO()
+        handle = self._handle
+        if sentinels:
+            handles = [handle] + sentinels
         remaining = size
         while remaining > 0:
-            chunk = read(self._handle, remaining)
+            if sentinels:
+                r = _select(handles, [], [])[0]
+                if handle not in r:
+                    raise SentinelReady(r)
+            chunk = read(handle, remaining)
             n = len(chunk)
             if n == 0:
                 if remaining == size:
@@ -381,15 +426,17 @@
         if n > 0:
             self._send(buf)
 
-    def _recv_bytes(self, maxsize=None):
-        buf = self._recv(4)
+    def _recv_bytes(self, maxsize=None, sentinels=()):
+        buf = self._recv(4, sentinels)
         size, = struct.unpack("=i", buf.getvalue())
         if maxsize is not None and size > maxsize:
             return None
-        return self._recv(size)
+        return self._recv(size, sentinels)
 
     def _poll(self, timeout):
-        r = select.select([self._handle], [], [], timeout)[0]
+        if timeout < 0.0:
+            timeout = None
+        r = _select([self._handle], [], [], timeout)[0]
         return bool(r)
 
 
@@ -495,23 +542,21 @@
             obsize, ibsize = 0, BUFSIZE
 
         h1 = win32.CreateNamedPipe(
-            address, openmode,
+            address, openmode | win32.FILE_FLAG_OVERLAPPED,
             win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
             win32.PIPE_WAIT,
             1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
             )
         h2 = win32.CreateFile(
-            address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+            address, access, 0, win32.NULL, win32.OPEN_EXISTING,
+            win32.FILE_FLAG_OVERLAPPED, win32.NULL
             )
         win32.SetNamedPipeHandleState(
             h2, win32.PIPE_READMODE_MESSAGE, None, None
             )
 
-        try:
-            win32.ConnectNamedPipe(h1, win32.NULL)
-        except WindowsError as e:
-            if e.args[0] != win32.ERROR_PIPE_CONNECTED:
-                raise
+        overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
+        overlapped.GetOverlappedResult(True)
 
         c1 = PipeConnection(h1, writable=duplex)
         c2 = PipeConnection(h2, readable=duplex)
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -35,6 +35,7 @@
 import os
 import sys
 import signal
+import select
 
 from multiprocessing import util, process
 
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -44,7 +44,7 @@
 
 from queue import Empty, Full
 import _multiprocessing
-from multiprocessing import Pipe
+from multiprocessing.connection import Pipe, SentinelReady
 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
 from multiprocessing.util import debug, info, Finalize, register_after_fork
 from multiprocessing.forking import assert_spawning
@@ -372,10 +372,10 @@
     def _make_methods(self):
         recv = self._reader.recv
         racquire, rrelease = self._rlock.acquire, self._rlock.release
-        def get():
+        def get(*, sentinels=None):
             racquire()
             try:
-                return recv()
+                return recv(sentinels)
             finally:
                 rrelease()
         self.get = get
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -19,7 +19,7 @@
 from concurrent import futures
 from concurrent.futures._base import (
     PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
-import concurrent.futures.process
+from concurrent.futures.process import BrokenProcessPool
 
 
 def create_future(state=PENDING, exception=None, result=None):
@@ -154,7 +154,7 @@
         processes = self.executor._processes
         self.executor.shutdown()
 
-        for p in processes:
+        for p in processes.values():
             p.join()
 
     def test_context_manager_shutdown(self):
@@ -163,7 +163,7 @@
             self.assertEqual(list(e.map(abs, range(-5, 5))),
                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 
-        for p in processes:
+        for p in processes.values():
             p.join()
 
     def test_del_shutdown(self):
@@ -174,7 +174,7 @@
         del executor
 
         queue_management_thread.join()
-        for p in processes:
+        for p in processes.values():
             p.join()
 
 class WaitTests(unittest.TestCase):
@@ -381,7 +381,17 @@
 
 
 class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
-    pass
+    def test_killed_child(self):
+        # When a child process is abruptly terminated, the whole pool gets
+        # "broken".
+        futures = [self.executor.submit(time.sleep, 3)]
+        # Get one of the processes, and terminate (kill) it
+        p = next(iter(self.executor._processes.values()))
+        p.terminate()
+        for fut in futures:
+            self.assertRaises(BrokenProcessPool, fut.result)
+        # Submitting other jobs fails as well.
+        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
 
 
 class FutureTests(unittest.TestCase):
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -187,6 +187,10 @@
 Library
 -------
 
+- Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
+  children and raises BrokenProcessPool in such a situation.  Previously it
+  would reliably freeze/deadlock.
+
 - Issue #12040: Expose a new attribute ``sentinel`` on instances of
   :class:`multiprocessing.Process`.  Also, fix Process.join() to not use
   polling anymore, when given a timeout.
diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c
--- a/Modules/_multiprocessing/win32_functions.c
+++ b/Modules/_multiprocessing/win32_functions.c
@@ -12,10 +12,223 @@
 #define WIN32_FUNCTION(func) \
     {#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_STATIC, ""}
 
+#define WIN32_KWARGS_FUNCTION(func) \
+    {#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_KEYWORDS | METH_STATIC, ""}
+
 #define WIN32_CONSTANT(fmt, con) \
     PyDict_SetItemString(Win32Type.tp_dict, #con, Py_BuildValue(fmt, con))
 
 
+/* Grab CancelIoEx dynamically from kernel32 */
+static int has_CancelIoEx = -1;
+static BOOL (CALLBACK *Py_CancelIoEx)(HANDLE, LPOVERLAPPED);
+
+static int
+check_CancelIoEx()
+{
+    if (has_CancelIoEx == -1)
+    {
+        HINSTANCE hKernel32 = GetModuleHandle("KERNEL32");
+        * (FARPROC *) &Py_CancelIoEx = GetProcAddress(hKernel32,
+                                                      "CancelIoEx");
+        has_CancelIoEx = (Py_CancelIoEx != NULL);
+    }
+    return has_CancelIoEx;
+}
+
+
+/*
+ * A Python object wrapping an OVERLAPPED structure and other useful data
+ * for overlapped I/O
+ */
+
+typedef struct {
+    PyObject_HEAD
+    OVERLAPPED overlapped;
+    /* For convenience, we store the file handle too */
+    HANDLE handle;
+    /* Whether there's I/O in flight */
+    int pending;
+    /* Whether I/O completed successfully */
+    int completed;
+    /* Buffer used for reading (optional) */
+    PyObject *read_buffer;
+    /* Buffer used for writing (optional) */
+    Py_buffer write_buffer;
+} OverlappedObject;
+
+static void
+overlapped_dealloc(OverlappedObject *self)
+{
+    int err = GetLastError();
+    if (self->pending) {
+        if (check_CancelIoEx())
+            Py_CancelIoEx(self->handle, &self->overlapped);
+        else {
+            PyErr_SetString(PyExc_RuntimeError,
+                            "I/O operations still in flight while destroying "
+                            "Overlapped object, the process may crash");
+            PyErr_WriteUnraisable(NULL);
+        }
+    }
+    CloseHandle(self->overlapped.hEvent);
+    SetLastError(err);
+    if (self->write_buffer.obj)
+        PyBuffer_Release(&self->write_buffer);
+    Py_CLEAR(self->read_buffer);
+    PyObject_Del(self);
+}
+
+static PyObject *
+overlapped_GetOverlappedResult(OverlappedObject *self, PyObject *waitobj)
+{
+    int wait;
+    BOOL res;
+    DWORD transferred = 0;
+
+    wait = PyObject_IsTrue(waitobj);
+    if (wait < 0)
+        return NULL;
+    Py_BEGIN_ALLOW_THREADS
+    res = GetOverlappedResult(self->handle, &self->overlapped, &transferred,
+                              wait != 0);
+    Py_END_ALLOW_THREADS
+
+    if (!res) {
+        int err = GetLastError();
+        if (err == ERROR_IO_INCOMPLETE)
+            Py_RETURN_NONE;
+        if (err != ERROR_MORE_DATA) {
+            self->pending = 0;
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
+        }
+    }
+    self->pending = 0;
+    self->completed = 1;
+    if (self->read_buffer) {
+        assert(PyBytes_CheckExact(self->read_buffer));
+        if (_PyBytes_Resize(&self->read_buffer, transferred))
+            return NULL;
+    }
+    return Py_BuildValue("lN", (long) transferred, PyBool_FromLong(res));
+}
+
+static PyObject *
+overlapped_getbuffer(OverlappedObject *self)
+{
+    PyObject *res;
+    if (!self->completed) {
+        PyErr_SetString(PyExc_ValueError,
+                        "can't get read buffer before GetOverlappedResult() "
+                        "signals the operation completed");
+        return NULL;
+    }
+    res = self->read_buffer ? self->read_buffer : Py_None;
+    Py_INCREF(res);
+    return res;
+}
+
+static PyObject *
+overlapped_cancel(OverlappedObject *self)
+{
+    BOOL res = TRUE;
+
+    if (self->pending) {
+        Py_BEGIN_ALLOW_THREADS
+        if (check_CancelIoEx())
+            res = Py_CancelIoEx(self->handle, &self->overlapped);
+        else
+            res = CancelIo(self->handle);
+        Py_END_ALLOW_THREADS
+    }
+
+    /* CancelIoEx returns ERROR_NOT_FOUND if the I/O completed in-between */
+    if (!res && GetLastError() != ERROR_NOT_FOUND)
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+    self->pending = 0;
+    Py_RETURN_NONE;
+}
+
+static PyMethodDef overlapped_methods[] = {
+    {"GetOverlappedResult", (PyCFunction) overlapped_GetOverlappedResult,
+                            METH_O, NULL},
+    {"getbuffer", (PyCFunction) overlapped_getbuffer, METH_NOARGS, NULL},
+    {"cancel", (PyCFunction) overlapped_cancel, METH_NOARGS, NULL},
+    {NULL}
+};
+
+static PyMemberDef overlapped_members[] = {
+    {"event", T_HANDLE,
+     offsetof(OverlappedObject, overlapped) + offsetof(OVERLAPPED, hEvent),
+     READONLY, "overlapped event handle"},
+    {NULL}
+};
+
+PyTypeObject OverlappedType = {
+    PyVarObject_HEAD_INIT(NULL, 0)
+    /* tp_name           */ "_multiprocessing.win32.Overlapped",
+    /* tp_basicsize      */ sizeof(OverlappedObject),
+    /* tp_itemsize       */ 0,
+    /* tp_dealloc        */ (destructor) overlapped_dealloc,
+    /* tp_print          */ 0,
+    /* tp_getattr        */ 0,
+    /* tp_setattr        */ 0,
+    /* tp_reserved       */ 0,
+    /* tp_repr           */ 0,
+    /* tp_as_number      */ 0,
+    /* tp_as_sequence    */ 0,
+    /* tp_as_mapping     */ 0,
+    /* tp_hash           */ 0,
+    /* tp_call           */ 0,
+    /* tp_str            */ 0,
+    /* tp_getattro       */ 0,
+    /* tp_setattro       */ 0,
+    /* tp_as_buffer      */ 0,
+    /* tp_flags          */ Py_TPFLAGS_DEFAULT,
+    /* tp_doc            */ "OVERLAPPED structure wrapper",
+    /* tp_traverse       */ 0,
+    /* tp_clear          */ 0,
+    /* tp_richcompare    */ 0,
+    /* tp_weaklistoffset */ 0,
+    /* tp_iter           */ 0,
+    /* tp_iternext       */ 0,
+    /* tp_methods        */ overlapped_methods,
+    /* tp_members        */ overlapped_members,
+    /* tp_getset         */ 0,
+    /* tp_base           */ 0,
+    /* tp_dict           */ 0,
+    /* tp_descr_get      */ 0,
+    /* tp_descr_set      */ 0,
+    /* tp_dictoffset     */ 0,
+    /* tp_init           */ 0,
+    /* tp_alloc          */ 0,
+    /* tp_new            */ 0,
+};
+
+static OverlappedObject *
+new_overlapped(HANDLE handle)
+{
+    OverlappedObject *self;
+
+    self = PyObject_New(OverlappedObject, &OverlappedType);
+    if (!self)
+        return NULL;
+    self->handle = handle;
+    self->read_buffer = NULL;
+    self->pending = 0;
+    self->completed = 0;
+    memset(&self->overlapped, 0, sizeof(OVERLAPPED));
+    memset(&self->write_buffer, 0, sizeof(Py_buffer));
+    /* Manual reset, initially non-signalled */
+    self->overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+    return self;
+}
+
+
+/*
+ * Module functions
+ */
+
 static PyObject *
 win32_CloseHandle(PyObject *self, PyObject *args)
 {
@@ -36,20 +249,44 @@
 }
 
 static PyObject *
-win32_ConnectNamedPipe(PyObject *self, PyObject *args)
+win32_ConnectNamedPipe(PyObject *self, PyObject *args, PyObject *kwds)
 {
     HANDLE hNamedPipe;
-    LPOVERLAPPED lpOverlapped;
+    int use_overlapped = 0;
     BOOL success;
+    OverlappedObject *overlapped = NULL;
+    static char *kwlist[] = {"handle", "overlapped", NULL};
 
-    if (!PyArg_ParseTuple(args, F_HANDLE F_POINTER,
-                          &hNamedPipe, &lpOverlapped))
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     F_HANDLE "|i", kwlist,
+                                     &hNamedPipe, &use_overlapped))
         return NULL;
 
+    if (use_overlapped) {
+        overlapped = new_overlapped(hNamedPipe);
+        if (!overlapped)
+            return NULL;
+    }
+
     Py_BEGIN_ALLOW_THREADS
-    success = ConnectNamedPipe(hNamedPipe, lpOverlapped);
+    success = ConnectNamedPipe(hNamedPipe,
+                               overlapped ? &overlapped->overlapped : NULL);
     Py_END_ALLOW_THREADS
 
+    if (overlapped) {
+        int err = GetLastError();
+        /* Overlapped ConnectNamedPipe never returns a success code */
+        assert(success == 0);
+        if (err == ERROR_IO_PENDING)
+            overlapped->pending = 1;
+        else if (err == ERROR_PIPE_CONNECTED)
+            SetEvent(overlapped->overlapped.hEvent);
+        else {
+            Py_DECREF(overlapped);
+            return PyErr_SetFromWindowsErr(err);
+        }
+        return (PyObject *) overlapped;
+    }
     if (!success)
         return PyErr_SetFromWindowsErr(0);
 
@@ -280,46 +517,109 @@
 }
 
 static PyObject *
-win32_WriteFile(PyObject *self, PyObject *args)
+win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds)
 {
     HANDLE handle;
-    Py_buffer buf;
+    Py_buffer _buf, *buf;
+    PyObject *bufobj;
     int written;
     BOOL ret;
+    int use_overlapped = 0;
+    OverlappedObject *overlapped = NULL;
+    static char *kwlist[] = {"handle", "buffer", "overlapped", NULL};
 
-    if (!PyArg_ParseTuple(args, F_HANDLE "y*:WriteFile" , &handle, &buf))
+    /* First get handle and use_overlapped to know which Py_buffer to use */
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     F_HANDLE "O|i:WriteFile", kwlist,
+                                     &handle, &bufobj, &use_overlapped))
         return NULL;
 
+    if (use_overlapped) {
+        overlapped = new_overlapped(handle);
+        if (!overlapped)
+            return NULL;
+        buf = &overlapped->write_buffer;
+    }
+    else
+        buf = &_buf;
+
+    if (!PyArg_Parse(bufobj, "y*", buf)) {
+        Py_XDECREF(overlapped);
+        return NULL;
+    }
+
     Py_BEGIN_ALLOW_THREADS
-    ret = WriteFile(handle, buf.buf, buf.len, &written, NULL);
+    ret = WriteFile(handle, buf->buf, buf->len, &written,
+                    overlapped ? &overlapped->overlapped : NULL);
     Py_END_ALLOW_THREADS
 
-    PyBuffer_Release(&buf);
+    if (overlapped) {
+        int err = GetLastError();
+        if (!ret) {
+            if (err == ERROR_IO_PENDING)
+                overlapped->pending = 1;
+            else {
+                Py_DECREF(overlapped);
+                return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+            }
+        }
+        return (PyObject *) overlapped;
+    }
+
+    PyBuffer_Release(buf);
     if (!ret)
         return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
     return PyLong_FromLong(written);
 }
 
 static PyObject *
-win32_ReadFile(PyObject *self, PyObject *args)
+win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds)
 {
     HANDLE handle;
     int size;
     DWORD nread;
     PyObject *buf;
     BOOL ret;
+    int use_overlapped = 0;
+    OverlappedObject *overlapped = NULL;
+    static char *kwlist[] = {"handle", "size", "overlapped", NULL};
 
-    if (!PyArg_ParseTuple(args, F_HANDLE "i:ReadFile" , &handle, &size))
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     F_HANDLE "i|i:ReadFile", kwlist,
+                                     &handle, &size, &use_overlapped))
         return NULL;
 
     buf = PyBytes_FromStringAndSize(NULL, size);
     if (!buf)
         return NULL;
+    if (use_overlapped) {
+        overlapped = new_overlapped(handle);
+        if (!overlapped) {
+            Py_DECREF(buf);
+            return NULL;
+        }
+        /* Steals reference to buf */
+        overlapped->read_buffer = buf;
+    }
 
     Py_BEGIN_ALLOW_THREADS
-    ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, NULL);
+    ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread,
+                   overlapped ? &overlapped->overlapped : NULL);
     Py_END_ALLOW_THREADS
 
+    if (overlapped) {
+        int err = GetLastError();
+        if (!ret) {
+            if (err == ERROR_IO_PENDING)
+                overlapped->pending = 1;
+            else if (err != ERROR_MORE_DATA) {
+                Py_DECREF(overlapped);
+                return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+            }
+        }
+        return (PyObject *) overlapped;
+    }
+
     if (!ret && GetLastError() != ERROR_MORE_DATA) {
         Py_DECREF(buf);
         return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
@@ -373,19 +673,71 @@
     }
 }
 
+static PyObject *
+win32_WaitForMultipleObjects(PyObject* self, PyObject* args)
+{
+    DWORD result;
+    PyObject *handle_seq;
+    HANDLE handles[MAXIMUM_WAIT_OBJECTS];
+    Py_ssize_t nhandles, i;
+    int wait_flag;
+    int milliseconds = INFINITE;
+
+    if (!PyArg_ParseTuple(args, "Oi|i:WaitForMultipleObjects",
+                                &handle_seq, &wait_flag, &milliseconds))
+        return NULL;
+
+    if (!PySequence_Check(handle_seq)) {
+        PyErr_Format(PyExc_TypeError,
+                     "sequence type expected, got '%s'",
+                     Py_TYPE(handle_seq)->tp_doc);
+        return NULL;
+    }
+    nhandles = PySequence_Length(handle_seq);
+    if (nhandles == -1)
+        return NULL;
+    if (nhandles < 0 || nhandles >= MAXIMUM_WAIT_OBJECTS) {
+        PyErr_Format(PyExc_ValueError,
+                     "need at most %zd handles, got a sequence of length %zd",
+                     MAXIMUM_WAIT_OBJECTS, nhandles);
+        return NULL;
+    }
+    for (i = 0; i < nhandles; i++) {
+        HANDLE h;
+        PyObject *v = PySequence_GetItem(handle_seq, i);
+        if (v == NULL)
+            return NULL;
+        if (!PyArg_Parse(v, F_HANDLE, &h))
+            return NULL;
+        handles[i] = h;
+    }
+
+    Py_BEGIN_ALLOW_THREADS
+    result = WaitForMultipleObjects((DWORD) nhandles, handles,
+                                    (BOOL) wait_flag, (DWORD) milliseconds);
+    Py_END_ALLOW_THREADS
+
+    if (result == WAIT_FAILED)
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+
+    return PyLong_FromLong((int) result);
+}
+
+
 static PyMethodDef win32_methods[] = {
     WIN32_FUNCTION(CloseHandle),
     WIN32_FUNCTION(GetLastError),
     WIN32_FUNCTION(OpenProcess),
     WIN32_FUNCTION(ExitProcess),
-    WIN32_FUNCTION(ConnectNamedPipe),
+    WIN32_KWARGS_FUNCTION(ConnectNamedPipe),
     WIN32_FUNCTION(CreateFile),
     WIN32_FUNCTION(CreateNamedPipe),
-    WIN32_FUNCTION(ReadFile),
+    WIN32_KWARGS_FUNCTION(ReadFile),
     WIN32_FUNCTION(PeekNamedPipe),
     WIN32_FUNCTION(SetNamedPipeHandleState),
+    WIN32_FUNCTION(WaitForMultipleObjects),
     WIN32_FUNCTION(WaitNamedPipe),
-    WIN32_FUNCTION(WriteFile),
+    WIN32_KWARGS_FUNCTION(WriteFile),
     WIN32_FUNCTION(closesocket),
     WIN32_FUNCTION(recv),
     WIN32_FUNCTION(send),
@@ -407,12 +759,18 @@
         return NULL;
     Py_INCREF(&Win32Type);
 
+    if (PyType_Ready(&OverlappedType) < 0)
+        return NULL;
+    PyDict_SetItemString(Win32Type.tp_dict, "Overlapped",
+                         (PyObject *) &OverlappedType);
+
     WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
     WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE);
     WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES);
     WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
     WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
     WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
+    WIN32_CONSTANT(F_DWORD, FILE_FLAG_OVERLAPPED);
     WIN32_CONSTANT(F_DWORD, GENERIC_READ);
     WIN32_CONSTANT(F_DWORD, GENERIC_WRITE);
     WIN32_CONSTANT(F_DWORD, INFINITE);

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list