From b33c287c231bedc9ff31d03b700e109769accc55 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 18 Jan 2010 21:10:31 +0000 Subject: Add ccbench to the Tools directory --- Tools/ccbench/ccbench.py | 462 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 462 insertions(+) create mode 100644 Tools/ccbench/ccbench.py (limited to 'Tools') diff --git a/Tools/ccbench/ccbench.py b/Tools/ccbench/ccbench.py new file mode 100644 index 0000000000..0b9301284f --- /dev/null +++ b/Tools/ccbench/ccbench.py @@ -0,0 +1,462 @@ +# -*- coding: utf-8 -*- +# This file should be kept compatible with both Python 2.6 and Python >= 3.0. + +from __future__ import division +from __future__ import print_function + +""" +ccbench, a Python concurrency benchmark. +""" + +import time +import os +import sys +import functools +import itertools +import threading +import subprocess +import socket +from optparse import OptionParser, SUPPRESS_HELP +import platform + +# Compatibility +try: + xrange +except NameError: + xrange = range + +try: + map = itertools.imap +except AttributeError: + pass + + +THROUGHPUT_DURATION = 2.0 + +LATENCY_PING_INTERVAL = 0.1 +LATENCY_DURATION = 2.0 + + +def task_pidigits(): + """Pi calculation (Python)""" + _map = map + _count = itertools.count + _islice = itertools.islice + + def calc_ndigits(n): + # From http://shootout.alioth.debian.org/ + def gen_x(): + return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1)) + + def compose(a, b): + aq, ar, as_, at = a + bq, br, bs, bt = b + return (aq * bq, + aq * br + ar * bt, + as_ * bq + at * bs, + as_ * br + at * bt) + + def extract(z, j): + q, r, s, t = z + return (q*j + r) // (s*j + t) + + def pi_digits(): + z = (1, 0, 0, 1) + x = gen_x() + while 1: + y = extract(z, 3) + while y != extract(z, 4): + z = compose(z, next(x)) + y = extract(z, 3) + z = compose((10, -10*y, 0, 1), z) + yield y + + return list(_islice(pi_digits(), n)) + + return calc_ndigits, (50, ) + +def task_regex(): + """regular expression (C)""" + # XXX this task gives horrendous latency results. + import re + # Taken from the `inspect` module + pat = re.compile(r'^(\s*def\s)|(.*(? return the previous one. + if end_event: + return niters, duration + niters += step + duration = t2 - start_time + if duration >= min_duration: + end_event.append(None) + return niters, duration + if t2 - t1 < 0.01: + # Minimize interference of measurement on overall runtime + step = step * 3 // 2 + elif do_yield: + # OS scheduling of Python threads is sometimes so bad that we + # have to force thread switching ourselves, otherwise we get + # completely useless results. + _sleep(0.0001) + t1 = t2 + + +def run_throughput_test(func, args, nthreads): + assert nthreads >= 1 + + # Warm up + func(*args) + + results = [] + loop = TimedLoop(func, args) + end_event = [] + + if nthreads == 1: + # Pure single-threaded performance, without any switching or + # synchronization overhead. + start_time = time.time() + results.append(loop(start_time, THROUGHPUT_DURATION, + end_event, do_yield=False)) + return results + + started = False + ready_cond = threading.Condition() + start_cond = threading.Condition() + ready = [] + + def run(): + with ready_cond: + ready.append(None) + ready_cond.notify() + with start_cond: + while not started: + start_cond.wait() + results.append(loop(start_time, THROUGHPUT_DURATION, + end_event, do_yield=True)) + + threads = [] + for i in range(nthreads): + threads.append(threading.Thread(target=run)) + for t in threads: + t.setDaemon(True) + t.start() + # We don't want measurements to include thread startup overhead, + # so we arrange for timing to start after all threads are ready. + with ready_cond: + while len(ready) < nthreads: + ready_cond.wait() + with start_cond: + start_time = time.time() + started = True + start_cond.notify(nthreads) + for t in threads: + t.join() + + return results + +def run_throughput_tests(max_threads): + for task in throughput_tasks: + print(task.__doc__) + print() + func, args = task() + nthreads = 1 + baseline_speed = None + while nthreads <= max_threads: + results = run_throughput_test(func, args, nthreads) + # Taking the max duration rather than average gives pessimistic + # results rather than optimistic. + speed = sum(r[0] for r in results) / max(r[1] for r in results) + print("threads=%d: %d" % (nthreads, speed), end="") + if baseline_speed is None: + print(" iterations/s.") + baseline_speed = speed + else: + print(" ( %d %%)" % (speed / baseline_speed * 100)) + nthreads += 1 + print() + + +LAT_END = "END" + +def _sendto(sock, s, addr): + sock.sendto(s.encode('ascii'), addr) + +def _recv(sock, n): + return sock.recv(n).decode('ascii') + +def latency_client(addr, nb_pings, interval): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + _time = time.time + _sleep = time.sleep + def _ping(): + _sendto(sock, "%r\n" % _time(), addr) + # The first ping signals the parent process that we are ready. + _ping() + # We give the parent a bit of time to notice. + _sleep(1.0) + for i in range(nb_pings): + _sleep(interval) + _ping() + _sendto(sock, LAT_END + "\n", addr) + +def run_latency_client(**kwargs): + cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] + cmd_line.extend(['--latclient', repr(kwargs)]) + return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, + #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + +def run_latency_test(func, args, nthreads): + # Create a listening socket to receive the pings. We use UDP which should + # be painlessly cross-platform. + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(("127.0.0.1", 0)) + addr = sock.getsockname() + + interval = LATENCY_PING_INTERVAL + duration = LATENCY_DURATION + nb_pings = int(duration / interval) + + results = [] + threads = [] + end_event = [] + start_cond = threading.Condition() + started = False + if nthreads > 0: + # Warm up + func(*args) + + results = [] + loop = TimedLoop(func, args) + ready = [] + ready_cond = threading.Condition() + + def run(): + with ready_cond: + ready.append(None) + ready_cond.notify() + with start_cond: + while not started: + start_cond.wait() + loop(start_time, duration * 1.5, end_event, do_yield=False) + + for i in range(nthreads): + threads.append(threading.Thread(target=run)) + for t in threads: + t.setDaemon(True) + t.start() + # Wait for threads to be ready + with ready_cond: + while len(ready) < nthreads: + ready_cond.wait() + + # Run the client and wait for the first ping(s) to arrive before + # unblocking the background threads. + chunks = [] + process = run_latency_client(addr=sock.getsockname(), + nb_pings=nb_pings, interval=interval) + s = _recv(sock, 4096) + _time = time.time + + with start_cond: + start_time = _time() + started = True + start_cond.notify(nthreads) + + while LAT_END not in s: + s = _recv(sock, 4096) + t = _time() + chunks.append((t, s)) + + # Tell the background threads to stop. + end_event.append(None) + for t in threads: + t.join() + process.wait() + + for recv_time, chunk in chunks: + # NOTE: it is assumed that a line sent by a client wasn't received + # in two chunks because the lines are very small. + for line in chunk.splitlines(): + line = line.strip() + if line and line != LAT_END: + send_time = eval(line) + assert isinstance(send_time, float) + results.append((send_time, recv_time)) + + return results + +def run_latency_tests(max_threads): + for task in latency_tasks: + print("Background CPU task:", task.__doc__) + print() + func, args = task() + nthreads = 0 + while nthreads <= max_threads: + results = run_latency_test(func, args, nthreads) + n = len(results) + # We print out milliseconds + lats = [1000 * (t2 - t1) for (t1, t2) in results] + #print(list(map(int, lats))) + avg = sum(lats) / n + dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5 + print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="") + print() + #print(" [... from %d samples]" % n) + nthreads += 1 + print() + + +def main(): + usage = "usage: %prog [-h|--help] [options]" + parser = OptionParser(usage=usage) + parser.add_option("-t", "--throughput", + action="store_true", dest="throughput", default=False, + help="run throughput tests") + parser.add_option("-l", "--latency", + action="store_true", dest="latency", default=False, + help="run latency tests") + parser.add_option("-i", "--interval", + action="store", type="int", dest="check_interval", default=None, + help="sys.setcheckinterval() value") + parser.add_option("-I", "--switch-interval", + action="store", type="float", dest="switch_interval", default=None, + help="sys.setswitchinterval() value") + parser.add_option("-n", "--num-threads", + action="store", type="int", dest="nthreads", default=4, + help="max number of threads in tests") + + # Hidden option to run the pinging client + parser.add_option("", "--latclient", + action="store", dest="latclient", default=None, + help=SUPPRESS_HELP) + + options, args = parser.parse_args() + if args: + parser.error("unexpected arguments") + + if options.latclient: + kwargs = eval(options.latclient) + latency_client(**kwargs) + return + + if not options.throughput and not options.latency: + options.throughput = options.latency = True + if options.check_interval: + sys.setcheckinterval(options.check_interval) + if options.switch_interval: + sys.setswitchinterval(options.switch_interval) + + print("== %s %s (%s) ==" % ( + platform.python_implementation(), + platform.python_version(), + platform.python_build()[0], + )) + # Processor identification often has repeated spaces + cpu = ' '.join(platform.processor().split()) + print("== %s %s on '%s' ==" % ( + platform.machine(), + platform.system(), + cpu, + )) + print() + + if options.throughput: + print("--- Throughput ---") + print() + run_throughput_tests(options.nthreads) + + if options.latency: + print("--- Latency ---") + print() + run_latency_tests(options.nthreads) + +if __name__ == "__main__": + main() -- cgit v1.2.1