summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-09 16:52:01 -0800
committerDana Powers <dana.powers@rd.io>2016-01-09 16:52:01 -0800
commita94d8fa1cb670b65a4815a05cda6f774f555c632 (patch)
tree7676637b69d9346519d5c0a1465dc63a2e6fd15e
parentcc22d1bab82fd234f2a47d347152a321aaa0b53e (diff)
downloadkafka-python-kafka-2978.tar.gz
KAFKA-2978: consumer stops fetching when consumed and fetch positions get out of synckafka-2978
-rw-r--r--kafka/consumer/fetcher.py86
-rw-r--r--kafka/consumer/group.py4
-rw-r--r--kafka/consumer/subscription_state.py34
-rw-r--r--kafka/coordinator/consumer.py3
4 files changed, 53 insertions, 74 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 1593018..dfbb0d6 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -209,11 +209,11 @@ class Fetcher(six.Iterator):
log.debug("Ignoring fetched records for %s since it is no"
" longer fetchable", partition)
continue
- consumed = self._subscriptions.assignment[partition].consumed
- # ignore partition if its consumed offset != offset in FetchResponse
+ position = self._subscriptions.assignment[partition].position
+ # ignore partition if the current position != offset in FetchResponse
# e.g. after seek()
- if consumed is not None and offset == consumed:
- current_out_of_range_partitions[partition] = offset
+ if position is not None and offset == position:
+ current_out_of_range_partitions[partition] = position
self._offset_out_of_range_partitions.clear()
if current_out_of_range_partitions:
@@ -290,31 +290,30 @@ class Fetcher(six.Iterator):
" since it is no longer assigned", tp)
continue
- # note that the consumed position should always be available
+ # note that the position should always be available
# as long as the partition is still assigned
- consumed = self._subscriptions.assignment[tp].consumed
+ position = self._subscriptions.assignment[tp].position
if not self._subscriptions.is_fetchable(tp):
- # this can happen when a partition consumption paused before
+ # this can happen when a partition is paused before
# fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for assigned partition"
" %s since it is no longer fetchable", tp)
- # we also need to reset the fetch positions to pretend we did
- # not fetch this partition in the previous request at all
- self._subscriptions.assignment[tp].fetched = consumed
- elif fetch_offset == consumed:
+ elif fetch_offset == position:
next_offset = messages[-1][0] + 1
- log.debug("Returning fetched records for assigned partition %s"
- " and update consumed position to %s", tp, next_offset)
- self._subscriptions.assignment[tp].consumed = next_offset
+ log.debug("Returning fetched records at offset %d for assigned"
+ " partition %s and update position to %s", position,
+ tp, next_offset)
+ self._subscriptions.assignment[tp].position = next_offset
for record in self._unpack_message_set(tp, messages):
drained[tp].append(record)
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
- log.debug("Ignoring fetched records for %s at offset %s",
- tp, fetch_offset)
+ log.debug("Ignoring fetched records for %s at offset %s since"
+ " the current position is %d", tp, fetch_offset,
+ position)
return dict(drained)
def _unpack_message_set(self, tp, messages):
@@ -351,20 +350,16 @@ class Fetcher(six.Iterator):
# note that the consumed position should always be available
# as long as the partition is still assigned
- consumed = self._subscriptions.assignment[tp].consumed
+ position = self._subscriptions.assignment[tp].position
if not self._subscriptions.is_fetchable(tp):
# this can happen when a partition consumption paused before
# fetched records are returned
log.warning("Not returning fetched records for assigned partition"
" %s since it is no longer fetchable", tp)
- # we also need to reset the fetch positions to pretend we did
- # not fetch this partition in the previous request at all
- self._subscriptions.assignment[tp].fetched = consumed
-
- elif fetch_offset == consumed:
+ elif fetch_offset == position:
for msg in self._unpack_message_set(tp, messages):
- self._subscriptions.assignment[tp].consumed = msg.offset + 1
+ self._subscriptions.assignment[tp].position = msg.offset + 1
yield msg
else:
# these records aren't next in line based on the last consumed
@@ -494,19 +489,15 @@ class Fetcher(six.Iterator):
# if there is a leader and no in-flight requests,
# issue a new fetch but only fetch data for partitions whose
# previously fetched data has been consumed
- fetched = self._subscriptions.assignment[partition].fetched
- consumed = self._subscriptions.assignment[partition].consumed
- if consumed == fetched:
- partition_info = (
- partition.partition,
- fetched,
- self.config['max_partition_fetch_bytes']
- )
- fetchable[node_id][partition.topic].append(partition_info)
- else:
- log.debug("Skipping FetchRequest to %s because previously"
- " fetched offsets (%s) have not been fully"
- " consumed yet (%s)", node_id, fetched, consumed)
+ position = self._subscriptions.assignment[partition].position
+ partition_info = (
+ partition.partition,
+ position,
+ self.config['max_partition_fetch_bytes']
+ )
+ fetchable[node_id][partition.topic].append(partition_info)
+ log.debug("Adding fetch request for partition %d at offset %d",
+ partition, position)
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
@@ -541,15 +532,12 @@ class Fetcher(six.Iterator):
# we are interested in this fetch only if the beginning
# offset matches the current consumed position
- consumed = self._subscriptions.assignment[tp].consumed
- if consumed is None:
- continue
- elif consumed != fetch_offset:
- # the fetched position has gotten out of sync with the
- # consumed position (which might happen when a
- # rebalance occurs with a fetch in-flight), so we need
- # to reset the fetch position so the next fetch is right
- self._subscriptions.assignment[tp].fetched = consumed
+ position = self._subscriptions.assignment[tp].position
+ if position is None or position != fetch_offset:
+ log.debug("Discarding fetch response for partition %s"
+ " since its offset %d does not match the"
+ " expected offset %d", tp, fetch_offset,
+ position)
continue
partial = None
@@ -557,9 +545,11 @@ class Fetcher(six.Iterator):
partial = messages.pop()
if messages:
- last_offset, _, _ = messages[-1]
- self._subscriptions.assignment[tp].fetched = last_offset + 1
+ log.debug("Adding fetched record for partition %s with"
+ " offset %d to buffered record list", tp,
+ position)
self._records.append((fetch_offset, tp, messages))
+ #last_offset, _, _ = messages[-1]
#self.sensors.records_fetch_lag.record(highwater - last_offset)
elif partial:
# we did not read a single message from a non-empty
@@ -581,7 +571,7 @@ class Fetcher(six.Iterator):
else:
self._offset_out_of_range_partitions[tp] = fetch_offset
log.info("Fetch offset %s is out of range, resetting offset",
- self._subscriptions.assignment[tp].fetched)
+ fetch_offset)
elif error_type is Errors.TopicAuthorizationFailedError:
log.warn("Not authorized to read from topic %s.", tp.topic)
self._unauthorized_topics.add(tp.topic)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 9ce1438..4930ba1 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -452,10 +452,10 @@ class KafkaConsumer(six.Iterator):
"""
assert self._subscription.is_assigned(partition)
- offset = self._subscription.assignment[partition].consumed
+ offset = self._subscription.assignment[partition].position
if offset is None:
self._update_fetch_positions(partition)
- offset = self._subscription.assignment[partition].consumed
+ offset = self._subscription.assignment[partition].position
return offset
def pause(self, *partitions):
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index c60f192..9b52ffb 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -276,7 +276,7 @@ class SubscriptionState(object):
all_consumed = {}
for partition, state in six.iteritems(self.assignment):
if state.has_valid_position:
- all_consumed[partition] = OffsetAndMetadata(state.consumed, '')
+ all_consumed[partition] = OffsetAndMetadata(state.position, '')
return all_consumed
def need_offset_reset(self, partition, offset_reset_strategy=None):
@@ -332,41 +332,29 @@ class SubscriptionState(object):
class TopicPartitionState(object):
def __init__(self):
self.committed = None # last committed position
- self.has_valid_position = False # whether we have valid consumed and fetched positions
+ self.has_valid_position = False # whether we have valid position
self.paused = False # whether this partition has been paused by the user
self.awaiting_reset = False # whether we are awaiting reset
self.reset_strategy = None # the reset strategy if awaitingReset is set
- self._consumed = None # offset exposed to the user
- self._fetched = None # current fetch position
+ self._position = None # offset exposed to the user
- def _set_fetched(self, offset):
- assert self.has_valid_position, 'Valid consumed/fetch position required'
- self._fetched = offset
+ def _set_position(self, offset):
+ assert self.has_valid_position, 'Valid position required'
+ self._position = offset
- def _get_fetched(self):
- return self._fetched
+ def _get_position(self):
+ return self._position
- fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
-
- def _set_consumed(self, offset):
- assert self.has_valid_position, 'Valid consumed/fetch position required'
- self._consumed = offset
-
- def _get_consumed(self):
- return self._consumed
-
- consumed = property(_get_consumed, _set_consumed, None, "last consumed position")
+ position = property(_get_position, _set_position, None, "last position")
def await_reset(self, strategy):
self.awaiting_reset = True
self.reset_strategy = strategy
- self._consumed = None
- self._fetched = None
+ self._position = None
self.has_valid_position = False
def seek(self, offset):
- self._consumed = offset
- self._fetched = offset
+ self._position = offset
self.awaiting_reset = False
self.reset_strategy = None
self.has_valid_position = True
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 48d5e14..d728624 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -167,7 +167,8 @@ class ConsumerCoordinator(BaseCoordinator):
old_partitions_per_topic = self._partitions_per_topic
self._partitions_per_topic = {}
for topic in self._subscription.group_subscription():
- self._partitions_per_topic[topic] = set(self._cluster.partitions_for_topic(topic))
+ partitions = self._cluster.partitions_for_topic(topic) or []
+ self._partitions_per_topic[topic] = set(partitions)
if self._partitions_per_topic != old_partitions_per_topic:
return True