[pypy-svn] r47352 - in pypy/dist/pypy/rlib: . test
fijal at codespeak.net
fijal at codespeak.net
Tue Oct 9 16:20:07 CEST 2007
Author: fijal
Date: Tue Oct 9 16:20:05 2007
New Revision: 47352
Added:
pypy/dist/pypy/rlib/rmmap.py (contents, props changed)
pypy/dist/pypy/rlib/test/test_rmmap.py (contents, props changed)
Log:
Implement rmmap, with slightly non-pythonic interface, but no RPython tricks
inside.
Added: pypy/dist/pypy/rlib/rmmap.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/rlib/rmmap.py Tue Oct 9 16:20:05 2007
@@ -0,0 +1,664 @@
+
+from pypy.rpython.tool import rffi_platform
+from pypy.rpython.lltypesystem import rffi, lltype, llmemory
+
+import sys
+import os
+import platform
+import stat
+
+_POSIX = os.name == "posix"
+_MS_WINDOWS = os.name == "nt"
+_LINUX = "linux" in sys.platform
+_64BIT = "64bit" in platform.architecture()[0]
+
+
+class CConfig:
+ _includes_ = ("sys/types.h",'unistd.h')
+ _header_ = '#define _GNU_SOURCE\n'
+ size_t = rffi_platform.SimpleType("size_t", rffi.LONG)
+ off_t = rffi_platform.SimpleType("off_t", rffi.LONG)
+
+constants = {}
+if _POSIX:
+ CConfig._includes_ += ("sys/mman.h",)
+ # constants, look in sys/mman.h and platform docs for the meaning
+ # some constants are linux only so they will be correctly exposed outside
+ # depending on the OS
+ constant_names = ['MAP_SHARED', 'MAP_PRIVATE',
+ 'PROT_READ', 'PROT_WRITE',
+ 'MS_SYNC']
+ opt_constant_names = ['MAP_ANON', 'MAP_ANONYMOUS',
+ 'PROT_EXEC',
+ 'MAP_DENYWRITE', 'MAP_EXECUTABLE']
+ for name in constant_names:
+ setattr(CConfig, name, rffi_platform.ConstantInteger(name))
+ for name in opt_constant_names:
+ setattr(CConfig, name, rffi_platform.DefinedConstantInteger(name))
+
+ CConfig.MREMAP_MAYMOVE = (
+ rffi_platform.DefinedConstantInteger("MREMAP_MAYMOVE"))
+ CConfig.has_mremap = rffi_platform.Has('mremap(NULL, 0, 0, 0)')
+ # a dirty hack, this is probably a macro
+
+elif _MS_WINDOWS:
+ CConfig._includes_ += ("windows.h",)
+ constant_names = ['PAGE_READONLY', 'PAGE_READWRITE', 'PAGE_WRITECOPY',
+ 'FILE_MAP_READ', 'FILE_MAP_WRITE', 'FILE_MAP_COPY',
+ 'DUPLICATE_SAME_ACCESS']
+ for name in constant_names:
+ setattr(CConfig, name, rffi_platform.ConstantInteger(name))
+
+# export the constants inside and outside. see __init__.py
+cConfig = rffi_platform.configure(CConfig)
+constants.update(cConfig)
+
+if _POSIX:
+ # MAP_ANONYMOUS is not always present but it's always available at CPython level
+ if constants["MAP_ANONYMOUS"] is None:
+ constants["MAP_ANONYMOUS"] = constants["MAP_ANON"]
+ assert constants["MAP_ANONYMOUS"] is not None
+ constants["MAP_ANON"] = constants["MAP_ANONYMOUS"]
+
+locals().update(constants)
+
+_ACCESS_DEFAULT, ACCESS_READ, ACCESS_WRITE, ACCESS_COPY = range(4)
+
+def external(name, args, result):
+ return rffi.llexternal(name, args, result, includes=CConfig._includes_)
+
+PTR = rffi.CCHARP
+
+has_mremap = cConfig['has_mremap']
+
+c_memmove = external('memmove', [PTR, PTR, size_t], lltype.Void)
+
+if _POSIX:
+ c_mmap = external('mmap', [PTR, size_t, rffi.INT, rffi.INT,
+ rffi.INT, off_t], PTR)
+ c_munmap = external('munmap', [PTR, size_t], rffi.INT)
+ c_msync = external('msync', [PTR, size_t, rffi.INT], rffi.INT)
+ if has_mremap:
+ c_mremap = external('mremap', [PTR, size_t, size_t, rffi.ULONG], PTR)
+
+ _get_page_size = external('getpagesize', [], rffi.INT)
+
+ def _get_error_msg():
+ errno = rffi.get_errno()
+ return os.strerror(errno)
+elif _MS_WINDOWS:
+ XXX
+ from ctypes import wintypes
+
+ WORD = wintypes.WORD
+ DWORD = wintypes.DWORD
+ BOOL = wintypes.BOOL
+ LONG = wintypes.LONG
+ LPVOID = PTR
+ LPCVOID = LPVOID
+ DWORD_PTR = DWORD
+ rffi.INT = wintypes.rffi.INT
+ INVALID_c_int_VALUE = c_int(-1).value
+
+ class SYSINFO_STRUCT(Structure):
+ _fields_ = [("wProcessorArchitecture", WORD),
+ ("wReserved", WORD)]
+
+ class SYSINFO_UNION(Union):
+ _fields_ = [("dwOemId", DWORD),
+ ("struct", SYSINFO_STRUCT)]
+
+ class SYSTEM_INFO(Structure):
+ _fields_ = [("union", SYSINFO_UNION),
+ ("dwPageSize", DWORD),
+ ("lpMinimumApplicationAddress", LPVOID),
+ ("lpMaximumApplicationAddress", LPVOID),
+ ("dwActiveProcessorMask", DWORD_PTR),
+ ("dwNumberOfProcessors", DWORD),
+ ("dwProcessorType", DWORD),
+ ("dwAllocationGranularity", DWORD),
+ ("wProcessorLevel", WORD),
+ ("wProcessorRevision", WORD)]
+
+ windll.kernel32.GetSystemInfo.argtypes = [POINTER(SYSTEM_INFO)]
+ GetFileSize = windll.kernel32.GetFileSize
+ GetFileSize.argtypes = [rffi.INT, POINTER(rffi.INT)]
+ GetFileSize.restype = rffi.INT
+ GetCurrentProcess = windll.kernel32.GetCurrentProcess
+ GetCurrentProcess.restype = rffi.INT
+ DuplicateHandle = windll.kernel32.DuplicateHandle
+ DuplicateHandle.argtypes = [rffi.INT, rffi.INT, rffi.INT, POINTER(rffi.INT), DWORD,
+ BOOL, DWORD]
+ DuplicateHandle.restype = BOOL
+ CreateFileMapping = windll.kernel32.CreateFileMappingA
+ CreateFileMapping.argtypes = [rffi.INT, PTR, rffi.INT, rffi.INT, rffi.INT,
+ c_char_p]
+ CreateFileMapping.restype = rffi.INT
+ MapViewOfFile = windll.kernel32.MapViewOfFile
+ MapViewOfFile.argtypes = [rffi.INT, DWORD, DWORD, DWORD, DWORD]
+ MapViewOfFile.restype = PTR
+ CloseHandle = windll.kernel32.CloseHandle
+ CloseHandle.argtypes = [rffi.INT]
+ CloseHandle.restype = BOOL
+ UnmapViewOfFile = windll.kernel32.UnmapViewOfFile
+ UnmapViewOfFile.argtypes = [LPCVOID]
+ UnmapViewOfFile.restype = BOOL
+ FlushViewOfFile = windll.kernel32.FlushViewOfFile
+ FlushViewOfFile.argtypes = [LPCVOID, rffi.INT]
+ FlushViewOfFile.restype = BOOL
+ SetFilePointer = windll.kernel32.SetFilePointer
+ SetFilePointer.argtypes = [rffi.INT, rffi.INT, POINTER(rffi.INT), rffi.INT]
+ SetEndOfFile = windll.kernel32.SetEndOfFile
+ SetEndOfFile.argtypes = [rffi.INT]
+ msvcr71 = cdll.LoadLibrary("msvcr71.dll")
+ msvcr71._get_osfhandle.argtypes = [rffi.INT]
+ msvcr71._get_osfhandle.restype = rffi.INT
+ # libc._lseek.argtypes = [rffi.INT, rffi.INT, rffi.INT]
+ # libc._lseek.restype = rffi.INT
+
+
+ def _get_page_size():
+ si = SYSTEM_INFO()
+ windll.kernel32.GetSystemInfo(byref(si))
+ return int(si.dwPageSize)
+
+ def _get_file_size(handle):
+ # XXX use native Windows types like WORD
+ high = rffi.INT(0)
+ low = rffi.INT(windll.kernel32.GetFileSize(rffi.INT(handle.value), byref(high)))
+ # low might just happen to have the value INVALID_FILE_SIZE
+ # so we need to check the last error also
+ INVALID_FILE_SIZE = -1
+ NO_ERROR = 0
+ dwErr = GetLastError()
+ if low.value == INVALID_FILE_SIZE and dwErr != NO_ERROR:
+ raise EnvironmentError(os.strerror(dwErr))
+ return low.value, high.value
+
+ def _get_error_msg():
+ errno = GetLastError()
+ return os.strerror(errno)
+
+PAGESIZE = _get_page_size()
+NULL = lltype.nullptr(PTR.TO)
+NODATA = lltype.nullptr(PTR.TO)
+INVALID_INT_VALUE = -1
+
+class MMap(object):
+ def __init__(self, access):
+ self.size = 0
+ self.pos = 0
+ self.access = access
+
+ if _MS_WINDOWS:
+ self.map_handle = 0
+ self.file_handle = 0
+ self.tagname = ""
+ elif _POSIX:
+ self.fd = -1
+ self.closed = False
+
+ def check_valid(self):
+ if _MS_WINDOWS:
+ to_close = self.map_handle.value == INVALID_INT_VALUE
+ elif _POSIX:
+ to_close = self.closed
+
+ if to_close:
+ raise ValueError("map closed or invalid")
+
+ def check_writeable(self):
+ if not (self.access != ACCESS_READ):
+ raise TypeError("mmap can't modify a readonly memory map.")
+
+ def check_resizeable(self):
+ if not (self.access == ACCESS_WRITE or self.access == _ACCESS_DEFAULT):
+ raise TypeError("mmap can't resize a readonly or copy-on-write memory map.")
+
+ def setdata(self, data, size):
+ """Set the internal data and map size from a PTR."""
+ assert size >= 0
+ self.data = data
+ self.size = size
+
+ def close(self):
+ if _MS_WINDOWS:
+ if self.size > 0:
+ self.unmapview()
+ self.setdata(NODATA, 0)
+ if self.map_handle.value != INVALID_rffi.INT_VALUE:
+ CloseHandle(self.map_handle)
+ self.map_handle.value = INVALID_rffi.INT_VALUE
+ if self.file_handle.value != INVALID_rffi.INT_VALUE:
+ CloseHandle(self.file_handle)
+ self.file_handle.value = INVALID_rffi.INT_VALUE
+ elif _POSIX:
+ self.closed = True
+ if self.fd != -1:
+ os.close(self.fd)
+ self.fd = -1
+ if self.size > 0:
+ c_munmap(self.getptr(0), self.size)
+ self.setdata(NODATA, 0)
+
+ def unmapview(self):
+ UnmapViewOfFile(self.getptr(0))
+
+ def read_byte(self):
+ self.check_valid()
+
+ if self.pos < self.size:
+ value = self.data[self.pos]
+ self.pos += 1
+ return value
+ else:
+ raise ValueError("read byte out of range")
+
+ def readline(self):
+ self.check_valid()
+
+ data = self.data
+ for pos in xrange(self.pos, self.size):
+ if data[pos] == '\n':
+ eol = pos + 1 # we're interested in the position after new line
+ break
+ else: # no '\n' found
+ eol = self.size
+
+ res = "".join([data[i] for i in range(self.pos, eol)])
+ self.pos += len(res)
+ return res
+
+ def read(self, num=-1):
+ self.check_valid()
+
+ if num < 0:
+ # read all
+ eol = self.size
+ else:
+ eol = self.pos + num
+ # silently adjust out of range requests
+ if eol > self.size:
+ eol = self.size
+
+ res = [self.data[i] for i in range(self.pos, eol)]
+ res = "".join(res)
+ self.pos += len(res)
+ return res
+
+ def find(self, tofind, start=0):
+ self.check_valid()
+
+ # XXX naive! how can we reuse the rstr algorithm?
+ if start < 0:
+ start += self.size
+ if start < 0:
+ start = 0
+ data = self.data
+ for p in xrange(start, self.size - len(tofind) + 1):
+ for q in range(len(tofind)):
+ if data[p+q] != tofind[q]:
+ break # position 'p' is not a match
+ else:
+ # full match
+ return p
+ # failure
+ return -1
+
+ def seek(self, pos, whence=0):
+ self.check_valid()
+
+ dist = pos
+ how = whence
+
+ if how == 0: # relative to start
+ where = dist
+ elif how == 1: # relative to current position
+ where = self.pos + dist
+ elif how == 2: # relative to the end
+ where = self.size + dist
+ else:
+ raise ValueError("unknown seek type")
+
+ if not (0 <= where <= self.size):
+ raise ValueError("seek out of range")
+
+ self.pos = where
+
+ def tell(self):
+ self.check_valid()
+ return self.pos
+
+ def file_size(self):
+ self.check_valid()
+
+ size = self.size
+ if _MS_WINDOWS:
+ if self.file_handle.value != INVALID_rffi.INT_VALUE:
+ low, high = _get_file_size(self.file_handle)
+ if not high and low <= sys.maxint:
+ return low
+ size = rffi.INT((high << 32) + low).value
+ elif _POSIX:
+ st = os.fstat(self.fd)
+ size = st[stat.ST_SIZE]
+ if size > sys.maxint:
+ size = sys.maxint
+ else:
+ size = int(size)
+ return size
+
+ def write(self, data):
+ self.check_valid()
+ self.check_writeable()
+
+ data_len = len(data)
+ if self.pos + data_len > self.size:
+ raise ValueError("data out of range")
+
+ internaldata = self.data
+ start = self.pos
+ for i in range(data_len):
+ internaldata[start+i] = data[i]
+ self.pos = start + data_len
+
+ def write_byte(self, byte):
+ self.check_valid()
+
+ if len(byte) != 1:
+ raise TypeError("write_byte() argument must be char")
+
+ self.check_writeable()
+ self.data[self.pos] = byte[0]
+ self.pos += 1
+
+ def getptr(self, offset):
+ return rffi.ptradd(self.data, offset)
+
+ def flush(self, offset=0, size=0):
+ self.check_valid()
+
+ if size == 0:
+ size = self.size
+ if offset < 0 or size < 0 or offset + size > self.size:
+ raise ValueError("flush values out of range")
+ else:
+ start = self.getptr(offset)
+ if _MS_WINDOWS:
+ res = FlushViewOfFile(start, size)
+ # XXX res == 0 means that an error occurred, but in CPython
+ # this is not checked
+ return res
+ elif _POSIX:
+## XXX why is this code here? There is no equivalent in CPython
+## if _LINUX:
+## # alignment of the address
+## value = cast(self.data, c_void_p).value
+## aligned_value = value & ~(PAGESIZE - 1)
+## # the size should be increased too. otherwise the final
+## # part is not "msynced"
+## new_size = size + value & (PAGESIZE - 1)
+ res = c_msync(start, size, MS_SYNC)
+ if res == -1:
+ raise EnvironmentError(_get_error_msg())
+
+ return 0
+
+ def move(self, dest, src, count):
+ self.check_valid()
+
+ self.check_writeable()
+
+ # check boundings
+ if (src < 0 or dest < 0 or count < 0 or
+ src + count > self.size or dest + count > self.size):
+ raise ValueError("source or destination out of range")
+
+ datasrc = self.getptr(src)
+ datadest = self.getptr(dest)
+ c_memmove(datadest, datasrc, count)
+
+ def resize(self, newsize):
+ self.check_valid()
+
+ self.check_resizeable()
+
+ if _POSIX:
+ if not has_mremap:
+ msg = "mmap: resizing not available -- no mremap()"
+ raise EnvironmentError(msg)
+
+ # resize the underlying file first
+ try:
+ os.ftruncate(self.fd, newsize)
+ except OSError, e:
+ raise EnvironmentError(os.strerror(e.errno))
+
+ # now resize the mmap
+ newdata = c_mremap(self.getptr(0), self.size, newsize,
+ MREMAP_MAYMOVE or 0)
+ self.setdata(newdata, newsize)
+ elif _MS_WINDOWS:
+ # disconnect the mapping
+ self.unmapview()
+ CloseHandle(self.map_handle)
+
+ # move to the desired EOF position
+ if _64BIT:
+ newsize_high = DWORD(newsize >> 32)
+ newsize_low = DWORD(newsize & 0xFFFFFFFF)
+ else:
+ newsize_high = rffi.INT(0)
+ newsize_low = rffi.INT(newsize)
+
+ FILE_BEGIN = rffi.INT(0)
+ SetFilePointer(self.file_handle, newsize_low, byref(newsize_high),
+ FILE_BEGIN)
+ # resize the file
+ SetEndOfFile(self.file_handle)
+ # create another mapping object and remap the file view
+ res = CreateFileMapping(self.file_handle, NULL, PAGE_READWRITE,
+ newsize_high, newsize_low, self.tagname)
+ self.map_handle = rffi.INT(res)
+
+ dwErrCode = DWORD(0)
+ if self.map_handle:
+ data = MapViewOfFile(self.map_handle, FILE_MAP_WRITE,
+ 0, 0, 0)
+ if data:
+ self.setdata(data, newsize)
+ return
+ else:
+ dwErrCode = GetLastError()
+ else:
+ dwErrCode = GetLastError()
+
+ raise EnvironmentError(os.strerror(dwErrCode))
+
+ def len(self):
+ self.check_valid()
+
+ return self.size
+
+ def getitem(self, index):
+ self.check_valid()
+ # simplified version, for rpython
+ if index < 0:
+ index += self.size
+ return self.data[index]
+
+ def setitem(self, index, value):
+ self.check_valid()
+ self.check_writeable()
+
+ if len(value) != 1:
+ raise ValueError("mmap assignment must be "
+ "single-character string")
+ if index < 0:
+ index += self.size
+ self.data[index] = value[0]
+
+def _check_map_size(size):
+ if size < 0:
+ raise TypeError("memory mapped size must be positive")
+ if rffi.cast(size_t, size) != size:
+ raise OverflowError("memory mapped size is too large (limited by C int)")
+
+if _POSIX:
+ def mmap(fileno, length, flags=MAP_SHARED,
+ prot=PROT_WRITE | PROT_READ, access=_ACCESS_DEFAULT):
+
+ fd = fileno
+
+ # check size boundaries
+ _check_map_size(length)
+ map_size = length
+
+ # check access is not there when flags and prot are there
+ if access != _ACCESS_DEFAULT and ((flags != MAP_SHARED) or\
+ (prot != (PROT_WRITE | PROT_READ))):
+ raise ValueError("mmap can't specify both access and flags, prot.")
+
+ if access == ACCESS_READ:
+ flags = MAP_SHARED
+ prot = PROT_READ
+ elif access == ACCESS_WRITE:
+ flags = MAP_SHARED
+ prot = PROT_READ | PROT_WRITE
+ elif access == ACCESS_COPY:
+ flags = MAP_PRIVATE
+ prot = PROT_READ | PROT_WRITE
+ elif access == _ACCESS_DEFAULT:
+ pass
+ else:
+ raise ValueError("mmap invalid access parameter.")
+
+ # check file size
+ try:
+ st = os.fstat(fd)
+ except OSError:
+ pass # ignore errors and trust map_size
+ else:
+ mode = st[stat.ST_MODE]
+ size = st[stat.ST_SIZE]
+ if size > sys.maxint:
+ size = sys.maxint
+ else:
+ size = int(size)
+ if stat.S_ISREG(mode):
+ if map_size == 0:
+ map_size = size
+ elif map_size > size:
+ raise ValueError("mmap length is greater than file size")
+
+ m = MMap(access)
+ if fd == -1:
+ # Assume the caller wants to map anonymous memory.
+ # This is the same behaviour as Windows. mmap.mmap(-1, size)
+ # on both Windows and Unix map anonymous memory.
+ m.fd = -1
+
+ flags |= MAP_ANONYMOUS
+
+ else:
+ try:
+ m.fd = os.dup(fd)
+ except OSError, e:
+ raise EnvironmentError(os.strerror(e.errno))
+
+ res = c_mmap(NULL, map_size, prot, flags, fd, 0)
+ if res == rffi.cast(PTR, -1):
+ raise EnvironmentError(_get_error_msg())
+
+ m.setdata(res, map_size)
+ return m
+elif _MS_WINDOWS:
+ def mmap(fileno, length, tagname="", access=_ACCESS_DEFAULT):
+ # check size boundaries
+ _check_map_size(length)
+ map_size = length
+
+ flProtect = 0
+ dwDesiredAccess = 0
+ fh = 0
+
+ if access == ACCESS_READ:
+ flProtect = PAGE_READONLY
+ dwDesiredAccess = FILE_MAP_READ
+ elif access == _ACCESS_DEFAULT or access == ACCESS_WRITE:
+ flProtect = PAGE_READWRITE
+ dwDesiredAccess = FILE_MAP_WRITE
+ elif access == ACCESS_COPY:
+ flProtect = PAGE_WRITECOPY
+ dwDesiredAccess = FILE_MAP_COPY
+ else:
+ raise ValueError("mmap invalid access parameter.")
+
+ # assume -1 and 0 both mean invalid file descriptor
+ # to 'anonymously' map memory.
+ if fileno != -1 and fileno != 0:
+ fh = msvcr71._get_osfhandle(fileno)
+ if fh == -1:
+ raise EnvironmentError(_get_error_msg())
+ # Win9x appears to need us seeked to zero
+ # SEEK_SET = 0
+ # libc._lseek(fileno, 0, SEEK_SET)
+
+ m = MMap(access)
+ # XXX the following two attributes should be plain RPython ints
+ m.file_handle = rffi.INT(INVALID_rffi.INT_VALUE)
+ m.map_handle = rffi.INT(INVALID_rffi.INT_VALUE)
+
+ if fh:
+ # it is necessary to duplicate the handle, so the
+ # Python code can close it on us
+ res = DuplicateHandle(GetCurrentProcess(), # source process handle
+ fh, # handle to be duplicated
+ GetCurrentProcess(), # target process handle
+ byref(m.file_handle), # result
+ 0, # access - ignored due to options value
+ False, # inherited by child procs?
+ DUPLICATE_SAME_ACCESS) # options
+ if not res:
+ raise EnvironmentError(_get_error_msg())
+
+ if not map_size:
+ low, high = _get_file_size(rffi.INT(fh))
+ if _64BIT:
+ map_size = rffi.INT((low << 32) + 1).value
+ else:
+ if high:
+ # file is too large to map completely
+ map_size = -1
+ else:
+ map_size = low
+
+ if tagname:
+ m.tagname = tagname
+
+ # DWORD is a 4-byte int. If int > 4-byte it must be divided
+ if _64BIT:
+ size_hi = DWORD(map_size >> 32)
+ size_lo = DWORD(map_size & 0xFFFFFFFF)
+ else:
+ size_hi = rffi.INT(0)
+ size_lo = rffi.INT(map_size)
+
+ m.map_handle = rffi.INT(CreateFileMapping(m.file_handle, NULL, flProtect,
+ size_hi, size_lo, m.tagname))
+
+ if m.map_handle:
+ res = MapViewOfFile(m.map_handle, dwDesiredAccess,
+ 0, 0, 0)
+ if res:
+ m.setdata(res, map_size)
+ return m
+ else:
+ dwErr = GetLastError()
+ else:
+ dwErr = GetLastError()
+
+ raise EnvironmentError(os.strerror(dwErr))
+
+
+# register_external here?
Added: pypy/dist/pypy/rlib/test/test_rmmap.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/rlib/test/test_rmmap.py Tue Oct 9 16:20:05 2007
@@ -0,0 +1,376 @@
+from pypy.tool.udir import udir
+import os
+from pypy.rpython.test.test_llinterp import interpret
+from pypy.rlib import rmmap as mmap
+import sys
+
+class TestMMap:
+ def setup_class(cls):
+ cls.tmpname = str(udir.join('mmap-'))
+
+ def test_page_size(self):
+ def f():
+ assert mmap.PAGESIZE > 0
+ assert isinstance(mmap.PAGESIZE, int)
+
+ interpret(f, [])
+
+ def test_attributes(self):
+ def f():
+ assert isinstance(mmap.ACCESS_READ, int)
+ assert isinstance(mmap.ACCESS_WRITE, int)
+ assert isinstance(mmap.ACCESS_COPY, int)
+ if os.name == "posix":
+ assert isinstance(mmap.MAP_ANON, int)
+ assert isinstance(mmap.MAP_ANONYMOUS, int)
+ assert isinstance(mmap.MAP_PRIVATE, int)
+ assert isinstance(mmap.MAP_SHARED, int)
+ assert isinstance(mmap.PROT_EXEC, int)
+ assert isinstance(mmap.PROT_READ, int)
+ assert isinstance(mmap.PROT_WRITE, int)
+
+ interpret(f, [])
+
+ def test_args(self):
+ from pypy.rlib import rmmap
+ mmap = rmmap.mmap
+ if os.name == "posix":
+ raises(TypeError, mmap, 0, 1, 2, 3, 4, 5)
+ raises(TypeError, mmap, 0, 1, 2, 3, "foo", 5)
+ raises(TypeError, mmap, 0, 1, foo="foo")
+ raises(TypeError, mmap, 0, -1)
+ raises(OverflowError, mmap, 0, sys.maxint ** 3)
+ raises(ValueError, mmap, 0, 1, flags=2, access=3)
+ raises(ValueError, mmap, 0, 1, access=123)
+ elif os.name == "nt":
+ raises(TypeError, mmap, 0, 1, 2, 3, 4)
+ raises(TypeError, mmap, 0, 1, tagname=123)
+ raises(TypeError, mmap, 0, 1, access="foo")
+ raises(ValueError, mmap, 0, 1, access=-1)
+
+ def test_file_size(self):
+ def func(no):
+ if os.name == "nt":
+ skip("Only Unix checks file size")
+
+ try:
+ mmap.mmap(no, 123)
+ except ValueError:
+ pass
+ else:
+ raise Exception("didn't raise")
+
+ f = open(self.tmpname + "a", "w+")
+
+ f.write("c")
+ f.flush()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_create(self):
+ f = open(self.tmpname + "b", "w+")
+
+ f.write("c")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 1)
+ assert m.read(99) == "c"
+
+ interpret(func, [f.fileno()])
+
+ f.close()
+
+ def test_close(self):
+ f = open(self.tmpname + "c", "w+")
+
+ f.write("c")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 1)
+ m.close()
+ try:
+ m.read(1)
+ except ValueError:
+ pass
+ else:
+ raise Exception("Did not raise")
+ interpret(func, [f.fileno()])
+
+ def test_read_byte(self):
+ f = open(self.tmpname + "d", "w+")
+
+ f.write("c")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 1)
+ assert m.read_byte() == "c"
+ try:
+ m.read_byte()
+ except ValueError:
+ pass
+ else:
+ raise Exception("Did not raise")
+ m.close()
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_readline(self):
+ import os
+ f = open(self.tmpname + "e", "w+")
+
+ f.write("foo\n")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 4)
+ if os.name == "nt":
+ # windows replaces \n with \r. it's time to change to \n only MS!
+ assert m.readline() == "foo\r"
+ elif os.name == "posix":
+ assert m.readline() == "foo\n"
+ assert m.readline() == ""
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_read(self):
+ f = open(self.tmpname + "f", "w+")
+
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6)
+ assert m.read(1) == "f"
+ assert m.read(6) == "oobar"
+ assert m.read(1) == ""
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_find(self):
+ f = open(self.tmpname + "g", "w+")
+
+ f.write("foobar\0")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 7)
+ assert m.find("b") == 3
+ assert m.find("z") == -1
+ assert m.find("o", 5) == -1
+ assert m.find("ob") == 2
+ assert m.find("\0") == 6
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_is_modifiable(self):
+ f = open(self.tmpname + "h", "w+")
+
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6, access=mmap.ACCESS_READ)
+ try:
+ m.write('x')
+ except TypeError:
+ pass
+ else:
+ assert False
+ try:
+ m.resize(7)
+ except TypeError:
+ pass
+ else:
+ assert False
+ m.close()
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_seek(self):
+ f = open(self.tmpname + "i", "w+")
+
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6)
+ m.seek(0)
+ assert m.tell() == 0
+ m.read(1)
+ m.seek(1, 1)
+ assert m.tell() == 2
+ m.seek(0)
+ m.seek(-1, 2)
+ assert m.tell() == 5
+ m.close()
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_write(self):
+ f = open(self.tmpname + "j", "w+")
+
+ f.write("foobar")
+ f.flush()
+ def func(no):
+ m = mmap.mmap(no, 6, access=mmap.ACCESS_WRITE)
+ m.write("ciao\n")
+ m.seek(0)
+ assert m.read(6) == "ciao\nr"
+ m.close()
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_write_byte(self):
+ f = open(self.tmpname + "k", "w+")
+
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6, access=mmap.ACCESS_READ)
+ m = mmap.mmap(no, 6, access=mmap.ACCESS_WRITE)
+ m.write_byte("x")
+ m.seek(0)
+ assert m.read(6) == "xoobar"
+ m.close()
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_size(self):
+ f = open(self.tmpname + "l", "w+")
+
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 5)
+ assert m.file_size() == 6 # size of the underline file, not the mmap
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_tell(self):
+ f = open(self.tmpname + "m", "w+")
+
+ f.write("c")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 1)
+ assert m.tell() >= 0
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_move(self):
+ f = open(self.tmpname + "o", "w+")
+
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6, access=mmap.ACCESS_WRITE)
+ m.move(1, 3, 3)
+ assert m.read(6) == "fbarar"
+ m.seek(0)
+ m.move(1, 3, 2)
+ a = m.read(6)
+ assert a == "frarar"
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_resize(self):
+ import sys
+ if ("darwin" in sys.platform) or ("freebsd" in sys.platform):
+ skip("resize does not work under OSX or FreeBSD")
+
+ import os
+
+ f = open(self.tmpname + "p", "w+")
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6, access=mmap.ACCESS_WRITE)
+ f_size = os.fstat(no).st_size
+ assert m.file_size() == f_size == 6
+ m.resize(10)
+ f_size = os.fstat(no).st_size
+ assert m.file_size() == f_size == 10
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_len(self):
+
+ f = open(self.tmpname + "q", "w+")
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6)
+ assert m.len() == 6
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_get_item(self):
+
+ f = open(self.tmpname + "r", "w+")
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6)
+ assert m.getitem(0) == 'f'
+ assert m.getitem(-1) == 'r'
+ # sl = slice(1, 2)
+ # assert m.get_item(sl) == 'o'
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
+
+ def test_set_item(self):
+ f = open(self.tmpname + "s", "w+")
+ f.write("foobar")
+ f.flush()
+
+ def func(no):
+ m = mmap.mmap(no, 6, access=mmap.ACCESS_WRITE)
+
+ # def f(m): m[1:3] = u'xx'
+ # py.test.raises(IndexError, f, m)
+ # def f(m): m[1:4] = "zz"
+ # py.test.raises(IndexError, f, m)
+ # def f(m): m[1:6] = "z" * 6
+ # py.test.raises(IndexError, f, m)
+ # def f(m): m[:2] = "z" * 5
+ # m[1:3] = 'xx'
+ # assert m.read(6) == "fxxbar"
+ # m.seek(0)
+ m.setitem(0, 'x')
+ assert m.getitem(0) == 'x'
+ m.setitem(-6, 'y')
+ data = m.read(6)
+ assert data == "yoobar" # yxxbar with slice's stuff
+ m.close()
+
+ interpret(func, [f.fileno()])
+ f.close()
More information about the Pypy-commit
mailing list