[pypy-commit] pypy py3.5: Start on posix.scandir()

arigo pypy.commits at gmail.com
Sun Aug 21 14:23:23 EDT 2016


Author: Armin Rigo <arigo at tunes.org>
Branch: py3.5
Changeset: r86377:cbc8e2e4d409
Date: 2016-08-21 19:57 +0200
http://bitbucket.org/pypy/pypy/changeset/cbc8e2e4d409/

Log:	Start on posix.scandir()

diff --git a/pypy/module/posix/__init__.py b/pypy/module/posix/__init__.py
--- a/pypy/module/posix/__init__.py
+++ b/pypy/module/posix/__init__.py
@@ -74,6 +74,8 @@
         'abort': 'interp_posix.abort',
         'urandom': 'interp_posix.urandom',
         'device_encoding': 'interp_posix.device_encoding',
+
+        'scandir': 'interp_scandir.scandir',
     }
 
     if hasattr(os, 'chown'):
diff --git a/pypy/module/posix/interp_posix.py b/pypy/module/posix/interp_posix.py
--- a/pypy/module/posix/interp_posix.py
+++ b/pypy/module/posix/interp_posix.py
@@ -862,13 +862,12 @@
     try:
         path = space.fsencode_w(w_path)
     except OperationError as operr:
+        if operr.async(space):
+            raise
         if not rposix.HAVE_FDOPENDIR:
             raise oefmt(space.w_TypeError,
                 "listdir: illegal type for path argument")
-        if not space.isinstance_w(w_path, space.w_int):
-            raise oefmt(space.w_TypeError,
-                "argument should be string, bytes or integer, not %T", w_path)
-        fd = unwrap_fd(space, w_path)
+        fd = unwrap_fd(space, w_path, "string, bytes or integer")
         try:
             result = rposix.fdlistdir(fd)
         except OSError as e:
diff --git a/pypy/module/posix/interp_scandir.py b/pypy/module/posix/interp_scandir.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/posix/interp_scandir.py
@@ -0,0 +1,126 @@
+from rpython.rlib import rgc
+from rpython.rlib import rposix_scandir
+from rpython.rtyper.lltypesystem import lltype
+
+from pypy.interpreter.gateway import unwrap_spec, WrappedDefault, interp2app
+from pypy.interpreter.error import OperationError, oefmt, wrap_oserror
+from pypy.interpreter.typedef import TypeDef, GetSetProperty
+from pypy.interpreter.baseobjspace import W_Root
+
+from pypy.module.posix.interp_posix import unwrap_fd
+
+
+ at unwrap_spec(w_path=WrappedDefault(u"."))
+def scandir(space, w_path):
+    "scandir(path='.') -> iterator of DirEntry objects for given path"
+
+    if space.isinstance_w(w_path, space.w_bytes):
+        XXX
+    else:
+        try:
+            path_bytes = space.fsencode_w(w_path)
+        except OperationError as operr:
+            if operr.async(space):
+                raise
+            fd = unwrap_fd(space, w_path, "string, bytes or integer")
+            XXXX
+
+        try:
+            dirp = rposix_scandir.opendir(path_bytes)
+        except OSError as e:
+            raise wrap_oserror(space, e)
+        path_prefix = path_bytes
+        if len(path_prefix) > 0 and path_prefix[-1] != '/':
+            path_prefix += '/'
+        w_path_prefix = space.fsdecode(space.newbytes(path_prefix))
+        return W_ScandirIterator(space, dirp, w_path_prefix)
+
+
+class W_ScandirIterator(W_Root):
+    def __init__(self, space, dirp, w_path_prefix):
+        self.space = space
+        self.dirp = dirp
+        self.w_path_prefix = w_path_prefix
+
+    @rgc.must_be_light_finalizer
+    def __del__(self):
+        if self.dirp:
+            rposix_scandir.closedir(self.dirp)
+
+    def iter_w(self):
+        return self.space.wrap(self)
+
+    def fail(self, err=None):
+        dirp = self.dirp
+        if dirp:
+            self.dirp = lltype.nullptr(lltype.typeOf(dirp).TO)
+            rposix_scandir.closedir(dirp)
+        if err is None:
+            raise OperationError(self.space.w_StopIteration, self.space.w_None)
+        else:
+            raise err
+
+    def next_w(self):
+        # XXX not safe against being called on several threads for
+        # the same object, but I think that CPython has the same problem
+        if not self.dirp:
+            self.fail()
+        #
+        space = self.space
+        while True:
+            try:
+                entry = rposix_scandir.nextentry(self.dirp)
+            except StopIteration:
+                self.fail()
+            except OSError as e:
+                self.fail(wrap_oserror(space, e))
+            assert rposix_scandir.has_name_bytes(entry)
+            name = rposix_scandir.get_name_bytes(entry)
+            if name != '.' and name != '..':
+                break
+        #
+        direntry = W_DirEntry(name, self.w_path_prefix)
+        return space.wrap(direntry)
+
+
+W_ScandirIterator.typedef = TypeDef(
+    'posix.ScandirIterator',
+    __iter__ = interp2app(W_ScandirIterator.iter_w),
+    __next__ = interp2app(W_ScandirIterator.next_w),
+)
+W_ScandirIterator.typedef.acceptable_as_base_class = False
+
+
+class W_DirEntry(W_Root):
+    w_name = None
+    w_path = None
+
+    def __init__(self, name_bytes, w_path_prefix):
+        self.name_bytes = name_bytes
+        self.w_path_prefix = w_path_prefix
+
+    def fget_name(self, space):
+        w_name = self.w_name
+        if w_name is None:
+            w_name = space.fsdecode(space.newbytes(self.name_bytes))
+            self.w_name = w_name
+        return w_name
+
+    def fget_path(self, space):
+        w_path = self.w_path
+        if w_path is None:
+            w_name = self.fget_name(space)
+            w_path = space.add(self.w_path_prefix, w_name)
+            self.w_path = w_path
+        return w_path
+
+W_DirEntry.typedef = TypeDef(
+    'posix.DirEntry',
+    name = GetSetProperty(W_DirEntry.fget_name,
+                          doc="the entry's base filename, relative to "
+                              'scandir() "path" argument'),
+    path = GetSetProperty(W_DirEntry.fget_path,
+                          doc="the entry's full path name; equivalent to "
+                              "os.path.join(scandir_path, entry.name)"),
+)
+W_DirEntry.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/posix/test/test_scandir.py b/pypy/module/posix/test/test_scandir.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/posix/test/test_scandir.py
@@ -0,0 +1,49 @@
+import sys, os
+from rpython.tool.udir import udir
+from pypy.module.posix.test import test_posix2
+
+
+def _make_dir(dirname, content):
+    d = os.path.join(str(udir), dirname)
+    os.mkdir(d)
+    for key, value in content.items():
+        xxx
+    return d.decode(sys.getfilesystemencoding())
+
+
+class AppTestScandir(object):
+    spaceconfig = {'usemodules': test_posix2.USEMODULES}
+
+    def setup_class(cls):
+        space = cls.space
+        cls.w_posix = space.appexec([], test_posix2.GET_POSIX)
+        cls.w_dir_empty = space.wrap(_make_dir('empty', {}))
+
+    def test_scandir_empty(self):
+        posix = self.posix
+        sd = posix.scandir(self.dir_empty)
+        assert list(sd) == []
+        assert list(sd) == []
+
+    def test_unicode_versus_bytes(self):
+        posix = self.posix
+        d = next(posix.scandir())
+        assert type(d.name) is str
+        assert type(d.path) is str
+        assert d.path == './' + d.name
+        d = next(posix.scandir(u'.'))
+        assert type(d.name) is str
+        assert type(d.path) is str
+        assert d.path == './' + d.name
+        d = next(posix.scandir(b'.'))
+        assert type(d.name) is bytes
+        assert type(d.path) is bytes
+        assert d.path == b'./' + d.name
+        d = next(posix.scandir('/'))
+        assert type(d.name) is str
+        assert type(d.path) is str
+        assert d.path == '/' + d.name
+        d = next(posix.scandir(b'/'))
+        assert type(d.name) is bytes
+        assert type(d.path) is bytes
+        assert d.path == b'/' + d.name


More information about the pypy-commit mailing list