[Python-checkins] r75449 - sandbox/trunk/ccbench/ccbench.py
antoine.pitrou
python-checkins at python.org
Fri Oct 16 19:46:11 CEST 2009
Author: antoine.pitrou
Date: Fri Oct 16 19:46:11 2009
New Revision: 75449
Log:
Add a latency test
Modified:
sandbox/trunk/ccbench/ccbench.py
Modified: sandbox/trunk/ccbench/ccbench.py
==============================================================================
--- sandbox/trunk/ccbench/ccbench.py (original)
+++ sandbox/trunk/ccbench/ccbench.py Fri Oct 16 19:46:11 2009
@@ -15,7 +15,8 @@
import itertools
import threading
import subprocess
-from optparse import OptionParser
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
# Compatibility
try:
@@ -31,6 +32,9 @@
THROUGHPUT_DURATION = 2.0
+LATENCY_PING_INTERVAL = 0.1
+LATENCY_DURATION = 2.0
+
def task_pidigits():
"""Pi calculation (Python)"""
@@ -86,10 +90,12 @@
with open(__file__, "rb") as f:
arg = f.read(3000) * 2
- return bz2.compress, (arg, )
+ def compress(s):
+ bz2.compress(s)
+ return compress, (arg, )
def task_hashing():
- """sha1 hashing (C)"""
+ """SHA1 hashing (C)"""
import hashlib
with open(__file__, "rb") as f:
arg = f.read(5000) * 30
@@ -100,20 +106,23 @@
throughput_tasks = [task_pidigits]
-# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
-# hashlib if available.
-# (note: hashlib releases the GIL from 2.7 and 3.1 onwards)
-try:
- import bz2
-except ImportError:
+for mod in 'bz2', 'hashlib':
try:
- import hashlib
+ globals()[mod] = __import__(mod)
except ImportError:
- throughput_tasks.append(task_compress_zlib)
- else:
- throughput_tasks.append(task_hashing)
-else:
+ globals()[mod] = None
+
+# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
+# hashlib if available.
+# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
+if bz2 is not None:
throughput_tasks.append(task_compress_bz2)
+elif hashlib is not None:
+ throughput_tasks.append(task_hashing)
+else:
+ throughput_tasks.append(task_compress_zlib)
+
+latency_tasks = throughput_tasks
class TimedLoop:
@@ -121,7 +130,7 @@
self.func = func
self.args = args
- def __call__(self, start_time, min_duration, end_event, do_yield=True):
+ def __call__(self, start_time, min_duration, end_event, do_yield=False):
step = 20
niters = 0
duration = 0.0
@@ -146,12 +155,11 @@
if t2 - t1 < 0.01:
# Minimize interference of measurement on overall runtime
step = step * 3 // 2
- else:
- if do_yield:
- # OS scheduling of Python threads is sometimes so bad that we
- # have to force thread switching ourselves, otherwise we get
- # completely useless results.
- _sleep(0.0001)
+ elif do_yield:
+ # OS scheduling of Python threads is sometimes so bad that we
+ # have to force thread switching ourselves, otherwise we get
+ # completely useless results.
+ _sleep(0.0001)
t1 = t2
@@ -181,7 +189,7 @@
with start_cond:
start_cond.wait()
results.append(loop(start_time, THROUGHPUT_DURATION,
- end_event))
+ end_event, do_yield=True))
threads = []
for i in range(nthreads):
@@ -222,6 +230,119 @@
print()
+LAT_END = "END"
+
+def _sendto(sock, s, addr):
+ sock.sendto(s.encode('ascii'), addr)
+
+def _recv(sock, n):
+ return sock.recv(n).decode('ascii')
+
+def latency_client(addr, nb_pings, interval):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ _time = time.time
+ _sleep = time.sleep
+ for i in range(nb_pings):
+ _sleep(interval)
+ _sendto(sock, "%r\n" % _time(), addr)
+ _sendto(sock, LAT_END + "\n", addr)
+
+def run_latency_client(**kwargs):
+ cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
+ cmd_line.extend(['--latclient', repr(kwargs)])
+ return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
+ #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+def run_latency_test(func, args, nthreads):
+ # Create a listening socket to receive the pings. We use UDP which should
+ # be painlessly cross-platform.
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.bind(("127.0.0.1", 0))
+ addr = sock.getsockname()
+
+ interval = LATENCY_PING_INTERVAL
+ duration = LATENCY_DURATION
+ nb_pings = int(duration / interval)
+
+ results = []
+ threads = []
+ start_cond = threading.Condition()
+ if nthreads > 0:
+ # Warm up
+ func(*args)
+
+ results = []
+ loop = TimedLoop(func, args)
+ end_event = []
+ ready = []
+
+ def run():
+ ready.append(None)
+ with start_cond:
+ start_cond.wait()
+ loop(start_time, duration, end_event, do_yield=False)
+
+ for i in range(nthreads):
+ threads.append(threading.Thread(target=run))
+ for t in threads:
+ t.start()
+ # Wait for threads to be ready
+ while len(ready) < nthreads:
+ time.sleep(0.1)
+
+ # Run the client and wait for the first ping(s) to arrive before
+ # unblocking the background threads.
+ chunks = []
+ process = run_latency_client(addr=sock.getsockname(),
+ nb_pings=nb_pings + 1, interval=interval)
+ s = _recv(sock, 4096)
+ _time = time.time
+
+ with start_cond:
+ start_time = _time()
+ start_cond.notify(nthreads)
+
+ while LAT_END not in s:
+ s = _recv(sock, 4096)
+ t = _time()
+ chunks.append((t, s))
+
+ for t in threads:
+ t.join()
+ process.wait()
+
+ for recv_time, chunk in chunks:
+ # NOTE: it is assumed that a line sent by a client wasn't received
+ # in two chunks because the lines are very small.
+ for line in chunk.splitlines():
+ line = line.strip()
+ if line and line != LAT_END:
+ send_time = eval(line)
+ assert isinstance(send_time, float)
+ results.append((send_time, recv_time))
+
+ return results
+
+def run_latency_tests(max_threads=4):
+ for task in latency_tasks:
+ print("Background CPU task:", task.__doc__)
+ print()
+ func, args = task()
+ nthreads = 0
+ while nthreads <= max_threads:
+ results = run_latency_test(func, args, nthreads)
+ n = len(results)
+ # We print out milliseconds
+ lats = [1000 * (t2 - t1) for (t1, t2) in results]
+ avg = sum(lats) / n
+ dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
+ print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
+ print()
+ #print(" [... from %d samples]" % n)
+ nthreads += 1
+ print()
+
+
def main():
usage = "usage: %prog [-h|--help] [options]"
parser = OptionParser(usage=usage)
@@ -234,10 +355,21 @@
parser.add_option("-i", "--interval",
action="store", type="int", dest="interval", default=None,
help="sys.setcheckinterval() value")
+
+ # Hidden option to run the pinging client
+ parser.add_option("", "--latclient",
+ action="store", dest="latclient", default=None,
+ help=SUPPRESS_HELP)
+
options, args = parser.parse_args()
if args:
parser.error("unexpected arguments")
+ if options.latclient:
+ kwargs = eval(options.latclient)
+ latency_client(**kwargs)
+ return
+
if not options.throughput and not options.latency:
options.throughput = options.latency = True
if options.interval:
@@ -247,6 +379,11 @@
print("--- Throughput ---")
print()
run_throughput_tests()
+
+ if options.latency:
+ print("--- Latency ---")
+ print()
+ run_latency_tests()
if __name__ == "__main__":
main()
More information about the Python-checkins
mailing list