summaryrefslogtreecommitdiff
path: root/benchmarks/record_batch_read.py
blob: fc01e425ec3a86f9577fea95f00205da280657e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python
from __future__ import print_function
import hashlib
import itertools
import os
import random

import perf

from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder


DEFAULT_BATCH_SIZE = 1600 * 1024
KEY_SIZE = 6
VALUE_SIZE = 60
TIMESTAMP_RANGE = [1505824130000, 1505824140000]

BATCH_SAMPLES = 5
MESSAGES_PER_BATCH = 100


def random_bytes(length):
    buffer = bytearray(length)
    for i in range(length):
        buffer[i] = random.randint(0, 255)
    return bytes(buffer)


def prepare(magic):
    samples = []
    for _ in range(BATCH_SAMPLES):
        batch = MemoryRecordsBuilder(
            magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
        for _ in range(MESSAGES_PER_BATCH):
            size = batch.append(
                random.randint(*TIMESTAMP_RANGE),
                random_bytes(KEY_SIZE),
                random_bytes(VALUE_SIZE),
                headers=[])
            assert size
        batch.close()
        samples.append(bytes(batch.buffer()))

    return iter(itertools.cycle(samples))


def finalize(results):
    # Just some strange code to make sure PyPy does execute the code above
    # properly
    hash_val = hashlib.md5()
    for buf in results:
        hash_val.update(buf)
    print(hash_val, file=open(os.devnull, "w"))


def func(loops, magic):
    # Jit can optimize out the whole function if the result is the same each
    # time, so we need some randomized input data )
    precomputed_samples = prepare(magic)
    results = []

    # Main benchmark code.
    batch_data = next(precomputed_samples)
    t0 = perf.perf_counter()
    for _ in range(loops):
        records = MemoryRecords(batch_data)
        while records.has_next():
            batch = records.next_batch()
            batch.validate_crc()
            for record in batch:
                results.append(record.value)

    res = perf.perf_counter() - t0
    finalize(results)

    return res


runner = perf.Runner()
runner.bench_time_func('batch_read_v0', func, 0)
runner.bench_time_func('batch_read_v1', func, 1)
runner.bench_time_func('batch_read_v2', func, 2)