[Python-ideas] sharedmutable locking solution
Aaron Brady
castironpi at comcast.net
Sun Jan 13 02:20:49 CET 2008
Practical concurrency solution attached and produced below. Includes worked
example. Brainstorm attached for referencing.
Excerpt:
1. Put a single thread in charge
2. Lock mutation and/or all operations
3. @mutation decorator
4. List-only solution
5. @onfulfill decorator
6. Ideal solution (implemented)
Timing: Important Unimportant
Return value:
Ignore (i.) (ii.)
Use (i.) (iii.)
sharedmutable.py
---
from __future__ import with_statement
import threading
import thread
from Queue import Queue
class Ref:
'''To adjust a value a reference to which is
not available.'''
def __init__( self, val= None ): self.val= val
def __repr__( self ): return repr( self.val )
class LockerItem( object ):
def __call__( self, farevent ):
raise NotImplemented
class LockerItemBlock( LockerItem ):
'''Turn-taking Locker item. Call frees its
event, frees and waits on its parameter (must
have 'clear' and 'wait' methods. Use 'wait'
to block for Call to set event, and block
calling event.'''
def __init__( self ):
self.event= threading.Event()
def __call__( self, farevent ):
farevent.clear()
self.event.set()
def wait( self, timeout= None ):
self.event.wait( timeout )
class LockerItemFaF( LockerItem ):
'''Fire-and-Forget Locker item; calling just calls
func. Use to polymorphisize LockerItemBlock.'''
def __init__( self, func, *ar, **kwar ):
self.func, self.ar, self.kwar= func, ar, kwar
self.ret= None
def __call__( self, _ ):
self.ret= self.func( *self.ar, **self.kwar )
class Locker( object ):
'''Synchronization class. Operations can honor:
i. Fire, block for completion (timing
important: do here.)
ii. Fire, launch new thread with result upon
completion (timing unimportant, return
important: do something with result later,
in turn.)
iii. Fire, forget, continue (timing & return
ignored: just do.)
Timing: Important Unimportant
Return value:
Ignore (i.) (ii.)
Use (i.) (iii.)
Corresponding instance functions are:
i. Locker.op( operation & args )
-or-
Locker.acq()
...code block...
Locker.rel()
-or-
with Locker:
...code block...
ii. Decorator or otherwise:
Locker.onfulfill( operation & args )
( oncompletioncallable )
iii. Fire-and-forget (faf) + options:
Locker.faf( operation & args )
To accomplish this, a Locker instance has a
Queue of callables, and -produces- LockerItemFaF
(fire-and-forget) and LockerItemBlock instances,
which are enqueued and granted in order received.
acq() and op() "get the caller in line" for
ownership; faf() gets something else in line: in
particular, a LockerItemFaF instance.
A LockerItem subclass instance is called when
"it's its turn" in Queue, with 'noget' as the
parameter; '_mainth', started in initialization,
blocks on 'noget' after each 'get' operation.
instance.rel() clears 'noget', and
LockerItemFaF.__call__ does not set it.
(LockerItemBlock.__call__ does; acq() enqueues
LockerItemBlock instances.)
Usage:
"with 'instance': (suite)" is equivalent to:
instance.acq()
(suite)
instance.rel()
(suite) may not call acq().
'op' can be used for simple-statement suites.
instance.op( listA.append, None )
instance.op( listA.reverse )
Decorator 'instance.onfulfill' spawns a new
thread, whence, when it's its turn, it calls
its decoratee with the result of the operation
as the only parameter.
@instance.onfulfull( dequeA.popleft )
def onfulfill1( result ):
#do something with the popped value
print '%s was popped\n'% result,
Use @ThreadUtils.ThFork for something complex.
@ThreadUtils.ThFork
def fork1():
with instance:
#something complex
print listA
'''
def __init__( self ):
self.queue= Queue()
self.noget= threading.Event()
self.thread= th= threading.Thread( target= self._mainth )
th.setDaemon( True )
th.start()
def _mainth( self ):
self.noget.clear()
while 1:
item= self.queue.get()
item( self.noget )
self.noget.wait()
def acq( self ):
item= LockerItemBlock()
self.queue.put( item )
item.wait()
return item
def rel( self ):
self.noget.set()
def op( self, func, *ar, **kwar ):
self.acq()
ret= func( *ar, **kwar )
self.rel()
return ret
def __enter__( self ):
return self.acq()
def __exit__( self, *excs ):
self.rel()
def onfulfill( self, func, *ar, **kwar ):
'''decorator launches the decoratee in separate
thread upon completion of func.'''
locfunc= Ref()
def callee():
result= self.op( func, *ar, **kwar )
locfunc.val( result )
def prefulfill( func ):
locfunc.val= func
th= threading.Thread( target= callee )
th.start()
return prefulfill
def faf( self, func, *ar, **kwar ):
'''fire and forget'''
return self.fafroot( False, None, func, *ar, **kwar )
def fafoption( self, func, *ar, **kwar ):
return self.fafroot( True, None, func, *ar, **kwar )
def faftimeout( self, timeout, func, *ar, **kwar ):
return self.fafroot( False, timeout, func, *ar, **kwar )
def fafroot( self, block, timeout, func, *ar, **kwar ):
item= LockerItemFaF( func, *ar, **kwar )
self.queue.put( item, block, timeout )
return item
if __name__== '__main__':
'''Thing to notice: fulfill1 prints a >0 value
when thTry producers are adding integers in the
closed interval [300,600]. By design there is a
small chance of assertion failure, unobserved yet.'''
import time
import random
counter= 0
def simplecounter():
global counter
ret= counter
counter+= 1
time.sleep( random.uniform( 0, 0.01 ) )
return counter
listA= []
lockerA= Locker()
def thTry():
while 1:
with lockerA:
ret= simplecounter()
listA.append( ret )
print ret,
'''this assertion fails outside of locker with.'''
assert all( [ listA[i]< listA[i+1]
for i in range( len( listA )- 1 ) ] )
if random.random()< 0.8 or len( listA )> 10:
'''fire-and-forget example. 80% chance of
removing an element (hence may fail), and 100%
if listA has 'a lot' of elements.'''
lockerA.faf( listA.pop, 0 )
'''return is important on this one; must block for.'''
ret= list( lockerA.op( reversed, listA ) )
if len( ret )> 1:
assert all( [ ret[i]> ret[i+1]
for i in range( len( ret )- 1 ) ] )
if random.random()< .05:
'''return is important, but timing is not.'''
@lockerA.onfulfill( set, listA )
def fulfill1( result ):
count= 0
for si in result:
if 300<= si<= 600:
count+= 1
print "\n\t%i counts in [300,600]\n"% count,
def thInterfere():
while 1:
with lockerA:
'''remove a multiple of 2 from somewhere.'''
ret= None
for i in range( len( listA ) ):
if listA[ i ]% 2== 0:
ret= listA.pop( i )
break
if ret is not None:
assert ret% 2== 0
print '\n\tremoved %i\n'% ret,
time.sleep( 0.5 )
def thMon():
while 1:
print '\n\t%s\n'% listA,
time.sleep( 0.5 )
thread.start_new_thread( thMon, () )
thread.start_new_thread( thInterfere, () )
for _ in range( 10 ):
'''start a bunch of producer threads.'''
thread.start_new_thread( thTry, () )
time.sleep( 0.1 )
'''and wait.'''
time.sleep( 1000 )
-------------- next part --------------
A non-text attachment was scrubbed...
Name: sharedmutable.zip
Type: application/octet-stream
Size: 3833 bytes
Desc: not available
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20080112/60509819/attachment.obj>
More information about the Python-ideas
mailing list