diff options
author | Taras <voyn1991@gmail.com> | 2017-10-11 17:41:23 +0300 |
---|---|---|
committer | Taras <voyn1991@gmail.com> | 2017-10-12 11:10:44 +0300 |
commit | 0557983b2ae05adc2f1076d5e670d693c8327ab9 (patch) | |
tree | 0ae50c93937ae6ea62be5b6f9cf7b2617c789f91 | |
parent | a8b25decf1d70e50223ab5c4fe5a122f0a9476ad (diff) | |
download | kafka-python-0557983b2ae05adc2f1076d5e670d693c8327ab9.tar.gz |
Added specific to record/ folder micro benchmarks to get exact speed change after updating to V2 message format
-rw-r--r-- | benchmarks/README | 4 | ||||
-rw-r--r-- | benchmarks/record_batch_compose.py | 73 | ||||
-rw-r--r-- | benchmarks/record_batch_read.py | 78 |
3 files changed, 155 insertions, 0 deletions
diff --git a/benchmarks/README b/benchmarks/README new file mode 100644 index 0000000..369e8b6 --- /dev/null +++ b/benchmarks/README @@ -0,0 +1,4 @@ +The `record_batch_*` benchmarks in this section are written using +``perf`` library, created by Viktor Stinner. For more information on how to get +reliable results of test runs please consult +http://perf.readthedocs.io/en/latest/run_benchmark.html. diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py new file mode 100644 index 0000000..11320ca --- /dev/null +++ b/benchmarks/record_batch_compose.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 +from __future__ import print_function +import perf +from kafka.record.memory_records import MemoryRecordsBuilder +import itertools +import random +import hashlib +import os + + +DEFAULT_BATCH_SIZE = 1600 * 1024 +KEY_SIZE = 6 +VALUE_SIZE = 60 +TIMESTAMP_RANGE = [1505824130000, 1505824140000] + +# With values above v1 record is 100 bytes, so 10_000 bytes for 100 messages +MESSAGES_PER_BATCH = 100 + + +def random_bytes(length): + buffer = bytearray(length) + for i in range(length): + buffer[i] = random.randint(0, 255) + return bytes(buffer) + + +def prepare(): + return iter(itertools.cycle([ + (random_bytes(KEY_SIZE), + random_bytes(VALUE_SIZE), + random.randint(*TIMESTAMP_RANGE) + ) + for _ in range(int(MESSAGES_PER_BATCH * 1.94)) + ])) + + +def finalize(results): + # Just some strange code to make sure PyPy does execute the main code + # properly, without optimizing it away + hash_val = hashlib.md5() + for buf in results: + hash_val.update(buf) + print(hash_val, file=open(os.devnull, "w")) + + +def func(loops, magic): + # Jit can optimize out the whole function if the result is the same each + # time, so we need some randomized input data ) + precomputed_samples = prepare() + results = [] + + # Main benchmark code. + t0 = perf.perf_counter() + for _ in range(loops): + batch = MemoryRecordsBuilder( + magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0) + for _ in range(MESSAGES_PER_BATCH): + key, value, timestamp = next(precomputed_samples) + size = batch.append(timestamp=timestamp, key=key, value=value) + assert size + batch.close() + results.append(batch.buffer()) + + res = perf.perf_counter() - t0 + + finalize(results) + + return res + + +runner = perf.Runner() +runner.bench_time_func('batch_append_v0', func, 0) +runner.bench_time_func('batch_append_v1', func, 1) diff --git a/benchmarks/record_batch_read.py b/benchmarks/record_batch_read.py new file mode 100644 index 0000000..4ded5b0 --- /dev/null +++ b/benchmarks/record_batch_read.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +from __future__ import print_function +import perf +from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder +import itertools +import random +import hashlib +import os + + +DEFAULT_BATCH_SIZE = 1600 * 1024 +KEY_SIZE = 6 +VALUE_SIZE = 60 +TIMESTAMP_RANGE = [1505824130000, 1505824140000] + +BATCH_SAMPLES = 5 +MESSAGES_PER_BATCH = 100 + + +def random_bytes(length): + buffer = bytearray(length) + for i in range(length): + buffer[i] = random.randint(0, 255) + return bytes(buffer) + + +def prepare(magic): + samples = [] + for _ in range(BATCH_SAMPLES): + batch = MemoryRecordsBuilder( + magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0) + for _ in range(MESSAGES_PER_BATCH): + size = batch.append( + random.randint(*TIMESTAMP_RANGE), + random_bytes(KEY_SIZE), + random_bytes(VALUE_SIZE)) + assert size + batch.close() + samples.append(bytes(batch.buffer())) + + return iter(itertools.cycle(samples)) + + +def finalize(results): + # Just some strange code to make sure PyPy does execute the code above + # properly + hash_val = hashlib.md5() + for buf in results: + hash_val.update(buf) + print(hash_val, file=open(os.devnull, "w")) + + +def func(loops, magic): + # Jit can optimize out the whole function if the result is the same each + # time, so we need some randomized input data ) + precomputed_samples = prepare(magic) + results = [] + + # Main benchmark code. + batch_data = next(precomputed_samples) + t0 = perf.perf_counter() + for _ in range(loops): + records = MemoryRecords(batch_data) + while records.has_next(): + batch = records.next_batch() + batch.validate_crc() + for record in batch: + results.append(record.value) + + res = perf.perf_counter() - t0 + finalize(results) + + return res + + +runner = perf.Runner() +runner.bench_time_func('batch_read_v0', func, 0) +runner.bench_time_func('batch_read_v1', func, 1) |