[Jython-checkins] jython: ant regrtest will now run subprocess and network resource tests
jim.baker
jython-checkins at python.org
Thu Jan 8 23:17:49 CET 2015
https://hg.python.org/jython/rev/a7b3aa073620
changeset: 7520:a7b3aa073620
user: Jim Baker <jim.baker at rackspace.com>
date: Thu Jan 08 15:17:43 2015 -0700
summary:
ant regrtest will now run subprocess and network resource tests
Also minor fixes to test_glob, test_urllibnet, test_urllib2net, as
well as some missing functionality in socket and ssl support.
files:
Lib/_socket.py | 18 +-
Lib/ssl.py | 16 +-
Lib/test/test_glob.py | 187 ++++++++++++
Lib/test/test_imaplib.py | 242 ++++++++++++++++
Lib/test/test_robotparser.py | 261 ------------------
Lib/test/test_ssl.py | 2 +-
Lib/test/test_subprocess.py | 2 +-
Lib/test/test_urllib2net.py | 332 +++++++++++++++++++++++
Lib/test/test_urllibnet.py | 219 +++++++++++++++
build.xml | 4 +
10 files changed, 1014 insertions(+), 269 deletions(-)
diff --git a/Lib/_socket.py b/Lib/_socket.py
--- a/Lib/_socket.py
+++ b/Lib/_socket.py
@@ -24,6 +24,7 @@
from java.lang import Thread, IllegalStateException
from java.net import InetAddress, InetSocketAddress
from java.nio.channels import ClosedChannelException
+from java.security.cert import CertificateException
from java.util import NoSuchElementException
from java.util.concurrent import (
ArrayBlockingQueue, CopyOnWriteArrayList, CountDownLatch, LinkedBlockingQueue,
@@ -317,16 +318,23 @@
java.nio.channels.UnsupportedAddressTypeException : None,
SSLPeerUnverifiedException: lambda x: SSLError(SSL_ERROR_SSL, x.message),
- SSLException: lambda x: SSLError(SSL_ERROR_SSL, x.message),
}
def _map_exception(java_exception):
- mapped_exception = _exception_map.get(java_exception.__class__)
- if mapped_exception:
- py_exception = mapped_exception(java_exception)
+ if isinstance(java_exception, SSLException) or isinstance(java_exception, CertificateException):
+ cause = java_exception.cause
+ if cause:
+ msg = "%s (%s)" % (java_exception.message, cause)
+ else:
+ msg = java_exception.message
+ py_exception = SSLError(SSL_ERROR_SSL, msg)
else:
- py_exception = error(-1, 'Unmapped exception: %s' % java_exception)
+ mapped_exception = _exception_map.get(java_exception.__class__)
+ if mapped_exception:
+ py_exception = mapped_exception(java_exception)
+ else:
+ py_exception = error(-1, 'Unmapped exception: %s' % java_exception)
py_exception.java_exception = java_exception
return _add_exception_attrs(py_exception)
diff --git a/Lib/ssl.py b/Lib/ssl.py
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -26,7 +26,8 @@
SSL_ERROR_WANT_CONNECT,
SSL_ERROR_EOF,
SSL_ERROR_INVALID_ERROR_CODE,
- error as socket_error)
+ error as socket_error,
+ CLIENT_SOCKET, DATAGRAM_SOCKET)
from _sslcerts import _get_ssl_context
from java.text import SimpleDateFormat
@@ -158,12 +159,16 @@
def send(self, data):
return self.sock.send(data)
+ write = send
+
def sendall(self, data):
return self.sock.sendall(data)
def recv(self, bufsize, flags=0):
return self.sock.recv(bufsize, flags)
+ read = recv
+
def recvfrom(self, bufsize, flags=0):
return self.sock.recvfrom(bufsize, flags)
@@ -196,6 +201,14 @@
# Need to work with the real underlying socket as well
+ def pending(self):
+ # undocumented function, used by some tests
+ # see also http://bugs.python.org/issue21430
+ if self._sock.socket_type == CLIENT_SOCKET or self._sock.socket_type == DATAGRAM_SOCKET:
+ if self._sock.incoming_head is not None:
+ return self._sock.incoming_head.readableBytes()
+ return 0
+
def _readable(self):
return self._sock._readable()
@@ -260,6 +273,7 @@
# ssl_version - use SSLEngine.setEnabledProtocols(java.lang.String[])
# ciphers - SSLEngine.setEnabledCipherSuites(String[] suites)
+ at raises_java_exception
def wrap_socket(sock, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE,
ssl_version=None, ca_certs=None, do_handshake_on_connect=True,
suppress_ragged_eofs=True, ciphers=None):
diff --git a/Lib/test/test_glob.py b/Lib/test/test_glob.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_glob.py
@@ -0,0 +1,187 @@
+import glob
+import os
+import shutil
+import sys
+import unittest
+
+from test.test_support import run_unittest, TESTFN
+
+
+def fsdecode(s):
+ return unicode(s, sys.getfilesystemencoding())
+
+
+class GlobTests(unittest.TestCase):
+
+ def norm(self, *parts):
+ return os.path.normpath(os.path.join(self.tempdir, *parts))
+
+ def mktemp(self, *parts):
+ filename = self.norm(*parts)
+ base, file = os.path.split(filename)
+ if not os.path.exists(base):
+ os.makedirs(base)
+ f = open(filename, 'w')
+ f.close()
+
+ def setUp(self):
+ self.tempdir = TESTFN + "_dir"
+ self.mktemp('a', 'D')
+ self.mktemp('aab', 'F')
+ self.mktemp('.aa', 'G')
+ self.mktemp('.bb', 'H')
+ self.mktemp('aaa', 'zzzF')
+ self.mktemp('ZZZ')
+ self.mktemp('a', 'bcd', 'EF')
+ self.mktemp('a', 'bcd', 'efg', 'ha')
+ if hasattr(os, 'symlink'):
+ os.symlink(self.norm('broken'), self.norm('sym1'))
+ os.symlink('broken', self.norm('sym2'))
+ os.symlink(os.path.join('a', 'bcd'), self.norm('sym3'))
+
+ def tearDown(self):
+ shutil.rmtree(self.tempdir)
+
+ def glob(self, *parts):
+ if len(parts) == 1:
+ pattern = parts[0]
+ else:
+ pattern = os.path.join(*parts)
+ p = os.path.join(self.tempdir, pattern)
+ res = glob.glob(p)
+ self.assertEqual(list(glob.iglob(p)), res)
+ ures = [fsdecode(x) for x in res]
+ self.assertEqual(glob.glob(fsdecode(p)), ures)
+ self.assertEqual(list(glob.iglob(fsdecode(p))), ures)
+ return res
+
+ def assertSequencesEqual_noorder(self, l1, l2):
+ l1 = list(l1)
+ l2 = list(l2)
+ self.assertEqual(set(l1), set(l2))
+ self.assertEqual(sorted(l1), sorted(l2))
+
+ def test_glob_literal(self):
+ eq = self.assertSequencesEqual_noorder
+ eq(self.glob('a'), [self.norm('a')])
+ eq(self.glob('a', 'D'), [self.norm('a', 'D')])
+ eq(self.glob('aab'), [self.norm('aab')])
+ eq(self.glob('zymurgy'), [])
+
+ res = glob.glob('*')
+ # For a clean checkout, the next two assertions would never
+ # have failed, even with the change with Jython in
+ # https://hg.python.org/jython/rev/ea036792f304
+ #
+ # But for developers playing with things, we should not have
+ # it fail either
+ self.assertLessEqual({type(r) for r in res}, {str, unicode})
+ res = glob.glob(os.path.join(os.curdir, '*'))
+ self.assertLessEqual({type(r) for r in res}, {str, unicode})
+
+ # test return types are unicode, but only if os.listdir
+ # returns unicode filenames
+ tmp = os.listdir(fsdecode(os.curdir))
+ if {type(x) for x in tmp} == {unicode}:
+ res = glob.glob(u'*')
+ self.assertEqual({type(r) for r in res}, {unicode})
+ res = glob.glob(os.path.join(fsdecode(os.curdir), u'*'))
+ self.assertEqual({type(r) for r in res}, {unicode})
+
+ def test_glob_one_directory(self):
+ eq = self.assertSequencesEqual_noorder
+ eq(self.glob('a*'), map(self.norm, ['a', 'aab', 'aaa']))
+ eq(self.glob('*a'), map(self.norm, ['a', 'aaa']))
+ eq(self.glob('.*'), map(self.norm, ['.aa', '.bb']))
+ eq(self.glob('?aa'), map(self.norm, ['aaa']))
+ eq(self.glob('aa?'), map(self.norm, ['aaa', 'aab']))
+ eq(self.glob('aa[ab]'), map(self.norm, ['aaa', 'aab']))
+ eq(self.glob('*q'), [])
+
+ def test_glob_nested_directory(self):
+ eq = self.assertSequencesEqual_noorder
+ if os.path.normcase("abCD") == "abCD":
+ # case-sensitive filesystem
+ eq(self.glob('a', 'bcd', 'E*'), [self.norm('a', 'bcd', 'EF')])
+ else:
+ # case insensitive filesystem
+ eq(self.glob('a', 'bcd', 'E*'), [self.norm('a', 'bcd', 'EF'),
+ self.norm('a', 'bcd', 'efg')])
+ eq(self.glob('a', 'bcd', '*g'), [self.norm('a', 'bcd', 'efg')])
+
+ def test_glob_directory_names(self):
+ eq = self.assertSequencesEqual_noorder
+ eq(self.glob('*', 'D'), [self.norm('a', 'D')])
+ eq(self.glob('*', '*a'), [])
+ eq(self.glob('a', '*', '*', '*a'),
+ [self.norm('a', 'bcd', 'efg', 'ha')])
+ eq(self.glob('?a?', '*F'), [self.norm('aaa', 'zzzF'),
+ self.norm('aab', 'F')])
+
+ def test_glob_directory_with_trailing_slash(self):
+ # Patterns ending with a slash shouldn't match non-dirs
+ res = glob.glob(self.norm('Z*Z') + os.sep)
+ self.assertEqual(res, [])
+ res = glob.glob(self.norm('ZZZ') + os.sep)
+ self.assertEqual(res, [])
+ # When there is a wildcard pattern which ends with os.sep, glob()
+ # doesn't blow up.
+ res = glob.glob(self.norm('aa*') + os.sep)
+ self.assertEqual(len(res), 2)
+ # either of these results is reasonable
+ self.assertIn(set(res), [
+ {self.norm('aaa'), self.norm('aab')},
+ {self.norm('aaa') + os.sep, self.norm('aab') + os.sep},
+ ])
+
+ def test_glob_unicode_directory_with_trailing_slash(self):
+ # Same as test_glob_directory_with_trailing_slash, but with an
+ # unicode argument.
+ res = glob.glob(fsdecode(self.norm('Z*Z') + os.sep))
+ self.assertEqual(res, [])
+ res = glob.glob(fsdecode(self.norm('ZZZ') + os.sep))
+ self.assertEqual(res, [])
+ res = glob.glob(fsdecode(self.norm('aa*') + os.sep))
+ self.assertEqual(len(res), 2)
+ # either of these results is reasonable
+ self.assertIn(set(res), [
+ {fsdecode(self.norm('aaa')), fsdecode(self.norm('aab'))},
+ {fsdecode(self.norm('aaa') + os.sep),
+ fsdecode(self.norm('aab') + os.sep)},
+ ])
+
+ @unittest.skipUnless(hasattr(os, 'symlink'), "Requires symlink support")
+ def test_glob_symlinks(self):
+ eq = self.assertSequencesEqual_noorder
+ eq(self.glob('sym3'), [self.norm('sym3')])
+ eq(self.glob('sym3', '*'), [self.norm('sym3', 'EF'),
+ self.norm('sym3', 'efg')])
+ self.assertIn(self.glob('sym3' + os.sep),
+ [[self.norm('sym3')], [self.norm('sym3') + os.sep]])
+ eq(self.glob('*', '*F'),
+ [self.norm('aaa', 'zzzF'), self.norm('aab', 'F'),
+ self.norm('sym3', 'EF')])
+
+ @unittest.skipUnless(hasattr(os, 'symlink'), "Requires symlink support")
+ def test_glob_broken_symlinks(self):
+ eq = self.assertSequencesEqual_noorder
+ eq(self.glob('sym*'), [self.norm('sym1'), self.norm('sym2'),
+ self.norm('sym3')])
+ eq(self.glob('sym1'), [self.norm('sym1')])
+ eq(self.glob('sym2'), [self.norm('sym2')])
+
+ @unittest.skipUnless(sys.platform == "win32", "Win32 specific test")
+ def test_glob_magic_in_drive(self):
+ eq = self.assertSequencesEqual_noorder
+ eq(glob.glob('*:'), [])
+ eq(glob.glob(u'*:'), [])
+ eq(glob.glob('?:'), [])
+ eq(glob.glob(u'?:'), [])
+
+
+def test_main():
+ run_unittest(GlobTests)
+
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_imaplib.py
@@ -0,0 +1,242 @@
+from test import test_support as support
+# If we end up with a significant number of tests that don't require
+# threading, this test module should be split. Right now we skip
+# them all if we don't have threading.
+threading = support.import_module('threading')
+
+from contextlib import contextmanager
+import imaplib
+import os.path
+import SocketServer
+import time
+
+from test.test_support import reap_threads, verbose, transient_internet, is_jython
+import unittest
+
+try:
+ import ssl
+except ImportError:
+ ssl = None
+
+CERTFILE = None
+
+
+class TestImaplib(unittest.TestCase):
+
+ def test_that_Time2Internaldate_returns_a_result(self):
+ # We can check only that it successfully produces a result,
+ # not the correctness of the result itself, since the result
+ # depends on the timezone the machine is in.
+ timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
+ '"18-May-2033 05:33:20 +0200"']
+
+ for t in timevalues:
+ imaplib.Time2Internaldate(t)
+
+
+if ssl:
+
+ class SecureTCPServer(SocketServer.TCPServer):
+
+ def get_request(self):
+ newsocket, fromaddr = self.socket.accept()
+ connstream = ssl.wrap_socket(newsocket,
+ server_side=True,
+ ca_certs=CERTFILE,
+ certfile=CERTFILE)
+ return connstream, fromaddr
+
+ IMAP4_SSL = imaplib.IMAP4_SSL
+
+else:
+
+ class SecureTCPServer:
+ pass
+
+ IMAP4_SSL = None
+
+
+class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
+
+ timeout = 1
+
+ def _send(self, message):
+ if verbose: print "SENT:", message.strip()
+ self.wfile.write(message)
+
+ def handle(self):
+ # Send a welcome message.
+ self._send('* OK IMAP4rev1\r\n')
+ while 1:
+ # Gather up input until we receive a line terminator or we timeout.
+ # Accumulate read(1) because it's simpler to handle the differences
+ # between naked sockets and SSL sockets.
+ line = ''
+ while 1:
+ try:
+ part = self.rfile.read(1)
+ if part == '':
+ # Naked sockets return empty strings..
+ return
+ line += part
+ except IOError:
+ # ..but SSLSockets raise exceptions.
+ return
+ if line.endswith('\r\n'):
+ break
+
+ if verbose: print 'GOT:', line.strip()
+ splitline = line.split()
+ tag = splitline[0]
+ cmd = splitline[1]
+ args = splitline[2:]
+
+ if hasattr(self, 'cmd_%s' % (cmd,)):
+ getattr(self, 'cmd_%s' % (cmd,))(tag, args)
+ else:
+ self._send('%s BAD %s unknown\r\n' % (tag, cmd))
+
+ def cmd_CAPABILITY(self, tag, args):
+ self._send('* CAPABILITY IMAP4rev1\r\n')
+ self._send('%s OK CAPABILITY completed\r\n' % (tag,))
+
+
+class BaseThreadedNetworkedTests(unittest.TestCase):
+
+ def make_server(self, addr, hdlr):
+
+ class MyServer(self.server_class):
+ def handle_error(self, request, client_address):
+ self.close_request(request)
+ self.server_close()
+ raise
+
+ if verbose: print "creating server"
+ server = MyServer(addr, hdlr)
+ self.assertEqual(server.server_address, server.socket.getsockname())
+
+ if verbose:
+ print "server created"
+ print "ADDR =", addr
+ print "CLASS =", self.server_class
+ print "HDLR =", server.RequestHandlerClass
+
+ t = threading.Thread(
+ name='%s serving' % self.server_class,
+ target=server.serve_forever,
+ # Short poll interval to make the test finish quickly.
+ # Time between requests is short enough that we won't wake
+ # up spuriously too many times.
+ kwargs={'poll_interval':0.01})
+ t.daemon = True # In case this function raises.
+ t.start()
+ if verbose: print "server running"
+ return server, t
+
+ def reap_server(self, server, thread):
+ if verbose: print "waiting for server"
+ server.shutdown()
+ thread.join()
+ if verbose: print "done"
+
+ @contextmanager
+ def reaped_server(self, hdlr):
+ server, thread = self.make_server((support.HOST, 0), hdlr)
+ try:
+ yield server
+ finally:
+ self.reap_server(server, thread)
+
+ @reap_threads
+ def test_connect(self):
+ with self.reaped_server(SimpleIMAPHandler) as server:
+ client = self.imap_class(*server.server_address)
+ client.shutdown()
+
+ @reap_threads
+ def test_issue5949(self):
+
+ class EOFHandler(SocketServer.StreamRequestHandler):
+ def handle(self):
+ # EOF without sending a complete welcome message.
+ self.wfile.write('* OK')
+
+ with self.reaped_server(EOFHandler) as server:
+ self.assertRaises(imaplib.IMAP4.abort,
+ self.imap_class, *server.server_address)
+
+
+class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
+
+ server_class = SocketServer.TCPServer
+ imap_class = imaplib.IMAP4
+
+
+ at unittest.skipIf(is_jython, "imaplib does not support passing in ca_certs; verifiable certs are necessary on Jython")
+ at unittest.skipUnless(ssl, "SSL not available")
+class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
+
+ server_class = SecureTCPServer
+ imap_class = IMAP4_SSL
+
+
+class RemoteIMAPTest(unittest.TestCase):
+ host = 'cyrus.andrew.cmu.edu'
+ port = 143
+ username = 'anonymous'
+ password = 'pass'
+ imap_class = imaplib.IMAP4
+
+ def setUp(self):
+ with transient_internet(self.host):
+ self.server = self.imap_class(self.host, self.port)
+
+ def tearDown(self):
+ if self.server is not None:
+ self.server.logout()
+
+ def test_logincapa(self):
+ self.assertTrue('LOGINDISABLED' in self.server.capabilities)
+
+ def test_anonlogin(self):
+ self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
+ rs = self.server.login(self.username, self.password)
+ self.assertEqual(rs[0], 'OK')
+
+ def test_logout(self):
+ rs = self.server.logout()
+ self.server = None
+ self.assertEqual(rs[0], 'BYE')
+
+
+ at unittest.skipUnless(ssl, "SSL not available")
+class RemoteIMAP_SSLTest(RemoteIMAPTest):
+ port = 993
+ imap_class = IMAP4_SSL
+
+ def test_logincapa(self):
+ self.assertFalse('LOGINDISABLED' in self.server.capabilities)
+ self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
+
+
+def test_main():
+ tests = [TestImaplib]
+
+ if support.is_resource_enabled('network'):
+ if ssl:
+ global CERTFILE
+ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
+ "keycert.pem")
+ if not os.path.exists(CERTFILE):
+ raise support.TestFailed("Can't read certificate files!")
+ tests.extend([
+ ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
+ RemoteIMAPTest, RemoteIMAP_SSLTest,
+ ])
+
+ support.run_unittest(*tests)
+
+
+if __name__ == "__main__":
+ support.use_resources = ['network']
+ test_main()
diff --git a/Lib/test/test_robotparser.py b/Lib/test/test_robotparser.py
deleted file mode 100644
--- a/Lib/test/test_robotparser.py
+++ /dev/null
@@ -1,261 +0,0 @@
-import unittest, StringIO, robotparser
-from test import test_support
-
-class RobotTestCase(unittest.TestCase):
- def __init__(self, index, parser, url, good, agent):
- unittest.TestCase.__init__(self)
- if good:
- self.str = "RobotTest(%d, good, %s)" % (index, url)
- else:
- self.str = "RobotTest(%d, bad, %s)" % (index, url)
- self.parser = parser
- self.url = url
- self.good = good
- self.agent = agent
-
- def runTest(self):
- if isinstance(self.url, tuple):
- agent, url = self.url
- else:
- url = self.url
- agent = self.agent
- if self.good:
- self.assertTrue(self.parser.can_fetch(agent, url))
- else:
- self.assertFalse(self.parser.can_fetch(agent, url))
-
- def __str__(self):
- return self.str
-
-tests = unittest.TestSuite()
-
-def RobotTest(index, robots_txt, good_urls, bad_urls,
- agent="test_robotparser"):
-
- lines = StringIO.StringIO(robots_txt).readlines()
- parser = robotparser.RobotFileParser()
- parser.parse(lines)
- for url in good_urls:
- tests.addTest(RobotTestCase(index, parser, url, 1, agent))
- for url in bad_urls:
- tests.addTest(RobotTestCase(index, parser, url, 0, agent))
-
-# Examples from http://www.robotstxt.org/wc/norobots.html (fetched 2002)
-
-# 1.
-doc = """
-User-agent: *
-Disallow: /cyberworld/map/ # This is an infinite virtual URL space
-Disallow: /tmp/ # these will soon disappear
-Disallow: /foo.html
-"""
-
-good = ['/','/test.html']
-bad = ['/cyberworld/map/index.html','/tmp/xxx','/foo.html']
-
-RobotTest(1, doc, good, bad)
-
-# 2.
-doc = """
-# robots.txt for http://www.example.com/
-
-User-agent: *
-Disallow: /cyberworld/map/ # This is an infinite virtual URL space
-
-# Cybermapper knows where to go.
-User-agent: cybermapper
-Disallow:
-
-"""
-
-good = ['/','/test.html',('cybermapper','/cyberworld/map/index.html')]
-bad = ['/cyberworld/map/index.html']
-
-RobotTest(2, doc, good, bad)
-
-# 3.
-doc = """
-# go away
-User-agent: *
-Disallow: /
-"""
-
-good = []
-bad = ['/cyberworld/map/index.html','/','/tmp/']
-
-RobotTest(3, doc, good, bad)
-
-# Examples from http://www.robotstxt.org/wc/norobots-rfc.html (fetched 2002)
-
-# 4.
-doc = """
-User-agent: figtree
-Disallow: /tmp
-Disallow: /a%3cd.html
-Disallow: /a%2fb.html
-Disallow: /%7ejoe/index.html
-"""
-
-good = [] # XFAIL '/a/b.html'
-bad = ['/tmp','/tmp.html','/tmp/a.html',
- '/a%3cd.html','/a%3Cd.html','/a%2fb.html',
- '/~joe/index.html'
- ]
-
-RobotTest(4, doc, good, bad, 'figtree')
-RobotTest(5, doc, good, bad, 'FigTree Robot libwww-perl/5.04')
-
-# 6.
-doc = """
-User-agent: *
-Disallow: /tmp/
-Disallow: /a%3Cd.html
-Disallow: /a/b.html
-Disallow: /%7ejoe/index.html
-"""
-
-good = ['/tmp',] # XFAIL: '/a%2fb.html'
-bad = ['/tmp/','/tmp/a.html',
- '/a%3cd.html','/a%3Cd.html',"/a/b.html",
- '/%7Ejoe/index.html']
-
-RobotTest(6, doc, good, bad)
-
-# From bug report #523041
-
-# 7.
-doc = """
-User-Agent: *
-Disallow: /.
-"""
-
-good = ['/foo.html']
-bad = [] # Bug report says "/" should be denied, but that is not in the RFC
-
-RobotTest(7, doc, good, bad)
-
-# From Google: http://www.google.com/support/webmasters/bin/answer.py?hl=en&answer=40364
-
-# 8.
-doc = """
-User-agent: Googlebot
-Allow: /folder1/myfile.html
-Disallow: /folder1/
-"""
-
-good = ['/folder1/myfile.html']
-bad = ['/folder1/anotherfile.html']
-
-RobotTest(8, doc, good, bad, agent="Googlebot")
-
-# 9. This file is incorrect because "Googlebot" is a substring of
-# "Googlebot-Mobile", so test 10 works just like test 9.
-doc = """
-User-agent: Googlebot
-Disallow: /
-
-User-agent: Googlebot-Mobile
-Allow: /
-"""
-
-good = []
-bad = ['/something.jpg']
-
-RobotTest(9, doc, good, bad, agent="Googlebot")
-
-good = []
-bad = ['/something.jpg']
-
-RobotTest(10, doc, good, bad, agent="Googlebot-Mobile")
-
-# 11. Get the order correct.
-doc = """
-User-agent: Googlebot-Mobile
-Allow: /
-
-User-agent: Googlebot
-Disallow: /
-"""
-
-good = []
-bad = ['/something.jpg']
-
-RobotTest(11, doc, good, bad, agent="Googlebot")
-
-good = ['/something.jpg']
-bad = []
-
-RobotTest(12, doc, good, bad, agent="Googlebot-Mobile")
-
-
-# 13. Google also got the order wrong in #8. You need to specify the
-# URLs from more specific to more general.
-doc = """
-User-agent: Googlebot
-Allow: /folder1/myfile.html
-Disallow: /folder1/
-"""
-
-good = ['/folder1/myfile.html']
-bad = ['/folder1/anotherfile.html']
-
-RobotTest(13, doc, good, bad, agent="googlebot")
-
-
-# 14. For issue #6325 (query string support)
-doc = """
-User-agent: *
-Disallow: /some/path?name=value
-"""
-
-good = ['/some/path']
-bad = ['/some/path?name=value']
-
-RobotTest(14, doc, good, bad)
-
-# 15. For issue #4108 (obey first * entry)
-doc = """
-User-agent: *
-Disallow: /some/path
-
-User-agent: *
-Disallow: /another/path
-"""
-
-good = ['/another/path']
-bad = ['/some/path']
-
-RobotTest(15, doc, good, bad)
-
-
-class NetworkTestCase(unittest.TestCase):
-
- def testPasswordProtectedSite(self):
- test_support.requires('network')
- with test_support.transient_internet('mueblesmoraleda.com'):
- url = 'http://mueblesmoraleda.com'
- parser = robotparser.RobotFileParser()
- parser.set_url(url)
- try:
- parser.read()
- except IOError:
- self.skipTest('%s is unavailable' % url)
- self.assertEqual(parser.can_fetch("*", url+"/robots.txt"), False)
-
- def testPythonOrg(self):
- test_support.requires('network')
- with test_support.transient_internet('www.python.org'):
- parser = robotparser.RobotFileParser(
- "http://www.python.org/robots.txt")
- parser.read()
- self.assertTrue(
- parser.can_fetch("*", "http://www.python.org/robots.txt"))
-
-
-def test_main():
- test_support.run_unittest(tests)
- test_support.run_unittest(NetworkTestCase)
-
-if __name__=='__main__':
- test_support.verbose = 1
- test_main()
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -181,7 +181,6 @@
with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
s.connect(remote)
- @test_support.cpython_only
def test_refcycle(self):
# Issue #7943: an SSL object doesn't create reference cycles with
# itself.
@@ -189,6 +188,7 @@
ss = ssl.wrap_socket(s)
wr = weakref.ref(ss)
del ss
+ test_support.gc_collect() # Usual Jython requirement
self.assertEqual(wr(), None)
def test_wrapped_unconnected(self):
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -447,7 +447,7 @@
self.assertEqual(subprocess.list2cmdline(['ab', '']),
'ab ""')
self.assertEqual(subprocess.list2cmdline(['echo', 'foo|bar']),
- 'echo "foo|bar"')
+ 'echo foo|bar')
def test_poll(self):
diff --git a/Lib/test/test_urllib2net.py b/Lib/test/test_urllib2net.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_urllib2net.py
@@ -0,0 +1,332 @@
+#!/usr/bin/env python
+
+import unittest
+from test import test_support
+from test.test_urllib2 import sanepathname2url
+
+import socket
+import urllib2
+import os
+import sys
+
+TIMEOUT = 60 # seconds
+
+
+def _retry_thrice(func, exc, *args, **kwargs):
+ for i in range(3):
+ try:
+ return func(*args, **kwargs)
+ except exc, last_exc:
+ continue
+ except:
+ raise
+ raise last_exc
+
+def _wrap_with_retry_thrice(func, exc):
+ def wrapped(*args, **kwargs):
+ return _retry_thrice(func, exc, *args, **kwargs)
+ return wrapped
+
+# Connecting to remote hosts is flaky. Make it more robust by retrying
+# the connection several times.
+_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
+
+
+class AuthTests(unittest.TestCase):
+ """Tests urllib2 authentication features."""
+
+## Disabled at the moment since there is no page under python.org which
+## could be used to HTTP authentication.
+#
+# def test_basic_auth(self):
+# import httplib
+#
+# test_url = "http://www.python.org/test/test_urllib2/basic_auth"
+# test_hostport = "www.python.org"
+# test_realm = 'Test Realm'
+# test_user = 'test.test_urllib2net'
+# test_password = 'blah'
+#
+# # failure
+# try:
+# _urlopen_with_retry(test_url)
+# except urllib2.HTTPError, exc:
+# self.assertEqual(exc.code, 401)
+# else:
+# self.fail("urlopen() should have failed with 401")
+#
+# # success
+# auth_handler = urllib2.HTTPBasicAuthHandler()
+# auth_handler.add_password(test_realm, test_hostport,
+# test_user, test_password)
+# opener = urllib2.build_opener(auth_handler)
+# f = opener.open('http://localhost/')
+# response = _urlopen_with_retry("http://www.python.org/")
+#
+# # The 'userinfo' URL component is deprecated by RFC 3986 for security
+# # reasons, let's not implement it! (it's already implemented for proxy
+# # specification strings (that is, URLs or authorities specifying a
+# # proxy), so we must keep that)
+# self.assertRaises(httplib.InvalidURL,
+# urllib2.urlopen, "http://evil:thing@example.com")
+
+
+class CloseSocketTest(unittest.TestCase):
+
+ def test_close(self):
+ import httplib
+
+ # calling .close() on urllib2's response objects should close the
+ # underlying socket
+
+ # delve deep into response to fetch socket._socketobject
+ response = _urlopen_with_retry("http://www.python.org/")
+ abused_fileobject = response.fp
+ self.assertTrue(abused_fileobject.__class__ is socket._fileobject)
+ httpresponse = abused_fileobject._sock
+ self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse)
+ fileobject = httpresponse.fp
+ self.assertTrue(fileobject.__class__ is socket._fileobject)
+
+ self.assertTrue(not fileobject.closed)
+ response.close()
+ self.assertTrue(fileobject.closed)
+
+class OtherNetworkTests(unittest.TestCase):
+ def setUp(self):
+ if 0: # for debugging
+ import logging
+ logger = logging.getLogger("test_urllib2net")
+ logger.addHandler(logging.StreamHandler())
+
+ # XXX The rest of these tests aren't very good -- they don't check much.
+ # They do sometimes catch some major disasters, though.
+
+ def test_ftp(self):
+ urls = [
+ 'ftp://ftp.kernel.org/pub/linux/kernel/README',
+ 'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file',
+ #'ftp://ftp.kernel.org/pub/leenox/kernel/test',
+ 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
+ '/research-reports/00README-Legal-Rules-Regs',
+ ]
+ self._test_urls(urls, self._extra_handlers())
+
+ def test_file(self):
+ TESTFN = test_support.TESTFN
+ f = open(TESTFN, 'w')
+ try:
+ f.write('hi there\n')
+ f.close()
+ urls = [
+ 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
+ ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
+ ]
+ self._test_urls(urls, self._extra_handlers(), retry=True)
+ finally:
+ os.remove(TESTFN)
+
+ self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
+
+ # XXX Following test depends on machine configurations that are internal
+ # to CNRI. Need to set up a public server with the right authentication
+ # configuration for test purposes.
+
+## def test_cnri(self):
+## if socket.gethostname() == 'bitdiddle':
+## localhost = 'bitdiddle.cnri.reston.va.us'
+## elif socket.gethostname() == 'bitdiddle.concentric.net':
+## localhost = 'localhost'
+## else:
+## localhost = None
+## if localhost is not None:
+## urls = [
+## 'file://%s/etc/passwd' % localhost,
+## 'http://%s/simple/' % localhost,
+## 'http://%s/digest/' % localhost,
+## 'http://%s/not/found.h' % localhost,
+## ]
+
+## bauth = HTTPBasicAuthHandler()
+## bauth.add_password('basic_test_realm', localhost, 'jhylton',
+## 'password')
+## dauth = HTTPDigestAuthHandler()
+## dauth.add_password('digest_test_realm', localhost, 'jhylton',
+## 'password')
+
+## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
+
+ def test_urlwithfrag(self):
+ urlwith_frag = "https://docs.python.org/2/glossary.html#glossary"
+ with test_support.transient_internet(urlwith_frag):
+ req = urllib2.Request(urlwith_frag)
+ res = urllib2.urlopen(req)
+ self.assertEqual(res.geturl(),
+ "https://docs.python.org/2/glossary.html#glossary")
+
+ def test_fileno(self):
+ req = urllib2.Request("http://www.python.org")
+ opener = urllib2.build_opener()
+ res = opener.open(req)
+ try:
+ res.fileno()
+ except AttributeError:
+ self.fail("HTTPResponse object should return a valid fileno")
+ finally:
+ res.close()
+
+ def test_custom_headers(self):
+ url = "http://www.example.com"
+ with test_support.transient_internet(url):
+ opener = urllib2.build_opener()
+ request = urllib2.Request(url)
+ self.assertFalse(request.header_items())
+ opener.open(request)
+ self.assertTrue(request.header_items())
+ self.assertTrue(request.has_header('User-agent'))
+ request.add_header('User-Agent','Test-Agent')
+ opener.open(request)
+ self.assertEqual(request.get_header('User-agent'),'Test-Agent')
+
+ def test_sites_no_connection_close(self):
+ # Some sites do not send Connection: close header.
+ # Verify that those work properly. (#issue12576)
+
+ URL = 'http://www.imdb.com' # No Connection:close
+ with test_support.transient_internet(URL):
+ req = urllib2.urlopen(URL)
+ res = req.read()
+ self.assertTrue(res)
+
+ def _test_urls(self, urls, handlers, retry=True):
+ import time
+ import logging
+ debug = logging.getLogger("test_urllib2").debug
+
+ urlopen = urllib2.build_opener(*handlers).open
+ if retry:
+ urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
+
+ for url in urls:
+ if isinstance(url, tuple):
+ url, req, expected_err = url
+ else:
+ req = expected_err = None
+ with test_support.transient_internet(url):
+ debug(url)
+ try:
+ f = urlopen(url, req, TIMEOUT)
+ except EnvironmentError as err:
+ debug(err)
+ if expected_err:
+ msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
+ (expected_err, url, req, type(err), err))
+ self.assertIsInstance(err, expected_err, msg)
+ except urllib2.URLError as err:
+ if isinstance(err[0], socket.timeout):
+ print >>sys.stderr, "<timeout: %s>" % url
+ continue
+ else:
+ raise
+ else:
+ try:
+ with test_support.transient_internet(url):
+ buf = f.read()
+ debug("read %d bytes" % len(buf))
+ except socket.timeout:
+ print >>sys.stderr, "<timeout: %s>" % url
+ f.close()
+ debug("******** next url coming up...")
+ time.sleep(0.1)
+
+ def _extra_handlers(self):
+ handlers = []
+
+ cfh = urllib2.CacheFTPHandler()
+ self.addCleanup(cfh.clear_cache)
+ cfh.setTimeout(1)
+ handlers.append(cfh)
+
+ return handlers
+
+
+class TimeoutTest(unittest.TestCase):
+ def test_http_basic(self):
+ self.assertTrue(socket.getdefaulttimeout() is None)
+ url = "http://www.python.org"
+ with test_support.transient_internet(url, timeout=None):
+ u = _urlopen_with_retry(url)
+ self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
+
+ def test_http_default_timeout(self):
+ self.assertTrue(socket.getdefaulttimeout() is None)
+ url = "http://www.python.org"
+ with test_support.transient_internet(url):
+ socket.setdefaulttimeout(60)
+ try:
+ u = _urlopen_with_retry(url)
+ finally:
+ socket.setdefaulttimeout(None)
+ self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
+
+ def test_http_no_timeout(self):
+ self.assertTrue(socket.getdefaulttimeout() is None)
+ url = "http://www.python.org"
+ with test_support.transient_internet(url):
+ socket.setdefaulttimeout(60)
+ try:
+ u = _urlopen_with_retry(url, timeout=None)
+ finally:
+ socket.setdefaulttimeout(None)
+ self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
+
+ def test_http_timeout(self):
+ url = "http://www.python.org"
+ with test_support.transient_internet(url):
+ u = _urlopen_with_retry(url, timeout=120)
+ self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
+
+ FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
+
+ def test_ftp_basic(self):
+ self.assertTrue(socket.getdefaulttimeout() is None)
+ with test_support.transient_internet(self.FTP_HOST, timeout=None):
+ u = _urlopen_with_retry(self.FTP_HOST)
+ self.assertTrue(u.fp.fp._sock.gettimeout() is None)
+
+ def test_ftp_default_timeout(self):
+ self.assertTrue(socket.getdefaulttimeout() is None)
+ with test_support.transient_internet(self.FTP_HOST):
+ socket.setdefaulttimeout(60)
+ try:
+ u = _urlopen_with_retry(self.FTP_HOST)
+ finally:
+ socket.setdefaulttimeout(None)
+ self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
+
+ def test_ftp_no_timeout(self):
+ self.assertTrue(socket.getdefaulttimeout() is None)
+ with test_support.transient_internet(self.FTP_HOST):
+ socket.setdefaulttimeout(60)
+ try:
+ u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
+ finally:
+ socket.setdefaulttimeout(None)
+ self.assertTrue(u.fp.fp._sock.gettimeout() is None)
+
+ def test_ftp_timeout(self):
+ with test_support.transient_internet(self.FTP_HOST):
+ u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
+ self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
+
+
+def test_main():
+ test_support.requires("network")
+ test_support.run_unittest(AuthTests,
+ OtherNetworkTests,
+ CloseSocketTest,
+ TimeoutTest,
+ )
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_urllibnet.py b/Lib/test/test_urllibnet.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_urllibnet.py
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+
+import unittest
+from test import test_support
+
+import socket
+import urllib
+import sys
+import os
+import time
+
+mimetools = test_support.import_module("mimetools", deprecated=True)
+
+
+def _open_with_retry(func, host, *args, **kwargs):
+ # Connecting to remote hosts is flaky. Make it more robust
+ # by retrying the connection several times.
+ for i in range(3):
+ try:
+ return func(host, *args, **kwargs)
+ except IOError, last_exc:
+ continue
+ except:
+ raise
+ raise last_exc
+
+
+class URLTimeoutTest(unittest.TestCase):
+
+ TIMEOUT = 10.0
+
+ def setUp(self):
+ socket.setdefaulttimeout(self.TIMEOUT)
+
+ def tearDown(self):
+ socket.setdefaulttimeout(None)
+
+ def testURLread(self):
+ f = _open_with_retry(urllib.urlopen, "http://www.python.org/")
+ x = f.read()
+
+class urlopenNetworkTests(unittest.TestCase):
+ """Tests urllib.urlopen using the network.
+
+ These tests are not exhaustive. Assuming that testing using files does a
+ good job overall of some of the basic interface features. There are no
+ tests exercising the optional 'data' and 'proxies' arguments. No tests
+ for transparent redirection have been written.
+
+ setUp is not used for always constructing a connection to
+ http://www.python.org/ since there a few tests that don't use that address
+ and making a connection is expensive enough to warrant minimizing unneeded
+ connections.
+
+ """
+
+ def urlopen(self, *args):
+ return _open_with_retry(urllib.urlopen, *args)
+
+ def test_basic(self):
+ # Simple test expected to pass.
+ open_url = self.urlopen("http://www.python.org/")
+ for attr in ("read", "readline", "readlines", "fileno", "close",
+ "info", "geturl"):
+ self.assertTrue(hasattr(open_url, attr), "object returned from "
+ "urlopen lacks the %s attribute" % attr)
+ try:
+ self.assertTrue(open_url.read(), "calling 'read' failed")
+ finally:
+ open_url.close()
+
+ def test_readlines(self):
+ # Test both readline and readlines.
+ open_url = self.urlopen("http://www.python.org/")
+ try:
+ self.assertIsInstance(open_url.readline(), basestring,
+ "readline did not return a string")
+ self.assertIsInstance(open_url.readlines(), list,
+ "readlines did not return a list")
+ finally:
+ open_url.close()
+
+ def test_info(self):
+ # Test 'info'.
+ open_url = self.urlopen("http://www.python.org/")
+ try:
+ info_obj = open_url.info()
+ finally:
+ open_url.close()
+ self.assertIsInstance(info_obj, mimetools.Message,
+ "object returned by 'info' is not an "
+ "instance of mimetools.Message")
+ self.assertEqual(info_obj.getsubtype(), "html")
+
+ def test_geturl(self):
+ # Make sure same URL as opened is returned by geturl.
+ #
+ # This test has been changed from what's currently in our
+ # lib-python/2.7 for Jython due to recent updates at the
+ # python.org to use https; other tests can take advantate of
+ # URL redirection
+ URL = "https://www.python.org/"
+ open_url = self.urlopen(URL)
+ try:
+ gotten_url = open_url.geturl()
+ finally:
+ open_url.close()
+ self.assertEqual(gotten_url, URL)
+
+ def test_getcode(self):
+ # test getcode() with the fancy opener to get 404 error codes
+ URL = "http://www.python.org/XXXinvalidXXX"
+ open_url = urllib.FancyURLopener().open(URL)
+ try:
+ code = open_url.getcode()
+ finally:
+ open_url.close()
+ self.assertEqual(code, 404)
+
+ @unittest.skipIf(test_support.is_jython, "Sockets cannot be used as file descriptors")
+ def test_fileno(self):
+ if (sys.platform in ('win32',) or
+ not hasattr(os, 'fdopen')):
+ # On Windows, socket handles are not file descriptors; this
+ # test can't pass on Windows.
+ return
+ # Make sure fd returned by fileno is valid.
+ open_url = self.urlopen("http://www.python.org/")
+ fd = open_url.fileno()
+ FILE = os.fdopen(fd)
+ try:
+ self.assertTrue(FILE.read(), "reading from file created using fd "
+ "returned by fileno failed")
+ finally:
+ FILE.close()
+
+ def test_bad_address(self):
+ # Make sure proper exception is raised when connecting to a bogus
+ # address.
+ bogus_domain = "sadflkjsasf.i.nvali.d"
+ try:
+ socket.gethostbyname(bogus_domain)
+ except socket.gaierror:
+ pass
+ else:
+ # This happens with some overzealous DNS providers such as OpenDNS
+ self.skipTest("%r should not resolve for test to work" % bogus_domain)
+ self.assertRaises(IOError,
+ # SF patch 809915: In Sep 2003, VeriSign started
+ # highjacking invalid .com and .net addresses to
+ # boost traffic to their own site. This test
+ # started failing then. One hopes the .invalid
+ # domain will be spared to serve its defined
+ # purpose.
+ # urllib.urlopen, "http://www.sadflkjsasadf.com/")
+ urllib.urlopen, "http://sadflkjsasf.i.nvali.d/")
+
+class urlretrieveNetworkTests(unittest.TestCase):
+ """Tests urllib.urlretrieve using the network."""
+
+ def urlretrieve(self, *args):
+ return _open_with_retry(urllib.urlretrieve, *args)
+
+ def test_basic(self):
+ # Test basic functionality.
+ file_location,info = self.urlretrieve("http://www.python.org/")
+ self.assertTrue(os.path.exists(file_location), "file location returned by"
+ " urlretrieve is not a valid path")
+ FILE = file(file_location)
+ try:
+ self.assertTrue(FILE.read(), "reading from the file location returned"
+ " by urlretrieve failed")
+ finally:
+ FILE.close()
+ os.unlink(file_location)
+
+ def test_specified_path(self):
+ # Make sure that specifying the location of the file to write to works.
+ file_location,info = self.urlretrieve("http://www.python.org/",
+ test_support.TESTFN)
+ self.assertEqual(file_location, test_support.TESTFN)
+ self.assertTrue(os.path.exists(file_location))
+ FILE = file(file_location)
+ try:
+ self.assertTrue(FILE.read(), "reading from temporary file failed")
+ finally:
+ FILE.close()
+ os.unlink(file_location)
+
+ def test_header(self):
+ # Make sure header returned as 2nd value from urlretrieve is good.
+ file_location, header = self.urlretrieve("http://www.python.org/")
+ os.unlink(file_location)
+ self.assertIsInstance(header, mimetools.Message,
+ "header is not an instance of mimetools.Message")
+
+ def test_data_header(self):
+ logo = "http://www.python.org/community/logos/python-logo-master-v3-TM.png"
+ file_location, fileheaders = self.urlretrieve(logo)
+ os.unlink(file_location)
+ datevalue = fileheaders.getheader('Date')
+ dateformat = '%a, %d %b %Y %H:%M:%S GMT'
+ try:
+ time.strptime(datevalue, dateformat)
+ except ValueError:
+ self.fail('Date value not in %r format', dateformat)
+
+
+
+def test_main():
+ test_support.requires('network')
+ with test_support.check_py3k_warnings(
+ ("urllib.urlopen.. has been removed", DeprecationWarning)):
+ test_support.run_unittest(URLTimeoutTest,
+ urlopenNetworkTests,
+ urlretrieveNetworkTests)
+
+if __name__ == "__main__":
+ test_main()
diff --git a/build.xml b/build.xml
--- a/build.xml
+++ b/build.xml
@@ -1032,6 +1032,8 @@
<arg value="--expected"/>
<arg value="-j"/>
<arg value="${junit.reports}"/>
+ <arg value="--use"/>
+ <arg value="network,subprocess"/>
</exec>
</target>
<target name="regrtest-windows" if="os.family.windows">
@@ -1041,6 +1043,8 @@
<arg value="--expected"/>
<arg value="-j"/>
<arg value="${junit.reports}"/>
+ <arg value="--use"/>
+ <arg value="network,subprocess"/>
</exec>
</target>
--
Repository URL: https://hg.python.org/jython
More information about the Jython-checkins
mailing list