[Python-checkins] bpo-37096: Add large-file tests for modules using sendfile(2) (GH-13676)

Giampaolo Rodola webhook-mailer at python.org
Mon Sep 30 00:51:59 EDT 2019


https://github.com/python/cpython/commit/5bcc6d89bcb622a6786fff632fabdcaf67dbb4e2
commit: 5bcc6d89bcb622a6786fff632fabdcaf67dbb4e2
branch: master
author: Giampaolo Rodola <g.rodola at gmail.com>
committer: GitHub <noreply at github.com>
date: 2019-09-30T12:51:55+08:00
summary:

bpo-37096: Add large-file tests for modules using sendfile(2) (GH-13676)

files:
M Lib/test/test_largefile.py

diff --git a/Lib/test/test_largefile.py b/Lib/test/test_largefile.py
index 8870c721ab0e..6c8813e06260 100644
--- a/Lib/test/test_largefile.py
+++ b/Lib/test/test_largefile.py
@@ -5,17 +5,19 @@
 import stat
 import sys
 import unittest
-from test.support import TESTFN, requires, unlink, bigmemtest
+import socket
+import shutil
+import threading
+from test.support import TESTFN, requires, unlink, bigmemtest, find_unused_port
 import io  # C implementation of io
 import _pyio as pyio # Python implementation of io
 
 # size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes)
 size = 2_500_000_000
+TESTFN2 = TESTFN + '2'
+
 
 class LargeFileTest:
-    """Test that each file function works as expected for large
-    (i.e. > 2 GiB) files.
-    """
 
     def setUp(self):
         if os.path.exists(TESTFN):
@@ -44,6 +46,13 @@ def tearDownClass(cls):
         if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
             raise cls.failureException('File was not truncated by opening '
                                        'with mode "wb"')
+        unlink(TESTFN2)
+
+
+class TestFileMethods(LargeFileTest):
+    """Test that each file function works as expected for large
+    (i.e. > 2 GiB) files.
+    """
 
     # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes,
     # so memuse=2 is needed
@@ -140,6 +149,72 @@ def test_seekable(self):
                 f.seek(pos)
                 self.assertTrue(f.seekable())
 
+
+class TestCopyfile(LargeFileTest, unittest.TestCase):
+    open = staticmethod(io.open)
+
+    def test_it(self):
+        # Internally shutil.copyfile() can use "fast copy" methods like
+        # os.sendfile().
+        size = os.path.getsize(TESTFN)
+        shutil.copyfile(TESTFN, TESTFN2)
+        self.assertEqual(os.path.getsize(TESTFN2), size)
+        with open(TESTFN2, 'rb') as f:
+            self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
+            f.seek(size - 5)
+            self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
+
+
+ at unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported')
+class TestSocketSendfile(LargeFileTest, unittest.TestCase):
+    open = staticmethod(io.open)
+    timeout = 3
+
+    def setUp(self):
+        super().setUp()
+        self.thread = None
+
+    def tearDown(self):
+        super().tearDown()
+        if self.thread is not None:
+            self.thread.join(self.timeout)
+            self.thread = None
+
+    def tcp_server(self, sock):
+        def run(sock):
+            with sock:
+                conn, _ = sock.accept()
+                with conn, open(TESTFN2, 'wb') as f:
+                    event.wait(self.timeout)
+                    while True:
+                        chunk = conn.recv(65536)
+                        if not chunk:
+                            return
+                        f.write(chunk)
+
+        event = threading.Event()
+        sock.settimeout(self.timeout)
+        self.thread = threading.Thread(target=run, args=(sock, ))
+        self.thread.start()
+        event.set()
+
+    def test_it(self):
+        port = find_unused_port()
+        with socket.create_server(("", port)) as sock:
+            self.tcp_server(sock)
+            with socket.create_connection(("127.0.0.1", port)) as client:
+                with open(TESTFN, 'rb') as f:
+                    client.sendfile(f)
+        self.tearDown()
+
+        size = os.path.getsize(TESTFN)
+        self.assertEqual(os.path.getsize(TESTFN2), size)
+        with open(TESTFN2, 'rb') as f:
+            self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
+            f.seek(size - 5)
+            self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
+
+
 def setUpModule():
     try:
         import signal
@@ -176,14 +251,18 @@ def setUpModule():
             unlink(TESTFN)
 
 
-class CLargeFileTest(LargeFileTest, unittest.TestCase):
+class CLargeFileTest(TestFileMethods, unittest.TestCase):
     open = staticmethod(io.open)
 
-class PyLargeFileTest(LargeFileTest, unittest.TestCase):
+
+class PyLargeFileTest(TestFileMethods, unittest.TestCase):
     open = staticmethod(pyio.open)
 
+
 def tearDownModule():
     unlink(TESTFN)
+    unlink(TESTFN2)
+
 
 if __name__ == '__main__':
     unittest.main()



More information about the Python-checkins mailing list