[pypy-commit] pypy more-rposix: os.access(), os.fchdir()

amauryfa noreply at buildbot.pypy.org
Fri Nov 7 22:37:15 CET 2014


Author: Amaury Forgeot d'Arc <amauryfa at gmail.com>
Branch: more-rposix
Changeset: r74388:ae00e9e0b19b
Date: 2014-11-07 21:41 +0100
http://bitbucket.org/pypy/pypy/changeset/ae00e9e0b19b/

Log:	os.access(), os.fchdir()

diff --git a/rpython/rlib/rposix.py b/rpython/rlib/rposix.py
--- a/rpython/rlib/rposix.py
+++ b/rpython/rlib/rposix.py
@@ -398,7 +398,7 @@
                    macro=True)
 
 @replace_os_function('lseek')
-def lseek_llimpl(fd, pos, how):
+def lseek(fd, pos, how):
     validate_fd(fd)
     if SEEK_SET is not None:
         if how == 0:
@@ -426,11 +426,36 @@
 
 @replace_os_function('fdatasync')
 def fdatasync(fd):
-    rposix.validate_fd(fd)
+    validate_fd(fd)
     handle_posix_error('fdatasync', c_fdatasync(fd))
 
 #___________________________________________________________________
 
+c_fchdir = external('fchdir', [rffi.INT], rffi.INT)
+c_access = external(UNDERSCORE_ON_WIN32 + 'access',
+                    [rffi.CCHARP, rffi.INT], rffi.INT)
+c_waccess = external(UNDERSCORE_ON_WIN32 + 'waccess',
+                     [rffi.CWCHARP, rffi.INT], rffi.INT)
+
+ at replace_os_function('fchdir')
+def fchdir(fd):
+    validate_fd(fd)
+    handle_posix_error('fchdir', c_fchdir(fd))
+
+ at replace_os_function('access')
+ at specialize.argtype(0)
+def access(path, mode):
+    if _WIN32:
+        # All files are executable on Windows
+        mode = mode & ~os.X_OK
+    if _prefer_unicode(path):
+        error = c_waccess(_as_unicode0(path), mode)
+    else:
+        error = c_access(_as_bytes0(path), mode)
+    return error == 0
+
+#___________________________________________________________________
+
 c_execv = external('execv', [rffi.CCHARP, rffi.CCHARPP], rffi.INT)
 c_execve = external('execve',
                     [rffi.CCHARP, rffi.CCHARPP, rffi.CCHARPP], rffi.INT)
diff --git a/rpython/rtyper/module/ll_os.py b/rpython/rtyper/module/ll_os.py
--- a/rpython/rtyper/module/ll_os.py
+++ b/rpython/rtyper/module/ll_os.py
@@ -173,39 +173,6 @@
                 separate_module_sources = ["\n".join(defs)]
             ))
 
-    @registering_if(os, 'fchdir')
-    def register_os_fchdir(self):
-        os_fchdir = self.llexternal('fchdir', [rffi.INT], rffi.INT)
-
-        def fchdir_llimpl(fd):
-            rposix.validate_fd(fd)
-            res = rffi.cast(rffi.SIGNED, os_fchdir(rffi.cast(rffi.INT, fd)))
-            if res < 0:
-                raise OSError(rposix.get_errno(), "fchdir failed")
-        return extdef([int], s_None,
-                      llimpl=fchdir_llimpl,
-                      export_name="ll_os.ll_os_fchdir")
-
-    @registering_str_unicode(os.access)
-    def register_os_access(self, traits):
-        os_access = self.llexternal(traits.posix_function_name('access'),
-                                    [traits.CCHARP, rffi.INT],
-                                    rffi.INT)
-
-        if sys.platform.startswith('win'):
-            # All files are executable on Windows
-            def access_llimpl(path, mode):
-                mode = mode & ~os.X_OK
-                error = rffi.cast(lltype.Signed, os_access(path, mode))
-                return error == 0
-        else:
-            def access_llimpl(path, mode):
-                error = rffi.cast(lltype.Signed, os_access(path, mode))
-                return error == 0
-
-        return extdef([traits.str0, int], s_Bool, llimpl=access_llimpl,
-                      export_name=traits.ll_os_name("access"))
-
     @registering_str_unicode(getattr(posix, '_getfullpathname', None),
                              condition=sys.platform=='win32')
     def register_posix__getfullpathname(self, traits):
diff --git a/rpython/rtyper/module/test/test_ll_os.py b/rpython/rtyper/module/test/test_ll_os.py
--- a/rpython/rtyper/module/test/test_ll_os.py
+++ b/rpython/rtyper/module/test/test_ll_os.py
@@ -21,7 +21,7 @@
     fd.close()
 
     for mode in os.R_OK, os.W_OK, os.X_OK, os.R_OK | os.W_OK | os.X_OK:
-        result = getllimpl(os.access)(filename, mode)
+        result = rposix.access(filename, mode)
         assert result == os.access(filename, mode)
 
 
@@ -217,62 +217,54 @@
     fname = str(udir.join('os_test.txt'))
     fd = os.open(fname, os.O_WRONLY|os.O_CREAT, 0777)
     assert fd >= 0
-    f = getllimpl(os.write)
-    f(fd, 'Hello world')
+    rposix.write(fd, 'Hello world')
     os.close(fd)
     with open(fname) as fid:
         assert fid.read() == "Hello world"
     fd = os.open(fname, os.O_WRONLY|os.O_CREAT, 0777)
     os.close(fd)
-    py.test.raises(OSError, f, fd, 'Hello world')
+    py.test.raises(OSError, rposix.write, fd, 'Hello world')
 
 def test_os_close():
     fname = str(udir.join('os_test.txt'))
     fd = os.open(fname, os.O_WRONLY|os.O_CREAT, 0777)
     assert fd >= 0
     os.write(fd, 'Hello world')
-    f = getllimpl(os.close)
-    f(fd)
-    py.test.raises(OSError, f, fd)
+    rposix.close(fd)
+    py.test.raises(OSError, rposix.close, fd)
 
 def test_os_lseek():
     fname = str(udir.join('os_test.txt'))
     fd = os.open(fname, os.O_RDWR|os.O_CREAT, 0777)
     assert fd >= 0
     os.write(fd, 'Hello world')
-    f = getllimpl(os.lseek)
-    f(fd,0,0)
+    rposix.lseek(fd,0,0)
     assert os.read(fd, 11) == 'Hello world'
     os.close(fd)
-    py.test.raises(OSError, f, fd, 0, 0)
+    py.test.raises(OSError, rposix.lseek, fd, 0, 0)
 
 def test_os_fsync():
     fname = str(udir.join('os_test.txt'))
     fd = os.open(fname, os.O_WRONLY|os.O_CREAT, 0777)
     assert fd >= 0
     os.write(fd, 'Hello world')
-    f = getllimpl(os.fsync)
-    f(fd)
+    rposix.fsync(fd)
     os.close(fd)
     fid = open(fname)
     assert fid.read() == 'Hello world'
     fid.close()
-    py.test.raises(OSError, f, fd)
+    py.test.raises(OSError, rposix.fsync, fd)
 
 def test_os_fdatasync():
-    try:
-        f = getllimpl(os.fdatasync)
-    except:
-        py.test.skip('No fdatasync in os')
     fname = str(udir.join('os_test.txt'))
     fd = os.open(fname, os.O_WRONLY|os.O_CREAT, 0777)
     assert fd >= 0
     os.write(fd, 'Hello world')
-    f(fd)
+    rposix.fdatasync(fd)
     fid = open(fname)
     assert fid.read() == 'Hello world'
     os.close(fd)
-    py.test.raises(OSError, f, fd)
+    py.test.raises(OSError, rposix.fdatasync, fd)
 
 
 def test_os_kill():


More information about the pypy-commit mailing list