summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-14 23:37:07 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commitd5ec0a60212fb37f77f48d9ed757135364d6aed5 (patch)
tree82b0a06e585b5e2a46b846de7cfbc3fae2161f80
parent93dd705ea7ce59e3f0a96b4d716bcbc59b8cb5a1 (diff)
downloadkafka-python-d5ec0a60212fb37f77f48d9ed757135364d6aed5.tar.gz
Support setting offsets in set_topic_partitions(); reorganize offsets initialization
-rw-r--r--kafka/consumer/new.py278
1 files changed, 151 insertions, 127 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index 39db60c..edf7843 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -1,4 +1,4 @@
-from collections import defaultdict, namedtuple
+from collections import namedtuple
from copy import deepcopy
import logging
import sys
@@ -11,8 +11,7 @@ from kafka.common import (
OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
- FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError,
- TopicAndPartition
+ FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
logger = logging.getLogger(__name__)
@@ -125,30 +124,12 @@ class KafkaConsumer(object):
self.set_topic_partitions(*topics)
self._msg_iter = None
- # Setup offsets
- self._offsets = OffsetsStruct(fetch=defaultdict(dict),
- commit=defaultdict(dict),
- highwater= defaultdict(dict),
- task_done=defaultdict(dict))
-
- # If we have a consumer group, try to fetch stored offsets
- if self._config['group_id']:
- self._fetch_stored_offsets()
- else:
- self._auto_reset_offsets()
-
- # highwater marks (received from server on fetch response)
- # and task_done (set locally by user)
- # should always get initialized to None
- self._reset_highwater_offsets()
- self._reset_task_done_offsets()
-
- def _fetch_stored_offsets(self):
+ def _get_commit_offsets(self):
logger.info("Consumer fetching stored offsets")
- for topic, partition in self._topics:
+ for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
self._config['group_id'],
- [OffsetFetchRequest(topic, partition)],
+ [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
check_error(resp)
@@ -159,29 +140,20 @@ class KafkaConsumer(object):
# -1 offset signals no commit is currently stored
if resp.offset == -1:
- self._offsets.commit[topic][partition] = None
- self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition)
+ self._offsets.commit[topic_partition] = None
# Otherwise we committed the stored offset
# and need to fetch the next one
else:
- self._offsets.commit[topic][partition] = resp.offset
- self._offsets.fetch[topic][partition] = resp.offset
-
- def _auto_reset_offsets(self):
- logger.info("Consumer auto-resetting offsets")
- for topic, partition in self._topics:
-
- self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition)
- self._offsets.commit[topic][partition] = None
+ self._offsets.commit[topic_partition] = resp.offset
def _reset_highwater_offsets(self):
- for topic, partition in self._topics:
- self._offsets.highwater[topic][partition] = None
+ for topic_partition in self._topics:
+ self._offsets.highwater[topic_partition] = None
def _reset_task_done_offsets(self):
- for topic, partition in self._topics:
- self._offsets.task_done[topic][partition] = None
+ for topic_partition in self._topics:
+ self._offsets.task_done[topic_partition] = None
def __repr__(self):
return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
@@ -224,12 +196,16 @@ class KafkaConsumer(object):
return dict(deepcopy(getattr(self._offsets, group)))
def task_done(self, message):
- topic = message.topic
- partition = message.partition
+ """
+ Mark a fetched message as consumed.
+ Offsets for messages marked as "task_done" will be stored back
+ to the kafka cluster for this consumer group on commit()
+ """
+ topic_partition = (message.topic, message.partition)
offset = message.offset
# Warn on non-contiguous offsets
- prev_done = self._offsets.task_done[topic][partition]
+ prev_done = self._offsets.task_done[topic_partition]
if prev_done is not None and offset != (prev_done + 1):
logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
offset, prev_done)
@@ -237,12 +213,12 @@ class KafkaConsumer(object):
# Warn on smaller offsets than previous commit
# "commit" offsets are actually the offset of the next # message to fetch.
# so task_done should be compared with (commit - 1)
- prev_done = (self._offsets.commit[topic][partition] - 1)
+ prev_done = (self._offsets.commit[topic_partition] - 1)
if prev_done is not None and (offset <= prev_done):
logger.warning('Marking task_done on a previously committed offset?: %d <= %d',
offset, prev_done)
- self._offsets.task_done[topic][partition] = offset
+ self._offsets.task_done[topic_partition] = offset
def should_auto_commit(self):
if not self._config['auto_commit_enable']:
@@ -267,24 +243,23 @@ class KafkaConsumer(object):
offsets = self._offsets.task_done
commits = []
- for topic, partitions in offsets.iteritems():
- for partition, task_done in partitions.iteritems():
+ for topic_partition, task_done_offset in offsets.iteritems():
- # Skip if None
- if task_done is None:
- continue
+ # Skip if None
+ if task_done_offset is None:
+ continue
- # Commit offsets as the next offset to fetch
- # which is consistent with the Java Client
- # task_done is marked by messages consumed,
- # so add one to mark the next message for fetching
- commit_offset = (task_done + 1)
+ # Commit offsets as the next offset to fetch
+ # which is consistent with the Java Client
+ # task_done is marked by messages consumed,
+ # so add one to mark the next message for fetching
+ commit_offset = (task_done_offset + 1)
- # Skip if no change from previous committed
- if commit_offset == self._offsets.commit[topic][partition]:
- continue
+ # Skip if no change from previous committed
+ if commit_offset == self._offsets.commit[topic_partition]:
+ continue
- commits.append(OffsetCommitRequest(topic, partition, commit_offset, metadata))
+ commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
@@ -294,8 +269,9 @@ class KafkaConsumer(object):
for r in resps:
check_error(r)
- task_done = self._offsets.task_done[r.topic][r.partition]
- self._offsets.commit[r.topic][r.partition] = (task_done + 1)
+ topic_partition = (r.topic, r.partition)
+ task_done = self._offsets.task_done[topic_partition]
+ self._offsets.commit[topic_partition] = (task_done + 1)
if self._config['auto_commit_enable']:
self._set_next_auto_commit_time()
@@ -335,82 +311,128 @@ class KafkaConsumer(object):
def set_topic_partitions(self, *topics):
"""
Set the topic/partitions to consume
+ Optionally specify offsets to start from
Accepts types:
- str - topic name, will consume all available partitions
- TopicAndPartition namedtuple - will consume topic/partition
- tuple - will consume (topic, partition)
- dict - will consume { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
+ str: topic name (will consume all available partitions)
+ tuple: (topic, partition)
+ dict: { topic: partition }
+ { topic: [partition list] }
+ { topic: (partition tuple,) }
+
+ Optionally, offsets can be specified directly:
+ tuple: (topic, partition, offset)
+ dict: { (topic, partition): offset, ... }
Ex:
kafka = KafkaConsumer()
# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
+
+ # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
+ # using tuples --
+ kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+
+ # using dict --
+ kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
"""
self._topics = []
self._client.load_metadata_for_topics()
+ # Setup offsets
+ self._offsets = OffsetsStruct(fetch=dict(),
+ commit=dict(),
+ highwater=dict(),
+ task_done=dict())
+
# Handle different topic types
for arg in topics:
# Topic name str -- all partitions
if isinstance(arg, six.string_types):
- for partition in self.client.get_partition_ids_for_topic(arg):
- self._topics.append(TopicAndPartition(arg, partition))
-
- # TopicAndPartition namedtuple
- elif isinstance(arg, TopicAndPartition):
- self._topics.append(arg)
+ topic = arg
+ for partition in self._client.get_partition_ids_for_topic(arg):
+ self._consume_topic_partition(topic, partition)
- # (topic, partition) tuple
+ # (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
- if not isinstance(arg[0], six.string_types):
- raise KafkaConfigurationError('Unknown topic type in (topic, '
- 'partition) tuple (expected string)')
- if not isinstance(arg[1], int):
- raise KafkaConfigurationError('Unknown partition type in (topic, '
- 'partition) tuple (expected int)')
- self._topics.append(TopicAndPartition(*arg))
+ (topic, partition) = arg[0:2]
+ if len(arg) == 3:
+ offset = arg[2]
+ self._offsets.fetch[(topic, partition)] = offset
+ self._consume_topic_partition(topic, partition)
# { topic: partitions, ... } dict
elif isinstance(arg, dict):
- for topic in arg:
-
- if not isinstance(topic, six.string_types):
- raise KafkaConfigurationError('Unknown topic type in {topic: '
- 'partitions} dict (expected string)')
- # partition can either be a single partition int
- if isinstance(arg[topic], int):
- self._topics.append(TopicAndPartition(topic, arg[topic]))
-
- # or a list/tuple of partition ints
- elif isinstance(arg[topic], (list, tuple)):
- for partition in arg[topic]:
- if not isinstance(arg[topic], int):
- raise KafkaConfigurationError('Unknown partition type in {topic: '
- 'partitions} dict (expected list '
- 'or tuple of ints)')
- self._topics.append(TopicAndPartition(topic, partition))
- else:
- raise KafkaConfigurationError('Unknown topic type (dict key must be '
- 'int or list/tuple of ints)')
+ for key, value in arg.iteritems():
+
+ # key can be string (a topic)
+ if isinstance(key, six.string_types):
+
+ # topic: partition
+ if isinstance(value, int):
+ self._consume_topic_partition(key, value)
+
+ # topic: [ partition1, partition2, ... ]
+ elif isinstance(value, (list, tuple)):
+ for partition in value:
+ self._consume_topic_partition(key, partition)
+ else:
+ raise KafkaConfigurationError('Unknown topic type (dict key must be '
+ 'int or list/tuple of ints)')
+
+ # (topic, partition): offset
+ elif isinstance(key, tuple):
+ self._consume_topic_partition(*key)
+ self._offsets.fetch[key] = value
+
else:
- raise KafkaConfigurationError('Unknown topic type (topic must be '
- 'string, TopicAndPartition, '
- '(topic,partition) tuple, or {topic: '
- 'partitions} dict)')
-
- # Verify that all topic/partitions exist in metadata
- for topic, partition in self._topics:
- if topic not in self.client.topic_partitions:
- raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
- if partition not in self.client.get_partition_ids_for_topic(topic):
- raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
- "in broker metadata" % (partition, topic))
- logger.info("Configuring consumer to fetch topic '%s'", topic)
+ raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
+
+ # If we have a consumer group, try to fetch stored offsets
+ if self._config['group_id']:
+ self._get_commit_offsets()
+
+ # Update missing fetch/commit offsets
+ for topic_partition in self._topics:
+
+ # Commit offsets default is None
+ if topic_partition not in self._offsets.commit:
+ self._offsets.commit[topic_partition] = None
+
+ # Skip if we already have a fetch offset from user args
+ if topic_partition not in self._offsets.fetch:
+
+ # Fetch offsets default is (1) commit
+ if self._offsets.commit[topic_partition] is not None:
+ self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
+
+ # or (2) auto reset
+ else:
+ self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
+
+ # highwater marks (received from server on fetch response)
+ # and task_done (set locally by user)
+ # should always get initialized to None
+ self._reset_highwater_offsets()
+ self._reset_task_done_offsets()
+
+ def _consume_topic_partition(self, topic, partition):
+ if not isinstance(topic, six.string_types):
+ raise KafkaConfigurationError('Unknown topic type (%s) '
+ '-- expected string' % type(topic))
+ if not isinstance(partition, int):
+ raise KafkaConfigurationError('Unknown partition type (%s) '
+ '-- expected int' % type(partition))
+
+ if topic not in self._client.topic_partitions:
+ raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
+ if partition not in self._client.get_partition_ids_for_topic(topic):
+ raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
+ "in broker metadata" % (partition, topic))
+ logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
+ self._topics.append((topic, partition))
def fetch_messages(self):
@@ -420,9 +442,8 @@ class KafkaConsumer(object):
fetches = []
offsets = self._offsets.fetch
- for topic, partitions in offsets.iteritems():
- for partition, offset in partitions.iteritems():
- fetches.append(FetchRequest(topic, partition, offset, max_bytes))
+ for topic_partition, offset in offsets.iteritems():
+ fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
# client.send_fetch_request will collect topic/partition requests by leader
# and send each group as a single FetchRequest to the correct broker
@@ -437,48 +458,51 @@ class KafkaConsumer(object):
return
for resp in responses:
- topic = resp.topic
- partition = resp.partition
+ topic_partition = (resp.topic, resp.partition)
try:
check_error(resp)
except OffsetOutOfRangeError:
logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
'(Highwatermark: %d)',
- topic, partition, offsets[topic][partition],
- resp.highwaterMark)
+ resp.topic, resp.partition,
+ offsets[topic_partition], resp.highwaterMark)
# Reset offset
- self._offsets.fetch[topic][partition] = self._reset_partition_offset(topic, partition)
+ self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
continue
except NotLeaderForPartitionError:
logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
- topic, partition)
+ resp.topic, resp.partition)
self._refresh_metadata_on_error()
continue
except RequestTimedOutError:
- logger.warning("RequestTimedOutError for %s - %d", topic, partition)
+ logger.warning("RequestTimedOutError for %s - %d",
+ resp.topic, resp.partition)
continue
# Track server highwater mark
- self._offsets.highwater[topic][partition] = resp.highwaterMark
+ self._offsets.highwater[topic_partition] = resp.highwaterMark
# Yield each message
# Kafka-python could raise an exception during iteration
# we are not catching -- user will need to address
for (offset, message) in resp.messages:
# deserializer_class could raise an exception here
- msg = KafkaMessage(topic, partition, offset, message.key,
+ msg = KafkaMessage(resp.topic,
+ resp.partition,
+ offset, message.key,
self._config['deserializer_class'](message.value))
# Only increment fetch offset if we safely got the message and deserialized
- self._offsets.fetch[topic][partition] = offset + 1
+ self._offsets.fetch[topic_partition] = offset + 1
# Then yield to user
yield msg
- def _reset_partition_offset(self, topic, partition):
+ def _reset_partition_offset(self, topic_partition):
+ (topic, partition) = topic_partition
LATEST = -1
EARLIEST = -2