[pypy-svn] r35896 - pypy/dist/pypy/lib/app_test
stephan at codespeak.net
stephan at codespeak.net
Tue Dec 19 17:43:20 CET 2006
Author: stephan
Date: Tue Dec 19 17:43:19 2006
New Revision: 35896
Modified:
pypy/dist/pypy/lib/app_test/test_stackless.py
Log:
added a couple of tests to test_stackless that are passing when running on CStackless, but skipping when running on pypy (not yet implemented features)
Modified: pypy/dist/pypy/lib/app_test/test_stackless.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/test_stackless.py (original)
+++ pypy/dist/pypy/lib/app_test/test_stackless.py Tue Dec 19 17:43:19 2006
@@ -7,10 +7,13 @@
from py.test import skip
try:
import stackless
+ stackless_c = True
if 'coroutine' in dir(stackless):
+ stackless_c = False
raise ImportError("We are running pypy-c")
withinit = False
except ImportError:
+ stackless_c = False
try:
from pypy.lib import stackless_new as stackless
except ImportError, e:
@@ -18,6 +21,11 @@
#from pypy.lib import stackless
withinit = True
+def pypy_skip(txt):
+ "don't skip, if we are running with CStackless"
+ if not stackless_c:
+ skip(txt)
+
class Test_Stackless:
def setup_method(self, method):
@@ -328,3 +336,128 @@
('schedule', 1), ('schedule', 2),
('schedule', 1), ('schedule', 2),]
+ def test_channel_callback(self):
+ pypy_skip('not yet implemented in pypy')
+ res = []
+ cb = []
+ def callback_function(chan, task, sending, willblock):
+ cb.append((chan, task, sending, willblock))
+ stackless.set_channel_callback(callback_function)
+ def f(chan):
+ chan.send('hello')
+ val = chan.receive()
+ res.append(val)
+
+ chan = stackless.channel()
+ task = stackless.tasklet(f)(chan)
+ val = chan.receive()
+ res.append(val)
+ chan.send('world')
+ assert res == ['hello','world']
+ maintask = stackless.getmain()
+ assert cb == [
+ (chan, maintask, 0, 1),
+ (chan, task, 1, 0),
+ (chan, maintask, 1, 1),
+ (chan, task, 0, 0)
+ ]
+
+ def test_schedule_callback(self):
+ pypy_skip('not yet implemented in pypy')
+ res = []
+ cb = []
+ def schedule_cb(prev, next):
+ cb.append((prev, next))
+
+ stackless.set_schedule_callback(schedule_cb)
+ def f(i):
+ res.append('A_%s' % i)
+ stackless.schedule()
+ res.append('B_%s' % i)
+
+ t1 = stackless.tasklet(f)(1)
+ t2 = stackless.tasklet(f)(2)
+ maintask = stackless.getmain()
+ stackless.run()
+ assert res == ['A_1', 'A_2', 'B_1', 'B_2']
+ assert cb == [
+ (maintask, t1),
+ (t1, t2),
+ (t2, t1),
+ (t1, t2),
+ (t2, maintask)
+ ]
+
+ def test_bomb(self):
+ pypy_skip('not yet implemented in pypy')
+ try:
+ 1/0
+ except:
+ import sys
+ b = stackless.bomb(*sys.exc_info())
+ assert b.type is ZeroDivisionError
+ print type(b.value)
+ assert str(b.value) == 'integer division or modulo by zero'
+ assert b.traceback is not None
+
+ def test_send_exception(self):
+ pypy_skip('not yet implemented in pypy')
+ def exp_sender(chan):
+ chan.send_exception(Exception, 'test')
+
+ def exp_recv(chan):
+ try:
+ val = chan.receive()
+ except Exception, exp:
+ assert exp.__class__ is Exception
+ assert str(exp) == 'test'
+
+ chan = stackless.channel()
+ t1 = stackless.tasklet(exp_recv)(chan)
+ t2 = stackless.tasklet(exp_sender)(chan)
+ stackless.run()
+
+ def test_send_sequence(self):
+ pypy_skip('not yet implemented in pypy')
+ res = []
+ lst = [1,2,3,4,5,6,None]
+ iterable = iter(lst)
+ chan = stackless.channel()
+ def f(chan):
+ r = chan.receive()
+ while r:
+ res.append(r)
+ r = chan.receive()
+
+ t = stackless.tasklet(f)(chan)
+ chan.send_sequence(iterable)
+ assert res == [1,2,3,4,5,6]
+
+ def test_getruncount(self):
+ pypy_skip('not yet implemented in pypy')
+ assert stackless.getruncount() == 1
+ def with_schedule():
+ assert stackless.getruncount() == 2
+
+ t1 = stackless.tasklet(with_schedule)()
+ assert stackless.getruncount() == 2
+ stackless.schedule()
+ def with_run():
+ assert stackless.getruncount() == 1
+
+ t2 = stackless.tasklet(with_run)()
+ stackless.run()
+
+ def test_schedule_return(self):
+ pypy_skip('not yet implemented in pypy')
+ def f():pass
+ t1= stackless.tasklet(f)()
+ r = stackless.schedule()
+ assert r is stackless.getmain()
+ t2 = stackless.tasklet(f)()
+ r = stackless.schedule('test')
+ assert r == 'test'
+
+
+
+
More information about the Pypy-commit
mailing list