summaryrefslogtreecommitdiff
path: root/Tools
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2010-01-18 21:10:31 +0000
committerAntoine Pitrou <solipsis@pitrou.net>2010-01-18 21:10:31 +0000
commitb33c287c231bedc9ff31d03b700e109769accc55 (patch)
tree578d3131c9ddf06cb3cd2fde8422bcd5e6238e01 /Tools
parente1819ff39068740a414b5131f2a80c3714e8ad2b (diff)
downloadcpython-b33c287c231bedc9ff31d03b700e109769accc55.tar.gz
Add ccbench to the Tools directory
Diffstat (limited to 'Tools')
-rw-r--r--Tools/ccbench/ccbench.py462
1 files changed, 462 insertions, 0 deletions
diff --git a/Tools/ccbench/ccbench.py b/Tools/ccbench/ccbench.py
new file mode 100644
index 0000000000..0b9301284f
--- /dev/null
+++ b/Tools/ccbench/ccbench.py
@@ -0,0 +1,462 @@
+# -*- coding: utf-8 -*-
+# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
+
+from __future__ import division
+from __future__ import print_function
+
+"""
+ccbench, a Python concurrency benchmark.
+"""
+
+import time
+import os
+import sys
+import functools
+import itertools
+import threading
+import subprocess
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+import platform
+
+# Compatibility
+try:
+ xrange
+except NameError:
+ xrange = range
+
+try:
+ map = itertools.imap
+except AttributeError:
+ pass
+
+
+THROUGHPUT_DURATION = 2.0
+
+LATENCY_PING_INTERVAL = 0.1
+LATENCY_DURATION = 2.0
+
+
+def task_pidigits():
+ """Pi calculation (Python)"""
+ _map = map
+ _count = itertools.count
+ _islice = itertools.islice
+
+ def calc_ndigits(n):
+ # From http://shootout.alioth.debian.org/
+ def gen_x():
+ return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
+
+ def compose(a, b):
+ aq, ar, as_, at = a
+ bq, br, bs, bt = b
+ return (aq * bq,
+ aq * br + ar * bt,
+ as_ * bq + at * bs,
+ as_ * br + at * bt)
+
+ def extract(z, j):
+ q, r, s, t = z
+ return (q*j + r) // (s*j + t)
+
+ def pi_digits():
+ z = (1, 0, 0, 1)
+ x = gen_x()
+ while 1:
+ y = extract(z, 3)
+ while y != extract(z, 4):
+ z = compose(z, next(x))
+ y = extract(z, 3)
+ z = compose((10, -10*y, 0, 1), z)
+ yield y
+
+ return list(_islice(pi_digits(), n))
+
+ return calc_ndigits, (50, )
+
+def task_regex():
+ """regular expression (C)"""
+ # XXX this task gives horrendous latency results.
+ import re
+ # Taken from the `inspect` module
+ pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
+ with open(__file__, "r") as f:
+ arg = f.read(2000)
+
+ def findall(s):
+ t = time.time()
+ try:
+ return pat.findall(s)
+ finally:
+ print(time.time() - t)
+ return pat.findall, (arg, )
+
+def task_sort():
+ """list sorting (C)"""
+ def list_sort(l):
+ l = l[::-1]
+ l.sort()
+
+ return list_sort, (list(range(1000)), )
+
+def task_compress_zlib():
+ """zlib compression (C)"""
+ import zlib
+ with open(__file__, "rb") as f:
+ arg = f.read(5000) * 3
+
+ def compress(s):
+ zlib.decompress(zlib.compress(s, 5))
+ return compress, (arg, )
+
+def task_compress_bz2():
+ """bz2 compression (C)"""
+ import bz2
+ with open(__file__, "rb") as f:
+ arg = f.read(3000) * 2
+
+ def compress(s):
+ bz2.compress(s)
+ return compress, (arg, )
+
+def task_hashing():
+ """SHA1 hashing (C)"""
+ import hashlib
+ with open(__file__, "rb") as f:
+ arg = f.read(5000) * 30
+
+ def compute(s):
+ hashlib.sha1(s).digest()
+ return compute, (arg, )
+
+
+throughput_tasks = [task_pidigits, task_regex]
+for mod in 'bz2', 'hashlib':
+ try:
+ globals()[mod] = __import__(mod)
+ except ImportError:
+ globals()[mod] = None
+
+# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
+# hashlib if available.
+# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
+if bz2 is not None:
+ throughput_tasks.append(task_compress_bz2)
+elif hashlib is not None:
+ throughput_tasks.append(task_hashing)
+else:
+ throughput_tasks.append(task_compress_zlib)
+
+latency_tasks = throughput_tasks
+
+
+class TimedLoop:
+ def __init__(self, func, args):
+ self.func = func
+ self.args = args
+
+ def __call__(self, start_time, min_duration, end_event, do_yield=False):
+ step = 20
+ niters = 0
+ duration = 0.0
+ _time = time.time
+ _sleep = time.sleep
+ _func = self.func
+ _args = self.args
+ t1 = start_time
+ while True:
+ for i in range(step):
+ _func(*_args)
+ t2 = _time()
+ # If another thread terminated, the current measurement is invalid
+ # => return the previous one.
+ if end_event:
+ return niters, duration
+ niters += step
+ duration = t2 - start_time
+ if duration >= min_duration:
+ end_event.append(None)
+ return niters, duration
+ if t2 - t1 < 0.01:
+ # Minimize interference of measurement on overall runtime
+ step = step * 3 // 2
+ elif do_yield:
+ # OS scheduling of Python threads is sometimes so bad that we
+ # have to force thread switching ourselves, otherwise we get
+ # completely useless results.
+ _sleep(0.0001)
+ t1 = t2
+
+
+def run_throughput_test(func, args, nthreads):
+ assert nthreads >= 1
+
+ # Warm up
+ func(*args)
+
+ results = []
+ loop = TimedLoop(func, args)
+ end_event = []
+
+ if nthreads == 1:
+ # Pure single-threaded performance, without any switching or
+ # synchronization overhead.
+ start_time = time.time()
+ results.append(loop(start_time, THROUGHPUT_DURATION,
+ end_event, do_yield=False))
+ return results
+
+ started = False
+ ready_cond = threading.Condition()
+ start_cond = threading.Condition()
+ ready = []
+
+ def run():
+ with ready_cond:
+ ready.append(None)
+ ready_cond.notify()
+ with start_cond:
+ while not started:
+ start_cond.wait()
+ results.append(loop(start_time, THROUGHPUT_DURATION,
+ end_event, do_yield=True))
+
+ threads = []
+ for i in range(nthreads):
+ threads.append(threading.Thread(target=run))
+ for t in threads:
+ t.setDaemon(True)
+ t.start()
+ # We don't want measurements to include thread startup overhead,
+ # so we arrange for timing to start after all threads are ready.
+ with ready_cond:
+ while len(ready) < nthreads:
+ ready_cond.wait()
+ with start_cond:
+ start_time = time.time()
+ started = True
+ start_cond.notify(nthreads)
+ for t in threads:
+ t.join()
+
+ return results
+
+def run_throughput_tests(max_threads):
+ for task in throughput_tasks:
+ print(task.__doc__)
+ print()
+ func, args = task()
+ nthreads = 1
+ baseline_speed = None
+ while nthreads <= max_threads:
+ results = run_throughput_test(func, args, nthreads)
+ # Taking the max duration rather than average gives pessimistic
+ # results rather than optimistic.
+ speed = sum(r[0] for r in results) / max(r[1] for r in results)
+ print("threads=%d: %d" % (nthreads, speed), end="")
+ if baseline_speed is None:
+ print(" iterations/s.")
+ baseline_speed = speed
+ else:
+ print(" ( %d %%)" % (speed / baseline_speed * 100))
+ nthreads += 1
+ print()
+
+
+LAT_END = "END"
+
+def _sendto(sock, s, addr):
+ sock.sendto(s.encode('ascii'), addr)
+
+def _recv(sock, n):
+ return sock.recv(n).decode('ascii')
+
+def latency_client(addr, nb_pings, interval):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ _time = time.time
+ _sleep = time.sleep
+ def _ping():
+ _sendto(sock, "%r\n" % _time(), addr)
+ # The first ping signals the parent process that we are ready.
+ _ping()
+ # We give the parent a bit of time to notice.
+ _sleep(1.0)
+ for i in range(nb_pings):
+ _sleep(interval)
+ _ping()
+ _sendto(sock, LAT_END + "\n", addr)
+
+def run_latency_client(**kwargs):
+ cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
+ cmd_line.extend(['--latclient', repr(kwargs)])
+ return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
+ #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+def run_latency_test(func, args, nthreads):
+ # Create a listening socket to receive the pings. We use UDP which should
+ # be painlessly cross-platform.
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.bind(("127.0.0.1", 0))
+ addr = sock.getsockname()
+
+ interval = LATENCY_PING_INTERVAL
+ duration = LATENCY_DURATION
+ nb_pings = int(duration / interval)
+
+ results = []
+ threads = []
+ end_event = []
+ start_cond = threading.Condition()
+ started = False
+ if nthreads > 0:
+ # Warm up
+ func(*args)
+
+ results = []
+ loop = TimedLoop(func, args)
+ ready = []
+ ready_cond = threading.Condition()
+
+ def run():
+ with ready_cond:
+ ready.append(None)
+ ready_cond.notify()
+ with start_cond:
+ while not started:
+ start_cond.wait()
+ loop(start_time, duration * 1.5, end_event, do_yield=False)
+
+ for i in range(nthreads):
+ threads.append(threading.Thread(target=run))
+ for t in threads:
+ t.setDaemon(True)
+ t.start()
+ # Wait for threads to be ready
+ with ready_cond:
+ while len(ready) < nthreads:
+ ready_cond.wait()
+
+ # Run the client and wait for the first ping(s) to arrive before
+ # unblocking the background threads.
+ chunks = []
+ process = run_latency_client(addr=sock.getsockname(),
+ nb_pings=nb_pings, interval=interval)
+ s = _recv(sock, 4096)
+ _time = time.time
+
+ with start_cond:
+ start_time = _time()
+ started = True
+ start_cond.notify(nthreads)
+
+ while LAT_END not in s:
+ s = _recv(sock, 4096)
+ t = _time()
+ chunks.append((t, s))
+
+ # Tell the background threads to stop.
+ end_event.append(None)
+ for t in threads:
+ t.join()
+ process.wait()
+
+ for recv_time, chunk in chunks:
+ # NOTE: it is assumed that a line sent by a client wasn't received
+ # in two chunks because the lines are very small.
+ for line in chunk.splitlines():
+ line = line.strip()
+ if line and line != LAT_END:
+ send_time = eval(line)
+ assert isinstance(send_time, float)
+ results.append((send_time, recv_time))
+
+ return results
+
+def run_latency_tests(max_threads):
+ for task in latency_tasks:
+ print("Background CPU task:", task.__doc__)
+ print()
+ func, args = task()
+ nthreads = 0
+ while nthreads <= max_threads:
+ results = run_latency_test(func, args, nthreads)
+ n = len(results)
+ # We print out milliseconds
+ lats = [1000 * (t2 - t1) for (t1, t2) in results]
+ #print(list(map(int, lats)))
+ avg = sum(lats) / n
+ dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
+ print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
+ print()
+ #print(" [... from %d samples]" % n)
+ nthreads += 1
+ print()
+
+
+def main():
+ usage = "usage: %prog [-h|--help] [options]"
+ parser = OptionParser(usage=usage)
+ parser.add_option("-t", "--throughput",
+ action="store_true", dest="throughput", default=False,
+ help="run throughput tests")
+ parser.add_option("-l", "--latency",
+ action="store_true", dest="latency", default=False,
+ help="run latency tests")
+ parser.add_option("-i", "--interval",
+ action="store", type="int", dest="check_interval", default=None,
+ help="sys.setcheckinterval() value")
+ parser.add_option("-I", "--switch-interval",
+ action="store", type="float", dest="switch_interval", default=None,
+ help="sys.setswitchinterval() value")
+ parser.add_option("-n", "--num-threads",
+ action="store", type="int", dest="nthreads", default=4,
+ help="max number of threads in tests")
+
+ # Hidden option to run the pinging client
+ parser.add_option("", "--latclient",
+ action="store", dest="latclient", default=None,
+ help=SUPPRESS_HELP)
+
+ options, args = parser.parse_args()
+ if args:
+ parser.error("unexpected arguments")
+
+ if options.latclient:
+ kwargs = eval(options.latclient)
+ latency_client(**kwargs)
+ return
+
+ if not options.throughput and not options.latency:
+ options.throughput = options.latency = True
+ if options.check_interval:
+ sys.setcheckinterval(options.check_interval)
+ if options.switch_interval:
+ sys.setswitchinterval(options.switch_interval)
+
+ print("== %s %s (%s) ==" % (
+ platform.python_implementation(),
+ platform.python_version(),
+ platform.python_build()[0],
+ ))
+ # Processor identification often has repeated spaces
+ cpu = ' '.join(platform.processor().split())
+ print("== %s %s on '%s' ==" % (
+ platform.machine(),
+ platform.system(),
+ cpu,
+ ))
+ print()
+
+ if options.throughput:
+ print("--- Throughput ---")
+ print()
+ run_throughput_tests(options.nthreads)
+
+ if options.latency:
+ print("--- Latency ---")
+ print()
+ run_latency_tests(options.nthreads)
+
+if __name__ == "__main__":
+ main()