summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2017-10-11 17:41:23 +0300
committerTaras <voyn1991@gmail.com>2017-10-12 11:10:44 +0300
commit0557983b2ae05adc2f1076d5e670d693c8327ab9 (patch)
tree0ae50c93937ae6ea62be5b6f9cf7b2617c789f91
parenta8b25decf1d70e50223ab5c4fe5a122f0a9476ad (diff)
downloadkafka-python-0557983b2ae05adc2f1076d5e670d693c8327ab9.tar.gz
Added specific to record/ folder micro benchmarks to get exact speed change after updating to V2 message format
-rw-r--r--benchmarks/README4
-rw-r--r--benchmarks/record_batch_compose.py73
-rw-r--r--benchmarks/record_batch_read.py78
3 files changed, 155 insertions, 0 deletions
diff --git a/benchmarks/README b/benchmarks/README
new file mode 100644
index 0000000..369e8b6
--- /dev/null
+++ b/benchmarks/README
@@ -0,0 +1,4 @@
+The `record_batch_*` benchmarks in this section are written using
+``perf`` library, created by Viktor Stinner. For more information on how to get
+reliable results of test runs please consult
+http://perf.readthedocs.io/en/latest/run_benchmark.html.
diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py
new file mode 100644
index 0000000..11320ca
--- /dev/null
+++ b/benchmarks/record_batch_compose.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python3
+from __future__ import print_function
+import perf
+from kafka.record.memory_records import MemoryRecordsBuilder
+import itertools
+import random
+import hashlib
+import os
+
+
+DEFAULT_BATCH_SIZE = 1600 * 1024
+KEY_SIZE = 6
+VALUE_SIZE = 60
+TIMESTAMP_RANGE = [1505824130000, 1505824140000]
+
+# With values above v1 record is 100 bytes, so 10_000 bytes for 100 messages
+MESSAGES_PER_BATCH = 100
+
+
+def random_bytes(length):
+ buffer = bytearray(length)
+ for i in range(length):
+ buffer[i] = random.randint(0, 255)
+ return bytes(buffer)
+
+
+def prepare():
+ return iter(itertools.cycle([
+ (random_bytes(KEY_SIZE),
+ random_bytes(VALUE_SIZE),
+ random.randint(*TIMESTAMP_RANGE)
+ )
+ for _ in range(int(MESSAGES_PER_BATCH * 1.94))
+ ]))
+
+
+def finalize(results):
+ # Just some strange code to make sure PyPy does execute the main code
+ # properly, without optimizing it away
+ hash_val = hashlib.md5()
+ for buf in results:
+ hash_val.update(buf)
+ print(hash_val, file=open(os.devnull, "w"))
+
+
+def func(loops, magic):
+ # Jit can optimize out the whole function if the result is the same each
+ # time, so we need some randomized input data )
+ precomputed_samples = prepare()
+ results = []
+
+ # Main benchmark code.
+ t0 = perf.perf_counter()
+ for _ in range(loops):
+ batch = MemoryRecordsBuilder(
+ magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
+ for _ in range(MESSAGES_PER_BATCH):
+ key, value, timestamp = next(precomputed_samples)
+ size = batch.append(timestamp=timestamp, key=key, value=value)
+ assert size
+ batch.close()
+ results.append(batch.buffer())
+
+ res = perf.perf_counter() - t0
+
+ finalize(results)
+
+ return res
+
+
+runner = perf.Runner()
+runner.bench_time_func('batch_append_v0', func, 0)
+runner.bench_time_func('batch_append_v1', func, 1)
diff --git a/benchmarks/record_batch_read.py b/benchmarks/record_batch_read.py
new file mode 100644
index 0000000..4ded5b0
--- /dev/null
+++ b/benchmarks/record_batch_read.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+from __future__ import print_function
+import perf
+from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
+import itertools
+import random
+import hashlib
+import os
+
+
+DEFAULT_BATCH_SIZE = 1600 * 1024
+KEY_SIZE = 6
+VALUE_SIZE = 60
+TIMESTAMP_RANGE = [1505824130000, 1505824140000]
+
+BATCH_SAMPLES = 5
+MESSAGES_PER_BATCH = 100
+
+
+def random_bytes(length):
+ buffer = bytearray(length)
+ for i in range(length):
+ buffer[i] = random.randint(0, 255)
+ return bytes(buffer)
+
+
+def prepare(magic):
+ samples = []
+ for _ in range(BATCH_SAMPLES):
+ batch = MemoryRecordsBuilder(
+ magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
+ for _ in range(MESSAGES_PER_BATCH):
+ size = batch.append(
+ random.randint(*TIMESTAMP_RANGE),
+ random_bytes(KEY_SIZE),
+ random_bytes(VALUE_SIZE))
+ assert size
+ batch.close()
+ samples.append(bytes(batch.buffer()))
+
+ return iter(itertools.cycle(samples))
+
+
+def finalize(results):
+ # Just some strange code to make sure PyPy does execute the code above
+ # properly
+ hash_val = hashlib.md5()
+ for buf in results:
+ hash_val.update(buf)
+ print(hash_val, file=open(os.devnull, "w"))
+
+
+def func(loops, magic):
+ # Jit can optimize out the whole function if the result is the same each
+ # time, so we need some randomized input data )
+ precomputed_samples = prepare(magic)
+ results = []
+
+ # Main benchmark code.
+ batch_data = next(precomputed_samples)
+ t0 = perf.perf_counter()
+ for _ in range(loops):
+ records = MemoryRecords(batch_data)
+ while records.has_next():
+ batch = records.next_batch()
+ batch.validate_crc()
+ for record in batch:
+ results.append(record.value)
+
+ res = perf.perf_counter() - t0
+ finalize(results)
+
+ return res
+
+
+runner = perf.Runner()
+runner.bench_time_func('batch_read_v0', func, 0)
+runner.bench_time_func('batch_read_v1', func, 1)