[pypy-svn] r28698 - pypy/dist/pypy/module/stackless/test

stephan at codespeak.net stephan at codespeak.net
Mon Jun 12 12:10:41 CEST 2006


Author: stephan
Date: Mon Jun 12 12:10:39 2006
New Revision: 28698

Added:
   pypy/dist/pypy/module/stackless/test/counter.py   (contents, props changed)
Modified:
   pypy/dist/pypy/module/stackless/test/stackless_.py
Log:
fixed another stupid scheduling bug. I'll create a real app-level stackless module later and rewrite the pseudo tests to real ones.
now working: stack1.py, stack2.py and counter.py


Added: pypy/dist/pypy/module/stackless/test/counter.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/stackless/test/counter.py	Mon Jun 12 12:10:39 2006
@@ -0,0 +1,51 @@
+# a little example of tasklets and channels.
+# don't use this in production code, it is really nonsense :-)
+
+# We want to sort some random numbers using tasklets.
+# The idea is to create a bulk of tasklets which call schedule()
+# a number of times. When the count is at zero, the tasklet
+# send its result over a channel.
+# Guess the result!
+
+import random
+import stackless
+if hasattr(stackless, 'coroutine'):
+    import stackless_ as stackless
+
+def print_sched(prev, next):
+    try:
+        print 'before scheduling. prev: %s, next: %s' % (prev, next)
+    except Exception, e:
+        print 'Exception in print_sched', e
+        print '\tprev:', type(prev)
+        print '\tnext:', type(next)
+    print
+
+def print_chan(chan, task, sending, willblock):
+    print 'channel_action:', chan, task, 's:', sending, ' wb:',
+    print willblock
+    print
+
+#stackless.set_schedule_callback(print_sched)
+#stackless.set_channel_callback(print_chan)
+
+
+numbers = range(20)
+random.shuffle(numbers)
+print numbers
+# [16, 13, 12, 5, 6, 4, 7, 1, 9, 17, 15, 14, 10, 8, 0, 3, 11, 18, 2, 19]
+
+def counter(n, ch):
+    for i in xrange(n):
+        stackless.schedule()
+    ch.send(n)
+	
+ch=stackless.channel()
+for each in numbers:
+    stackless.tasklet(counter)(each, ch)
+
+stackless.run()
+# now we should have a sorted chain of results in ch
+while ch.balance:
+    print '#',
+    print ch.receive()

Modified: pypy/dist/pypy/module/stackless/test/stackless_.py
==============================================================================
--- pypy/dist/pypy/module/stackless/test/stackless_.py	(original)
+++ pypy/dist/pypy/module/stackless/test/stackless_.py	Mon Jun 12 12:10:39 2006
@@ -47,9 +47,28 @@
 This is a necessary Stackless 3.1 feature.
 """
 
+def SETNEXT(obj, val):
+    if DEBUG:
+        print 'SETNEXT', obj, val
+    obj.next = val
+
+def SETPREV(obj, val):
+    if DEBUG:
+        print 'SETPREV', obj, val
+    obj.prev = val
+
+def SETNONE(obj):
+    if DEBUG:
+        print 'SETNONE'
+    obj.prev = obj.next = None
+
 def SWAPVAL(task1, task2):
-    assert task1 is not None
-    assert task2 is not None
+    try:
+        assert task1 is not None
+        assert task2 is not None
+    except:
+        print 'AssertionError in SWAPVAL', task1, task2
+        raise
     if DEBUG:
         print 'SWAPVAL(%s, %s)' % (task1, task2)
         print '\t', task1.tempval
@@ -57,7 +76,13 @@
     task1.tempval, task2.tempval = task2.tempval, task1.tempval
 
 def SETVAL(task, val):
-    assert task is not None
+    try:
+        assert task is not None
+    except:
+        print 'AssertionError in SETVAL'
+        raise
+    if isinstance(val, bomb):
+        print val.type, val.value
     if DEBUG:
         print 'SETVAL(%s, %s)' % (task, val)
     task.tempval = val
@@ -65,7 +90,8 @@
 # thread related stuff: assuming NON threaded execution for now
 
 def check_for_deadlock():
-    return False
+    return True
+    #return False
 
 last_thread_id = 0
 
@@ -83,9 +109,8 @@
         self.is_current = False
         self.is_main = False
         self.nesting_level = 0
-        self.next = None
+        self.next = self.prev = None
         self.paused = False
-        self.prev = None
         self.recursion_depth = 0
         self.restorable = False
         self.scheduled = False
@@ -253,7 +278,8 @@
     global scheduler 
     main_coroutine = c = coroutine.getcurrent()
     main_tasklet = TaskletProxy(c)
-    main_tasklet.next = main_tasklet.prev = main_tasklet
+    SETNEXT(main_tasklet, main_tasklet)
+    SETPREV(main_tasklet, main_tasklet)
     main_tasklet.is_main = True
     scheduler = Scheduler()
 
@@ -277,19 +303,22 @@
     if DEBUG:
         print 'stackless.run()'
     me = scheduler.current_remove()
+    print 'me is', me
     if me is not main_tasklet:
         raise RuntimeError("run() must be run from the main thread's \
                              main tasklet")
     try:
-        scheduler.schedule_task(me, scheduler._head)
-    except Exception, exp:
+        retval = scheduler.schedule_task(me, scheduler._head)
         if DEBUG:
-            print 'run: in Excpetion', exp
+            print 'run: returning to main'
+        #scheduler.current_insert(me)
+        return retval
+    except Exception, exp:
+        print 'run: in Excpetion', exp
         b = curexc_to_bomb()
-        main = main_tasklet
-        SETVAL(main, b)
-        scheduler.current_insert_after(main)
-        scheduler.schedule_task(me, main)
+        SETVAL(me, b)
+        scheduler.current_insert(me)
+        scheduler.schedule_task(me, me)
 
 
 def getcurrent():
@@ -404,8 +433,7 @@
         self.is_current = False
         self.is_main = False
         self.nesting_level = 0
-        self.next = None
-        self.prev = None
+        self.next = self.prev = None
         self.paused = False
         self.recursion_depth = 0
         self.restorable = False
@@ -421,8 +449,12 @@
         return self
 
     def __str__(self):
-        next = (self.next and self.next.thread_id) or None
-        prev = (self.prev and self.prev.thread_id) or None
+        next = None
+        if self.next is not None:
+            next = self.next.thread_id
+        prev = None
+        if self.prev is not None:
+            prev = self.prev.thread_id
         if self.blocked:
             bs = 'b'
         else:
@@ -628,7 +660,11 @@
 
     def _channel_remove(self, d):
         ret = self.next
-        assert isinstance(ret, tasklet)
+        try:
+            assert isinstance(ret, (tasklet, TaskletProxy))
+        except:
+            print 'AssertionError in channel_remove'
+            raise
         self.balance -= d
         self._rem(ret)
         ret.blocked = 0
@@ -649,20 +685,24 @@
         if (task.next is not None) or (task.prev is not None):
             raise AssertionError('task.next and task.prev must be None')
         # insert at end
-        task.prev = self.prev
-        task.next = self
-        self.prev.next = task
-        self.prev = task
+        SETPREV(task, self.prev)
+        SETNEXT(task, self)
+        SETNEXT(self.prev, task)
+        SETPREV(self, task)
 
     def _rem(self, task):
         if DEBUG:
             print '### channel._rem(%s)' % task
-        assert task.next is not None
-        assert task.prev is not None
+        try:
+            assert task.next is not None
+            assert task.prev is not None
+        except:
+            print 'AssertionError in channel._rem', task, task.next, task.prev
+            raise
         #remove at end
-        task.next.prev = task.prev
-        task.prev.next = task.next
-        task.next = task.prev = None
+        SETPREV(task.next, task.prev)
+        SETNEXT(task.prev, task.next)
+        SETNONE(task)
 
     def _notify(self, task, d, cando, res):
         global schedlock
@@ -680,7 +720,7 @@
             source = scheduler._head
             target = self.next
             if not source is getcurrent():
-                print '!!!!! scheduler._head is not current !!!!!'
+                print '!!!!! scheduler._head is not current !!!!!', source, getcurrent()
             interthread = 0 # no interthreading at the moment
             if d > 0:
                 cando = self.balance < 0
@@ -697,7 +737,11 @@
                 print scheduler
                 print
                 print
-            assert abs(d) == 1
+            try:
+                assert abs(d) == 1
+            except:
+                print 'AssertionError in channel_action'
+                raise
 
             SETVAL(source, arg)
             if not interthread:
@@ -892,17 +936,23 @@
         return 'Scheduler: [' + ' -> '.join(parts) + ']'
 
     def _chain_insert(self, task):
-        assert task.next is None
-        assert task.prev is None
+        try:
+            assert task.next is None
+            assert task.prev is None
+        except:
+            print 'AssertionError in _chain_insert', task, task.prev, task.next
+            raise
         if self._head is None:
-            task.next = task.prev = task
+            SETNEXT(task, task)
+            SETPREV(task, task)
             self._set_head(task)
         else:
             r = self._head
             l = r.prev
-            l.next = r.prev = task
-            task.prev = l
-            task.next = r
+            SETNEXT(l, task)
+            SETPREV(r, task)
+            SETPREV(task, l)
+            SETNEXT(task, r)
 
     def _chain_remove(self):
         if self._head is None: 
@@ -910,12 +960,12 @@
         task = self._head
         l = task.prev
         r = task.next
-        l.next = r 
-        r.prev = l
+        SETNEXT(l, r)
+        SETPREV(r, l)
         self._set_head(r)
         if r == task:
             self._set_head(None)
-        task.prev = task.next = None
+        SETNONE(task)
 
         return task
 
@@ -940,7 +990,11 @@
         while not isinstance(prev, channel):
             prev = prev.prev
         chan = prev
-        assert chan.balance
+        try:
+            assert chan.balance
+        except:
+            print 'AssertionError in channel_remove_slow'
+            raise
         if chan.balance > 0:
             d = 1
         else:
@@ -965,16 +1019,23 @@
                 return errflag
 
     def schedule_task_block(self, prev):
+        if DEBUG:
+            print 'schedule_task_block(%s)' % prev
         next = None
         if check_for_deadlock():
-            if main_tasklet.next is None:
-                if isinstance(prev.tempval, bomb):
-                    SETVAL(main_tasklet, prev.tempval)
-                return self.schedule_task(prev, main_tasklet)
-            retval = make_deadlock_bomb()
-            SETVAL(prev, retval)
+            try:
+                if main_tasklet.next is None:
+                    if isinstance(prev.tempval, bomb):
+                        SETVAL(main_tasklet, prev.tempval)
+                    return self.schedule_task(prev, main_tasklet)
+                retval = make_deadlock_bomb()
+                SETVAL(prev, retval)
+
+                return self.schedule_task(prev, prev)
+            except Exception, exp:
+                print 'exp in schedule_task_block', exp
+                raise
 
-            return self.schedule_task(prev, prev)
 
         next = prev
         return self.schedule_task(prev, next)



More information about the Pypy-commit mailing list