[pypy-svn] r78685 - in pypy/branch/fast-forward/pypy/module: _warnings _warnings/test sys

afa at codespeak.net afa at codespeak.net
Thu Nov 4 01:09:26 CET 2010


Author: afa
Date: Thu Nov  4 01:09:23 2010
New Revision: 78685

Added:
   pypy/branch/fast-forward/pypy/module/_warnings/   (props changed)
   pypy/branch/fast-forward/pypy/module/_warnings/__init__.py   (contents, props changed)
   pypy/branch/fast-forward/pypy/module/_warnings/interp_warnings.py   (contents, props changed)
   pypy/branch/fast-forward/pypy/module/_warnings/test/   (props changed)
   pypy/branch/fast-forward/pypy/module/_warnings/test/test_warnings.py   (contents, props changed)
Modified:
   pypy/branch/fast-forward/pypy/module/sys/__init__.py
Log:
Start implementing the _warnings module


Added: pypy/branch/fast-forward/pypy/module/_warnings/__init__.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/pypy/module/_warnings/__init__.py	Thu Nov  4 01:09:23 2010
@@ -0,0 +1,21 @@
+from pypy.interpreter.mixedmodule import MixedModule
+
+class Module(MixedModule):
+    """provides basic warning filtering support.
+    It is a helper module to speed up interpreter start-up."""
+
+    interpleveldefs = {
+        'warn'         : 'interp_warnings.warn',
+        'warn_explicit': 'interp_warnings.warn_explicit',
+    }
+
+    appleveldefs = {
+    }
+
+    def setup_after_space_initialization(self):
+        from pypy.module._warnings.interp_warnings import State
+        state = self.space.fromcache(State)
+        self.setdictvalue(self.space, "filters", state.w_filters)
+        self.setdictvalue(self.space, "once_registry", state.w_once_registry)
+        self.setdictvalue(self.space, "default_action", state.w_default_action)
+

Added: pypy/branch/fast-forward/pypy/module/_warnings/interp_warnings.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/pypy/module/_warnings/interp_warnings.py	Thu Nov  4 01:09:23 2010
@@ -0,0 +1,341 @@
+from pypy.interpreter.gateway import unwrap_spec
+from pypy.interpreter.baseobjspace import W_Root, ObjSpace
+from pypy.interpreter.error import OperationError
+
+def create_filter(space, w_category, action):
+    return space.newtuple([
+        space.wrap(action), space.w_None, w_category,
+        space.w_None, space.wrap(0)])
+
+class State:
+    def __init__(self, space):
+        self.init_filters(space)
+        self.w_once_registry = space.newdict()
+        self.w_default_action = space.wrap("default")
+
+    def init_filters(self, space):
+        filters_w = []
+
+        if (not space.sys.get_flag('py3k_warning') and
+            not space.sys.get_flag('division_warning')):
+            filters_w.append(create_filter(
+                space, space.w_DeprecationWarning, "ignore"))
+
+        filters_w.append(create_filter(
+            space, space.w_PendingDeprecationWarning, "ignore"))
+        filters_w.append(create_filter(
+            space, space.w_ImportWarning, "ignore"))
+
+        bytes_warning = space.sys.get_flag('bytes_warning')
+        if bytes_warning > 1:
+            action = "error"
+        elif bytes_warning == 0:
+            action = "ignore"
+        else:
+            action = "default"
+        filters_w.append(create_filter(
+            space, space.w_BytesWarning, action))
+
+        self.w_filters = space.newlist(filters_w)
+
+def get_warnings_attr(space, name):
+    try:
+        w_module = space.getitem(space.sys.get('modules'),
+                                 space.wrap('warnings'))
+    except OperationError, e:
+        if not e.match(space, space.w_KeyError):
+            raise
+        return None
+
+    try:
+        return space.getattr(w_module, space.wrap(name))
+    except OperationError, e:
+        if not e.match(space, space.w_AttributeError):
+            raise
+    return None
+
+def get_category(space, w_message, w_category):
+    # Get category
+    if space.isinstance_w(w_message, space.w_Warning):
+        w_category = space.type(w_message)
+    elif space.is_w(w_category, space.w_None):
+        w_category = space.w_UserWarning
+
+    # Validate category
+    if not space.abstract_issubclass_w(w_category, space.w_Warning):
+        raise OperationError(space.w_ValueError, space.wrap(
+            "category is not a subclass of Warning"))
+
+    return w_category
+
+def setup_context(space, stacklevel):
+    # Setup globals and lineno
+    ec = space.getexecutioncontext()
+    frame = ec.gettopframe_nohidden()
+    while frame and stacklevel:
+        frame = ec.getnextframe_nohidden(frame)
+        stacklevel -= 1
+    if frame:
+        w_globals = frame.w_globals
+        lineno = frame.f_lineno
+    else:
+        w_globals = space.sys.w_dict
+        lineno = 1
+
+    # setup registry
+    try:
+        w_registry = space.getitem(w_globals, space.wrap("__warningregistry__"))
+    except OperationError, e:
+        if not e.match(space, space.w_KeyError):
+            raise
+        w_registry = space.newdict()
+        space.setitem(w_globals, space.wrap("__warningregistry__"), w_registry)
+
+    # setup module
+    try:
+        w_module = space.getitem(w_globals, space.wrap("__name__"))
+    except OperationError, e:
+        if not e.match(space, space.w_KeyError):
+            raise
+        w_module = space.wrap("<string>")
+
+    # setup filename
+    try:
+        w_filename = space.getitem(w_globals, space.wrap("__file__"))
+    except OperationError, e:
+        if not e.match(space, space.w_KeyError):
+            raise
+        if space.str_w(w_module) == '__main__':
+            w_argv = space.sys.get('argv')
+            if space.int_w(space.len(w_argv)) > 0:
+                w_filename = space.getitem(w_argv, space.wrap(0))
+                if not space.is_true(w_filename):
+                    w_filename = space.wrap('__main__')
+            else:
+                # embedded interpreters don't have sys.argv
+                w_filename = space.wrap('__main__')
+        else:
+            w_filename = w_module
+    else:
+        # if filename.lower().endswith((".pyc", ".pyo"))
+        if space.is_true(space.call_method(
+            w_filename, "endswith",
+            space.newtuple([space.wrap(".pyc"), space.wrap(".pyo")]))):
+            # strip last character
+            w_filename = space.wrap(space.str_w(w_filename)[:-1])
+
+    return (w_filename, lineno, w_module, w_registry)
+
+def check_matched(space, w_obj, w_arg):
+    if space.is_w(w_obj, space.w_None):
+        return True
+    return space.is_true(space.call_method(w_obj, "match", w_arg))
+
+def get_filter(space, w_category, w_text, lineno, w_module):
+    w_filters = get_warnings_attr(space, "filters")
+    if w_filters:
+        space.fromcache(State).w_filters = w_filters
+    else:
+        w_filters = space.fromcache(State).w_filters
+
+    # filters could change while we are iterating over it
+    for w_item in space.fixedview(w_filters):
+        w_action, w_msg, w_cat, w_mod, w_lineno = space.fixedview(
+            w_item, 5)
+        ln = space.int_w(w_lineno)
+
+        if (check_matched(space, w_msg, w_text) and
+            check_matched(space, w_mod, w_module) and
+            space.abstract_issubclass_w(w_category, w_cat) and
+            (ln == 0 or ln == lineno)):
+            return space.str_w(w_action), w_item
+
+    action = get_default_action(space)
+    if not action:
+        raise OperationError(space.w_ValueError, space.wrap(
+            "warnings.defaultaction not found"))
+    return action, None
+
+def get_default_action(space):
+    w_action = get_warnings_attr(space, "default_action");
+    if w_action is None:
+        return space.str_w(space.fromcache(State).w_default_action)
+
+    space.fromcache(State).w_default_action = w_action
+    return space.str_w(w_action)
+
+def get_once_registry(space):
+    w_registry = get_warnings_attr(space, "onceregistry");
+    if w_registry is None:
+        return space.fromcache(State).w_once_registry
+
+    space.fromcache(State).w_once_registry = w_registry
+    return w_registry
+
+def update_registry(space, w_registry, w_text, w_category):
+    w_key = space.newtuple([w_text, w_category])
+    return already_warned(space, w_registry, w_altkey, should_set=True)
+
+def already_warned(space, w_registry, w_key, should_set=False):
+    try:
+        w_warned = space.getitem(w_registry, w_key)
+    except OperationError, e:
+        if not e.match(space, space.w_KeyError):
+            raise
+        if should_set:
+            space.setitem(w_registry, w_key, space.w_True)
+        return False
+    else:
+        return space.is_true(w_warned)
+
+def normalize_module(space, w_filename):
+    if not space.is_true(w_filename):
+        return space.wrap("<unknown>")
+
+    filename = space.str_w(w_filename)
+    length = len(filename)
+    if filename.endswith(".py"):
+        filename = filename[:-3]
+    return space.wrap(filename)
+
+def show_warning(space, w_filename, lineno, w_text, w_category,
+                 w_sourceline=None):
+    w_name = space.getattr(w_category, space.wrap("__name__"))
+    w_stderr = space.sys.get("stderr")
+
+    # Print "filename:lineno: category: text\n"
+    message = "%s:%d: %s: %s\n" % (space.str_w(w_filename), lineno,
+                                   space.str_w(w_name), space.str_w(w_text))
+    space.call_method(w_stderr, "write", space.wrap(message))
+
+    # Print "  source_line\n"
+    if w_sourceline:
+        line = space.str_w(w_sourceline)
+        for i in range(len(line)):
+            c = line[i]
+            if c not in ' \t\014':
+                break
+        message = "  %s\n" % (line[i:],)
+        space.call_method(w_stderr, "write", space.wrap(message))
+
+def do_warn(space, w_message, w_category, stacklevel):
+    context_w = setup_context(space, stacklevel)
+    do_warn_explicit(space, w_category, w_message, context_w)
+
+def do_warn_explicit(space, w_category, w_message, context_w,
+                     w_sourceline=None):
+    w_filename, lineno, w_module, w_registry = context_w
+
+    # normalize module
+    if space.is_w(w_module, space.w_None):
+        w_module = normalize_module(space, w_filename)
+
+    # normalize message
+    if space.isinstance_w(w_message, space.w_Warning):
+        w_text = space.str(w_message)
+        w_category = space.type(w_message)
+    else:
+        w_text = w_message
+        w_message = space.call_function(w_category, w_message)
+
+    w_lineno = space.wrap(lineno)
+
+    # create key
+    w_key = space.newtuple([w_text, w_category, w_lineno])
+
+    if not space.is_w(w_registry, space.w_None):
+        if already_warned(space, w_registry, w_key):
+            return
+        # else this warning hasn't been generated before
+
+    action, w_item = get_filter(space, w_category, w_text, lineno, w_module)
+
+    if action == "error":
+        raise OperationError(w_category, w_message)
+
+    # Store in the registry that we've been here, *except* when the action is
+    # "always".
+    warned = False
+    if action != 'always':
+        if not space.is_w(w_registry, space.w_None):
+            space.setitem(w_registry, w_key, space.w_True)
+        if action == 'ignore':
+            return
+        elif action == 'once':
+            if space.is_w(w_registry, space.w_None):
+                w_registry = get_once_registry(space)
+            warned = update_registry(space, w_registry, w_text, w_category)
+        elif action == 'module':
+            if not space.is_w(w_registry, space.w_None):
+                warned = update_registry(space, w_registry, w_text, w_category)
+        elif action != 'default':
+            try:
+                err = space.str_w(space.str(w_item))
+            except OperationError:
+                err = "???"
+            raise OperationError(space.w_RuntimeError, space.wrap(
+                "Unrecognized action (%s) in warnings.filters:\n %s" %
+                (action, err)))
+
+    if warned:
+        # Already warned for this module
+        return
+    w_show_fxn = get_warnings_attr(space, "showwarning")
+    if w_show_fxn is None:
+        show_warning(space, w_filename, lineno, w_text, w_category,
+                     w_sourceline)
+    else:
+        space.call_function(
+            w_show_fxn, w_message, w_category, w_filename, w_lineno)
+
+ at unwrap_spec(ObjSpace, W_Root, W_Root, int)
+def warn(space, w_message, w_category=None, stacklevel=1):
+    w_category = get_category(space, w_message, w_category);
+    do_warn(space, w_message, w_category, stacklevel)
+
+
+def warn_with_loader(space, w_message, w_category, w_filename, lineno,
+                     w_module, w_registry, w_globals):
+    # Check/get the requisite pieces needed for the loader.
+    try:
+        w_loader = space.getitem(w_globals, space.wrap("__loader__"))
+        w_module_name = space.getitem(w_globals, space.wrap("__name__"))
+    except OperationError, e:
+        if not e.match(space, space.w_KeyError):
+            raise
+        return # perform standard call
+
+    # Make sure the loader implements the optional get_source() method.
+    try:
+        w_get_source = space.getattr(w_loader, space.wrap("get_source"))
+    except OperationError, e:
+        if not e.match(space, space.w_AttributeError):
+            raise
+        return # perform standard call
+
+    # Call get_source() to get the source code.
+    w_source = space.call_function(w_get_source, w_module_name)
+    if space.is_w(w_source, space.w_None):
+        return # perform standard call
+    # Split the source into lines.
+    w_source_list = space.call_method(w_source, "splitlines")
+    # Get the source line.
+    w_source_line = space.getitem(w_source_list, space.wrap(lineno - 1))
+
+    # Handle the warning.
+    do_warn_explicit(space, w_category, w_message,
+                     (w_filename, lineno, w_module, w_registry),
+                     w_source_line)
+    return True
+
+ at unwrap_spec(ObjSpace, W_Root, W_Root, W_Root, int, W_Root, W_Root, W_Root)
+def warn_explicit(space, w_message, w_category, w_filename, lineno,
+                  w_module=None, w_registry=None, w_module_globals=None):
+
+    if not space.is_w(w_module_globals, space.w_None):
+        if warn_with_loader(space, w_message, w_category, w_filename, lineno,
+                            w_module, w_registry, w_module_globals):
+            return
+
+    do_warn_explicit(space, w_category, w_message,
+                     (w_filename, lineno, w_module, w_registry))

Added: pypy/branch/fast-forward/pypy/module/_warnings/test/test_warnings.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/pypy/module/_warnings/test/test_warnings.py	Thu Nov  4 01:09:23 2010
@@ -0,0 +1,27 @@
+import py
+import sys
+from pypy.conftest import gettestobjspace
+
+class AppTestWarnings:
+    def setup_class(cls):
+        space = gettestobjspace(usemodules=('_warnings',))
+        cls.space = space
+
+    def test_defaults(self):
+        import _warnings
+        assert _warnings.once_registry == {}
+        assert _warnings.default_action == 'default'
+        assert "PendingDeprecationWarning" in str(_warnings.filters)
+
+    def test_warn(self):
+        import _warnings
+        _warnings.warn("some message", DeprecationWarning)
+        _warnings.warn("some message", Warning)
+
+    def test_warn_explicit(self):
+        import _warnings
+        _warnings.warn_explicit("some message", DeprecationWarning,
+                                "<string>", 1, module_globals=globals())
+        _warnings.warn_explicit("some message", Warning,
+                                "<string>", 1, module_globals=globals())
+

Modified: pypy/branch/fast-forward/pypy/module/sys/__init__.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/sys/__init__.py	(original)
+++ pypy/branch/fast-forward/pypy/module/sys/__init__.py	Thu Nov  4 01:09:23 2010
@@ -156,3 +156,7 @@
         else:
             from pypy.module.sys.interp_encoding import get_w_default_encoder
             return get_w_default_encoder(self.space)
+
+    def get_flag(self, name):
+        space = self.space
+        return space.int_w(space.getattr(self.get('flags'), space.wrap(name)))



More information about the Pypy-commit mailing list