[Python-checkins] r78939 - in python/trunk: Misc/NEWS Tools/ccbench/ccbench.py

antoine.pitrou python-checkins at python.org
Sat Mar 13 22:21:30 CET 2010


Author: antoine.pitrou
Date: Sat Mar 13 22:21:30 2010
New Revision: 78939

Log:
Issue #7993: Add a test of IO packet processing bandwidth to ccbench.
It measures the number of UDP packets processed per second depending on
the number of background CPU-bound Python threads.



Modified:
   python/trunk/Misc/NEWS
   python/trunk/Tools/ccbench/ccbench.py

Modified: python/trunk/Misc/NEWS
==============================================================================
--- python/trunk/Misc/NEWS	(original)
+++ python/trunk/Misc/NEWS	Sat Mar 13 22:21:30 2010
@@ -54,6 +54,12 @@
   integer codes for which it was used differed between native packing
   and standard packing.)
 
+Tools/Demos
+-----------
+
+- Issue #7993: Add a test of IO packet processing bandwidth to ccbench.
+  It measures the number of UDP packets processed per second depending on
+  the number of background CPU-bound Python threads.
 
 
 What's New in Python 2.7 alpha 4?

Modified: python/trunk/Tools/ccbench/ccbench.py
==============================================================================
--- python/trunk/Tools/ccbench/ccbench.py	(original)
+++ python/trunk/Tools/ccbench/ccbench.py	Sat Mar 13 22:21:30 2010
@@ -36,6 +36,9 @@
 LATENCY_PING_INTERVAL = 0.1
 LATENCY_DURATION = 2.0
 
+BANDWIDTH_PACKET_SIZE = 1024
+BANDWIDTH_DURATION = 2.0
+
 
 def task_pidigits():
     """Pi calculation (Python)"""
@@ -149,6 +152,7 @@
     throughput_tasks.append(task_compress_zlib)
 
 latency_tasks = throughput_tasks
+bandwidth_tasks = [task_pidigits]
 
 
 class TimedLoop:
@@ -394,6 +398,133 @@
         print()
 
 
+BW_END = "END"
+
+def bandwidth_client(addr, packet_size, duration):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    sock.bind(("127.0.0.1", 0))
+    local_addr = sock.getsockname()
+    _time = time.time
+    _sleep = time.sleep
+    def _send_chunk(msg):
+        _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
+    # We give the parent some time to be ready.
+    _sleep(1.0)
+    try:
+        start_time = _time()
+        end_time = start_time + duration * 2.0
+        i = 0
+        while _time() < end_time:
+            _send_chunk(str(i))
+            s = _recv(sock, packet_size)
+            assert len(s) == packet_size
+            i += 1
+        _send_chunk(BW_END)
+    finally:
+        sock.close()
+
+def run_bandwidth_client(**kwargs):
+    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
+    cmd_line.extend(['--bwclient', repr(kwargs)])
+    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
+                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+def run_bandwidth_test(func, args, nthreads):
+    # Create a listening socket to receive the packets. We use UDP which should
+    # be painlessly cross-platform.
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    sock.bind(("127.0.0.1", 0))
+    addr = sock.getsockname()
+
+    duration = BANDWIDTH_DURATION
+    packet_size = BANDWIDTH_PACKET_SIZE
+
+    results = []
+    threads = []
+    end_event = []
+    start_cond = threading.Condition()
+    started = False
+    if nthreads > 0:
+        # Warm up
+        func(*args)
+
+        results = []
+        loop = TimedLoop(func, args)
+        ready = []
+        ready_cond = threading.Condition()
+
+        def run():
+            with ready_cond:
+                ready.append(None)
+                ready_cond.notify()
+            with start_cond:
+                while not started:
+                    start_cond.wait()
+            loop(start_time, duration * 1.5, end_event, do_yield=False)
+
+        for i in range(nthreads):
+            threads.append(threading.Thread(target=run))
+        for t in threads:
+            t.setDaemon(True)
+            t.start()
+        # Wait for threads to be ready
+        with ready_cond:
+            while len(ready) < nthreads:
+                ready_cond.wait()
+
+    # Run the client and wait for the first packet to arrive before
+    # unblocking the background threads.
+    process = run_bandwidth_client(addr=addr,
+                                   packet_size=packet_size,
+                                   duration=duration)
+    _time = time.time
+    # This will also wait for the parent to be ready
+    s = _recv(sock, packet_size)
+    remote_addr = eval(s.partition('#')[0])
+
+    with start_cond:
+        start_time = _time()
+        started = True
+        start_cond.notify(nthreads)
+
+    n = 0
+    first_time = None
+    while not end_event and BW_END not in s:
+        _sendto(sock, s, remote_addr)
+        s = _recv(sock, packet_size)
+        if first_time is None:
+            first_time = _time()
+        n += 1
+    end_time = _time()
+
+    end_event.append(None)
+    for t in threads:
+        t.join()
+    process.kill()
+
+    return (n - 1) / (end_time - first_time)
+
+def run_bandwidth_tests(max_threads):
+    for task in bandwidth_tasks:
+        print("Background CPU task:", task.__doc__)
+        print()
+        func, args = task()
+        nthreads = 0
+        baseline_speed = None
+        while nthreads <= max_threads:
+            results = run_bandwidth_test(func, args, nthreads)
+            speed = results
+            #speed = len(results) * 1.0 / results[-1][0]
+            print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
+            if baseline_speed is None:
+                print(" packets/s.")
+                baseline_speed = speed
+            else:
+                print(" ( %d %%)" % (speed / baseline_speed * 100))
+            nthreads += 1
+        print()
+
+
 def main():
     usage = "usage: %prog [-h|--help] [options]"
     parser = OptionParser(usage=usage)
@@ -403,6 +534,9 @@
     parser.add_option("-l", "--latency",
                       action="store_true", dest="latency", default=False,
                       help="run latency tests")
+    parser.add_option("-b", "--bandwidth",
+                      action="store_true", dest="bandwidth", default=False,
+                      help="run I/O bandwidth tests")
     parser.add_option("-i", "--interval",
                       action="store", type="int", dest="check_interval", default=None,
                       help="sys.setcheckinterval() value")
@@ -413,10 +547,13 @@
                       action="store", type="int", dest="nthreads", default=4,
                       help="max number of threads in tests")
 
-    # Hidden option to run the pinging client
+    # Hidden option to run the pinging and bandwidth clients
     parser.add_option("", "--latclient",
                       action="store", dest="latclient", default=None,
                       help=SUPPRESS_HELP)
+    parser.add_option("", "--bwclient",
+                      action="store", dest="bwclient", default=None,
+                      help=SUPPRESS_HELP)
 
     options, args = parser.parse_args()
     if args:
@@ -427,8 +564,13 @@
         latency_client(**kwargs)
         return
 
-    if not options.throughput and not options.latency:
-        options.throughput = options.latency = True
+    if options.bwclient:
+        kwargs = eval(options.bwclient)
+        bandwidth_client(**kwargs)
+        return
+
+    if not options.throughput and not options.latency and not options.bandwidth:
+        options.throughput = options.latency = options.bandwidth = True
     if options.check_interval:
         sys.setcheckinterval(options.check_interval)
     if options.switch_interval:
@@ -458,5 +600,10 @@
         print()
         run_latency_tests(options.nthreads)
 
+    if options.bandwidth:
+        print("--- I/O bandwidth ---")
+        print()
+        run_bandwidth_tests(options.nthreads)
+
 if __name__ == "__main__":
     main()


More information about the Python-checkins mailing list