1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
import sys
from mock import MagicMock, patch
from . import unittest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponsePayload,
FailedPayloadsError, OffsetAndMessage,
NotLeaderForPartitionError, UnknownTopicOrPartitionError
)
class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
class TestMultiProcessConsumer(unittest.TestCase):
@unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
def test_partition_list(self):
client = MagicMock()
partitions = (0,)
with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
class TestSimpleConsumer(unittest.TestCase):
def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
topic='topic', partitions=[0, 1],
auto_commit=False)
def failed_payloads(payload):
return FailedPayloadsError(payload)
client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)
# This should not raise an exception
consumer.get_messages(5)
def test_simple_consumer_leader_change(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
topic='topic', partitions=[0, 1],
auto_commit=False)
# Mock so that only the first request gets a valid response
def not_leader(request):
return FetchResponsePayload(request.topic, request.partition,
NotLeaderForPartitionError.errno, -1, ())
client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)
# This should not raise an exception
consumer.get_messages(20)
# client should have updated metadata
self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
def test_simple_consumer_unknown_topic_partition(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
topic='topic', partitions=[0, 1],
auto_commit=False)
# Mock so that only the first request gets a valid response
def unknown_topic_partition(request):
return FetchResponsePayload(request.topic, request.partition,
UnknownTopicOrPartitionError.errno, -1, ())
client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)
# This should not raise an exception
with self.assertRaises(UnknownTopicOrPartitionError):
consumer.get_messages(20)
def test_simple_consumer_commit_does_not_raise(self):
client = MagicMock()
client.get_partition_ids_for_topic.return_value = [0, 1]
def mock_offset_fetch_request(group, payloads, **kwargs):
return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads]
client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
def mock_offset_commit_request(group, payloads, **kwargs):
raise FailedPayloadsError(payloads[0])
client.send_offset_commit_request.side_effect = mock_offset_commit_request
consumer = SimpleConsumer(client, group='foobar',
topic='topic', partitions=[0, 1],
auto_commit=False)
# Mock internal commit check
consumer.count_since_commit = 10
# This should not raise an exception
self.assertFalse(consumer.commit(partitions=[0, 1]))
def test_simple_consumer_reset_partition_offset(self):
client = MagicMock()
def mock_offset_request(payloads, **kwargs):
raise FailedPayloadsError(payloads[0])
client.send_offset_request.side_effect = mock_offset_request
consumer = SimpleConsumer(client, group='foobar',
topic='topic', partitions=[0, 1],
auto_commit=False)
# This should not raise an exception
self.assertEqual(consumer.reset_partition_offset(0), None)
@staticmethod
def fail_requests_factory(error_factory):
# Mock so that only the first request gets a valid response
def fail_requests(payloads, **kwargs):
responses = [
FetchResponsePayload(payloads[0].topic, payloads[0].partition, 0, 0,
[OffsetAndMessage(
payloads[0].offset + i,
"msg %d" % (payloads[0].offset + i))
for i in range(10)]),
]
for failure in payloads[1:]:
responses.append(error_factory(failure))
return responses
return fail_requests
|