[Python-checkins] cpython: Issue #11743: Rewrite multiprocessing connection classes in pure Python.

antoine.pitrou python-checkins at python.org
Mon May 9 17:10:41 CEST 2011


http://hg.python.org/cpython/rev/1ac03e071d65
changeset:   69980:1ac03e071d65
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Mon May 09 17:04:27 2011 +0200
summary:
  Issue #11743: Rewrite multiprocessing connection classes in pure Python.

files:
  Lib/multiprocessing/connection.py            |  315 +++++-
  Lib/multiprocessing/forking.py               |    5 +-
  Lib/multiprocessing/reduction.py             |   13 +-
  Lib/test/test_multiprocessing.py             |   12 +-
  Misc/NEWS                                    |    2 +
  Modules/_multiprocessing/connection.h        |  527 ----------
  Modules/_multiprocessing/multiprocessing.c   |   23 -
  Modules/_multiprocessing/multiprocessing.h   |   24 +-
  Modules/_multiprocessing/pipe_connection.c   |  149 --
  Modules/_multiprocessing/socket_connection.c |  202 ---
  Modules/_multiprocessing/win32_functions.c   |  166 +++
  PC/VC6/_multiprocessing.dsp                  |    8 -
  PC/VS8.0/_multiprocessing.vcproj             |   12 -
  PCbuild/_multiprocessing.vcproj              |   12 -
  setup.py                                     |    3 -
  15 files changed, 490 insertions(+), 983 deletions(-)


diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -34,19 +34,27 @@
 
 __all__ = [ 'Client', 'Listener', 'Pipe' ]
 
+import io
 import os
 import sys
+import pickle
+import select
 import socket
+import struct
 import errno
 import time
 import tempfile
 import itertools
 
 import _multiprocessing
-from multiprocessing import current_process, AuthenticationError
+from multiprocessing import current_process, AuthenticationError, BufferTooShort
 from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import duplicate, close
-
+try:
+    from _multiprocessing import win32
+except ImportError:
+    if sys.platform == 'win32':
+        raise
+    win32 = None
 
 #
 #
@@ -111,6 +119,281 @@
         raise ValueError('address type of %r unrecognized' % address)
 
 #
+# Connection classes
+#
+
+class _ConnectionBase:
+    _handle = None
+
+    def __init__(self, handle, readable=True, writable=True):
+        handle = handle.__index__()
+        if handle < 0:
+            raise ValueError("invalid handle")
+        if not readable and not writable:
+            raise ValueError(
+                "at least one of `readable` and `writable` must be True")
+        self._handle = handle
+        self._readable = readable
+        self._writable = writable
+
+    def __del__(self):
+        if self._handle is not None:
+            self._close()
+
+    def _check_closed(self):
+        if self._handle is None:
+            raise IOError("handle is closed")
+
+    def _check_readable(self):
+        if not self._readable:
+            raise IOError("connection is write-only")
+
+    def _check_writable(self):
+        if not self._writable:
+            raise IOError("connection is read-only")
+
+    def _bad_message_length(self):
+        if self._writable:
+            self._readable = False
+        else:
+            self.close()
+        raise IOError("bad message length")
+
+    @property
+    def closed(self):
+        """True if the connection is closed"""
+        return self._handle is None
+
+    @property
+    def readable(self):
+        """True if the connection is readable"""
+        return self._readable
+
+    @property
+    def writable(self):
+        """True if the connection is writable"""
+        return self._writable
+
+    def fileno(self):
+        """File descriptor or handle of the connection"""
+        self._check_closed()
+        return self._handle
+
+    def close(self):
+        """Close the connection"""
+        if self._handle is not None:
+            try:
+                self._close()
+            finally:
+                self._handle = None
+
+    def send_bytes(self, buf, offset=0, size=None):
+        """Send the bytes data from a bytes-like object"""
+        self._check_closed()
+        self._check_writable()
+        m = memoryview(buf)
+        # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
+        if m.itemsize > 1:
+            m = memoryview(bytes(m))
+        n = len(m)
+        if offset < 0:
+            raise ValueError("offset is negative")
+        if n < offset:
+            raise ValueError("buffer length < offset")
+        if size is None:
+            size = n - offset
+        elif size < 0:
+            raise ValueError("size is negative")
+        elif offset + size > n:
+            raise ValueError("buffer length < offset + size")
+        self._send_bytes(m[offset:offset + size])
+
+    def send(self, obj):
+        """Send a (picklable) object"""
+        self._check_closed()
+        self._check_writable()
+        buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
+        self._send_bytes(memoryview(buf))
+
+    def recv_bytes(self, maxlength=None):
+        """
+        Receive bytes data as a bytes object.
+        """
+        self._check_closed()
+        self._check_readable()
+        if maxlength is not None and maxlength < 0:
+            raise ValueError("negative maxlength")
+        buf = self._recv_bytes(maxlength)
+        if buf is None:
+            self._bad_message_length()
+        return buf.getvalue()
+
+    def recv_bytes_into(self, buf, offset=0):
+        """
+        Receive bytes data into a writeable buffer-like object.
+        Return the number of bytes read.
+        """
+        self._check_closed()
+        self._check_readable()
+        with memoryview(buf) as m:
+            # Get bytesize of arbitrary buffer
+            itemsize = m.itemsize
+            bytesize = itemsize * len(m)
+            if offset < 0:
+                raise ValueError("negative offset")
+            elif offset > bytesize:
+                raise ValueError("offset too large")
+            result = self._recv_bytes()
+            size = result.tell()
+            if bytesize < offset + size:
+                raise BufferTooShort(result.getvalue())
+            # Message can fit in dest
+            result.seek(0)
+            result.readinto(m[offset // itemsize :
+                              (offset + size) // itemsize])
+            return size
+
+    def recv(self):
+        """Receive a (picklable) object"""
+        self._check_closed()
+        self._check_readable()
+        buf = self._recv_bytes()
+        return pickle.loads(buf.getbuffer())
+
+    def poll(self, timeout=0.0):
+        """Whether there is any input available to be read"""
+        self._check_closed()
+        self._check_readable()
+        if timeout < 0.0:
+            timeout = None
+        return self._poll(timeout)
+
+
+if win32:
+
+    class PipeConnection(_ConnectionBase):
+        """
+        Connection class based on a Windows named pipe.
+        """
+
+        def _close(self):
+            win32.CloseHandle(self._handle)
+
+        def _send_bytes(self, buf):
+            nwritten = win32.WriteFile(self._handle, buf)
+            assert nwritten == len(buf)
+
+        def _recv_bytes(self, maxsize=None):
+            buf = io.BytesIO()
+            bufsize = 512
+            if maxsize is not None:
+                bufsize = min(bufsize, maxsize)
+            try:
+                firstchunk, complete = win32.ReadFile(self._handle, bufsize)
+            except IOError as e:
+                if e.errno == win32.ERROR_BROKEN_PIPE:
+                    raise EOFError
+                raise
+            lenfirstchunk = len(firstchunk)
+            buf.write(firstchunk)
+            if complete:
+                return buf
+            navail, nleft = win32.PeekNamedPipe(self._handle)
+            if maxsize is not None and lenfirstchunk + nleft > maxsize:
+                return None
+            lastchunk, complete = win32.ReadFile(self._handle, nleft)
+            assert complete
+            buf.write(lastchunk)
+            return buf
+
+        def _poll(self, timeout):
+            navail, nleft = win32.PeekNamedPipe(self._handle)
+            if navail > 0:
+                return True
+            elif timeout == 0.0:
+                return False
+            # Setup a polling loop (translated straight from old
+            # pipe_connection.c)
+            if timeout < 0.0:
+                deadline = None
+            else:
+                deadline = time.time() + timeout
+            delay = 0.001
+            max_delay = 0.02
+            while True:
+                time.sleep(delay)
+                navail, nleft = win32.PeekNamedPipe(self._handle)
+                if navail > 0:
+                    return True
+                if deadline and time.time() > deadline:
+                    return False
+                if delay < max_delay:
+                    delay += 0.001
+
+
+class Connection(_ConnectionBase):
+    """
+    Connection class based on an arbitrary file descriptor (Unix only), or
+    a socket handle (Windows).
+    """
+
+    if win32:
+        def _close(self):
+            win32.closesocket(self._handle)
+        _write = win32.send
+        _read = win32.recv
+    else:
+        def _close(self):
+            os.close(self._handle)
+        _write = os.write
+        _read = os.read
+
+    def _send(self, buf, write=_write):
+        remaining = len(buf)
+        while True:
+            n = write(self._handle, buf)
+            remaining -= n
+            if remaining == 0:
+                break
+            buf = buf[n:]
+
+    def _recv(self, size, read=_read):
+        buf = io.BytesIO()
+        remaining = size
+        while remaining > 0:
+            chunk = read(self._handle, remaining)
+            n = len(chunk)
+            if n == 0:
+                if remaining == size:
+                    raise EOFError
+                else:
+                    raise IOError("got end of file during message")
+            buf.write(chunk)
+            remaining -= n
+        return buf
+
+    def _send_bytes(self, buf):
+        # For wire compatibility with 3.2 and lower
+        n = len(buf)
+        self._send(struct.pack("=i", len(buf)))
+        # The condition is necessary to avoid "broken pipe" errors
+        # when sending a 0-length buffer if the other end closed the pipe.
+        if n > 0:
+            self._send(buf)
+
+    def _recv_bytes(self, maxsize=None):
+        buf = self._recv(4)
+        size, = struct.unpack("=i", buf.getvalue())
+        if maxsize is not None and size > maxsize:
+            return None
+        return self._recv(size)
+
+    def _poll(self, timeout):
+        r = select.select([self._handle], [], [], timeout)[0]
+        return bool(r)
+
+
+#
 # Public functions
 #
 
@@ -186,21 +469,19 @@
         '''
         if duplex:
             s1, s2 = socket.socketpair()
-            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
-            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
+            c1 = Connection(os.dup(s1.fileno()))
+            c2 = Connection(os.dup(s2.fileno()))
             s1.close()
             s2.close()
         else:
             fd1, fd2 = os.pipe()
-            c1 = _multiprocessing.Connection(fd1, writable=False)
-            c2 = _multiprocessing.Connection(fd2, readable=False)
+            c1 = Connection(fd1, writable=False)
+            c2 = Connection(fd2, readable=False)
 
         return c1, c2
 
 else:
 
-    from _multiprocessing import win32
-
     def Pipe(duplex=True):
         '''
         Returns pair of connection objects at either end of a pipe
@@ -234,8 +515,8 @@
             if e.args[0] != win32.ERROR_PIPE_CONNECTED:
                 raise
 
-        c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
-        c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
+        c1 = PipeConnection(h1, writable=duplex)
+        c2 = PipeConnection(h2, readable=duplex)
 
         return c1, c2
 
@@ -266,7 +547,7 @@
     def accept(self):
         s, self._last_accepted = self._socket.accept()
         fd = duplicate(s.fileno())
-        conn = _multiprocessing.Connection(fd)
+        conn = Connection(fd)
         s.close()
         return conn
 
@@ -298,7 +579,7 @@
             raise
 
         fd = duplicate(s.fileno())
-    conn = _multiprocessing.Connection(fd)
+    conn = Connection(fd)
     return conn
 
 #
@@ -345,7 +626,7 @@
             except WindowsError as e:
                 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
                     raise
-            return _multiprocessing.PipeConnection(handle)
+            return PipeConnection(handle)
 
         @staticmethod
         def _finalize_pipe_listener(queue, address):
@@ -377,7 +658,7 @@
         win32.SetNamedPipeHandleState(
             h, win32.PIPE_READMODE_MESSAGE, None, None
             )
-        return _multiprocessing.PipeConnection(h)
+        return PipeConnection(h)
 
 #
 # Authentication stuff
@@ -451,3 +732,7 @@
     global xmlrpclib
     import xmlrpc.client as xmlrpclib
     return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+
+
+# Late import because of circular import
+from multiprocessing.forking import duplicate, close
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -183,7 +183,7 @@
     import time
 
     from pickle import dump, load, HIGHEST_PROTOCOL
-    from _multiprocessing import win32, Connection, PipeConnection
+    from _multiprocessing import win32
     from .util import Finalize
 
     def dump(obj, file, protocol=None):
@@ -411,6 +411,9 @@
     # Make (Pipe)Connection picklable
     #
 
+    # Late import because of circular import
+    from .connection import Connection, PipeConnection
+
     def reduce_connection(conn):
         if not Popen.thread_is_spawning():
             raise RuntimeError(
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -44,7 +44,7 @@
 from multiprocessing import current_process
 from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
 from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener
+from multiprocessing.connection import Client, Listener, Connection
 
 
 #
@@ -159,7 +159,7 @@
     return new_handle
 
 #
-# Register `_multiprocessing.Connection` with `ForkingPickler`
+# Register `Connection` with `ForkingPickler`
 #
 
 def reduce_connection(conn):
@@ -168,11 +168,11 @@
 
 def rebuild_connection(reduced_handle, readable, writable):
     handle = rebuild_handle(reduced_handle)
-    return _multiprocessing.Connection(
+    return Connection(
         handle, readable=readable, writable=writable
         )
 
-ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
+ForkingPickler.register(Connection, reduce_connection)
 
 #
 # Register `socket.socket` with `ForkingPickler`
@@ -201,6 +201,7 @@
 #
 
 if sys.platform == 'win32':
+    from multiprocessing.connection import PipeConnection
 
     def reduce_pipe_connection(conn):
         rh = reduce_handle(conn.fileno())
@@ -208,8 +209,8 @@
 
     def rebuild_pipe_connection(reduced_handle, readable, writable):
         handle = rebuild_handle(reduced_handle)
-        return _multiprocessing.PipeConnection(
+        return PipeConnection(
             handle, readable=readable, writable=writable
             )
 
-    ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
+    ForkingPickler.register(PipeConnection, reduce_pipe_connection)
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1915,9 +1915,15 @@
 
     @unittest.skipIf(WIN32, "skipped on Windows")
     def test_invalid_handles(self):
-        conn = _multiprocessing.Connection(44977608)
-        self.assertRaises(IOError, conn.poll)
-        self.assertRaises(IOError, _multiprocessing.Connection, -1)
+        conn = multiprocessing.connection.Connection(44977608)
+        try:
+            self.assertRaises((ValueError, IOError), conn.poll)
+        finally:
+            # Hack private attribute _handle to avoid printing an error
+            # in conn.__del__
+            conn._handle = None
+        self.assertRaises((ValueError, IOError),
+                          multiprocessing.connection.Connection, -1)
 
 #
 # Functions used to create test cases from the base ones in this module
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -140,6 +140,8 @@
 Library
 -------
 
+- Issue #11743: Rewrite multiprocessing connection classes in pure Python.
+
 - Issue #11164: Stop trying to use _xmlplus in the xml module.
 
 - Issue #11888: Add log2 function to math module. Patch written by Mark
diff --git a/Modules/_multiprocessing/connection.h b/Modules/_multiprocessing/connection.h
deleted file mode 100644
--- a/Modules/_multiprocessing/connection.h
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Definition of a `Connection` type.
- * Used by `socket_connection.c` and `pipe_connection.c`.
- *
- * connection.h
- *
- * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
- */
-
-#ifndef CONNECTION_H
-#define CONNECTION_H
-
-/*
- * Read/write flags
- */
-
-#define READABLE 1
-#define WRITABLE 2
-
-#define CHECK_READABLE(self) \
-    if (!(self->flags & READABLE)) { \
-    PyErr_SetString(PyExc_IOError, "connection is write-only"); \
-    return NULL; \
-    }
-
-#define CHECK_WRITABLE(self) \
-    if (!(self->flags & WRITABLE)) { \
-    PyErr_SetString(PyExc_IOError, "connection is read-only"); \
-    return NULL; \
-    }
-
-/*
- * Allocation and deallocation
- */
-
-static PyObject *
-connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
-{
-    ConnectionObject *self;
-    HANDLE handle;
-    BOOL readable = TRUE, writable = TRUE;
-
-    static char *kwlist[] = {"handle", "readable", "writable", NULL};
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist,
-                                     &handle, &readable, &writable))
-        return NULL;
-
-    if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) {
-        PyErr_Format(PyExc_IOError, "invalid handle %zd",
-                     (Py_ssize_t)handle);
-        return NULL;
-    }
-
-    if (!readable && !writable) {
-        PyErr_SetString(PyExc_ValueError,
-                        "either readable or writable must be true");
-        return NULL;
-    }
-
-    self = PyObject_New(ConnectionObject, type);
-    if (self == NULL)
-        return NULL;
-
-    self->weakreflist = NULL;
-    self->handle = handle;
-    self->flags = 0;
-
-    if (readable)
-        self->flags |= READABLE;
-    if (writable)
-        self->flags |= WRITABLE;
-    assert(self->flags >= 1 && self->flags <= 3);
-
-    return (PyObject*)self;
-}
-
-static void
-connection_dealloc(ConnectionObject* self)
-{
-    if (self->weakreflist != NULL)
-        PyObject_ClearWeakRefs((PyObject*)self);
-
-    if (self->handle != INVALID_HANDLE_VALUE) {
-        Py_BEGIN_ALLOW_THREADS
-        CLOSE(self->handle);
-        Py_END_ALLOW_THREADS
-    }
-    PyObject_Del(self);
-}
-
-/*
- * Functions for transferring buffers
- */
-
-static PyObject *
-connection_sendbytes(ConnectionObject *self, PyObject *args)
-{
-    Py_buffer pbuffer;
-    char *buffer;
-    Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN;
-    int res;
-
-    if (!PyArg_ParseTuple(args, F_RBUFFER "*|" F_PY_SSIZE_T F_PY_SSIZE_T,
-                          &pbuffer, &offset, &size))
-        return NULL;
-    buffer = pbuffer.buf;
-    length = pbuffer.len;
-
-    CHECK_WRITABLE(self); /* XXX release buffer in case of failure */
-
-    if (offset < 0) {
-        PyBuffer_Release(&pbuffer);
-        PyErr_SetString(PyExc_ValueError, "offset is negative");
-        return NULL;
-    }
-    if (length < offset) {
-        PyBuffer_Release(&pbuffer);
-        PyErr_SetString(PyExc_ValueError, "buffer length < offset");
-        return NULL;
-    }
-
-    if (size == PY_SSIZE_T_MIN) {
-        size = length - offset;
-    } else {
-        if (size < 0) {
-            PyBuffer_Release(&pbuffer);
-            PyErr_SetString(PyExc_ValueError, "size is negative");
-            return NULL;
-        }
-        if (offset + size > length) {
-            PyBuffer_Release(&pbuffer);
-            PyErr_SetString(PyExc_ValueError,
-                            "buffer length < offset + size");
-            return NULL;
-        }
-    }
-
-    res = conn_send_string(self, buffer + offset, size);
-
-    PyBuffer_Release(&pbuffer);
-    if (res < 0) {
-        if (PyErr_Occurred())
-            return NULL;
-        else
-            return mp_SetError(PyExc_IOError, res);
-    }
-
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-connection_recvbytes(ConnectionObject *self, PyObject *args)
-{
-    char *freeme = NULL;
-    Py_ssize_t res, maxlength = PY_SSIZE_T_MAX;
-    PyObject *result = NULL;
-
-    if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength))
-        return NULL;
-
-    CHECK_READABLE(self);
-
-    if (maxlength < 0) {
-        PyErr_SetString(PyExc_ValueError, "maxlength < 0");
-        return NULL;
-    }
-
-    res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
-                           &freeme, maxlength);
-
-    if (res < 0) {
-        if (res == MP_BAD_MESSAGE_LENGTH) {
-            if ((self->flags & WRITABLE) == 0) {
-                Py_BEGIN_ALLOW_THREADS
-                CLOSE(self->handle);
-                Py_END_ALLOW_THREADS
-                self->handle = INVALID_HANDLE_VALUE;
-            } else {
-                self->flags = WRITABLE;
-            }
-        }
-        mp_SetError(PyExc_IOError, res);
-    } else {
-        if (freeme == NULL) {
-            result = PyBytes_FromStringAndSize(self->buffer, res);
-        } else {
-            result = PyBytes_FromStringAndSize(freeme, res);
-            PyMem_Free(freeme);
-        }
-    }
-
-    return result;
-}
-
-static PyObject *
-connection_recvbytes_into(ConnectionObject *self, PyObject *args)
-{
-    char *freeme = NULL, *buffer = NULL;
-    Py_ssize_t res, length, offset = 0;
-    PyObject *result = NULL;
-    Py_buffer pbuf;
-
-    CHECK_READABLE(self);
-
-    if (!PyArg_ParseTuple(args, "w*|" F_PY_SSIZE_T,
-                          &pbuf, &offset))
-        return NULL;
-
-    buffer = pbuf.buf;
-    length = pbuf.len;
-
-    if (offset < 0) {
-        PyErr_SetString(PyExc_ValueError, "negative offset");
-        goto _error;
-    }
-
-    if (offset > length) {
-        PyErr_SetString(PyExc_ValueError, "offset too large");
-        goto _error;
-    }
-
-    res = conn_recv_string(self, buffer+offset, length-offset,
-                           &freeme, PY_SSIZE_T_MAX);
-
-    if (res < 0) {
-        if (res == MP_BAD_MESSAGE_LENGTH) {
-            if ((self->flags & WRITABLE) == 0) {
-                Py_BEGIN_ALLOW_THREADS
-                CLOSE(self->handle);
-                Py_END_ALLOW_THREADS
-                self->handle = INVALID_HANDLE_VALUE;
-            } else {
-                self->flags = WRITABLE;
-            }
-        }
-        mp_SetError(PyExc_IOError, res);
-    } else {
-        if (freeme == NULL) {
-            result = PyInt_FromSsize_t(res);
-        } else {
-            result = PyObject_CallFunction(BufferTooShort,
-                                           F_RBUFFER "#",
-                                           freeme, res);
-            PyMem_Free(freeme);
-            if (result) {
-                PyErr_SetObject(BufferTooShort, result);
-                Py_DECREF(result);
-            }
-            goto _error;
-        }
-    }
-
-_cleanup:
-    PyBuffer_Release(&pbuf);
-    return result;
-
-_error:
-    result = NULL;
-    goto _cleanup;
-}
-
-/*
- * Functions for transferring objects
- */
-
-static PyObject *
-connection_send_obj(ConnectionObject *self, PyObject *obj)
-{
-    char *buffer;
-    int res;
-    Py_ssize_t length;
-    PyObject *pickled_string = NULL;
-
-    CHECK_WRITABLE(self);
-
-    pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj,
-                                                  pickle_protocol, NULL);
-    if (!pickled_string)
-        goto failure;
-
-    if (PyBytes_AsStringAndSize(pickled_string, &buffer, &length) < 0)
-        goto failure;
-
-    res = conn_send_string(self, buffer, (int)length);
-
-    if (res < 0) {
-        mp_SetError(PyExc_IOError, res);
-        goto failure;
-    }
-
-    Py_XDECREF(pickled_string);
-    Py_RETURN_NONE;
-
-  failure:
-    Py_XDECREF(pickled_string);
-    return NULL;
-}
-
-static PyObject *
-connection_recv_obj(ConnectionObject *self)
-{
-    char *freeme = NULL;
-    Py_ssize_t res;
-    PyObject *temp = NULL, *result = NULL;
-
-    CHECK_READABLE(self);
-
-    res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
-                           &freeme, PY_SSIZE_T_MAX);
-
-    if (res < 0) {
-        if (res == MP_BAD_MESSAGE_LENGTH) {
-            if ((self->flags & WRITABLE) == 0) {
-                Py_BEGIN_ALLOW_THREADS
-                CLOSE(self->handle);
-                Py_END_ALLOW_THREADS
-                self->handle = INVALID_HANDLE_VALUE;
-            } else {
-                self->flags = WRITABLE;
-            }
-        }
-        mp_SetError(PyExc_IOError, res);
-    } else {
-        if (freeme == NULL) {
-            temp = PyBytes_FromStringAndSize(self->buffer, res);
-        } else {
-            temp = PyBytes_FromStringAndSize(freeme, res);
-            PyMem_Free(freeme);
-        }
-    }
-
-    if (temp)
-        result = PyObject_CallFunctionObjArgs(pickle_loads,
-                                              temp, NULL);
-    Py_XDECREF(temp);
-    return result;
-}
-
-/*
- * Other functions
- */
-
-static PyObject *
-connection_poll(ConnectionObject *self, PyObject *args)
-{
-    PyObject *timeout_obj = NULL;
-    double timeout = 0.0;
-    int res;
-
-    CHECK_READABLE(self);
-
-    if (!PyArg_ParseTuple(args, "|O", &timeout_obj))
-        return NULL;
-
-    if (timeout_obj == NULL) {
-        timeout = 0.0;
-    } else if (timeout_obj == Py_None) {
-        timeout = -1.0;                                 /* block forever */
-    } else {
-        timeout = PyFloat_AsDouble(timeout_obj);
-        if (PyErr_Occurred())
-            return NULL;
-        if (timeout < 0.0)
-            timeout = 0.0;
-    }
-
-    Py_BEGIN_ALLOW_THREADS
-    res = conn_poll(self, timeout, _save);
-    Py_END_ALLOW_THREADS
-
-    switch (res) {
-    case TRUE:
-        Py_RETURN_TRUE;
-    case FALSE:
-        Py_RETURN_FALSE;
-    default:
-        return mp_SetError(PyExc_IOError, res);
-    }
-}
-
-static PyObject *
-connection_fileno(ConnectionObject* self)
-{
-    if (self->handle == INVALID_HANDLE_VALUE) {
-        PyErr_SetString(PyExc_IOError, "handle is invalid");
-        return NULL;
-    }
-    return PyInt_FromLong((long)self->handle);
-}
-
-static PyObject *
-connection_close(ConnectionObject *self)
-{
-    if (self->handle != INVALID_HANDLE_VALUE) {
-        Py_BEGIN_ALLOW_THREADS
-        CLOSE(self->handle);
-        Py_END_ALLOW_THREADS
-        self->handle = INVALID_HANDLE_VALUE;
-    }
-
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-connection_repr(ConnectionObject *self)
-{
-    static char *conn_type[] = {"read-only", "write-only", "read-write"};
-
-    assert(self->flags >= 1 && self->flags <= 3);
-    return FROM_FORMAT("<%s %s, handle %zd>",
-                       conn_type[self->flags - 1],
-                       CONNECTION_NAME, (Py_ssize_t)self->handle);
-}
-
-/*
- * Getters and setters
- */
-
-static PyObject *
-connection_closed(ConnectionObject *self, void *closure)
-{
-    return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE));
-}
-
-static PyObject *
-connection_readable(ConnectionObject *self, void *closure)
-{
-    return PyBool_FromLong((long)(self->flags & READABLE));
-}
-
-static PyObject *
-connection_writable(ConnectionObject *self, void *closure)
-{
-    return PyBool_FromLong((long)(self->flags & WRITABLE));
-}
-
-/*
- * Tables
- */
-
-static PyMethodDef connection_methods[] = {
-    {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS,
-     "send the byte data from a readable buffer-like object"},
-    {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS,
-     "receive byte data as a string"},
-    {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS,
-     "receive byte data into a writeable buffer-like object\n"
-     "returns the number of bytes read"},
-
-    {"send", (PyCFunction)connection_send_obj, METH_O,
-     "send a (picklable) object"},
-    {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS,
-     "receive a (picklable) object"},
-
-    {"poll", (PyCFunction)connection_poll, METH_VARARGS,
-     "whether there is any input available to be read"},
-    {"fileno", (PyCFunction)connection_fileno, METH_NOARGS,
-     "file descriptor or handle of the connection"},
-    {"close", (PyCFunction)connection_close, METH_NOARGS,
-     "close the connection"},
-
-    {NULL}  /* Sentinel */
-};
-
-static PyGetSetDef connection_getset[] = {
-    {"closed", (getter)connection_closed, NULL,
-     "True if the connection is closed", NULL},
-    {"readable", (getter)connection_readable, NULL,
-     "True if the connection is readable", NULL},
-    {"writable", (getter)connection_writable, NULL,
-     "True if the connection is writable", NULL},
-    {NULL}
-};
-
-/*
- * Connection type
- */
-
-PyDoc_STRVAR(connection_doc,
-             "Connection type whose constructor signature is\n\n"
-             "    Connection(handle, readable=True, writable=True).\n\n"
-             "The constructor does *not* duplicate the handle.");
-
-PyTypeObject CONNECTION_TYPE = {
-    PyVarObject_HEAD_INIT(NULL, 0)
-    /* tp_name           */ "_multiprocessing." CONNECTION_NAME,
-    /* tp_basicsize      */ sizeof(ConnectionObject),
-    /* tp_itemsize       */ 0,
-    /* tp_dealloc        */ (destructor)connection_dealloc,
-    /* tp_print          */ 0,
-    /* tp_getattr        */ 0,
-    /* tp_setattr        */ 0,
-    /* tp_reserved       */ 0,
-    /* tp_repr           */ (reprfunc)connection_repr,
-    /* tp_as_number      */ 0,
-    /* tp_as_sequence    */ 0,
-    /* tp_as_mapping     */ 0,
-    /* tp_hash           */ 0,
-    /* tp_call           */ 0,
-    /* tp_str            */ 0,
-    /* tp_getattro       */ 0,
-    /* tp_setattro       */ 0,
-    /* tp_as_buffer      */ 0,
-    /* tp_flags          */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
-                            Py_TPFLAGS_HAVE_WEAKREFS,
-    /* tp_doc            */ connection_doc,
-    /* tp_traverse       */ 0,
-    /* tp_clear          */ 0,
-    /* tp_richcompare    */ 0,
-    /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist),
-    /* tp_iter           */ 0,
-    /* tp_iternext       */ 0,
-    /* tp_methods        */ connection_methods,
-    /* tp_members        */ 0,
-    /* tp_getset         */ connection_getset,
-    /* tp_base           */ 0,
-    /* tp_dict           */ 0,
-    /* tp_descr_get      */ 0,
-    /* tp_descr_set      */ 0,
-    /* tp_dictoffset     */ 0,
-    /* tp_init           */ 0,
-    /* tp_alloc          */ 0,
-    /* tp_new            */ connection_new,
-};
-
-#endif /* CONNECTION_H */
diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c
--- a/Modules/_multiprocessing/multiprocessing.c
+++ b/Modules/_multiprocessing/multiprocessing.c
@@ -49,16 +49,6 @@
     case MP_MEMORY_ERROR:
         PyErr_NoMemory();
         break;
-    case MP_END_OF_FILE:
-        PyErr_SetNone(PyExc_EOFError);
-        break;
-    case MP_EARLY_END_OF_FILE:
-        PyErr_SetString(PyExc_IOError,
-                        "got end of file during message");
-        break;
-    case MP_BAD_MESSAGE_LENGTH:
-        PyErr_SetString(PyExc_IOError, "bad message length");
-        break;
     case MP_EXCEPTION_HAS_BEEN_SET:
         break;
     default:
@@ -257,12 +247,6 @@
     BufferTooShort = PyObject_GetAttrString(temp, "BufferTooShort");
     Py_XDECREF(temp);
 
-    /* Add connection type to module */
-    if (PyType_Ready(&ConnectionType) < 0)
-        return NULL;
-    Py_INCREF(&ConnectionType);
-    PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType);
-
 #if defined(MS_WINDOWS) ||                                              \
   (defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED))
     /* Add SemLock type to module */
@@ -286,13 +270,6 @@
 #endif
 
 #ifdef MS_WINDOWS
-    /* Add PipeConnection to module */
-    if (PyType_Ready(&PipeConnectionType) < 0)
-        return NULL;
-    Py_INCREF(&PipeConnectionType);
-    PyModule_AddObject(module, "PipeConnection",
-                       (PyObject*)&PipeConnectionType);
-
     /* Initialize win32 class and add to multiprocessing */
     temp = create_win32_namespace();
     if (!temp)
diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h
--- a/Modules/_multiprocessing/multiprocessing.h
+++ b/Modules/_multiprocessing/multiprocessing.h
@@ -118,11 +118,8 @@
 #define MP_SUCCESS (0)
 #define MP_STANDARD_ERROR (-1)
 #define MP_MEMORY_ERROR (-1001)
-#define MP_END_OF_FILE (-1002)
-#define MP_EARLY_END_OF_FILE (-1003)
-#define MP_BAD_MESSAGE_LENGTH (-1004)
-#define MP_SOCKET_ERROR (-1005)
-#define MP_EXCEPTION_HAS_BEEN_SET (-1006)
+#define MP_SOCKET_ERROR (-1002)
+#define MP_EXCEPTION_HAS_BEEN_SET (-1003)
 
 PyObject *mp_SetError(PyObject *Type, int num);
 
@@ -135,7 +132,6 @@
 extern PyObject *pickle_protocol;
 extern PyObject *BufferTooShort;
 extern PyTypeObject SemLockType;
-extern PyTypeObject ConnectionType;
 extern PyTypeObject PipeConnectionType;
 extern HANDLE sigint_event;
 
@@ -162,25 +158,9 @@
 #endif
 
 /*
- * Connection definition
- */
-
-#define CONNECTION_BUFFER_SIZE 1024
-
-typedef struct {
-    PyObject_HEAD
-    HANDLE handle;
-    int flags;
-    PyObject *weakreflist;
-    char buffer[CONNECTION_BUFFER_SIZE];
-} ConnectionObject;
-
-/*
  * Miscellaneous
  */
 
-#define MAX_MESSAGE_LENGTH 0x7fffffff
-
 #ifndef MIN
 #  define MIN(x, y) ((x) < (y) ? x : y)
 #  define MAX(x, y) ((x) > (y) ? x : y)
diff --git a/Modules/_multiprocessing/pipe_connection.c b/Modules/_multiprocessing/pipe_connection.c
deleted file mode 100644
--- a/Modules/_multiprocessing/pipe_connection.c
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * A type which wraps a pipe handle in message oriented mode
- *
- * pipe_connection.c
- *
- * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
- */
-
-#include "multiprocessing.h"
-
-#define CLOSE(h) CloseHandle(h)
-
-/*
- * Send string to the pipe; assumes in message oriented mode
- */
-
-static Py_ssize_t
-conn_send_string(ConnectionObject *conn, char *string, size_t length)
-{
-    DWORD amount_written;
-    BOOL ret;
-
-    Py_BEGIN_ALLOW_THREADS
-    ret = WriteFile(conn->handle, string, length, &amount_written, NULL);
-    Py_END_ALLOW_THREADS
-
-    if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) {
-        PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length);
-        return MP_STANDARD_ERROR;
-    }
-
-    return ret ? MP_SUCCESS : MP_STANDARD_ERROR;
-}
-
-/*
- * Attempts to read into buffer, or if buffer too small into *newbuffer.
- *
- * Returns number of bytes read.  Assumes in message oriented mode.
- */
-
-static Py_ssize_t
-conn_recv_string(ConnectionObject *conn, char *buffer,
-                 size_t buflength, char **newbuffer, size_t maxlength)
-{
-    DWORD left, length, full_length, err;
-    BOOL ret;
-    *newbuffer = NULL;
-
-    Py_BEGIN_ALLOW_THREADS
-    ret = ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
-                  &length, NULL);
-    Py_END_ALLOW_THREADS
-    if (ret)
-        return length;
-
-    err = GetLastError();
-    if (err != ERROR_MORE_DATA) {
-        if (err == ERROR_BROKEN_PIPE)
-            return MP_END_OF_FILE;
-        return MP_STANDARD_ERROR;
-    }
-
-    if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
-        return MP_STANDARD_ERROR;
-
-    full_length = length + left;
-    if (full_length > maxlength)
-        return MP_BAD_MESSAGE_LENGTH;
-
-    *newbuffer = PyMem_Malloc(full_length);
-    if (*newbuffer == NULL)
-        return MP_MEMORY_ERROR;
-
-    memcpy(*newbuffer, buffer, length);
-
-    Py_BEGIN_ALLOW_THREADS
-    ret = ReadFile(conn->handle, *newbuffer+length, left, &length, NULL);
-    Py_END_ALLOW_THREADS
-    if (ret) {
-        assert(length == left);
-        return full_length;
-    } else {
-        PyMem_Free(*newbuffer);
-        return MP_STANDARD_ERROR;
-    }
-}
-
-/*
- * Check whether any data is available for reading
- */
-
-static int
-conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
-{
-    DWORD bytes, deadline, delay;
-    int difference, res;
-    BOOL block = FALSE;
-
-    if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
-        return MP_STANDARD_ERROR;
-
-    if (timeout == 0.0)
-        return bytes > 0;
-
-    if (timeout < 0.0)
-        block = TRUE;
-    else
-        /* XXX does not check for overflow */
-        deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
-
-    Sleep(0);
-
-    for (delay = 1 ; ; delay += 1) {
-        if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
-            return MP_STANDARD_ERROR;
-        else if (bytes > 0)
-            return TRUE;
-
-        if (!block) {
-            difference = deadline - GetTickCount();
-            if (difference < 0)
-                return FALSE;
-            if ((int)delay > difference)
-                delay = difference;
-        }
-
-        if (delay > 20)
-            delay = 20;
-
-        Sleep(delay);
-
-        /* check for signals */
-        Py_BLOCK_THREADS
-        res = PyErr_CheckSignals();
-        Py_UNBLOCK_THREADS
-
-        if (res)
-            return MP_EXCEPTION_HAS_BEEN_SET;
-    }
-}
-
-/*
- * "connection.h" defines the PipeConnection type using the definitions above
- */
-
-#define CONNECTION_NAME "PipeConnection"
-#define CONNECTION_TYPE PipeConnectionType
-
-#include "connection.h"
diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c
deleted file mode 100644
--- a/Modules/_multiprocessing/socket_connection.c
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * A type which wraps a socket
- *
- * socket_connection.c
- *
- * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
- */
-
-#include "multiprocessing.h"
-
-#ifdef MS_WINDOWS
-#  define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0)
-#  define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0)
-#  define CLOSE(h) closesocket((SOCKET)h)
-#else
-#  define WRITE(h, buffer, length) write(h, buffer, length)
-#  define READ(h, buffer, length) read(h, buffer, length)
-#  define CLOSE(h) close(h)
-#endif
-
-/*
- * Send string to file descriptor
- */
-
-static Py_ssize_t
-_conn_sendall(HANDLE h, char *string, size_t length)
-{
-    char *p = string;
-    Py_ssize_t res;
-
-    while (length > 0) {
-        res = WRITE(h, p, length);
-        if (res < 0)
-            return MP_SOCKET_ERROR;
-        length -= res;
-        p += res;
-    }
-
-    return MP_SUCCESS;
-}
-
-/*
- * Receive string of exact length from file descriptor
- */
-
-static Py_ssize_t
-_conn_recvall(HANDLE h, char *buffer, size_t length)
-{
-    size_t remaining = length;
-    Py_ssize_t temp;
-    char *p = buffer;
-
-    while (remaining > 0) {
-        temp = READ(h, p, remaining);
-        if (temp <= 0) {
-            if (temp == 0)
-                return remaining == length ?
-                    MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
-            else
-                return temp;
-        }
-        remaining -= temp;
-        p += temp;
-    }
-
-    return MP_SUCCESS;
-}
-
-/*
- * Send a string prepended by the string length in network byte order
- */
-
-static Py_ssize_t
-conn_send_string(ConnectionObject *conn, char *string, size_t length)
-{
-    Py_ssize_t res;
-    /* The "header" of the message is a 32 bit unsigned number (in
-       network order) which specifies the length of the "body".  If
-       the message is shorter than about 16kb then it is quicker to
-       combine the "header" and the "body" of the message and send
-       them at once. */
-    if (length < (16*1024)) {
-        char *message;
-
-        message = PyMem_Malloc(length+4);
-        if (message == NULL)
-            return MP_MEMORY_ERROR;
-
-        *(UINT32*)message = htonl((UINT32)length);
-        memcpy(message+4, string, length);
-        Py_BEGIN_ALLOW_THREADS
-        res = _conn_sendall(conn->handle, message, length+4);
-        Py_END_ALLOW_THREADS
-        PyMem_Free(message);
-    } else {
-        UINT32 lenbuff;
-
-        if (length > MAX_MESSAGE_LENGTH)
-            return MP_BAD_MESSAGE_LENGTH;
-
-        lenbuff = htonl((UINT32)length);
-        Py_BEGIN_ALLOW_THREADS
-        res = _conn_sendall(conn->handle, (char*)&lenbuff, 4) ||
-            _conn_sendall(conn->handle, string, length);
-        Py_END_ALLOW_THREADS
-    }
-    return res;
-}
-
-/*
- * Attempts to read into buffer, or failing that into *newbuffer
- *
- * Returns number of bytes read.
- */
-
-static Py_ssize_t
-conn_recv_string(ConnectionObject *conn, char *buffer,
-                 size_t buflength, char **newbuffer, size_t maxlength)
-{
-    int res;
-    UINT32 ulength;
-
-    *newbuffer = NULL;
-
-    Py_BEGIN_ALLOW_THREADS
-    res = _conn_recvall(conn->handle, (char*)&ulength, 4);
-    Py_END_ALLOW_THREADS
-    if (res < 0)
-        return res;
-
-    ulength = ntohl(ulength);
-    if (ulength > maxlength)
-        return MP_BAD_MESSAGE_LENGTH;
-
-    if (ulength <= buflength) {
-        Py_BEGIN_ALLOW_THREADS
-        res = _conn_recvall(conn->handle, buffer, (size_t)ulength);
-        Py_END_ALLOW_THREADS
-        return res < 0 ? res : ulength;
-    } else {
-        *newbuffer = PyMem_Malloc((size_t)ulength);
-        if (*newbuffer == NULL)
-            return MP_MEMORY_ERROR;
-        Py_BEGIN_ALLOW_THREADS
-        res = _conn_recvall(conn->handle, *newbuffer, (size_t)ulength);
-        Py_END_ALLOW_THREADS
-        return res < 0 ? (Py_ssize_t)res : (Py_ssize_t)ulength;
-    }
-}
-
-/*
- * Check whether any data is available for reading -- neg timeout blocks
- */
-
-static int
-conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
-{
-    int res;
-    fd_set rfds;
-
-    /*
-     * Verify the handle, issue 3321. Not required for windows.
-     */
-    #ifndef MS_WINDOWS
-        if (((int)conn->handle) < 0 || ((int)conn->handle) >= FD_SETSIZE) {
-            Py_BLOCK_THREADS
-            PyErr_SetString(PyExc_IOError, "handle out of range in select()");
-            Py_UNBLOCK_THREADS
-            return MP_EXCEPTION_HAS_BEEN_SET;
-        }
-    #endif
-
-    FD_ZERO(&rfds);
-    FD_SET((SOCKET)conn->handle, &rfds);
-
-    if (timeout < 0.0) {
-        res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
-    } else {
-        struct timeval tv;
-        tv.tv_sec = (long)timeout;
-        tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5);
-        res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv);
-    }
-
-    if (res < 0) {
-        return MP_SOCKET_ERROR;
-    } else if (FD_ISSET(conn->handle, &rfds)) {
-        return TRUE;
-    } else {
-        assert(res == 0);
-        return FALSE;
-    }
-}
-
-/*
- * "connection.h" defines the Connection type using defs above
- */
-
-#define CONNECTION_NAME "Connection"
-#define CONNECTION_TYPE ConnectionType
-
-#include "connection.h"
diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c
--- a/Modules/_multiprocessing/win32_functions.c
+++ b/Modules/_multiprocessing/win32_functions.c
@@ -215,6 +215,164 @@
     Py_RETURN_NONE;
 }
 
+static PyObject *
+win32_closesocket(PyObject *self, PyObject *args)
+{
+    HANDLE handle;
+    int ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE ":closesocket" , &handle))
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = closesocket((SOCKET) handle);
+    Py_END_ALLOW_THREADS
+
+    if (ret)
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+win32_recv(PyObject *self, PyObject *args)
+{
+    HANDLE handle;
+    int size, nread;
+    PyObject *buf;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE "i:recv" , &handle, &size))
+        return NULL;
+
+    buf = PyBytes_FromStringAndSize(NULL, size);
+    if (!buf)
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    nread = recv((SOCKET) handle, PyBytes_AS_STRING(buf), size, 0);
+    Py_END_ALLOW_THREADS
+
+    if (nread < 0) {
+        Py_DECREF(buf);
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+    }
+    _PyBytes_Resize(&buf, nread);
+    return buf;
+}
+
+static PyObject *
+win32_send(PyObject *self, PyObject *args)
+{
+    HANDLE handle;
+    Py_buffer buf;
+    int ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE "y*:send" , &handle, &buf))
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = send((SOCKET) handle, buf.buf, buf.len, 0);
+    Py_END_ALLOW_THREADS
+
+    PyBuffer_Release(&buf);
+    if (ret < 0)
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, WSAGetLastError());
+    return PyLong_FromLong(ret);
+}
+
+static PyObject *
+win32_WriteFile(PyObject *self, PyObject *args)
+{
+    HANDLE handle;
+    Py_buffer buf;
+    int written;
+    BOOL ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE "y*:WriteFile" , &handle, &buf))
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = WriteFile(handle, buf.buf, buf.len, &written, NULL);
+    Py_END_ALLOW_THREADS
+
+    PyBuffer_Release(&buf);
+    if (!ret)
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+    return PyLong_FromLong(written);
+}
+
+static PyObject *
+win32_ReadFile(PyObject *self, PyObject *args)
+{
+    HANDLE handle;
+    int size;
+    DWORD nread;
+    PyObject *buf;
+    BOOL ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE "i:ReadFile" , &handle, &size))
+        return NULL;
+
+    buf = PyBytes_FromStringAndSize(NULL, size);
+    if (!buf)
+        return NULL;
+
+    Py_BEGIN_ALLOW_THREADS
+    ret = ReadFile(handle, PyBytes_AS_STRING(buf), size, &nread, NULL);
+    Py_END_ALLOW_THREADS
+
+    if (!ret && GetLastError() != ERROR_MORE_DATA) {
+        Py_DECREF(buf);
+        return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+    }
+    if (_PyBytes_Resize(&buf, nread))
+        return NULL;
+    return Py_BuildValue("NN", buf, PyBool_FromLong(ret));
+}
+
+static PyObject *
+win32_PeekNamedPipe(PyObject *self, PyObject *args)
+{
+    HANDLE handle;
+    int size = 0;
+    PyObject *buf = NULL;
+    DWORD nread, navail, nleft;
+    BOOL ret;
+
+    if (!PyArg_ParseTuple(args, F_HANDLE "|i:PeekNamedPipe" , &handle, &size))
+        return NULL;
+
+    if (size < 0) {
+        PyErr_SetString(PyExc_ValueError, "negative size");
+        return NULL;
+    }
+
+    if (size) {
+        buf = PyBytes_FromStringAndSize(NULL, size);
+        if (!buf)
+            return NULL;
+        Py_BEGIN_ALLOW_THREADS
+        ret = PeekNamedPipe(handle, PyBytes_AS_STRING(buf), size, &nread,
+                            &navail, &nleft);
+        Py_END_ALLOW_THREADS
+        if (!ret) {
+            Py_DECREF(buf);
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+        }
+        if (_PyBytes_Resize(&buf, nread))
+            return NULL;
+        return Py_BuildValue("Nii", buf, navail, nleft);
+    }
+    else {
+        Py_BEGIN_ALLOW_THREADS
+        ret = PeekNamedPipe(handle, NULL, 0, NULL, &navail, &nleft);
+        Py_END_ALLOW_THREADS
+        if (!ret) {
+            return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
+        }
+        return Py_BuildValue("ii", navail, nleft);
+    }
+}
+
 static PyMethodDef win32_methods[] = {
     WIN32_FUNCTION(CloseHandle),
     WIN32_FUNCTION(GetLastError),
@@ -223,8 +381,14 @@
     WIN32_FUNCTION(ConnectNamedPipe),
     WIN32_FUNCTION(CreateFile),
     WIN32_FUNCTION(CreateNamedPipe),
+    WIN32_FUNCTION(ReadFile),
+    WIN32_FUNCTION(PeekNamedPipe),
     WIN32_FUNCTION(SetNamedPipeHandleState),
     WIN32_FUNCTION(WaitNamedPipe),
+    WIN32_FUNCTION(WriteFile),
+    WIN32_FUNCTION(closesocket),
+    WIN32_FUNCTION(recv),
+    WIN32_FUNCTION(send),
     {NULL}
 };
 
@@ -244,6 +408,8 @@
     Py_INCREF(&Win32Type);
 
     WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
+    WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE);
+    WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES);
     WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
     WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
     WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
diff --git a/PC/VC6/_multiprocessing.dsp b/PC/VC6/_multiprocessing.dsp
--- a/PC/VC6/_multiprocessing.dsp
+++ b/PC/VC6/_multiprocessing.dsp
@@ -97,18 +97,10 @@
 # End Source File
 # Begin Source File
 
-SOURCE=..\..\Modules\_multiprocessing\pipe_connection.c
-# End Source File
-# Begin Source File
-
 SOURCE=..\..\Modules\_multiprocessing\semaphore.c
 # End Source File
 # Begin Source File
 
-SOURCE=..\..\Modules\_multiprocessing\socket_connection.c
-# End Source File
-# Begin Source File
-
 SOURCE=..\..\Modules\_multiprocessing\win32_functions.c
 # End Source File
 # End Target
diff --git a/PC/VS8.0/_multiprocessing.vcproj b/PC/VS8.0/_multiprocessing.vcproj
--- a/PC/VS8.0/_multiprocessing.vcproj
+++ b/PC/VS8.0/_multiprocessing.vcproj
@@ -522,10 +522,6 @@
 				RelativePath="..\..\Modules\_multiprocessing\multiprocessing.h"
 				>
 			</File>
-			<File
-				RelativePath="..\..\Modules\_multiprocessing\connection.h"
-				>
-			</File>
 		</Filter>
 		<Filter
 			Name="Source Files"
@@ -535,18 +531,10 @@
 				>
 			</File>
 			<File
-				RelativePath="..\..\Modules\_multiprocessing\pipe_connection.c"
-				>
-			</File>
-			<File
 				RelativePath="..\..\Modules\_multiprocessing\semaphore.c"
 				>
 			</File>
 			<File
-				RelativePath="..\..\Modules\_multiprocessing\socket_connection.c"
-				>
-			</File>
-			<File
 				RelativePath="..\..\Modules\_multiprocessing\win32_functions.c"
 				>
 			</File>
diff --git a/PCbuild/_multiprocessing.vcproj b/PCbuild/_multiprocessing.vcproj
--- a/PCbuild/_multiprocessing.vcproj
+++ b/PCbuild/_multiprocessing.vcproj
@@ -522,10 +522,6 @@
 				RelativePath="..\Modules\_multiprocessing\multiprocessing.h"
 				>
 			</File>
-			<File
-				RelativePath="..\Modules\_multiprocessing\connection.h"
-				>
-			</File>
 		</Filter>
 		<Filter
 			Name="Source Files"
@@ -535,18 +531,10 @@
 				>
 			</File>
 			<File
-				RelativePath="..\Modules\_multiprocessing\pipe_connection.c"
-				>
-			</File>
-			<File
 				RelativePath="..\Modules\_multiprocessing\semaphore.c"
 				>
 			</File>
 			<File
-				RelativePath="..\Modules\_multiprocessing\socket_connection.c"
-				>
-			</File>
-			<File
 				RelativePath="..\Modules\_multiprocessing\win32_functions.c"
 				>
 			</File>
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -1353,14 +1353,11 @@
         if platform == 'win32':
             multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c',
                                      '_multiprocessing/semaphore.c',
-                                     '_multiprocessing/pipe_connection.c',
-                                     '_multiprocessing/socket_connection.c',
                                      '_multiprocessing/win32_functions.c'
                                    ]
 
         else:
             multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c',
-                                     '_multiprocessing/socket_connection.c'
                                    ]
             if (sysconfig.get_config_var('HAVE_SEM_OPEN') and not
                 sysconfig.get_config_var('POSIX_SEMAPHORES_NOT_ENABLED')):

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list