summaryrefslogtreecommitdiff
path: root/test/test_consumer.py
blob: e6642922d5e70396755f72f3bd90ba7b8699cb3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import sys

from mock import MagicMock, patch
from . import unittest

from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
    KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponsePayload,
    FailedPayloadsError, OffsetAndMessage,
    NotLeaderForPartitionError, UnknownTopicOrPartitionError
)


class TestKafkaConsumer(unittest.TestCase):
    def test_non_integer_partitions(self):
        with self.assertRaises(AssertionError):
            SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])


class TestMultiProcessConsumer(unittest.TestCase):
    @unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
    def test_partition_list(self):
        client = MagicMock()
        partitions = (0,)
        with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
            MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
            self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
        self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member

class TestSimpleConsumer(unittest.TestCase):
    def test_simple_consumer_failed_payloads(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        def failed_payloads(payload):
            return FailedPayloadsError(payload)

        client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)

        # This should not raise an exception
        consumer.get_messages(5)

    def test_simple_consumer_leader_change(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # Mock so that only the first request gets a valid response
        def not_leader(request):
            return FetchResponsePayload(request.topic, request.partition,
                                 NotLeaderForPartitionError.errno, -1, ())

        client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)

        # This should not raise an exception
        consumer.get_messages(20)

        # client should have updated metadata
        self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
        self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)

    def test_simple_consumer_unknown_topic_partition(self):
        client = MagicMock()
        consumer = SimpleConsumer(client, group=None,
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # Mock so that only the first request gets a valid response
        def unknown_topic_partition(request):
            return FetchResponsePayload(request.topic, request.partition,
                                 UnknownTopicOrPartitionError.errno, -1, ())

        client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)

        # This should not raise an exception
        with self.assertRaises(UnknownTopicOrPartitionError):
            consumer.get_messages(20)

    def test_simple_consumer_commit_does_not_raise(self):
        client = MagicMock()
        client.get_partition_ids_for_topic.return_value = [0, 1]

        def mock_offset_fetch_request(group, payloads, **kwargs):
            return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads]

        client.send_offset_fetch_request.side_effect = mock_offset_fetch_request

        def mock_offset_commit_request(group, payloads, **kwargs):
            raise FailedPayloadsError(payloads[0])

        client.send_offset_commit_request.side_effect = mock_offset_commit_request

        consumer = SimpleConsumer(client, group='foobar',
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # Mock internal commit check
        consumer.count_since_commit = 10

        # This should not raise an exception
        self.assertFalse(consumer.commit(partitions=[0, 1]))

    def test_simple_consumer_reset_partition_offset(self):
        client = MagicMock()

        def mock_offset_request(payloads, **kwargs):
            raise FailedPayloadsError(payloads[0])

        client.send_offset_request.side_effect = mock_offset_request

        consumer = SimpleConsumer(client, group='foobar',
                                  topic='topic', partitions=[0, 1],
                                  auto_commit=False)

        # This should not raise an exception
        self.assertEqual(consumer.reset_partition_offset(0), None)

    @staticmethod
    def fail_requests_factory(error_factory):
        # Mock so that only the first request gets a valid response
        def fail_requests(payloads, **kwargs):
            responses = [
                FetchResponsePayload(payloads[0].topic, payloads[0].partition, 0, 0,
                              [OffsetAndMessage(
                                  payloads[0].offset + i,
                                  "msg %d" % (payloads[0].offset + i))
                               for i in range(10)]),
            ]
            for failure in payloads[1:]:
                responses.append(error_factory(failure))
            return responses
        return fail_requests