[pypy-svn] r28698 - pypy/dist/pypy/module/stackless/test
stephan at codespeak.net
stephan at codespeak.net
Mon Jun 12 12:10:41 CEST 2006
Author: stephan
Date: Mon Jun 12 12:10:39 2006
New Revision: 28698
Added:
pypy/dist/pypy/module/stackless/test/counter.py (contents, props changed)
Modified:
pypy/dist/pypy/module/stackless/test/stackless_.py
Log:
fixed another stupid scheduling bug. I'll create a real app-level stackless module later and rewrite the pseudo tests to real ones.
now working: stack1.py, stack2.py and counter.py
Added: pypy/dist/pypy/module/stackless/test/counter.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/stackless/test/counter.py Mon Jun 12 12:10:39 2006
@@ -0,0 +1,51 @@
+# a little example of tasklets and channels.
+# don't use this in production code, it is really nonsense :-)
+
+# We want to sort some random numbers using tasklets.
+# The idea is to create a bulk of tasklets which call schedule()
+# a number of times. When the count is at zero, the tasklet
+# send its result over a channel.
+# Guess the result!
+
+import random
+import stackless
+if hasattr(stackless, 'coroutine'):
+ import stackless_ as stackless
+
+def print_sched(prev, next):
+ try:
+ print 'before scheduling. prev: %s, next: %s' % (prev, next)
+ except Exception, e:
+ print 'Exception in print_sched', e
+ print '\tprev:', type(prev)
+ print '\tnext:', type(next)
+ print
+
+def print_chan(chan, task, sending, willblock):
+ print 'channel_action:', chan, task, 's:', sending, ' wb:',
+ print willblock
+ print
+
+#stackless.set_schedule_callback(print_sched)
+#stackless.set_channel_callback(print_chan)
+
+
+numbers = range(20)
+random.shuffle(numbers)
+print numbers
+# [16, 13, 12, 5, 6, 4, 7, 1, 9, 17, 15, 14, 10, 8, 0, 3, 11, 18, 2, 19]
+
+def counter(n, ch):
+ for i in xrange(n):
+ stackless.schedule()
+ ch.send(n)
+
+ch=stackless.channel()
+for each in numbers:
+ stackless.tasklet(counter)(each, ch)
+
+stackless.run()
+# now we should have a sorted chain of results in ch
+while ch.balance:
+ print '#',
+ print ch.receive()
Modified: pypy/dist/pypy/module/stackless/test/stackless_.py
==============================================================================
--- pypy/dist/pypy/module/stackless/test/stackless_.py (original)
+++ pypy/dist/pypy/module/stackless/test/stackless_.py Mon Jun 12 12:10:39 2006
@@ -47,9 +47,28 @@
This is a necessary Stackless 3.1 feature.
"""
+def SETNEXT(obj, val):
+ if DEBUG:
+ print 'SETNEXT', obj, val
+ obj.next = val
+
+def SETPREV(obj, val):
+ if DEBUG:
+ print 'SETPREV', obj, val
+ obj.prev = val
+
+def SETNONE(obj):
+ if DEBUG:
+ print 'SETNONE'
+ obj.prev = obj.next = None
+
def SWAPVAL(task1, task2):
- assert task1 is not None
- assert task2 is not None
+ try:
+ assert task1 is not None
+ assert task2 is not None
+ except:
+ print 'AssertionError in SWAPVAL', task1, task2
+ raise
if DEBUG:
print 'SWAPVAL(%s, %s)' % (task1, task2)
print '\t', task1.tempval
@@ -57,7 +76,13 @@
task1.tempval, task2.tempval = task2.tempval, task1.tempval
def SETVAL(task, val):
- assert task is not None
+ try:
+ assert task is not None
+ except:
+ print 'AssertionError in SETVAL'
+ raise
+ if isinstance(val, bomb):
+ print val.type, val.value
if DEBUG:
print 'SETVAL(%s, %s)' % (task, val)
task.tempval = val
@@ -65,7 +90,8 @@
# thread related stuff: assuming NON threaded execution for now
def check_for_deadlock():
- return False
+ return True
+ #return False
last_thread_id = 0
@@ -83,9 +109,8 @@
self.is_current = False
self.is_main = False
self.nesting_level = 0
- self.next = None
+ self.next = self.prev = None
self.paused = False
- self.prev = None
self.recursion_depth = 0
self.restorable = False
self.scheduled = False
@@ -253,7 +278,8 @@
global scheduler
main_coroutine = c = coroutine.getcurrent()
main_tasklet = TaskletProxy(c)
- main_tasklet.next = main_tasklet.prev = main_tasklet
+ SETNEXT(main_tasklet, main_tasklet)
+ SETPREV(main_tasklet, main_tasklet)
main_tasklet.is_main = True
scheduler = Scheduler()
@@ -277,19 +303,22 @@
if DEBUG:
print 'stackless.run()'
me = scheduler.current_remove()
+ print 'me is', me
if me is not main_tasklet:
raise RuntimeError("run() must be run from the main thread's \
main tasklet")
try:
- scheduler.schedule_task(me, scheduler._head)
- except Exception, exp:
+ retval = scheduler.schedule_task(me, scheduler._head)
if DEBUG:
- print 'run: in Excpetion', exp
+ print 'run: returning to main'
+ #scheduler.current_insert(me)
+ return retval
+ except Exception, exp:
+ print 'run: in Excpetion', exp
b = curexc_to_bomb()
- main = main_tasklet
- SETVAL(main, b)
- scheduler.current_insert_after(main)
- scheduler.schedule_task(me, main)
+ SETVAL(me, b)
+ scheduler.current_insert(me)
+ scheduler.schedule_task(me, me)
def getcurrent():
@@ -404,8 +433,7 @@
self.is_current = False
self.is_main = False
self.nesting_level = 0
- self.next = None
- self.prev = None
+ self.next = self.prev = None
self.paused = False
self.recursion_depth = 0
self.restorable = False
@@ -421,8 +449,12 @@
return self
def __str__(self):
- next = (self.next and self.next.thread_id) or None
- prev = (self.prev and self.prev.thread_id) or None
+ next = None
+ if self.next is not None:
+ next = self.next.thread_id
+ prev = None
+ if self.prev is not None:
+ prev = self.prev.thread_id
if self.blocked:
bs = 'b'
else:
@@ -628,7 +660,11 @@
def _channel_remove(self, d):
ret = self.next
- assert isinstance(ret, tasklet)
+ try:
+ assert isinstance(ret, (tasklet, TaskletProxy))
+ except:
+ print 'AssertionError in channel_remove'
+ raise
self.balance -= d
self._rem(ret)
ret.blocked = 0
@@ -649,20 +685,24 @@
if (task.next is not None) or (task.prev is not None):
raise AssertionError('task.next and task.prev must be None')
# insert at end
- task.prev = self.prev
- task.next = self
- self.prev.next = task
- self.prev = task
+ SETPREV(task, self.prev)
+ SETNEXT(task, self)
+ SETNEXT(self.prev, task)
+ SETPREV(self, task)
def _rem(self, task):
if DEBUG:
print '### channel._rem(%s)' % task
- assert task.next is not None
- assert task.prev is not None
+ try:
+ assert task.next is not None
+ assert task.prev is not None
+ except:
+ print 'AssertionError in channel._rem', task, task.next, task.prev
+ raise
#remove at end
- task.next.prev = task.prev
- task.prev.next = task.next
- task.next = task.prev = None
+ SETPREV(task.next, task.prev)
+ SETNEXT(task.prev, task.next)
+ SETNONE(task)
def _notify(self, task, d, cando, res):
global schedlock
@@ -680,7 +720,7 @@
source = scheduler._head
target = self.next
if not source is getcurrent():
- print '!!!!! scheduler._head is not current !!!!!'
+ print '!!!!! scheduler._head is not current !!!!!', source, getcurrent()
interthread = 0 # no interthreading at the moment
if d > 0:
cando = self.balance < 0
@@ -697,7 +737,11 @@
print scheduler
print
print
- assert abs(d) == 1
+ try:
+ assert abs(d) == 1
+ except:
+ print 'AssertionError in channel_action'
+ raise
SETVAL(source, arg)
if not interthread:
@@ -892,17 +936,23 @@
return 'Scheduler: [' + ' -> '.join(parts) + ']'
def _chain_insert(self, task):
- assert task.next is None
- assert task.prev is None
+ try:
+ assert task.next is None
+ assert task.prev is None
+ except:
+ print 'AssertionError in _chain_insert', task, task.prev, task.next
+ raise
if self._head is None:
- task.next = task.prev = task
+ SETNEXT(task, task)
+ SETPREV(task, task)
self._set_head(task)
else:
r = self._head
l = r.prev
- l.next = r.prev = task
- task.prev = l
- task.next = r
+ SETNEXT(l, task)
+ SETPREV(r, task)
+ SETPREV(task, l)
+ SETNEXT(task, r)
def _chain_remove(self):
if self._head is None:
@@ -910,12 +960,12 @@
task = self._head
l = task.prev
r = task.next
- l.next = r
- r.prev = l
+ SETNEXT(l, r)
+ SETPREV(r, l)
self._set_head(r)
if r == task:
self._set_head(None)
- task.prev = task.next = None
+ SETNONE(task)
return task
@@ -940,7 +990,11 @@
while not isinstance(prev, channel):
prev = prev.prev
chan = prev
- assert chan.balance
+ try:
+ assert chan.balance
+ except:
+ print 'AssertionError in channel_remove_slow'
+ raise
if chan.balance > 0:
d = 1
else:
@@ -965,16 +1019,23 @@
return errflag
def schedule_task_block(self, prev):
+ if DEBUG:
+ print 'schedule_task_block(%s)' % prev
next = None
if check_for_deadlock():
- if main_tasklet.next is None:
- if isinstance(prev.tempval, bomb):
- SETVAL(main_tasklet, prev.tempval)
- return self.schedule_task(prev, main_tasklet)
- retval = make_deadlock_bomb()
- SETVAL(prev, retval)
+ try:
+ if main_tasklet.next is None:
+ if isinstance(prev.tempval, bomb):
+ SETVAL(main_tasklet, prev.tempval)
+ return self.schedule_task(prev, main_tasklet)
+ retval = make_deadlock_bomb()
+ SETVAL(prev, retval)
+
+ return self.schedule_task(prev, prev)
+ except Exception, exp:
+ print 'exp in schedule_task_block', exp
+ raise
- return self.schedule_task(prev, prev)
next = prev
return self.schedule_task(prev, next)
More information about the Pypy-commit
mailing list