summaryrefslogtreecommitdiff
path: root/test/test_consumer.py
blob: edcc2d8c7fd7c49231ef14e83958f6e86bd7cbfd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import sys

from mock import MagicMock, patch
from . import unittest
import pytest

from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.errors import (
    FailedPayloadsError, KafkaConfigurationError, NotLeaderForPartitionError,
    UnknownTopicOrPartitionError)
from kafka.structs import (
    FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)


class TestKafkaConsumer:
    def test_session_timeout_larger_than_request_timeout_raises(self):
        with pytest.raises(KafkaConfigurationError):
            KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000)

    def test_fetch_max_wait_larger_than_request_timeout_raises(self):
        with pytest.raises(KafkaConfigurationError):
            KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000)

    def test_request_timeout_larger_than_connections_max_idle_ms_raises(self):
        with pytest.raises(KafkaConfigurationError):
            KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000)

    def test_subscription_copy(self):
        consumer = KafkaConsumer('foo', api_version=(0, 10))
        sub = consumer.subscription()
        assert sub is not consumer.subscription()
        assert sub == set(['foo'])
        sub.add('fizz')
        assert consumer.subscription() == set(['foo'])


class TestMultiProcessConsumer(unittest.TestCase):
    @unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
    def test_partition_list(self):
        client = MagicMock()
        partitions = (0,)
        with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
            MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
            self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
        self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member


class TestSimpleConsumer(unittest.TestCase):
    def test_non_integer_partitions(self):
        with self.assertRaises(AssertionError):
            SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])

    def test_simple_consumer_failed_payloads(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        def failed_payloads(payload):
            return FailedPayloadsError(payload)

        client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)

        # This should not raise an exception
        consumer.get_messages(5)

    def test_simple_consumer_leader_change(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # Mock so that only the first request gets a valid response
        def not_leader(request):
            return FetchResponsePayload(request.topic, request.partition,
                                 NotLeaderForPartitionError.errno, -1, ())

        client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)

        # This should not raise an exception
        consumer.get_messages(20)

        # client should have updated metadata
        self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
        self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)

    def test_simple_consumer_unknown_topic_partition(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # Mock so that only the first request gets a valid response
        def unknown_topic_partition(request):
            return FetchResponsePayload(request.topic, request.partition,
                                 UnknownTopicOrPartitionError.errno, -1, ())

        client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)

        # This should not raise an exception
        with self.assertRaises(UnknownTopicOrPartitionError):
            consumer.get_messages(20)

    def test_simple_consumer_commit_does_not_raise(self):
        client = MagicMock()
        client.get_partition_ids_for_topic.return_value = [0, 1]

        def mock_offset_fetch_request(group, payloads, **kwargs):
            return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads]

        client.send_offset_fetch_request.side_effect = mock_offset_fetch_request

        def mock_offset_commit_request(group, payloads, **kwargs):
            raise FailedPayloadsError(payloads[0])

        client.send_offset_commit_request.side_effect = mock_offset_commit_request

        consumer = SimpleConsumer(client, group='foobar',
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # Mock internal commit check
        consumer.count_since_commit = 10

        # This should not raise an exception
        self.assertFalse(consumer.commit(partitions=[0, 1]))

    def test_simple_consumer_reset_partition_offset(self):
        client = MagicMock()

        def mock_offset_request(payloads, **kwargs):
            raise FailedPayloadsError(payloads[0])

        client.send_offset_request.side_effect = mock_offset_request

        consumer = SimpleConsumer(client, group='foobar',
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # This should not raise an exception
        self.assertEqual(consumer.reset_partition_offset(0), None)

    @staticmethod
    def fail_requests_factory(error_factory):
        # Mock so that only the first request gets a valid response
        def fail_requests(payloads, **kwargs):
            responses = [
                FetchResponsePayload(payloads[0].topic, payloads[0].partition, 0, 0,
                              [OffsetAndMessage(
                                  payloads[0].offset + i,
                                  "msg %d" % (payloads[0].offset + i))
                               for i in range(10)]),
            ]
            for failure in payloads[1:]:
                responses.append(error_factory(failure))
            return responses
        return fail_requests