[Python-checkins] r63446 - in python/trunk/Lib: BaseHTTPServer.py test/test_httpservers.py

georg.brandl python-checkins at python.org
Sun May 18 11:12:21 CEST 2008


Author: georg.brandl
Date: Sun May 18 11:12:20 2008
New Revision: 63446

Log:
GHOP #134, #171, #137: unit tests for the three HTTPServer modules.


Added:
   python/trunk/Lib/test/test_httpservers.py
Modified:
   python/trunk/Lib/BaseHTTPServer.py

Modified: python/trunk/Lib/BaseHTTPServer.py
==============================================================================
--- python/trunk/Lib/BaseHTTPServer.py	(original)
+++ python/trunk/Lib/BaseHTTPServer.py	Sun May 18 11:12:20 2008
@@ -218,6 +218,12 @@
     # where each string is of the form name[/version].
     server_version = "BaseHTTP/" + __version__
 
+    # The default request version.  This only affects responses up until
+    # the point where the request line is parsed, so it mainly decides what
+    # the client gets back when sending a malformed request line.
+    # Most web servers default to HTTP 0.9, i.e. don't send a status line.
+    default_request_version = "HTTP/0.9"
+
     def parse_request(self):
         """Parse a request (internal).
 
@@ -230,7 +236,7 @@
 
         """
         self.command = None  # set in case of error on the first line
-        self.request_version = version = "HTTP/0.9" # Default
+        self.request_version = version = self.default_request_version
         self.close_connection = 1
         requestline = self.raw_requestline
         if requestline[-2:] == '\r\n':

Added: python/trunk/Lib/test/test_httpservers.py
==============================================================================
--- (empty file)
+++ python/trunk/Lib/test/test_httpservers.py	Sun May 18 11:12:20 2008
@@ -0,0 +1,351 @@
+"""Unittests for the various HTTPServer modules.
+
+Written by Cody A.W. Somerville <cody-somerville at ubuntu.com>,
+Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
+"""
+
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from SimpleHTTPServer import SimpleHTTPRequestHandler
+from CGIHTTPServer import CGIHTTPRequestHandler
+
+import os
+import sys
+import base64
+import shutil
+import urllib
+import httplib
+import tempfile
+import threading
+
+import unittest
+from test import test_support
+
+
+class NoLogRequestHandler:
+    def log_message(self, *args):
+        # don't write log messages to stderr
+        pass
+
+
+class TestServerThread(threading.Thread):
+    def __init__(self, test_object, request_handler):
+        threading.Thread.__init__(self)
+        self.request_handler = request_handler
+        self.test_object = test_object
+        self.test_object.lock.acquire()
+
+    def run(self):
+        self.server = HTTPServer(('', 0), self.request_handler)
+        self.test_object.PORT = self.server.socket.getsockname()[1]
+        self.test_object.lock.release()
+        try:
+            self.server.serve_forever()
+        finally:
+            self.server.server_close()
+
+    def stop(self):
+        self.server.shutdown()
+
+
+class BaseTestCase(unittest.TestCase):
+    def setUp(self):
+        self.lock = threading.Lock()
+        self.thread = TestServerThread(self, self.request_handler)
+        self.thread.start()
+        self.lock.acquire()
+
+    def tearDown(self):
+        self.lock.release()
+        self.thread.stop()
+
+    def request(self, uri, method='GET', body=None, headers={}):
+        self.connection = httplib.HTTPConnection('localhost', self.PORT)
+        self.connection.request(method, uri, body, headers)
+        return self.connection.getresponse()
+
+
+class BaseHTTPServerTestCase(BaseTestCase):
+    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
+        protocol_version = 'HTTP/1.1'
+        default_request_version = 'HTTP/1.1'
+
+        def do_TEST(self):
+            self.send_response(204)
+            self.send_header('Content-Type', 'text/html')
+            self.send_header('Connection', 'close')
+            self.end_headers()
+
+        def do_KEEP(self):
+            self.send_response(204)
+            self.send_header('Content-Type', 'text/html')
+            self.send_header('Connection', 'keep-alive')
+            self.end_headers()
+
+        def do_KEYERROR(self):
+            self.send_error(999)
+
+        def do_CUSTOM(self):
+            self.send_response(999)
+            self.send_header('Content-Type', 'text/html')
+            self.send_header('Connection', 'close')
+            self.end_headers()
+
+    def setUp(self):
+        BaseTestCase.setUp(self)
+        self.con = httplib.HTTPConnection('localhost', self.PORT)
+        self.con.connect()
+
+    def test_command(self):
+        self.con.request('GET', '/')
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 501)
+
+    def test_request_line_trimming(self):
+        self.con._http_vsn_str = 'HTTP/1.1\n'
+        self.con.putrequest('GET', '/')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 501)
+
+    def test_version_bogus(self):
+        self.con._http_vsn_str = 'FUBAR'
+        self.con.putrequest('GET', '/')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 400)
+
+    def test_version_digits(self):
+        self.con._http_vsn_str = 'HTTP/9.9.9'
+        self.con.putrequest('GET', '/')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 400)
+
+    def test_version_none_get(self):
+        self.con._http_vsn_str = ''
+        self.con.putrequest('GET', '/')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 501)
+
+    def test_version_none(self):
+        self.con._http_vsn_str = ''
+        self.con.putrequest('PUT', '/')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 400)
+
+    def test_version_invalid(self):
+        self.con._http_vsn = 99
+        self.con._http_vsn_str = 'HTTP/9.9'
+        self.con.putrequest('GET', '/')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 505)
+
+    def test_send_blank(self):
+        self.con._http_vsn_str = ''
+        self.con.putrequest('', '')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 400)
+
+    def test_header_close(self):
+        self.con.putrequest('GET', '/')
+        self.con.putheader('Connection', 'close')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 501)
+
+    def test_head_keep_alive(self):
+        self.con._http_vsn_str = 'HTTP/1.1'
+        self.con.putrequest('GET', '/')
+        self.con.putheader('Connection', 'keep-alive')
+        self.con.endheaders()
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 501)
+
+    def test_handler(self):
+        self.con.request('TEST', '/')
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 204)
+
+    def test_return_header_keep_alive(self):
+        self.con.request('KEEP', '/')
+        res = self.con.getresponse()
+        self.assertEquals(res.getheader('Connection'), 'keep-alive')
+        self.con.request('TEST', '/')
+
+    def test_internal_key_error(self):
+        self.con.request('KEYERROR', '/')
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 999)
+
+    def test_return_custom_status(self):
+        self.con.request('CUSTOM', '/')
+        res = self.con.getresponse()
+        self.assertEquals(res.status, 999)
+
+
+class SimpleHTTPServerTestCase(BaseTestCase):
+    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
+        pass
+
+    def setUp(self):
+        BaseTestCase.setUp(self)
+        os.chdir(tempfile.gettempdir())
+        self.data = 'We are the knights who say Ni!'
+        self.tempdir = tempfile.mkdtemp(dir=tempfile.gettempdir())
+        self.tempdir_name = os.path.basename(self.tempdir)
+        self.tempfile = tempfile.NamedTemporaryFile(dir=self.tempdir)
+        self.tempfile.file.write(self.data)
+        self.tempfile.file.flush()
+        self.tempfile_name = os.path.basename(self.tempfile.name)
+
+    def tearDown(self):
+        try:
+            self.tempfile.close()
+            try:
+                shutil.rmtree(self.tempdir)
+            except:
+                pass
+        finally:
+            BaseTestCase.tearDown(self)
+
+    def check_status_and_reason(self, response, status, data=None):
+        body = response.read()
+        self.assert_(response)
+        self.assertEquals(response.status, status)
+        self.assert_(response.reason != None)
+        if data:
+            self.assertEqual(data, body)
+
+    def test_get(self):
+        #constructs the path relative to the root directory of the HTTPServer
+        response = self.request(self.tempdir_name + '/' + self.tempfile_name)
+        self.check_status_and_reason(response, 200, data=self.data)
+        response = self.request(self.tempdir_name + '/')
+        self.check_status_and_reason(response, 200)
+        response = self.request(self.tempdir_name)
+        self.check_status_and_reason(response, 301)
+        response = self.request('/ThisDoesNotExist')
+        self.check_status_and_reason(response, 404)
+        response = self.request('/' + 'ThisDoesNotExist' + '/')
+        self.check_status_and_reason(response, 404)
+        f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
+        response = self.request('/' + self.tempdir_name + '/')
+        self.check_status_and_reason(response, 200)
+        if os.name == 'posix':
+            # chmod won't work as expected on Windows platforms
+            os.chmod(self.tempdir, 0)
+            response = self.request(self.tempdir_name + '/')
+            self.check_status_and_reason(response, 404)
+            os.chmod(self.tempdir, 0755)
+
+    def test_head(self):
+        response = self.request(
+            self.tempdir_name + '/'+ self.tempfile_name, method='HEAD')
+        self.check_status_and_reason(response, 200)
+        self.assertEqual(response.getheader('content-length'),
+                         str(len(self.data)))
+        self.assertEqual(response.getheader('content-type'),
+                         'application/octet-stream')
+
+    def test_invalid_requests(self):
+        response = self.request('/', method='FOO')
+        self.check_status_and_reason(response, 501)
+        # requests must be case sensitive,so this should fail too
+        response = self.request('/', method='get')
+        self.check_status_and_reason(response, 501)
+        response = self.request('/', method='GETs')
+        self.check_status_and_reason(response, 501)
+
+
+cgi_file1 = """\
+#!%s
+
+print "Content-type: text/html"
+print
+print "Hello World"
+"""
+
+cgi_file2 = """\
+#!%s
+import cgi
+
+print "Content-type: text/html"
+print
+
+form = cgi.FieldStorage()
+print "%%s, %%s, %%s" % (form.getfirst("spam"), form.getfirst("eggs"),\
+              form.getfirst("bacon"))
+"""
+
+class CGIHTTPServerTestCase(BaseTestCase):
+    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
+        pass
+
+    def setUp(self):
+        BaseTestCase.setUp(self)
+        self.parent_dir = tempfile.mkdtemp()
+        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
+        os.mkdir(self.cgi_dir)
+
+        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
+        with open(self.file1_path, 'w') as file1:
+            file1.write(cgi_file1 % sys.executable)
+        os.chmod(self.file1_path, 0777)
+
+        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
+        with open(self.file2_path, 'w') as file2:
+            file2.write(cgi_file2 % sys.executable)
+        os.chmod(self.file2_path, 0777)
+
+        os.chdir(self.parent_dir)
+
+    def tearDown(self):
+        try:
+            os.remove(self.file1_path)
+            os.remove(self.file2_path)
+            os.rmdir(self.cgi_dir)
+            os.rmdir(self.parent_dir)
+        finally:
+            BaseTestCase.tearDown(self)
+
+    def test_headers_and_content(self):
+        res = self.request('/cgi-bin/file1.py')
+        self.assertEquals(('Hello World\n', 'text/html', 200), \
+             (res.read(), res.getheader('Content-type'), res.status))
+
+    def test_post(self):
+        params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
+        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
+        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
+
+        self.assertEquals(res.read(), '1, python, 123456\n')
+
+    def test_invaliduri(self):
+        res = self.request('/cgi-bin/invalid')
+        res.read()
+        self.assertEquals(res.status, 404)
+
+    def test_authorization(self):
+        headers = {'Authorization' : 'Basic %s' % \
+                base64.b64encode('username:pass')}
+        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
+        self.assertEquals(('Hello World\n', 'text/html', 200), \
+             (res.read(), res.getheader('Content-type'), res.status))
+
+
+def test_main(verbose=None):
+    try:
+        cwd = os.getcwd()
+        test_support.run_unittest(BaseHTTPServerTestCase,
+                                  #SimpleHTTPServerTestCase,
+                                  #CGIHTTPServerTestCase
+                                  )
+    finally:
+        os.chdir(cwd)
+
+if __name__ == '__main__':
+    test_main()


More information about the Python-checkins mailing list