[Python-checkins] r75449 - sandbox/trunk/ccbench/ccbench.py

antoine.pitrou python-checkins at python.org
Fri Oct 16 19:46:11 CEST 2009


Author: antoine.pitrou
Date: Fri Oct 16 19:46:11 2009
New Revision: 75449

Log:
Add a latency test



Modified:
   sandbox/trunk/ccbench/ccbench.py

Modified: sandbox/trunk/ccbench/ccbench.py
==============================================================================
--- sandbox/trunk/ccbench/ccbench.py	(original)
+++ sandbox/trunk/ccbench/ccbench.py	Fri Oct 16 19:46:11 2009
@@ -15,7 +15,8 @@
 import itertools
 import threading
 import subprocess
-from optparse import OptionParser
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
 
 # Compatibility
 try:
@@ -31,6 +32,9 @@
 
 THROUGHPUT_DURATION = 2.0
 
+LATENCY_PING_INTERVAL = 0.1
+LATENCY_DURATION = 2.0
+
 
 def task_pidigits():
     """Pi calculation (Python)"""
@@ -86,10 +90,12 @@
     with open(__file__, "rb") as f:
         arg = f.read(3000) * 2
 
-    return bz2.compress, (arg, )
+    def compress(s):
+        bz2.compress(s)
+    return compress, (arg, )
 
 def task_hashing():
-    """sha1 hashing (C)"""
+    """SHA1 hashing (C)"""
     import hashlib
     with open(__file__, "rb") as f:
         arg = f.read(5000) * 30
@@ -100,20 +106,23 @@
 
 
 throughput_tasks = [task_pidigits]
-# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
-# hashlib if available.
-# (note: hashlib releases the GIL from 2.7 and 3.1 onwards)
-try:
-    import bz2
-except ImportError:
+for mod in 'bz2', 'hashlib':
     try:
-        import hashlib
+        globals()[mod] = __import__(mod)
     except ImportError:
-        throughput_tasks.append(task_compress_zlib)
-    else:
-        throughput_tasks.append(task_hashing)
-else:
+        globals()[mod] = None
+
+# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
+# hashlib if available.
+# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
+if bz2 is not None:
     throughput_tasks.append(task_compress_bz2)
+elif hashlib is not None:
+    throughput_tasks.append(task_hashing)
+else:
+    throughput_tasks.append(task_compress_zlib)
+
+latency_tasks = throughput_tasks
 
 
 class TimedLoop:
@@ -121,7 +130,7 @@
         self.func = func
         self.args = args
     
-    def __call__(self, start_time, min_duration, end_event, do_yield=True):
+    def __call__(self, start_time, min_duration, end_event, do_yield=False):
         step = 20
         niters = 0
         duration = 0.0
@@ -146,12 +155,11 @@
             if t2 - t1 < 0.01:
                 # Minimize interference of measurement on overall runtime
                 step = step * 3 // 2
-            else:
-                if do_yield:
-                    # OS scheduling of Python threads is sometimes so bad that we
-                    # have to force thread switching ourselves, otherwise we get
-                    # completely useless results.
-                    _sleep(0.0001)
+            elif do_yield:
+                # OS scheduling of Python threads is sometimes so bad that we
+                # have to force thread switching ourselves, otherwise we get
+                # completely useless results.
+                _sleep(0.0001)
             t1 = t2
 
 
@@ -181,7 +189,7 @@
         with start_cond:
             start_cond.wait()
         results.append(loop(start_time, THROUGHPUT_DURATION,
-                            end_event))
+                            end_event, do_yield=True))
 
     threads = []
     for i in range(nthreads):
@@ -222,6 +230,119 @@
         print()
 
 
+LAT_END = "END"
+
+def _sendto(sock, s, addr):
+    sock.sendto(s.encode('ascii'), addr)
+
+def _recv(sock, n):
+    return sock.recv(n).decode('ascii')
+
+def latency_client(addr, nb_pings, interval):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    _time = time.time
+    _sleep = time.sleep
+    for i in range(nb_pings):
+        _sleep(interval)
+        _sendto(sock, "%r\n" % _time(), addr)
+    _sendto(sock, LAT_END + "\n", addr)
+
+def run_latency_client(**kwargs):
+    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
+    cmd_line.extend(['--latclient', repr(kwargs)])
+    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
+                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+def run_latency_test(func, args, nthreads):
+    # Create a listening socket to receive the pings. We use UDP which should
+    # be painlessly cross-platform.
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    sock.bind(("127.0.0.1", 0))
+    addr = sock.getsockname()
+
+    interval = LATENCY_PING_INTERVAL
+    duration = LATENCY_DURATION
+    nb_pings = int(duration / interval)
+    
+    results = []
+    threads = []
+    start_cond = threading.Condition()
+    if nthreads > 0:
+        # Warm up
+        func(*args)
+    
+        results = []
+        loop = TimedLoop(func, args)
+        end_event = []
+        ready = []
+    
+        def run():
+            ready.append(None)
+            with start_cond:
+                start_cond.wait()
+            loop(start_time, duration, end_event, do_yield=False)
+
+        for i in range(nthreads):
+            threads.append(threading.Thread(target=run))
+        for t in threads:
+            t.start()
+        # Wait for threads to be ready
+        while len(ready) < nthreads:
+            time.sleep(0.1)
+    
+    # Run the client and wait for the first ping(s) to arrive before
+    # unblocking the background threads.
+    chunks = []
+    process = run_latency_client(addr=sock.getsockname(),
+                                 nb_pings=nb_pings + 1, interval=interval)
+    s = _recv(sock, 4096)
+    _time = time.time
+
+    with start_cond:
+        start_time = _time()
+        start_cond.notify(nthreads)
+
+    while LAT_END not in s:
+        s = _recv(sock, 4096)
+        t = _time()
+        chunks.append((t, s))
+
+    for t in threads:
+        t.join()
+    process.wait()
+
+    for recv_time, chunk in chunks:
+        # NOTE: it is assumed that a line sent by a client wasn't received
+        # in two chunks because the lines are very small.
+        for line in chunk.splitlines():
+            line = line.strip()
+            if line and line != LAT_END:
+                send_time = eval(line)
+                assert isinstance(send_time, float)
+                results.append((send_time, recv_time))
+    
+    return results
+
+def run_latency_tests(max_threads=4):
+    for task in latency_tasks:
+        print("Background CPU task:", task.__doc__)
+        print()
+        func, args = task()
+        nthreads = 0
+        while nthreads <= max_threads:
+            results = run_latency_test(func, args, nthreads)
+            n = len(results)
+            # We print out milliseconds
+            lats = [1000 * (t2 - t1) for (t1, t2) in results]
+            avg = sum(lats) / n
+            dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
+            print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
+            print()
+            #print("    [... from %d samples]" % n)
+            nthreads += 1
+        print()
+
+
 def main():
     usage = "usage: %prog [-h|--help] [options]"
     parser = OptionParser(usage=usage)
@@ -234,10 +355,21 @@
     parser.add_option("-i", "--interval",
                       action="store", type="int", dest="interval", default=None,
                       help="sys.setcheckinterval() value")
+    
+    # Hidden option to run the pinging client
+    parser.add_option("", "--latclient",
+                      action="store", dest="latclient", default=None,
+                      help=SUPPRESS_HELP)
+    
     options, args = parser.parse_args()
     if args:
         parser.error("unexpected arguments")
 
+    if options.latclient:
+        kwargs = eval(options.latclient)
+        latency_client(**kwargs)
+        return
+
     if not options.throughput and not options.latency:
         options.throughput = options.latency = True
     if options.interval:
@@ -247,6 +379,11 @@
         print("--- Throughput ---")
         print()
         run_throughput_tests()
+    
+    if options.latency:
+        print("--- Latency ---")
+        print()
+        run_latency_tests()
 
 if __name__ == "__main__":
     main()


More information about the Python-checkins mailing list