
Practical concurrency solution attached and produced below. Includes worked example. Brainstorm attached for referencing. Excerpt: 1. Put a single thread in charge 2. Lock mutation and/or all operations 3. @mutation decorator 4. List-only solution 5. @onfulfill decorator 6. Ideal solution (implemented) Timing: Important Unimportant Return value: Ignore (i.) (ii.) Use (i.) (iii.) sharedmutable.py --- from __future__ import with_statement import threading import thread from Queue import Queue class Ref: '''To adjust a value a reference to which is not available.''' def __init__( self, val= None ): self.val= val def __repr__( self ): return repr( self.val ) class LockerItem( object ): def __call__( self, farevent ): raise NotImplemented class LockerItemBlock( LockerItem ): '''Turn-taking Locker item. Call frees its event, frees and waits on its parameter (must have 'clear' and 'wait' methods. Use 'wait' to block for Call to set event, and block calling event.''' def __init__( self ): self.event= threading.Event() def __call__( self, farevent ): farevent.clear() self.event.set() def wait( self, timeout= None ): self.event.wait( timeout ) class LockerItemFaF( LockerItem ): '''Fire-and-Forget Locker item; calling just calls func. Use to polymorphisize LockerItemBlock.''' def __init__( self, func, *ar, **kwar ): self.func, self.ar, self.kwar= func, ar, kwar self.ret= None def __call__( self, _ ): self.ret= self.func( *self.ar, **self.kwar ) class Locker( object ): '''Synchronization class. Operations can honor: i. Fire, block for completion (timing important: do here.) ii. Fire, launch new thread with result upon completion (timing unimportant, return important: do something with result later, in turn.) iii. Fire, forget, continue (timing & return ignored: just do.) Timing: Important Unimportant Return value: Ignore (i.) (ii.) Use (i.) (iii.) Corresponding instance functions are: i. Locker.op( operation & args ) -or- Locker.acq() ...code block... Locker.rel() -or- with Locker: ...code block... ii. Decorator or otherwise: Locker.onfulfill( operation & args ) ( oncompletioncallable ) iii. Fire-and-forget (faf) + options: Locker.faf( operation & args ) To accomplish this, a Locker instance has a Queue of callables, and -produces- LockerItemFaF (fire-and-forget) and LockerItemBlock instances, which are enqueued and granted in order received. acq() and op() "get the caller in line" for ownership; faf() gets something else in line: in particular, a LockerItemFaF instance. A LockerItem subclass instance is called when "it's its turn" in Queue, with 'noget' as the parameter; '_mainth', started in initialization, blocks on 'noget' after each 'get' operation. instance.rel() clears 'noget', and LockerItemFaF.__call__ does not set it. (LockerItemBlock.__call__ does; acq() enqueues LockerItemBlock instances.) Usage: "with 'instance': (suite)" is equivalent to: instance.acq() (suite) instance.rel() (suite) may not call acq(). 'op' can be used for simple-statement suites. instance.op( listA.append, None ) instance.op( listA.reverse ) Decorator 'instance.onfulfill' spawns a new thread, whence, when it's its turn, it calls its decoratee with the result of the operation as the only parameter. @instance.onfulfull( dequeA.popleft ) def onfulfill1( result ): #do something with the popped value print '%s was popped\n'% result, Use @ThreadUtils.ThFork for something complex. @ThreadUtils.ThFork def fork1(): with instance: #something complex print listA ''' def __init__( self ): self.queue= Queue() self.noget= threading.Event() self.thread= th= threading.Thread( target= self._mainth ) th.setDaemon( True ) th.start() def _mainth( self ): self.noget.clear() while 1: item= self.queue.get() item( self.noget ) self.noget.wait() def acq( self ): item= LockerItemBlock() self.queue.put( item ) item.wait() return item def rel( self ): self.noget.set() def op( self, func, *ar, **kwar ): self.acq() ret= func( *ar, **kwar ) self.rel() return ret def __enter__( self ): return self.acq() def __exit__( self, *excs ): self.rel() def onfulfill( self, func, *ar, **kwar ): '''decorator launches the decoratee in separate thread upon completion of func.''' locfunc= Ref() def callee(): result= self.op( func, *ar, **kwar ) locfunc.val( result ) def prefulfill( func ): locfunc.val= func th= threading.Thread( target= callee ) th.start() return prefulfill def faf( self, func, *ar, **kwar ): '''fire and forget''' return self.fafroot( False, None, func, *ar, **kwar ) def fafoption( self, func, *ar, **kwar ): return self.fafroot( True, None, func, *ar, **kwar ) def faftimeout( self, timeout, func, *ar, **kwar ): return self.fafroot( False, timeout, func, *ar, **kwar ) def fafroot( self, block, timeout, func, *ar, **kwar ): item= LockerItemFaF( func, *ar, **kwar ) self.queue.put( item, block, timeout ) return item if __name__== '__main__': '''Thing to notice: fulfill1 prints a >0 value when thTry producers are adding integers in the closed interval [300,600]. By design there is a small chance of assertion failure, unobserved yet.''' import time import random counter= 0 def simplecounter(): global counter ret= counter counter+= 1 time.sleep( random.uniform( 0, 0.01 ) ) return counter listA= [] lockerA= Locker() def thTry(): while 1: with lockerA: ret= simplecounter() listA.append( ret ) print ret, '''this assertion fails outside of locker with.''' assert all( [ listA[i]< listA[i+1] for i in range( len( listA )- 1 ) ] ) if random.random()< 0.8 or len( listA )> 10: '''fire-and-forget example. 80% chance of removing an element (hence may fail), and 100% if listA has 'a lot' of elements.''' lockerA.faf( listA.pop, 0 ) '''return is important on this one; must block for.''' ret= list( lockerA.op( reversed, listA ) ) if len( ret )> 1: assert all( [ ret[i]> ret[i+1] for i in range( len( ret )- 1 ) ] ) if random.random()< .05: '''return is important, but timing is not.''' @lockerA.onfulfill( set, listA ) def fulfill1( result ): count= 0 for si in result: if 300<= si<= 600: count+= 1 print "\n\t%i counts in [300,600]\n"% count, def thInterfere(): while 1: with lockerA: '''remove a multiple of 2 from somewhere.''' ret= None for i in range( len( listA ) ): if listA[ i ]% 2== 0: ret= listA.pop( i ) break if ret is not None: assert ret% 2== 0 print '\n\tremoved %i\n'% ret, time.sleep( 0.5 ) def thMon(): while 1: print '\n\t%s\n'% listA, time.sleep( 0.5 ) thread.start_new_thread( thMon, () ) thread.start_new_thread( thInterfere, () ) for _ in range( 10 ): '''start a bunch of producer threads.''' thread.start_new_thread( thTry, () ) time.sleep( 0.1 ) '''and wait.''' time.sleep( 1000 )
participants (1)
-
Aaron Brady