[pypy-commit] pypy py3.5: Start on posix.scandir()
arigo
pypy.commits at gmail.com
Sun Aug 21 14:23:23 EDT 2016
Author: Armin Rigo <arigo at tunes.org>
Branch: py3.5
Changeset: r86377:cbc8e2e4d409
Date: 2016-08-21 19:57 +0200
http://bitbucket.org/pypy/pypy/changeset/cbc8e2e4d409/
Log: Start on posix.scandir()
diff --git a/pypy/module/posix/__init__.py b/pypy/module/posix/__init__.py
--- a/pypy/module/posix/__init__.py
+++ b/pypy/module/posix/__init__.py
@@ -74,6 +74,8 @@
'abort': 'interp_posix.abort',
'urandom': 'interp_posix.urandom',
'device_encoding': 'interp_posix.device_encoding',
+
+ 'scandir': 'interp_scandir.scandir',
}
if hasattr(os, 'chown'):
diff --git a/pypy/module/posix/interp_posix.py b/pypy/module/posix/interp_posix.py
--- a/pypy/module/posix/interp_posix.py
+++ b/pypy/module/posix/interp_posix.py
@@ -862,13 +862,12 @@
try:
path = space.fsencode_w(w_path)
except OperationError as operr:
+ if operr.async(space):
+ raise
if not rposix.HAVE_FDOPENDIR:
raise oefmt(space.w_TypeError,
"listdir: illegal type for path argument")
- if not space.isinstance_w(w_path, space.w_int):
- raise oefmt(space.w_TypeError,
- "argument should be string, bytes or integer, not %T", w_path)
- fd = unwrap_fd(space, w_path)
+ fd = unwrap_fd(space, w_path, "string, bytes or integer")
try:
result = rposix.fdlistdir(fd)
except OSError as e:
diff --git a/pypy/module/posix/interp_scandir.py b/pypy/module/posix/interp_scandir.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/posix/interp_scandir.py
@@ -0,0 +1,126 @@
+from rpython.rlib import rgc
+from rpython.rlib import rposix_scandir
+from rpython.rtyper.lltypesystem import lltype
+
+from pypy.interpreter.gateway import unwrap_spec, WrappedDefault, interp2app
+from pypy.interpreter.error import OperationError, oefmt, wrap_oserror
+from pypy.interpreter.typedef import TypeDef, GetSetProperty
+from pypy.interpreter.baseobjspace import W_Root
+
+from pypy.module.posix.interp_posix import unwrap_fd
+
+
+ at unwrap_spec(w_path=WrappedDefault(u"."))
+def scandir(space, w_path):
+ "scandir(path='.') -> iterator of DirEntry objects for given path"
+
+ if space.isinstance_w(w_path, space.w_bytes):
+ XXX
+ else:
+ try:
+ path_bytes = space.fsencode_w(w_path)
+ except OperationError as operr:
+ if operr.async(space):
+ raise
+ fd = unwrap_fd(space, w_path, "string, bytes or integer")
+ XXXX
+
+ try:
+ dirp = rposix_scandir.opendir(path_bytes)
+ except OSError as e:
+ raise wrap_oserror(space, e)
+ path_prefix = path_bytes
+ if len(path_prefix) > 0 and path_prefix[-1] != '/':
+ path_prefix += '/'
+ w_path_prefix = space.fsdecode(space.newbytes(path_prefix))
+ return W_ScandirIterator(space, dirp, w_path_prefix)
+
+
+class W_ScandirIterator(W_Root):
+ def __init__(self, space, dirp, w_path_prefix):
+ self.space = space
+ self.dirp = dirp
+ self.w_path_prefix = w_path_prefix
+
+ @rgc.must_be_light_finalizer
+ def __del__(self):
+ if self.dirp:
+ rposix_scandir.closedir(self.dirp)
+
+ def iter_w(self):
+ return self.space.wrap(self)
+
+ def fail(self, err=None):
+ dirp = self.dirp
+ if dirp:
+ self.dirp = lltype.nullptr(lltype.typeOf(dirp).TO)
+ rposix_scandir.closedir(dirp)
+ if err is None:
+ raise OperationError(self.space.w_StopIteration, self.space.w_None)
+ else:
+ raise err
+
+ def next_w(self):
+ # XXX not safe against being called on several threads for
+ # the same object, but I think that CPython has the same problem
+ if not self.dirp:
+ self.fail()
+ #
+ space = self.space
+ while True:
+ try:
+ entry = rposix_scandir.nextentry(self.dirp)
+ except StopIteration:
+ self.fail()
+ except OSError as e:
+ self.fail(wrap_oserror(space, e))
+ assert rposix_scandir.has_name_bytes(entry)
+ name = rposix_scandir.get_name_bytes(entry)
+ if name != '.' and name != '..':
+ break
+ #
+ direntry = W_DirEntry(name, self.w_path_prefix)
+ return space.wrap(direntry)
+
+
+W_ScandirIterator.typedef = TypeDef(
+ 'posix.ScandirIterator',
+ __iter__ = interp2app(W_ScandirIterator.iter_w),
+ __next__ = interp2app(W_ScandirIterator.next_w),
+)
+W_ScandirIterator.typedef.acceptable_as_base_class = False
+
+
+class W_DirEntry(W_Root):
+ w_name = None
+ w_path = None
+
+ def __init__(self, name_bytes, w_path_prefix):
+ self.name_bytes = name_bytes
+ self.w_path_prefix = w_path_prefix
+
+ def fget_name(self, space):
+ w_name = self.w_name
+ if w_name is None:
+ w_name = space.fsdecode(space.newbytes(self.name_bytes))
+ self.w_name = w_name
+ return w_name
+
+ def fget_path(self, space):
+ w_path = self.w_path
+ if w_path is None:
+ w_name = self.fget_name(space)
+ w_path = space.add(self.w_path_prefix, w_name)
+ self.w_path = w_path
+ return w_path
+
+W_DirEntry.typedef = TypeDef(
+ 'posix.DirEntry',
+ name = GetSetProperty(W_DirEntry.fget_name,
+ doc="the entry's base filename, relative to "
+ 'scandir() "path" argument'),
+ path = GetSetProperty(W_DirEntry.fget_path,
+ doc="the entry's full path name; equivalent to "
+ "os.path.join(scandir_path, entry.name)"),
+)
+W_DirEntry.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/posix/test/test_scandir.py b/pypy/module/posix/test/test_scandir.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/posix/test/test_scandir.py
@@ -0,0 +1,49 @@
+import sys, os
+from rpython.tool.udir import udir
+from pypy.module.posix.test import test_posix2
+
+
+def _make_dir(dirname, content):
+ d = os.path.join(str(udir), dirname)
+ os.mkdir(d)
+ for key, value in content.items():
+ xxx
+ return d.decode(sys.getfilesystemencoding())
+
+
+class AppTestScandir(object):
+ spaceconfig = {'usemodules': test_posix2.USEMODULES}
+
+ def setup_class(cls):
+ space = cls.space
+ cls.w_posix = space.appexec([], test_posix2.GET_POSIX)
+ cls.w_dir_empty = space.wrap(_make_dir('empty', {}))
+
+ def test_scandir_empty(self):
+ posix = self.posix
+ sd = posix.scandir(self.dir_empty)
+ assert list(sd) == []
+ assert list(sd) == []
+
+ def test_unicode_versus_bytes(self):
+ posix = self.posix
+ d = next(posix.scandir())
+ assert type(d.name) is str
+ assert type(d.path) is str
+ assert d.path == './' + d.name
+ d = next(posix.scandir(u'.'))
+ assert type(d.name) is str
+ assert type(d.path) is str
+ assert d.path == './' + d.name
+ d = next(posix.scandir(b'.'))
+ assert type(d.name) is bytes
+ assert type(d.path) is bytes
+ assert d.path == b'./' + d.name
+ d = next(posix.scandir('/'))
+ assert type(d.name) is str
+ assert type(d.path) is str
+ assert d.path == '/' + d.name
+ d = next(posix.scandir(b'/'))
+ assert type(d.name) is bytes
+ assert type(d.path) is bytes
+ assert d.path == b'/' + d.name
More information about the pypy-commit
mailing list