blob: 77a6673fa5805071abff896abdef13ff530a92ac (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
from __future__ import absolute_import
import os
import random
import string
import time
def random_string(length):
return "".join(random.choice(string.ascii_letters) for i in range(length))
def env_kafka_version():
"""Return the Kafka version set in the OS environment as a tuple.
Example: '0.8.1.1' --> (0, 8, 1, 1)
"""
if 'KAFKA_VERSION' not in os.environ:
return ()
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
def assert_message_count(messages, num_messages):
"""Check that we received the expected number of messages with no duplicates."""
# Make sure we got them all
assert len(messages) == num_messages
# Make sure there are no duplicates
# Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers,
# timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes.
unique_messages = {(m.key, m.value) for m in messages}
assert len(unique_messages) == num_messages
class Timer(object):
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *args):
self.end = time.time()
self.interval = self.end - self.start
|