[pypy-svn] r29939 - in pypy/dist/pypy/module/fcntl: . test

rhymes at codespeak.net rhymes at codespeak.net
Tue Jul 11 11:39:44 CEST 2006


Author: rhymes
Date: Tue Jul 11 11:39:39 2006
New Revision: 29939

Added:
   pypy/dist/pypy/module/fcntl/
   pypy/dist/pypy/module/fcntl/__init__.py
   pypy/dist/pypy/module/fcntl/app_fcntl.py
   pypy/dist/pypy/module/fcntl/interp_fcntl.py
   pypy/dist/pypy/module/fcntl/test/
   pypy/dist/pypy/module/fcntl/test/test_fcntl.py
Log:
add fcntl module. initial stuff

Added: pypy/dist/pypy/module/fcntl/__init__.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/fcntl/__init__.py	Tue Jul 11 11:39:39 2006
@@ -0,0 +1,9 @@
+from pypy.interpreter.mixedmodule import MixedModule
+
+class Module(MixedModule):
+    interpleveldefs = {
+    }
+
+    appleveldefs = {
+        '_conv_descriptor': 'app_fcntl._conv_descriptor',
+    }

Added: pypy/dist/pypy/module/fcntl/app_fcntl.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/fcntl/app_fcntl.py	Tue Jul 11 11:39:39 2006
@@ -0,0 +1,8 @@
+
+def _conv_descriptor(f):
+    if hasattr(f, "fileno"):
+        return f.fileno()
+    elif isinstance(f, (int, long)):
+        return f
+    else:
+        raise TypeError, "argument must be an int, or have a fileno() method."

Added: pypy/dist/pypy/module/fcntl/interp_fcntl.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/fcntl/interp_fcntl.py	Tue Jul 11 11:39:39 2006
@@ -0,0 +1,54 @@
+from pypy.rpython.rctypes.tool import ctypes_platform
+from pypy.rpython.rctypes.tool.libc import libc
+import pypy.rpython.rctypes.implementation # this defines rctypes magic
+from pypy.rpython.rctypes.aerrno import geterrno
+from pypy.interpreter.error import OperationError
+from pypy.interpreter.baseobjspace import W_Root, ObjSpace
+from ctypes import *
+
+class CConfig:
+    _header_ = """
+    #include <fcntl.h>
+    """
+    flock = ctypes_platform.Struct("struct flock",
+        [('l_start', c_longlong), ('l_len', c_longlong),
+        ('l_pid', c_long), ('l_type', c_short),
+        ('l_whence', c_short)])
+    
+# constants, look in fcntl.h and platform docs for the meaning
+# some constants are linux only so they will be correctly exposed outside 
+# depending on the OS
+constant_names = ['LOCK_SH', 'LOCK_EX', 'LOCK_NB', 'LOCK_UN', 'F_DUPFD',
+    'F_GETFD', 'F_SETFD', 'F_GETFL', 'F_SETFL', 'F_UNLCK', 'FD_CLOEXEC',
+    'LOCK_MAND', 'LOCK_READ', 'LOCK_WRITE', 'LOCK_RW', 'F_GETSIG', 'F_SETSIG', 
+    'F_GETLK64', 'F_SETLK64', 'F_SETLKW64', 'F_GETLK', 'F_SETLK', 'F_SETLKW',
+    'F_GETOWN', 'F_SETOWN', 'F_RDLCK', 'F_WRLCK', 'F_SETLEASE', 'F_GETLEASE',
+    'F_NOTIFY', 'F_EXLCK', 'F_SHLCK', 'DN_ACCESS', 'DN_MODIFY', 'DN_CREATE',
+    'DN_DELETE', 'DN_RENAME', 'DN_ATTRIB', 'DN_MULTISHOT', 'I_NREAD',
+    'I_PUSH', 'I_POP', 'I_LOOK', 'I_FLUSH', 'I_SRDOPT', 'I_GRDOPT', 'I_STR', 
+    'I_SETSIG', 'I_GETSIG', 'I_FIND', 'I_LINK', 'I_UNLINK', 'I_PEEK',
+    'I_FDINSERT', 'I_SENDFD', 'I_RECVFD', 'I_SWROPT', 'I_LIST', 'I_PLINK',
+    'I_PUNLINK', 'I_FLUSHBAND', 'I_CKBAND', 'I_GETBAND', 'I_ATMARK',
+    'I_SETCLTIME', 'I_GETCLTIME', 'I_CANPUT']
+for name in constant_names:
+    setattr(CConfig, name, ctypes_platform.DefinedConstantInteger(name))
+
+class cConfig:
+    pass
+
+cConfig.__dict__.update(ctypes_platform.configure(CConfig))
+cConfig.flock.__name__ = "_flock"
+
+_flock = cConfig.flock
+
+def _get_error_msg():
+    errno = geterrno()
+    return libc.strerror(errno)
+
+def _conv_descriptor(space, f):
+    w_conv_descriptor = _get_module_object(space, "_check_float")
+    space.call_function(w_conv_descriptor, space.wrap(f))
+
+
+
+

Added: pypy/dist/pypy/module/fcntl/test/test_fcntl.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/fcntl/test/test_fcntl.py	Tue Jul 11 11:39:39 2006
@@ -0,0 +1,122 @@
+from py.test import raises, skip
+from pypy.conftest import gettestobjspace
+
+class AppTestFcntl:
+    def setup_class(cls):
+        space = gettestobjspace(usemodules=('fcntl',))
+        cls.space = space
+
+    def test_conv_descriptor(self):
+        import fcntl
+        
+        f = open("/tmp/conv_descr", "w")
+        
+        raises(TypeError, fcntl._conv_descriptor, "foo")
+        raises(TypeError, fcntl._conv_descriptor, 2.0)
+        import cStringIO
+        raises(TypeError, fcntl._conv_descriptor, cStringIO.StringIO())
+        res = fcntl._conv_descriptor(10)
+        res_1 = fcntl._conv_descriptor(f)
+        assert res == 10
+        assert res_1 == f.fileno()
+
+# def test_fcntl():
+#     assert cfcntl.fcntl(f, 1, 0) == 0
+#     assert cfcntl.fcntl(f, 2, "foo") == "foo"
+#     py.test.raises(TypeError, cfcntl.fcntl, "foo")
+#     py.test.raises(IOError, cfcntl.fcntl, -1, 1, 0)
+# 
+#     try:
+#         os.O_LARGEFILE
+#     except AttributeError:
+#         start_len = "ll"
+#     else:
+#         start_len = "qq"
+# 
+#     if sys.platform in ('netbsd1', 'netbsd2', 'netbsd3', 
+#                         'Darwin1.2', 'darwin',
+#                         'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
+#                         'freebsd6', 'freebsd7', 
+#                         'bsdos2', 'bsdos3', 'bsdos4',
+#                         'openbsd', 'openbsd2', 'openbsd3'):
+#         if struct.calcsize('l') == 8:
+#             off_t = 'l'
+#             pid_t = 'i'
+#         else:
+#             off_t = 'lxxxx'
+#             pid_t = 'l'
+# 
+#         format = "%s%s%shh" % (off_t, off_t, pid_t)
+#         lockdata = struct.pack(format, 0, 0, 0, cfcntl.F_WRLCK, 0)
+#     else:
+#         format = "hh%shh" % start_len
+#         lockdata = struct.pack(format, cfcntl.F_WRLCK, 0, 0, 0, 0, 0)
+# 
+#     rv = cfcntl.fcntl(f.fileno(), cfcntl.F_SETLKW, lockdata)
+#     assert rv == lockdata
+#     assert cfcntl.fcntl(f, cfcntl.F_SETLKW, lockdata) == lockdata
+# 
+#     # test duplication of file descriptor
+#     rv = cfcntl.fcntl(f, cfcntl.F_DUPFD)
+#     assert rv > 2 # > (stdin, stdout, stderr) at least
+#     assert cfcntl.fcntl(f, cfcntl.F_DUPFD) > rv
+#     assert cfcntl.fcntl(f, cfcntl.F_DUPFD, 99) == 99
+# 
+#     # test descriptor flags
+#     assert cfcntl.fcntl(f, cfcntl.F_GETFD) == 0
+#     cfcntl.fcntl(f, cfcntl.F_SETFD, 1)
+#     assert cfcntl.fcntl(f, cfcntl.F_GETFD, cfcntl.FD_CLOEXEC) == 1
+# 
+#     # test status flags
+#     assert cfcntl.fcntl(f.fileno(), cfcntl.F_SETFL, os.O_NONBLOCK) == 0
+#     assert cfcntl.fcntl(f.fileno(), cfcntl.F_SETFL, os.O_NDELAY) == 0
+#     assert cfcntl.fcntl(f, cfcntl.F_SETFL, os.O_NONBLOCK) == 0
+#     assert cfcntl.fcntl(f, cfcntl.F_SETFL, os.O_NDELAY) == 0
+# 
+#     if "linux" in sys.platform:
+#         # test managing signals
+#         assert cfcntl.fcntl(f, cfcntl.F_GETOWN) == 0
+#         cfcntl.fcntl(f, cfcntl.F_SETOWN, 20)
+#         assert cfcntl.fcntl(f, cfcntl.F_GETOWN) == 20
+#         assert cfcntl.fcntl(f, cfcntl.F_GETSIG) == 0
+#         cfcntl.fcntl(f, cfcntl.F_SETSIG, 20)
+#         assert cfcntl.fcntl(f, cfcntl.F_GETSIG) == 20
+# 
+#         # test leases
+#         assert cfcntl.fcntl(f, cfcntl.F_GETLEASE) == cfcntl.F_UNLCK
+#         cfcntl.fcntl(f, cfcntl.F_SETLEASE, cfcntl.F_WRLCK)
+#         assert cfcntl.fcntl(f, cfcntl.F_GETLEASE) == cfcntl.F_WRLCK
+#     else:
+#         # this tests should fail under BSD
+#         # with "Inappropriate ioctl for device"
+#         py.test.raises(IOError, cfcntl.fcntl, f, cfcntl.F_GETOWN)
+#         py.test.raises(IOError, cfcntl.fcntl, f, cfcntl.F_SETOWN, 20)
+# 
+# 
+# def test_flock():
+#     if "linux" in sys.platform:
+#         cfcntl.flock(f, cfcntl.LOCK_SH)
+#         # this is an error EWOULDBLOCK, man: The file is locked and the
+#         # LOCK_NB flag was selected.
+#         py.test.raises(IOError, cfcntl.flock, f, cfcntl.LOCK_NB)
+#         py.test.raises(IOError, cfcntl.flock, f, 3)
+#     py.test.raises(TypeError, cfcntl.flock, f, "foo")
+#     cfcntl.flock(f, cfcntl.LOCK_UN)
+# 
+# def test_lockf():
+#     py.test.raises(TypeError, cfcntl.lockf, f, "foo")
+#     py.test.raises(TypeError, cfcntl.lockf, f, cfcntl.LOCK_UN, "foo")
+#     py.test.raises(ValueError, cfcntl.lockf, f, 0)
+# 
+# def test_ioctl():
+#     py.test.raises(TypeError, cfcntl.ioctl, "foo")
+#     py.test.raises(TypeError, cfcntl.ioctl, 0, "foo")
+#     py.test.raises(TypeError, cfcntl.ioctl, 0, termios.TIOCGPGRP, float(0))
+#     py.test.raises(TypeError, cfcntl.ioctl, 0, termios.TIOCGPGRP, 1, "foo")
+# 
+#     buf = array.array('h', [0])
+#     assert cfcntl.ioctl(0, termios.TIOCGPGRP, buf, True) == 0
+#     buf = array.array('c', "a"*1025)
+#     py.test.raises(ValueError, cfcntl.ioctl, 0, termios.TIOCGPGRP, buf, 0)
+#     py.test.raises(ValueError, cfcntl.ioctl, 0, termios.TIOCGPGRP,
+#                    "a"*1025, 0)



More information about the Pypy-commit mailing list