[pypy-svn] r38746 - in pypy/dist/pypy/module/select: . test
arigo at codespeak.net
arigo at codespeak.net
Tue Feb 13 20:20:43 CET 2007
Author: arigo
Date: Tue Feb 13 20:20:41 2007
New Revision: 38746
Added:
pypy/dist/pypy/module/select/test/ (props changed)
pypy/dist/pypy/module/select/test/test_select.py (contents, props changed)
Modified:
pypy/dist/pypy/module/select/app_select.py
Log:
Add tests to the select module implementation. Fix a bug that
caused "busy-loops" in typical applications.
Modified: pypy/dist/pypy/module/select/app_select.py
==============================================================================
--- pypy/dist/pypy/module/select/app_select.py (original)
+++ pypy/dist/pypy/module/select/app_select.py Tue Feb 13 20:20:41 2007
@@ -36,7 +36,7 @@
*** IMPORTANT NOTICE ***
On Windows, only sockets are supported; on Unix, all file descriptors.
"""
- from select import poll, POLLIN, POLLOUT, POLLPRI
+ from select import poll, POLLIN, POLLOUT, POLLPRI, POLLERR, POLLHUP
fddict = {}
polldict = {}
fd = 0
@@ -63,9 +63,9 @@
else:
ret = dict(p.poll())
- iretd = [ f for f in iwtd if ret.get(fddict[id(f)], 0) & POLLIN]
+ iretd = [ f for f in iwtd if ret.get(fddict[id(f)], 0) & (POLLIN|POLLHUP)]
oretd = [ f for f in owtd if ret.get(fddict[id(f)], 0) & POLLOUT]
- eretd = [ f for f in ewtd if ret.get(fddict[id(f)], 0) & POLLPRI]
+ eretd = [ f for f in ewtd if ret.get(fddict[id(f)], 0) & (POLLERR|POLLPRI)]
return iretd, oretd, eretd
Added: pypy/dist/pypy/module/select/test/test_select.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/select/test/test_select.py Tue Feb 13 20:20:41 2007
@@ -0,0 +1,119 @@
+import py, sys
+from pypy.conftest import gettestobjspace
+
+class AppTestSelect:
+ def setup_class(cls):
+ if sys.platform == 'win':
+ py.test.skip("select() doesn't work with pipes, "
+ "we would need tests using sockets")
+ space = gettestobjspace(usemodules=('select',))
+ cls.space = space
+
+ def test_sleep(self):
+ import time, select
+ start = time.time()
+ iwtd, owtd, ewtd = select.select([], [], [], 0.3)
+ end = time.time()
+ assert iwtd == owtd == ewtd == []
+ assert end - start > 0.25
+
+ def test_readable(self):
+ import os, select
+ readend, writeend = os.pipe()
+ try:
+ iwtd, owtd, ewtd = select.select([readend], [], [], 0)
+ assert iwtd == owtd == ewtd == []
+ os.write(writeend, 'X')
+ iwtd, owtd, ewtd = select.select([readend], [], [])
+ assert iwtd == [readend]
+ assert owtd == ewtd == []
+ finally:
+ os.close(writeend)
+ os.close(readend)
+
+ def test_write_read(self):
+ import os, select
+ readend, writeend = os.pipe()
+ try:
+ total_out = 0
+ while True:
+ iwtd, owtd, ewtd = select.select([], [writeend], [], 0)
+ assert iwtd == ewtd == []
+ if owtd == []:
+ break
+ assert owtd == [writeend]
+ total_out += os.write(writeend, 'x' * 512)
+ total_in = 0
+ while True:
+ iwtd, owtd, ewtd = select.select([readend], [], [], 0)
+ assert owtd == ewtd == []
+ if iwtd == []:
+ break
+ assert iwtd == [readend]
+ data = os.read(readend, 4096)
+ assert len(data) > 0
+ assert data == 'x' * len(data)
+ total_in += len(data)
+ assert total_in == total_out
+ finally:
+ os.close(writeend)
+ os.close(readend)
+
+ def test_close(self):
+ import os, select
+ readend, writeend = os.pipe()
+ try:
+ try:
+ total_out = os.write(writeend, 'x' * 512)
+ finally:
+ os.close(writeend)
+ assert 1 <= total_out <= 512
+ total_in = 0
+ while True:
+ iwtd, owtd, ewtd = select.select([readend], [], [])
+ assert iwtd == [readend]
+ assert owtd == ewtd == []
+ data = os.read(readend, 4096)
+ if len(data) == 0:
+ break
+ assert data == 'x' * len(data)
+ total_in += len(data)
+ assert total_in == total_out
+ finally:
+ os.close(readend)
+
+ def test_read_many(self):
+ import os, select
+ readends = []
+ writeends = []
+ try:
+ for i in range(10):
+ fd1, fd2 = os.pipe()
+ readends.append(fd1)
+ writeends.append(fd2)
+ iwtd, owtd, ewtd = select.select(readends, [], [], 0)
+ assert iwtd == owtd == ewtd == []
+
+ for i in range(50):
+ n = (i*3) % 10
+ os.write(writeends[n], 'X')
+ iwtd, owtd, ewtd = select.select(readends, [], [])
+ assert iwtd == [readends[n]]
+ assert owtd == ewtd == []
+ data = os.read(readends[n], 1)
+ assert data == 'X'
+
+ finally:
+ for fd in readends + writeends:
+ os.close(fd)
+
+ def test_read_end_closed(self):
+ import os, select
+ readend, writeend = os.pipe()
+ os.close(readend)
+ try:
+ iwtd, owtd, ewtd = select.select([], [writeend], [])
+ assert owtd == [writeend]
+ assert iwtd == ewtd == []
+ finally:
+ os.close(writeend)
More information about the Pypy-commit
mailing list