summaryrefslogtreecommitdiff
path: root/test/testutil.py
blob: ec4d70bf67f958242493ce0697235d4d4cd899ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from __future__ import absolute_import

import os
import random
import re
import string
import time


def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')):
    return _matcher.sub('_', string)


def random_string(length):
    return "".join(random.choice(string.ascii_letters) for i in range(length))


def env_kafka_version():
    """Return the Kafka version set in the OS environment as a tuple.

     Example: '0.8.1.1' --> (0, 8, 1, 1)
    """
    if 'KAFKA_VERSION' not in os.environ:
        return ()
    return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))


def assert_message_count(messages, num_messages):
    """Check that we received the expected number of messages with no duplicates."""
    # Make sure we got them all
    assert len(messages) == num_messages
    # Make sure there are no duplicates
    # Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers,
    # timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes.
    unique_messages = {(m.key, m.value) for m in messages}
    assert len(unique_messages) == num_messages


class Timer(object):
    def __enter__(self):
        self.start = time.time()
        return self

    def __exit__(self, *args):
        self.end = time.time()
        self.interval = self.end - self.start