channel (synchronous queue)

Recently (for some) the CSP style of channel has become quite popular in concurrency implementations. This kind of channel allows sends that do not complete until a receiver has actually taken the item. The existing queue.Queue would act like this if it didn't treat a queue size of 0 as infinite capacity. In particular, I find channels to have value when sending data between threads, where it doesn't make sense to proceed until some current item has been accepted. This is useful when items are not purely CPU bound, and so generators are not appropriate. I believe this rendezvous behaviour can be added to queue.Queue for the maxsize=0 case, with maxsize=None being the existing "infinite queue" behaviour. Additionally a close method, Closed exception and other usability features like an __iter__ for receiving until closed can be added. The stackless class linked below also has some other possible ideas for performance reasons that make a lot of sense. Existing code using queue.Queue would remain completely unaffected by such additions if the default maxsize value is changed to maxsize=None, and maxsize=0 is not being explicitly passed (it's currently the default). Here are a few links for some background and ideas: http://gevent.org/gevent.queue.html#gevent.queue.Queue http://www.disinterest.org/resource/stackless/2.6-docs-html/library/stackles... http://en.wikipedia.org/wiki/Communicating_sequential_processes#Comparison_w... http://golang.org/doc/go_spec.html#Channel_types

On 18 February 2012 15:38, Matt Joiner <anacrolix@gmail.com> wrote:
I don't know if that's exactly what you have in mind, but you can implement a channel very simply with a threading.Barrier object (new in Python 3.2). I'm no specialist of concurrency at all, but it seems that this is what you are describing (what in the go language is called a "synchronous channel" I think): from threading import Barrier class Channel: def __init__(self): self._sync = Barrier(2) self._values = [None, None] def send(self, value=None): i = self._sync.wait() self._values[i] = value self._sync.wait() return self._values[1 - i] def get(self): return self.send() Then with the following convenience function to start a function in a new thread: from threading import Thread def go(f, *args, **kwargs): thread = Thread(target=f, args=args, kwargs=kwargs) thread.start() return thread You can have e.g. the scenario: ch = Channel() def produce(ch): for i in count(): print("sending", i) ch.send(i) def consume(ch, n): for i in range(n): print("getting", ch.get()) Giving you this:
-- Arnaud

I'm not sure that your example allows for multiple senders and receivers to block at the same time. I'm also not sure why the senders are receiving values. It's definitely an interesting approach to use a Barrier but incomplete. On Feb 19, 2012 2:57 AM, "Arnaud Delobelle" <arnodel@gmail.com> wrote:

On 18Feb2012 20:01, Antoine Pitrou <solipsis@pitrou.net> wrote: | On Sat, 18 Feb 2012 23:38:06 +0800 | Matt Joiner <anacrolix@gmail.com> wrote: | > Recently (for some) the CSP style of channel has become quite popular | > in concurrency implementations. This kind of channel allows sends that | > do not complete until a receiver has actually taken the item. The | > existing queue.Queue would act like this if it didn't treat a queue | > size of 0 as infinite capacity. | > | > In particular, I find channels to have value when sending data between | > threads, where it doesn't make sense to proceed until some current | > item has been accepted. This is useful when items are not purely CPU | > bound, and so generators are not appropriate. | | What is the point to process the data in another thread, if you are | going to block on the result anyway? Synchronisation. Shrug. I use synchronous channels myself; they are a fine basic facility. The problem with Queues et al is that they are inherently _asynchronous_ and you have to work hard to wrap locking around it when you want interlocking cogs. Also, it is perfectly reasonable in many circumstances to use a thread for algorithmic clarity, just like you might use a generator or a coroutine in suitable circumstances. Here one does it not so that some work may process in parallel but to cleanly write two algorithms that pass information between each other but are otherwise as separate as an aother pair of functions might be. The alternative may be a complicated interwoven event loop. Cheers, -- Cameron Simpson <cs@zip.com.au> DoD#743 http://www.cskk.ezoshosting.com/cs/ C makes it easy for you to shoot yourself in the foot. C++ makes that harder, but when you do, it blows away your whole leg. - Bjarne Stroustrup

Den 18.02.2012 16:38, skrev Matt Joiner:
That is the most common cause of deadlock in number crunching code using MPI. Process A sends message to Process B, waits for B to receive Process B sends message to Process A, waits for A to receive ... and now we just wait ... I am really glad the queues on Python do not do this. Sturla

Yes, channels can allow for this, but as with locks directionality and ordering matter. Typically messages will only run in a particular direction. Nor will all channels be synchronous (they're a tool, not a panacea), they might be intermixed with infinite asynchronous queues as is commonplace at the moment. On Feb 19, 2012 8:19 AM, "Sturla Molden" <sturla@molden.no> wrote:

Den 19.02.2012 01:39, skrev Matt Joiner:
Actually, it was only a synchronous MPI_Recv that did this in MPI, a synchronous MPI_Send would have been even worse. Which is why MPI got the asynchronous method MPI_Irecv... Sounds like you just want a barrier or a condition primitive. E.g. have the sender call .wait() on a condition and let the receiver call .notify() the condition. Sturla

On 19Feb2012 01:59, Sturla Molden <sturla@molden.no> wrote: | Den 19.02.2012 01:39, skrev Matt Joiner: | > | > Yes, channels can allow for this, but as with locks directionality and | > ordering matter. Typically messages will only run in a particular | > direction. | > | | Actually, it was only a synchronous MPI_Recv that did this in MPI, a | synchronous | MPI_Send would have been even worse. Which is why MPI got the | asynchronous method MPI_Irecv... | | Sounds like you just want a barrier or a condition primitive. E.g. have | the sender | call .wait() on a condition and let the receiver call .notify() the | condition. A condition is essentially a boolean (with waiting). A channel is a value passing mechanism. Sometimes you really do want a zero-storage Queue i.e. a channel. Saying "but you could put a value in a shared variable and just use a condition" removes the abstraction/metaphor. If I was thinking that way more than once in some code I'd write a small class to do that. And it would be a channel! Seriously, a channel is semanticly equivalent to a zero-storage Queue, which is a mode not provided by the current Queue implementation. -- Cameron Simpson <cs@zip.com.au> DoD#743 http://www.cskk.ezoshosting.com/cs/ No good deed shall go unpunished! - David Wood <davewood@teleport.com>

How hard would it be to add Channel to the stdlib? Perhaps even in the threading module, which already has a bunch of different primitives like Lock, RLock, Condition, Event, Semaphore, Barrier. On Sun, Feb 19, 2012 at 2:05 AM, Cameron Simpson <cs@zip.com.au> wrote:
-- --Guido van Rossum (python.org/~guido)

Den 19.02.2012 17:01, skrev Antoine Pitrou:
Here is a sceleton (replace self._data with some IPC mechanism for multiprocessing). I'll post a barrier for multiprocessing asap, I happen to have one ;-) Sturla from threading import Lock, Event class Channel(object): def __init__(self): self._writelock = Lock() self._readlock = Lock() self._new_data = Event() self._recv_data = Event() self._data = None def put(self, msg): with self._writelock: self._data = msg self._new_data.set() self._recv_data.wait() self._recv_data.clear() def get(self): with self._readlock: self._new_data.wait() msg = self._data self._data = None self._new_data.clear() self._recv_data.set() return msg if __name__ == "__main__": from threading import Thread from sys import stdout def thread2(channel): for i in range(1000): msg = channel.get() stdout.flush() print "Thread 2 received '%s'\n" % msg, stdout.flush() def thread1(channel): for i in range(1000): stdout.flush() print "Thread 1 preparing to send 'message %d'\n" % i, stdout.flush() msg = channel.put(("message %d" % i,)) stdout.flush() print "Thread 1 finished sending 'message %d'\n" % i, stdout.flush() channel = Channel() t2 = Thread(target=thread2, args=(channel,)) t2.start() thread1(channel) t2.join()

Den 19.02.2012 17:58, skrev Sturla Molden:
from multiprocessing import Event from math import ceil, log class Barrier(object): def __init__(self, numproc): self._events = [mp.Event() for n in range(numproc**2)] self._numproc = numproc def wait(self, rank): # loop log2(numproc) times, rounding up for k in range(int(ceil(log(self._numproc)/log(2)))): # send event to process # (rank + 2**k) % numproc receiver = (rank + 2**k) % self._numproc evt = self._events[rank * self._numproc + receiver] evt.set() # wait for event from process # (rank - 2**k) % numproc sender = (rank - 2**k) % self._numproc evt = self._events[sender * self._numproc + rank] evt.wait() evt.clear()

On 19/02/2012 5:05pm, Sturla Molden wrote:
I presume rank is the index of the process? Sounds very MPIish. One problem with multiprocessing's Event uses 5 semaphores. (Condition uses 4 and Lock, RLock, Semaphore use 1). So your Barrier will use 5*numproc semaphores. This is likely to be a problem for those Unixes (such as oldish versions of FreeBSD) which allow a very limited number of semaphores. It would probably better to use something which has an API which is a closer match to threading.Barrier. The code below gets closer in API but does not implement reset() (which I think is pretty pointless anyway), and wait() returns None instead of an index. It is not properly tested though. import multiprocessing as mp class BrokenBarrierError(Exception): pass class Barrier(object): def __init__(self, size): assert size > 0 self.size = size self._lock = mp.Lock() self._entry_sema = mp.Semaphore(size-1) self._exit_sema = mp.Semaphore(0) self._broken_sema = mp.BoundedSemaphore(1) def wait(self, timeout=None): if self.broken: raise BrokenBarrierError try: if self._entry_sema.acquire(timeout=0): if not self._exit_sema.acquire(timeout=timeout): self.abort() else: for i in range(self.size-1): self._exit_sema.release() for i in range(self.size-1): self._entry_sema.release() except: self.abort() raise if self.broken: raise BrokenBarrierError def abort(self): with self._lock: self._broken_sema.acquire(timeout=5) for i in range(self.size): self._entry_sema.release() self._exit_sema.release() def reset(self): raise NotImplementedError @property def broken(self): with self._lock: if not self._broken_sema.acquire(timeout=0): return True self._broken_sema.release() return False ## import time, random def child(b,l): for i in range(5): time.sleep(random.random()*5) with l: print i, "entering barrier:", mp.current_process().name b.wait() with l: print '\t', i, "exiting barrier:", mp.current_process().name if __name__ == '__main__': b = Barrier(5) l = mp.Lock() for i in range(5): mp.Process(target=child, args=(b,l)).start() time.sleep(10) print("ABORTING") b.abort()

On Sun, 19 Feb 2012 17:58:53 +0100 Sturla Molden <sturla@molden.no> wrote:
This begs the question: what does it achieve? You know that the data has been "received" on the other side (i.e. get() has been called), but this doesn't tell you anything was done with the data, so: why is this an useful way to synchronize? Regards Antoine.

Den 19.02.2012 18:27, skrev Sturla Molden:
Which is to say, I just wanted to prove how ridiculously simple Matt Joiner's complaint about a "channel" was. The multiprocessing barrier on the other hand is quite useful. (Though the butterfly method is not the most efficient implementation of a barrier.) Sturla

On Sun, Feb 19, 2012 at 9:36 AM, Sturla Molden <sturla@molden.no> wrote:
I may be taking this out of context, but I have a really hard time understanding what you were trying to say. What does it mean for a complaint to be simple? Did you leave out a word in haste? (I know that happens a lot to me. :-)
The multiprocessing barrier on the other hand is quite useful. (Though the butterfly method is not the most efficient implementation of a barrier.)
Glad to see some real code. It's probably time to move the code samples to the bug tracker where they can be reviewed and have a chance of getting incorporated into the next release. -- --Guido van Rossum (python.org/~guido)

Den 19.02.2012 18:44, skrev Guido van Rossum:
Sorry for the rude language. I ment I think it is a problem that does not belong in the standard library, but perhaps in a cookbook. It is ~20 lines of trivial code with objects already in the standard library. Well, one could say the same thing about a queue too (it's just deque and a lock), but it is very useful and commonly used, so there is a difference. Sturla

On 19 February 2012 18:01, Sturla Molden <sturla@molden.no> wrote:
FWIW, I wouldn't have got this code right if I'd tried to write it. I'd have missed a lock or something. So it's possible that having it in the standard library avoids people like me writing buggy implementations. On the other hand, I can't imagine ever needing to use a channel object like this, so it would probably be worth having some real-world use cases to justify it. Paul

On Sun, Feb 19, 2012 at 10:08 AM, Paul Moore <p.f.moore@gmail.com> wrote:
It would also encourage using it as an interface between libraries with different authors, which would not happen if it was just a recipe -- every author would implement their own version of the recipe, and they would not be API-compatible even if they did the same thing. Many of the existing primitives in threading.py are very simple combinations of the basic Lock; but that doesn't make it less valuable to have them. Also, writing a performant Channel implementation for multiprocessing would hardly be a trivial job; it seems primitives don't make it into multiprocessing without first existing in threading.py. So all this suggests to me that there is no great harm in adding threading.Channel and it might open up some interesting new approaches to synchronization. That said, it certainly isn't a panacea; e.g. some Go examples written using Channels are better done with coroutines instead of threads in Python. (IIUC Go intentionally blurs the difference, but that's not given to Python.)
I think Matt Joiner's original post hinted at some. Matt, could you elaborate? We may be only an inch away from getting this into the stdlib... -- --Guido van Rossum (python.org/~guido)

Den 19.02.2012 20:04, skrev Guido van Rossum:
Also, writing a performant Channel implementation for multiprocessing would hardly be a trivial job;
I think this should work :-) Sturla from multiprocessing import Lock, Event, Pipe class Channel(object): def __init__(self): self._writelock = Lock() self._readlock = Lock() self._new_data = Event() self._recv_data = Event() self._conn1, self._conn2 = Pipe(False) def put(self, msg): with self._writelock: self._conn2.send(msg) self._new_data.set() self._recv_data.wait() self._recv_data.clear() def get(self): with self._readlock: self._new_data.wait() msg = self._conn1.recv() self._new_data.clear() self._recv_data.set() return msg ## ------------- def proc2(channel): from sys import stdout for i in range(1000): msg = channel.get() stdout.flush() print "Process 2 received '%s'\n" % msg, stdout.flush() def proc1(channel): from sys import stdout for i in range(1000): stdout.flush() print "Process 1 preparing to send 'message %d'\n" % i, stdout.flush() msg = channel.put(("message %d" % i,)) stdout.flush() print "Process 1 finished sending 'message %d'\n" % i, stdout.flush() if __name__ == "__main__": from multiprocessing import Process channel = Channel() p2 = Process(target=proc2, args=(channel,)) p2.start() proc1(channel) p2.join()

If someone want to write a PEP for Go'ish "channels" (or whatever), put it on the bug tracker, here is the example implementation (with a lock around stdout in the example code, stupid me...) It would still need a timeout argument. A unittest could check the output of the test code as it is a known in advance. I don't really see the usefulness of a "channel" primitive, for those who do, here is my contribution. I have only tested on Win64. Sturla

Den 19.02.2012 20:04, skrev Guido van Rossum:
One thing I could think of, is "atomic messaging" with multiple producers or consumers talking on the same channel. E.g. while process A sends a message to process B, process C cannot write and process D cannot read. So you always get a 1 to 1 conversation. But I am not sure why (or if) Go has this mechanism. On the other hand, if we put in N**2 pipes (or channels), we could achieve the same atomicity of transaction by having an index for sender and receiver of a message. This is what MPI does in the functions MPI_Send and MPI_Recv. But then I will be scolded for using to many semaphores on FreeBSD again :-( But there are some other useful mechanisms from MPI (and ØMQ) to consider as well. For example message broadcasting, message scatter and gather, and reductions. The latter is a reduce operation (e.g. add or multiply) on messages coming in from multiple processes. OpenMP also has reductions in the API. So there is a lot to be considered on the area of concurrency if we want to put in more classes in threading and multiprocessing. But now I'll stop before someone tells me to take this to the concurrency list :-) Sturla

On Feb 19, 2012, at 3:29 PM, Sturla Molden wrote:
On the other hand, if we put in N**2 pipes (or channels), we could achieve the same atomicity of transaction by having an index for sender and receiver of a message. This is what MPI does in the functions MPI_Send and MPI_Recv. But then I will be scolded for using to many semaphores on FreeBSD again :-(
I like this a lot. Below is some toy code I use in my parallel algorithms class (I removed the global communications broadcast, scatter, gather, reduce and I removed logging, network topology constraints, and checks). class PSim(object): def __init__(self,p): """ forks p-1 processes and creates p*p pipes """ self.nprocs = p self.pipes = {} for i in range(p): for j in range(p): self.pipes[i,j] = os.pipe() self.rank = 0 for i in range(1,p): if not os.fork(): self.rank = i def send(self,j,data): s = cPickle.dumps(data) os.write(self.pipes[self.rank,j][1], string.zfill(str(len(s)),10)) os.write(self.pipes[self.rank,j][1], s) def recv(self,j): size=int(os.read(self.pipes[j,self.rank][0],10)) s=os.read(self.pipes[j,self.rank][0],size) data=cPickle.loads(s) return data if __name__ == '__main__': comm = PSim(2) if comm.rank == 0: comm.send(1,'hello world') else: print comm.recv(0) It would be very useful to have something like these channels built-in. Notice that using OS pipes have the problem of a OS dependent size. send is non-blocking for small data-size but becomes blocking for large data sizes. Using OS mkfifo or multiprocessing Queue is better but the OS limits the number of files open by one program.

Den 20.02.2012 00:38, skrev Massimo Di Pierro:
Most MPI implementations use shared memory on localhost. In theory one could implement a queue (deque and lock) using a shared memory region (a file on /tmp or Windows equivalent). It would be extremely fast and could contain any number of "pipes" of arbitrary size. Sturla

I've created http://bugs.python.org/issue14059 for the multiprocessing.Barrier. I suggest a new thread be started to continue discussion on that. On Mon, Feb 20, 2012 at 8:44 AM, Massimo Di Pierro <massimo.dipierro@gmail.com> wrote:

I've created http://bugs.python.org/issue14060 for the possibility of a channel implementation. On Mon, Feb 20, 2012 at 1:44 AM, Guido van Rossum <guido@python.org> wrote:

On 18 February 2012 15:38, Matt Joiner <anacrolix@gmail.com> wrote:
I don't know if that's exactly what you have in mind, but you can implement a channel very simply with a threading.Barrier object (new in Python 3.2). I'm no specialist of concurrency at all, but it seems that this is what you are describing (what in the go language is called a "synchronous channel" I think): from threading import Barrier class Channel: def __init__(self): self._sync = Barrier(2) self._values = [None, None] def send(self, value=None): i = self._sync.wait() self._values[i] = value self._sync.wait() return self._values[1 - i] def get(self): return self.send() Then with the following convenience function to start a function in a new thread: from threading import Thread def go(f, *args, **kwargs): thread = Thread(target=f, args=args, kwargs=kwargs) thread.start() return thread You can have e.g. the scenario: ch = Channel() def produce(ch): for i in count(): print("sending", i) ch.send(i) def consume(ch, n): for i in range(n): print("getting", ch.get()) Giving you this:
-- Arnaud

I'm not sure that your example allows for multiple senders and receivers to block at the same time. I'm also not sure why the senders are receiving values. It's definitely an interesting approach to use a Barrier but incomplete. On Feb 19, 2012 2:57 AM, "Arnaud Delobelle" <arnodel@gmail.com> wrote:

On 18Feb2012 20:01, Antoine Pitrou <solipsis@pitrou.net> wrote: | On Sat, 18 Feb 2012 23:38:06 +0800 | Matt Joiner <anacrolix@gmail.com> wrote: | > Recently (for some) the CSP style of channel has become quite popular | > in concurrency implementations. This kind of channel allows sends that | > do not complete until a receiver has actually taken the item. The | > existing queue.Queue would act like this if it didn't treat a queue | > size of 0 as infinite capacity. | > | > In particular, I find channels to have value when sending data between | > threads, where it doesn't make sense to proceed until some current | > item has been accepted. This is useful when items are not purely CPU | > bound, and so generators are not appropriate. | | What is the point to process the data in another thread, if you are | going to block on the result anyway? Synchronisation. Shrug. I use synchronous channels myself; they are a fine basic facility. The problem with Queues et al is that they are inherently _asynchronous_ and you have to work hard to wrap locking around it when you want interlocking cogs. Also, it is perfectly reasonable in many circumstances to use a thread for algorithmic clarity, just like you might use a generator or a coroutine in suitable circumstances. Here one does it not so that some work may process in parallel but to cleanly write two algorithms that pass information between each other but are otherwise as separate as an aother pair of functions might be. The alternative may be a complicated interwoven event loop. Cheers, -- Cameron Simpson <cs@zip.com.au> DoD#743 http://www.cskk.ezoshosting.com/cs/ C makes it easy for you to shoot yourself in the foot. C++ makes that harder, but when you do, it blows away your whole leg. - Bjarne Stroustrup

Den 18.02.2012 16:38, skrev Matt Joiner:
That is the most common cause of deadlock in number crunching code using MPI. Process A sends message to Process B, waits for B to receive Process B sends message to Process A, waits for A to receive ... and now we just wait ... I am really glad the queues on Python do not do this. Sturla

Yes, channels can allow for this, but as with locks directionality and ordering matter. Typically messages will only run in a particular direction. Nor will all channels be synchronous (they're a tool, not a panacea), they might be intermixed with infinite asynchronous queues as is commonplace at the moment. On Feb 19, 2012 8:19 AM, "Sturla Molden" <sturla@molden.no> wrote:

Den 19.02.2012 01:39, skrev Matt Joiner:
Actually, it was only a synchronous MPI_Recv that did this in MPI, a synchronous MPI_Send would have been even worse. Which is why MPI got the asynchronous method MPI_Irecv... Sounds like you just want a barrier or a condition primitive. E.g. have the sender call .wait() on a condition and let the receiver call .notify() the condition. Sturla

On 19Feb2012 01:59, Sturla Molden <sturla@molden.no> wrote: | Den 19.02.2012 01:39, skrev Matt Joiner: | > | > Yes, channels can allow for this, but as with locks directionality and | > ordering matter. Typically messages will only run in a particular | > direction. | > | | Actually, it was only a synchronous MPI_Recv that did this in MPI, a | synchronous | MPI_Send would have been even worse. Which is why MPI got the | asynchronous method MPI_Irecv... | | Sounds like you just want a barrier or a condition primitive. E.g. have | the sender | call .wait() on a condition and let the receiver call .notify() the | condition. A condition is essentially a boolean (with waiting). A channel is a value passing mechanism. Sometimes you really do want a zero-storage Queue i.e. a channel. Saying "but you could put a value in a shared variable and just use a condition" removes the abstraction/metaphor. If I was thinking that way more than once in some code I'd write a small class to do that. And it would be a channel! Seriously, a channel is semanticly equivalent to a zero-storage Queue, which is a mode not provided by the current Queue implementation. -- Cameron Simpson <cs@zip.com.au> DoD#743 http://www.cskk.ezoshosting.com/cs/ No good deed shall go unpunished! - David Wood <davewood@teleport.com>

How hard would it be to add Channel to the stdlib? Perhaps even in the threading module, which already has a bunch of different primitives like Lock, RLock, Condition, Event, Semaphore, Barrier. On Sun, Feb 19, 2012 at 2:05 AM, Cameron Simpson <cs@zip.com.au> wrote:
-- --Guido van Rossum (python.org/~guido)

Den 19.02.2012 17:01, skrev Antoine Pitrou:
Here is a sceleton (replace self._data with some IPC mechanism for multiprocessing). I'll post a barrier for multiprocessing asap, I happen to have one ;-) Sturla from threading import Lock, Event class Channel(object): def __init__(self): self._writelock = Lock() self._readlock = Lock() self._new_data = Event() self._recv_data = Event() self._data = None def put(self, msg): with self._writelock: self._data = msg self._new_data.set() self._recv_data.wait() self._recv_data.clear() def get(self): with self._readlock: self._new_data.wait() msg = self._data self._data = None self._new_data.clear() self._recv_data.set() return msg if __name__ == "__main__": from threading import Thread from sys import stdout def thread2(channel): for i in range(1000): msg = channel.get() stdout.flush() print "Thread 2 received '%s'\n" % msg, stdout.flush() def thread1(channel): for i in range(1000): stdout.flush() print "Thread 1 preparing to send 'message %d'\n" % i, stdout.flush() msg = channel.put(("message %d" % i,)) stdout.flush() print "Thread 1 finished sending 'message %d'\n" % i, stdout.flush() channel = Channel() t2 = Thread(target=thread2, args=(channel,)) t2.start() thread1(channel) t2.join()

Den 19.02.2012 17:58, skrev Sturla Molden:
from multiprocessing import Event from math import ceil, log class Barrier(object): def __init__(self, numproc): self._events = [mp.Event() for n in range(numproc**2)] self._numproc = numproc def wait(self, rank): # loop log2(numproc) times, rounding up for k in range(int(ceil(log(self._numproc)/log(2)))): # send event to process # (rank + 2**k) % numproc receiver = (rank + 2**k) % self._numproc evt = self._events[rank * self._numproc + receiver] evt.set() # wait for event from process # (rank - 2**k) % numproc sender = (rank - 2**k) % self._numproc evt = self._events[sender * self._numproc + rank] evt.wait() evt.clear()

On 19/02/2012 5:05pm, Sturla Molden wrote:
I presume rank is the index of the process? Sounds very MPIish. One problem with multiprocessing's Event uses 5 semaphores. (Condition uses 4 and Lock, RLock, Semaphore use 1). So your Barrier will use 5*numproc semaphores. This is likely to be a problem for those Unixes (such as oldish versions of FreeBSD) which allow a very limited number of semaphores. It would probably better to use something which has an API which is a closer match to threading.Barrier. The code below gets closer in API but does not implement reset() (which I think is pretty pointless anyway), and wait() returns None instead of an index. It is not properly tested though. import multiprocessing as mp class BrokenBarrierError(Exception): pass class Barrier(object): def __init__(self, size): assert size > 0 self.size = size self._lock = mp.Lock() self._entry_sema = mp.Semaphore(size-1) self._exit_sema = mp.Semaphore(0) self._broken_sema = mp.BoundedSemaphore(1) def wait(self, timeout=None): if self.broken: raise BrokenBarrierError try: if self._entry_sema.acquire(timeout=0): if not self._exit_sema.acquire(timeout=timeout): self.abort() else: for i in range(self.size-1): self._exit_sema.release() for i in range(self.size-1): self._entry_sema.release() except: self.abort() raise if self.broken: raise BrokenBarrierError def abort(self): with self._lock: self._broken_sema.acquire(timeout=5) for i in range(self.size): self._entry_sema.release() self._exit_sema.release() def reset(self): raise NotImplementedError @property def broken(self): with self._lock: if not self._broken_sema.acquire(timeout=0): return True self._broken_sema.release() return False ## import time, random def child(b,l): for i in range(5): time.sleep(random.random()*5) with l: print i, "entering barrier:", mp.current_process().name b.wait() with l: print '\t', i, "exiting barrier:", mp.current_process().name if __name__ == '__main__': b = Barrier(5) l = mp.Lock() for i in range(5): mp.Process(target=child, args=(b,l)).start() time.sleep(10) print("ABORTING") b.abort()

On Sun, 19 Feb 2012 17:58:53 +0100 Sturla Molden <sturla@molden.no> wrote:
This begs the question: what does it achieve? You know that the data has been "received" on the other side (i.e. get() has been called), but this doesn't tell you anything was done with the data, so: why is this an useful way to synchronize? Regards Antoine.

Den 19.02.2012 18:27, skrev Sturla Molden:
Which is to say, I just wanted to prove how ridiculously simple Matt Joiner's complaint about a "channel" was. The multiprocessing barrier on the other hand is quite useful. (Though the butterfly method is not the most efficient implementation of a barrier.) Sturla

On Sun, Feb 19, 2012 at 9:36 AM, Sturla Molden <sturla@molden.no> wrote:
I may be taking this out of context, but I have a really hard time understanding what you were trying to say. What does it mean for a complaint to be simple? Did you leave out a word in haste? (I know that happens a lot to me. :-)
The multiprocessing barrier on the other hand is quite useful. (Though the butterfly method is not the most efficient implementation of a barrier.)
Glad to see some real code. It's probably time to move the code samples to the bug tracker where they can be reviewed and have a chance of getting incorporated into the next release. -- --Guido van Rossum (python.org/~guido)

Den 19.02.2012 18:44, skrev Guido van Rossum:
Sorry for the rude language. I ment I think it is a problem that does not belong in the standard library, but perhaps in a cookbook. It is ~20 lines of trivial code with objects already in the standard library. Well, one could say the same thing about a queue too (it's just deque and a lock), but it is very useful and commonly used, so there is a difference. Sturla

On 19 February 2012 18:01, Sturla Molden <sturla@molden.no> wrote:
FWIW, I wouldn't have got this code right if I'd tried to write it. I'd have missed a lock or something. So it's possible that having it in the standard library avoids people like me writing buggy implementations. On the other hand, I can't imagine ever needing to use a channel object like this, so it would probably be worth having some real-world use cases to justify it. Paul

On Sun, Feb 19, 2012 at 10:08 AM, Paul Moore <p.f.moore@gmail.com> wrote:
It would also encourage using it as an interface between libraries with different authors, which would not happen if it was just a recipe -- every author would implement their own version of the recipe, and they would not be API-compatible even if they did the same thing. Many of the existing primitives in threading.py are very simple combinations of the basic Lock; but that doesn't make it less valuable to have them. Also, writing a performant Channel implementation for multiprocessing would hardly be a trivial job; it seems primitives don't make it into multiprocessing without first existing in threading.py. So all this suggests to me that there is no great harm in adding threading.Channel and it might open up some interesting new approaches to synchronization. That said, it certainly isn't a panacea; e.g. some Go examples written using Channels are better done with coroutines instead of threads in Python. (IIUC Go intentionally blurs the difference, but that's not given to Python.)
I think Matt Joiner's original post hinted at some. Matt, could you elaborate? We may be only an inch away from getting this into the stdlib... -- --Guido van Rossum (python.org/~guido)

Den 19.02.2012 20:04, skrev Guido van Rossum:
Also, writing a performant Channel implementation for multiprocessing would hardly be a trivial job;
I think this should work :-) Sturla from multiprocessing import Lock, Event, Pipe class Channel(object): def __init__(self): self._writelock = Lock() self._readlock = Lock() self._new_data = Event() self._recv_data = Event() self._conn1, self._conn2 = Pipe(False) def put(self, msg): with self._writelock: self._conn2.send(msg) self._new_data.set() self._recv_data.wait() self._recv_data.clear() def get(self): with self._readlock: self._new_data.wait() msg = self._conn1.recv() self._new_data.clear() self._recv_data.set() return msg ## ------------- def proc2(channel): from sys import stdout for i in range(1000): msg = channel.get() stdout.flush() print "Process 2 received '%s'\n" % msg, stdout.flush() def proc1(channel): from sys import stdout for i in range(1000): stdout.flush() print "Process 1 preparing to send 'message %d'\n" % i, stdout.flush() msg = channel.put(("message %d" % i,)) stdout.flush() print "Process 1 finished sending 'message %d'\n" % i, stdout.flush() if __name__ == "__main__": from multiprocessing import Process channel = Channel() p2 = Process(target=proc2, args=(channel,)) p2.start() proc1(channel) p2.join()

If someone want to write a PEP for Go'ish "channels" (or whatever), put it on the bug tracker, here is the example implementation (with a lock around stdout in the example code, stupid me...) It would still need a timeout argument. A unittest could check the output of the test code as it is a known in advance. I don't really see the usefulness of a "channel" primitive, for those who do, here is my contribution. I have only tested on Win64. Sturla

Den 19.02.2012 20:04, skrev Guido van Rossum:
One thing I could think of, is "atomic messaging" with multiple producers or consumers talking on the same channel. E.g. while process A sends a message to process B, process C cannot write and process D cannot read. So you always get a 1 to 1 conversation. But I am not sure why (or if) Go has this mechanism. On the other hand, if we put in N**2 pipes (or channels), we could achieve the same atomicity of transaction by having an index for sender and receiver of a message. This is what MPI does in the functions MPI_Send and MPI_Recv. But then I will be scolded for using to many semaphores on FreeBSD again :-( But there are some other useful mechanisms from MPI (and ØMQ) to consider as well. For example message broadcasting, message scatter and gather, and reductions. The latter is a reduce operation (e.g. add or multiply) on messages coming in from multiple processes. OpenMP also has reductions in the API. So there is a lot to be considered on the area of concurrency if we want to put in more classes in threading and multiprocessing. But now I'll stop before someone tells me to take this to the concurrency list :-) Sturla

On Feb 19, 2012, at 3:29 PM, Sturla Molden wrote:
On the other hand, if we put in N**2 pipes (or channels), we could achieve the same atomicity of transaction by having an index for sender and receiver of a message. This is what MPI does in the functions MPI_Send and MPI_Recv. But then I will be scolded for using to many semaphores on FreeBSD again :-(
I like this a lot. Below is some toy code I use in my parallel algorithms class (I removed the global communications broadcast, scatter, gather, reduce and I removed logging, network topology constraints, and checks). class PSim(object): def __init__(self,p): """ forks p-1 processes and creates p*p pipes """ self.nprocs = p self.pipes = {} for i in range(p): for j in range(p): self.pipes[i,j] = os.pipe() self.rank = 0 for i in range(1,p): if not os.fork(): self.rank = i def send(self,j,data): s = cPickle.dumps(data) os.write(self.pipes[self.rank,j][1], string.zfill(str(len(s)),10)) os.write(self.pipes[self.rank,j][1], s) def recv(self,j): size=int(os.read(self.pipes[j,self.rank][0],10)) s=os.read(self.pipes[j,self.rank][0],size) data=cPickle.loads(s) return data if __name__ == '__main__': comm = PSim(2) if comm.rank == 0: comm.send(1,'hello world') else: print comm.recv(0) It would be very useful to have something like these channels built-in. Notice that using OS pipes have the problem of a OS dependent size. send is non-blocking for small data-size but becomes blocking for large data sizes. Using OS mkfifo or multiprocessing Queue is better but the OS limits the number of files open by one program.

Den 20.02.2012 00:38, skrev Massimo Di Pierro:
Most MPI implementations use shared memory on localhost. In theory one could implement a queue (deque and lock) using a shared memory region (a file on /tmp or Windows equivalent). It would be extremely fast and could contain any number of "pipes" of arbitrary size. Sturla

I've created http://bugs.python.org/issue14059 for the multiprocessing.Barrier. I suggest a new thread be started to continue discussion on that. On Mon, Feb 20, 2012 at 8:44 AM, Massimo Di Pierro <massimo.dipierro@gmail.com> wrote:

I've created http://bugs.python.org/issue14060 for the possibility of a channel implementation. On Mon, Feb 20, 2012 at 1:44 AM, Guido van Rossum <guido@python.org> wrote:
participants (9)
-
Antoine Pitrou
-
Arnaud Delobelle
-
Cameron Simpson
-
Guido van Rossum
-
Massimo Di Pierro
-
Matt Joiner
-
Paul Moore
-
shibturn
-
Sturla Molden