[pypy-svn] r29360 - pypy/dist/pypy/lib/test2

auc at codespeak.net auc at codespeak.net
Mon Jun 26 17:22:45 CEST 2006


Author: auc
Date: Mon Jun 26 17:22:43 2006
New Revision: 29360

Added:
   pypy/dist/pypy/lib/test2/test_stackless.py
Log:
three basic tests for stackless lib
* the simplest pass
* test with nested tasklets fails
* adaptation of wait_two fails
(maybe this is a wrong usage ... I only lightly simmed over : http://www.stackless.com/wiki/Samples)


Added: pypy/dist/pypy/lib/test2/test_stackless.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/lib/test2/test_stackless.py	Mon Jun 26 17:22:43 2006
@@ -0,0 +1,75 @@
+from pypy.conftest import gettestobjspace
+ 
+class AppTest_Stackless(object):
+
+    def setup_class(cls):
+        cls.space = gettestobjspace('std', usemodules=("_stackless",))
+
+
+    def test_simple_pipe(self):
+        
+        from stackless import run, tasklet, channel
+
+        def pipe(X_in, X_out):
+            foo = X_in.receive()
+            X_out.send(foo)
+
+        X, Y = channel(), channel()
+        tasklet(pipe)(X, Y)
+        run()
+        X.send(42)
+        assert Y.receive() == 42
+
+    def notest_nested_pipe(self):
+        """
+        XXX: fails complaining that some coroutine hasn't the
+             block_trap attribute/slot
+        """
+        from stackless import run, tasklet, channel
+        run()
+
+        def pipe(X, Y):
+            foo = X.receive()
+            Y.send(foo)
+
+        def nest(X, Y):
+            X2, Y2 = channel(), channel()
+            tasklet(pipe)(X2, Y2)
+            X2.send(X.receive())
+            Y.send(Y2.receive())
+
+        X, Y = channel(), channel()
+        tasklet(nest)(X, Y)
+        X.send(42)
+        assert Y.receive() == 42
+        
+
+    def notest_wait_two(self):
+        """
+        A tasklets/channels adaptation of the test_wait_two from the
+        logic object space
+        XXX: fails complaining that some channel has no Receive
+             attribute OR some coroutine hasn't block_trap slot,
+             depending on where run() is placed (resp. before the
+             first send, on top as currently)
+        """
+        from stackless import run, tasklet, channel
+        run()
+        
+        def sleep(X, Barrier):
+            Barrier.send((X, X.Receive()))
+
+        def wait_two(X, Y, Ret_chan):
+            Barrier = channel()
+            tasklet(sleep)(X, Barrier)
+            tasklet(sleep)(Y, Barrier)
+            ret = Barrier.receive()
+            if ret[0] == X:
+                Ret_chan.send((1, ret[1]))
+            return Ret_chan.send((2, ret[1]))
+
+        X, Y, Ret_chan = channel(), channel(), channel()
+        tasklet(wait_two)(X, Y, Ret_chan)
+        Y.send(42)
+        X.send(42)
+        assert Ret_chan.receive() == (2, 42)



More information about the Pypy-commit mailing list