summaryrefslogtreecommitdiff
path: root/kafka/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/context.py')
-rw-r--r--kafka/context.py178
1 files changed, 0 insertions, 178 deletions
diff --git a/kafka/context.py b/kafka/context.py
deleted file mode 100644
index 1ebc71d..0000000
--- a/kafka/context.py
+++ /dev/null
@@ -1,178 +0,0 @@
-"""
-Context manager to commit/rollback consumer offsets.
-"""
-from __future__ import absolute_import
-
-from logging import getLogger
-
-from kafka.errors import check_error, OffsetOutOfRangeError
-from kafka.structs import OffsetCommitRequestPayload
-
-
-class OffsetCommitContext(object):
- """
- Provides commit/rollback semantics around a `SimpleConsumer`.
-
- Usage assumes that `auto_commit` is disabled, that messages are consumed in
- batches, and that the consuming process will record its own successful
- processing of each message. Both the commit and rollback operations respect
- a "high-water mark" to ensure that last unsuccessfully processed message
- will be retried.
-
- Example:
-
- .. code:: python
-
- consumer = SimpleConsumer(client, group, topic, auto_commit=False)
- consumer.provide_partition_info()
- consumer.fetch_last_known_offsets()
-
- while some_condition:
- with OffsetCommitContext(consumer) as context:
- messages = consumer.get_messages(count, block=False)
-
- for partition, message in messages:
- if can_process(message):
- context.mark(partition, message.offset)
- else:
- break
-
- if not context:
- sleep(delay)
-
-
- These semantics allow for deferred message processing (e.g. if `can_process`
- compares message time to clock time) and for repeated processing of the last
- unsuccessful message (until some external error is resolved).
- """
-
- def __init__(self, consumer):
- """
- :param consumer: an instance of `SimpleConsumer`
- """
- self.consumer = consumer
- self.initial_offsets = None
- self.high_water_mark = None
- self.logger = getLogger("kafka.context")
-
- def mark(self, partition, offset):
- """
- Set the high-water mark in the current context.
-
- In order to know the current partition, it is helpful to initialize
- the consumer to provide partition info via:
-
- .. code:: python
-
- consumer.provide_partition_info()
-
- """
- max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
-
- self.logger.debug("Setting high-water mark to: %s",
- {partition: max_offset})
-
- self.high_water_mark[partition] = max_offset
-
- def __nonzero__(self):
- """
- Return whether any operations were marked in the context.
- """
- return bool(self.high_water_mark)
-
- def __enter__(self):
- """
- Start a new context:
-
- - Record the initial offsets for rollback
- - Reset the high-water mark
- """
- self.initial_offsets = dict(self.consumer.offsets)
- self.high_water_mark = dict()
-
- self.logger.debug("Starting context at: %s", self.initial_offsets)
-
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- """
- End a context.
-
- - If there was no exception, commit up to the current high-water mark.
- - If there was an offset of range error, attempt to find the correct
- initial offset.
- - If there was any other error, roll back to the initial offsets.
- """
- if exc_type is None:
- self.commit()
- elif isinstance(exc_value, OffsetOutOfRangeError):
- self.handle_out_of_range()
- return True
- else:
- self.rollback()
-
- def commit(self):
- """
- Commit this context's offsets:
-
- - If the high-water mark has moved, commit up to and position the
- consumer at the high-water mark.
- - Otherwise, reset to the consumer to the initial offsets.
- """
- if self.high_water_mark:
- self.logger.info("Committing offsets: %s", self.high_water_mark)
- self.commit_partition_offsets(self.high_water_mark)
- self.update_consumer_offsets(self.high_water_mark)
- else:
- self.update_consumer_offsets(self.initial_offsets)
-
- def rollback(self):
- """
- Rollback this context:
-
- - Position the consumer at the initial offsets.
- """
- self.logger.info("Rolling back context: %s", self.initial_offsets)
- self.update_consumer_offsets(self.initial_offsets)
-
- def commit_partition_offsets(self, partition_offsets):
- """
- Commit explicit partition/offset pairs.
- """
- self.logger.debug("Committing partition offsets: %s", partition_offsets)
-
- commit_requests = [
- OffsetCommitRequestPayload(self.consumer.topic, partition, offset, None)
- for partition, offset in partition_offsets.items()
- ]
- commit_responses = self.consumer.client.send_offset_commit_request(
- self.consumer.group,
- commit_requests,
- )
- for commit_response in commit_responses:
- check_error(commit_response)
-
- def update_consumer_offsets(self, partition_offsets):
- """
- Update consumer offsets to explicit positions.
- """
- self.logger.debug("Updating consumer offsets to: %s", partition_offsets)
-
- for partition, offset in partition_offsets.items():
- self.consumer.offsets[partition] = offset
-
- # consumer keeps other offset states beyond its `offsets` dictionary,
- # a relative seek with zero delta forces the consumer to reset to the
- # current value of the `offsets` dictionary
- self.consumer.seek(0, 1)
-
- def handle_out_of_range(self):
- """
- Handle out of range condition by seeking to the beginning of valid
- ranges.
-
- This assumes that an out of range doesn't happen by seeking past the end
- of valid ranges -- which is far less likely.
- """
- self.logger.info("Seeking beginning of partition on out of range error")
- self.consumer.seek(0, 0)