[pypy-svn] r45818 - in pypy/branch/pypy-more-rtti-inprogress/translator/sandbox: . test

arigo at codespeak.net arigo at codespeak.net
Fri Aug 17 16:59:47 CEST 2007


Author: arigo
Date: Fri Aug 17 16:59:47 2007
New Revision: 45818

Added:
   pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/pypy_interact.py
      - copied, changed from r45812, pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/interact.py
Modified:
   pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/sandlib.py
   pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/test/test_sandbox.py
Log:
* starting a general virtual-paths-mapping class for sandboxed processes
* pypy-c-sandbox compiles but there are still issues
* add pypy_interact.py, specifically to run pypy-c-sandbox


Modified: pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/sandlib.py
==============================================================================
--- pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/sandlib.py	(original)
+++ pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/sandlib.py	Fri Aug 17 16:59:47 2007
@@ -4,7 +4,8 @@
 for the outer process, which can run CPython or PyPy.
 """
 
-import marshal, sys
+import marshal, sys, os, posixpath
+from pypy.rpython.module.ll_os_stat import STAT_FIELDS, s_tuple_StatResult
 from py.compat import subprocess
 
 def read_message(f, timeout=None):
@@ -17,25 +18,31 @@
             raise EOFError("timed out waiting for data")
     return marshal.load(f)
 
-if sys.version_info < (2, 4):
-    def write_message(g, msg):
-        marshal.dump(msg, g)
-        g.flush()
-else:
-    def write_message(g, msg):
-        marshal.dump(msg, g, 0)
-        g.flush()
+def write_message(g, msg, resulttype=None):
+    if resulttype is None:
+        if sys.version_info < (2, 4):
+            marshal.dump(msg, g)
+        else:
+            marshal.dump(msg, g, 0)
+    else:
+        # use the exact result type for encoding
+        from pypy.rlib.rmarshal import get_marshaller
+        buf = []
+        get_marshaller(resulttype)(buf, msg)
+        g.write(''.join(buf))
+    g.flush()
+
 
 class SandboxedProc(object):
     """Base class to control a sandboxed subprocess.
     Inherit from this class and implement all the do_xxx() methods
     for the external functions xxx that you want to support.
     """
-    def __init__(self, args):
+    def __init__(self, args, executable=None):
         """'args' should a sequence of argument for the subprocess,
         starting with the full path of the executable.
         """
-        self.popen = subprocess.Popen(args,
+        self.popen = subprocess.Popen(args, executable=executable,
                                       stdin=subprocess.PIPE,
                                       stdout=subprocess.PIPE)
 
@@ -58,9 +65,9 @@
                 args   = read_message(self.popen.stdout)
             except EOFError, e:
                 break
-            answer = self.handle_message(fnname, *args)
+            answer, resulttype = self.handle_message(fnname, *args)
             write_message(self.popen.stdin, 0)  # error code - always 0 for now
-            write_message(self.popen.stdin, answer)
+            write_message(self.popen.stdin, answer, resulttype)
         returncode = self.popen.wait()
         return returncode
 
@@ -68,7 +75,8 @@
         if '__' in fnname:
             raise ValueError("unsafe fnname")
         handler = getattr(self, 'do_' + fnname.replace('.', '__'))
-        return handler(*args)
+        resulttype = getattr(handler, 'resulttype', None)
+        return handler(*args), resulttype
 
 
 class SimpleIOSandboxedProc(SandboxedProc):
@@ -128,3 +136,48 @@
             self._error.write(data)
             return len(data)
         raise OSError("trying to write to fd %d" % (fd,))
+
+
+class VirtualOSError(Exception):
+    pass
+
+class VirtualizedSandboxedProc(SandboxedProc):
+    """Control a virtualized sandboxed process, which is given a custom
+    view on the filesystem and a custom environment.
+    """
+    virtual_env = {}
+    virtual_cwd = '/tmp'
+    path_mapping = {}     # maps virtual paths to real paths or None
+
+    def do_ll_os__ll_os_envitems(self):
+        return self.virtual_env.items()
+
+    def translate_path(self, vpath):
+        # XXX this assumes posix vpaths for now, but os-specific real paths
+        vpath = posixpath.join(self.virtual_cwd, vpath)
+        components = [component for component in vpath.split('/') if component]
+        # find the longest match in self.path_mapping
+        for i in range(len(components), -1, -1):
+            test = '/' + '/'.join(components[:i])
+            if test in self.path_mapping:
+                break    # found
+        else:
+            raise VirtualOSError("no access to vpath %r" % (vpath,))
+        result = self.path_mapping[test]
+        if i < len(components):
+            result = os.path.join(result, *components[i:])
+        if vpath.endswith(os.sep):
+            result += os.sep     # re-add a trailing '/' if one was specified
+        print 'VPATH:', vpath, '=>', result
+        return result
+
+    def build_stat_result(self, st):
+        result = tuple(st)
+        result += (0,) * (len(STAT_FIELDS) - len(result))
+        return result
+
+    def do_ll_os__ll_os_lstat(self, vpathname):
+        pathname = self.translate_path(vpathname)
+        st = os.lstat(pathname)
+        return self.build_stat_result(st)
+    do_ll_os__ll_os_lstat.resulttype = s_tuple_StatResult

Modified: pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/test/test_sandbox.py
==============================================================================
--- pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/test/test_sandbox.py	(original)
+++ pypy/branch/pypy-more-rtti-inprogress/translator/sandbox/test/test_sandbox.py	Fri Aug 17 16:59:47 2007
@@ -12,14 +12,7 @@
     msg = read_message(f, timeout=10.0)
     assert msg == args
     write_message(g, 0)
-    if resulttype is None:
-        write_message(g, result)
-    else:
-        # use the exact result type for encoding
-        from pypy.rlib.rmarshal import get_marshaller
-        buf = []
-        get_marshaller(resulttype)(buf, result)
-        g.write(''.join(buf))
+    write_message(g, result, resulttype)
 
 
 def test_sandbox_1():



More information about the Pypy-commit mailing list