summaryrefslogtreecommitdiff
path: root/kafka/transaction.py
blob: 10c2ebd0e032aba84c7e5cdf8aede1a6ec5f3fe7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
Transactional commit and rollback semantics for consumer.
"""
from logging import getLogger

from kafka.common import check_error, OffsetCommitRequest, OffsetOutOfRangeError


class KafkaTransaction(object):
    """
    Provides transactional commit/rollback semantics around a `SimpleConsumer`.

    Usage assumes that `auto_commit` is disabled, that messages are consumed in
    batches, and that the consuming process will record its own successful
    processing of each message. Both the commit and rollback operations respect
    a "high-water mark" to ensure that last unsuccessfully processed message
    will be retried.

    Example:

        consumer = SimpleConsumer(client, group, topic, auto_commit=False)
        consumer.provide_partition_info()

        while some_condition:
            with KafkaTransaction(consumer) as transaction:
                messages = consumer.get_messages(count, block=False)

                for partition, message in messages:
                    if can_process(message.value):
                        transaction.mark(partition, message.offset)
                    else:
                        break

                if not transaction:
                    sleep(delay)


    These semantics allow for deferred message processing (e.g. if `can_process`
    compares message time to clock time) and for repeated processing of the last
    unsuccessful message (until some external error is resolved).
    """

    def __init__(self, consumer):
        """
        :param consumer: an instance of `SimpleConsumer`
        """
        self.consumer = consumer
        self.initial_offsets = None
        self.high_water_mark = None
        self.logger = getLogger("kafka.transaction")

    def mark(self, partition, offset):
        """
        Set the high-water mark in the current transaction.

        In order to know the current partition, it is helpful to initialize
        the consumer to provide partition info via:

            consumer.provide_partition_info()
        """
        max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))

        self.logger.debug("Setting high-water mark to: %s",
                          {partition: max_offset})

        self.high_water_mark[partition] = max_offset

    def __nonzero__(self):
        """
        Return whether any operations were marked in the transaction.
        """
        return bool(self.high_water_mark)

    def __enter__(self):
        """
        Start a new transaction:

         -  Record the initial offsets for rollback
         -  Reset the high-water mark
        """
        self.initial_offsets = dict(self.consumer.offsets)
        self.high_water_mark = dict()

        self.logger.debug("Starting transaction at: %s", self.initial_offsets)

        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """
        End a transaction.

         -  If there was no exception, commit up to the current high-water mark.
         -  If there was an offset of range error, attempt to find the correct
            initial offset.
         -  If there was any other error, roll back to the initial offsets.
        """
        if exc_type is None:
            self.commit()
        elif isinstance(exc_value, OffsetOutOfRangeError):
            self.handle_out_of_range()
            return True
        else:
            self.rollback()

    def commit(self):
        """
        Commit this transaction:

         -  If the high-water mark has moved, commit up to and position the
            consumer at the high-water mark.
         -  Otherwise, reset to the consumer to the initial offsets.
        """
        if self.high_water_mark:
            self.logger.info("Committing transaction: %s", self.high_water_mark)
            self.commit_partition_offsets(self.high_water_mark)
            self.update_consumer_offsets(self.high_water_mark)
        else:
            self.update_consumer_offsets(self.initial_offsets)

    def rollback(self):
        """
        Rollback this transaction:

         -  Position the consumer at the initial offsets.
        """
        self.logger.info("Rolling back transaction: %s", self.initial_offsets)
        self.update_consumer_offsets(self.initial_offsets)

    def commit_partition_offsets(self, partition_offsets):
        """
        Commit explicit partition/offset pairs.
        """
        self.logger.debug("Committing partition offsets: %s", partition_offsets)

        commit_requests = [
            OffsetCommitRequest(self.consumer.topic, partition, offset, None)
            for partition, offset in partition_offsets.items()
        ]
        commit_responses = self.consumer.client.send_offset_commit_request(
            self.consumer.group,
            commit_requests,
        )
        for commit_response in commit_responses:
            check_error(commit_response)

    def update_consumer_offsets(self, partition_offsets):
        """
        Update consumer offsets to explicit positions.
        """
        self.logger.debug("Updating consumer offsets to: %s", partition_offsets)

        for partition, offset in partition_offsets.items():
            self.consumer.offsets[partition] = offset

        # consumer keeps other offset states beyond its `offsets` dictionary,
        # a relative seek with zero delta forces the consumer to reset to the
        # current value of the `offsets` dictionary
        self.consumer.seek(0, 1)

    def handle_out_of_range(self):
        """
        Handle out of range condition by seeking to the beginning of valid
        ranges.

        This assumes that an out of range doesn't happen by seeking past the end
        of valid ranges -- which is far less likely.
        """
        self.logger.info("Seeking beginning of partition on out of range error")
        self.consumer.seek(0, 0)