summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-15 16:30:54 -0700
committerDana Powers <dana.powers@gmail.com>2019-09-28 16:33:01 -0700
commit5a653346a55d8ce3780833cbbd96735e40e74927 (patch)
tree1b65c6d16c3b2767ae47411ef2d733c06446a65b
parent5381591bac7f1322e7a54e4be65d1a54e2898732 (diff)
downloadkafka-python-5a653346a55d8ce3780833cbbd96735e40e74927.tar.gz
Wrap consumer.poll() for KafkaConsumer iteration
-rw-r--r--kafka/consumer/fetcher.py10
-rw-r--r--kafka/consumer/group.py57
2 files changed, 60 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 36e269f..17c818f 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -292,7 +292,7 @@ class Fetcher(six.Iterator):
raise Errors.KafkaTimeoutError(
"Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
- def fetched_records(self, max_records=None):
+ def fetched_records(self, max_records=None, update_offsets=True):
"""Returns previously fetched records and updates consumed offsets.
Arguments:
@@ -330,10 +330,11 @@ class Fetcher(six.Iterator):
else:
records_remaining -= self._append(drained,
self._next_partition_records,
- records_remaining)
+ records_remaining,
+ update_offsets)
return dict(drained), bool(self._completed_fetches)
- def _append(self, drained, part, max_records):
+ def _append(self, drained, part, max_records, update_offsets):
if not part:
return 0
@@ -366,7 +367,8 @@ class Fetcher(six.Iterator):
for record in part_records:
drained[tp].append(record)
- self._subscriptions.assignment[tp].position = next_offset
+ if update_offsets:
+ self._subscriptions.assignment[tp].position = next_offset
return len(part_records)
else:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index f9d0fb9..7b59567 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -302,7 +302,8 @@ class KafkaConsumer(six.Iterator):
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
- 'sasl_oauth_token_provider': None
+ 'sasl_oauth_token_provider': None,
+ 'legacy_iterator': True, # experimental feature
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
@@ -660,7 +661,7 @@ class KafkaConsumer(six.Iterator):
# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
- records, partial = self._fetcher.fetched_records(max_records)
+ records, partial = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator))
if records:
# Before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
@@ -680,7 +681,7 @@ class KafkaConsumer(six.Iterator):
if self._coordinator.need_rejoin():
return {}
- records, _ = self._fetcher.fetched_records(max_records)
+ records, _ = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator))
return records
def position(self, partition):
@@ -743,6 +744,9 @@ class KafkaConsumer(six.Iterator):
for partition in partitions:
log.debug("Pausing partition %s", partition)
self._subscription.pause(partition)
+ # Because the iterator checks is_fetchable() on each iteration
+ # we expect pauses to get handled automatically and therefore
+ # we do not need to reset the full iterator (forcing a full refetch)
def paused(self):
"""Get the partitions that were previously paused using
@@ -790,6 +794,8 @@ class KafkaConsumer(six.Iterator):
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
log.debug("Seeking to offset %s for partition %s", offset, partition)
self._subscription.assignment[partition].seek(offset)
+ if not self.config['legacy_iterator']:
+ self._iterator = None
def seek_to_beginning(self, *partitions):
"""Seek to the oldest available offset for partitions.
@@ -814,6 +820,8 @@ class KafkaConsumer(six.Iterator):
for tp in partitions:
log.debug("Seeking to beginning of partition %s", tp)
self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
+ if not self.config['legacy_iterator']:
+ self._iterator = None
def seek_to_end(self, *partitions):
"""Seek to the most recent available offset for partitions.
@@ -838,6 +846,8 @@ class KafkaConsumer(six.Iterator):
for tp in partitions:
log.debug("Seeking to end of partition %s", tp)
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
+ if not self.config['legacy_iterator']:
+ self._iterator = None
def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern.
@@ -913,6 +923,8 @@ class KafkaConsumer(six.Iterator):
self._client.cluster.need_all_topic_metadata = False
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
+ if not self.config['legacy_iterator']:
+ self._iterator = None
def metrics(self, raw=False):
"""Get metrics on consumer performance.
@@ -1075,6 +1087,25 @@ class KafkaConsumer(six.Iterator):
# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
+ def _message_generator_v2(self):
+ timeout_ms = 1000 * (self._consumer_timeout - time.time())
+ record_map = self.poll(timeout_ms=timeout_ms)
+ for tp, records in six.iteritems(record_map):
+ # Generators are stateful, and it is possible that the tp / records
+ # here may become stale during iteration -- i.e., we seek to a
+ # different offset, pause consumption, or lose assignment.
+ for record in records:
+ # is_fetchable(tp) should handle assignment changes and offset
+ # resets; for all other changes (e.g., seeks) we'll rely on the
+ # outer function destroying the existing iterator/generator
+ # via self._iterator = None
+ if not self._subscription.is_fetchable(tp):
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer fetchable", tp)
+ break
+ self._subscription.assignment[tp].position = record.offset + 1
+ yield record
+
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
@@ -1127,6 +1158,26 @@ class KafkaConsumer(six.Iterator):
return self
def __next__(self):
+ # Now that the heartbeat thread runs in the background
+ # there should be no reason to maintain a separate iterator
+ # but we'll keep it available for a few releases just in case
+ if self.config['legacy_iterator']:
+ return self.next_v1()
+ else:
+ return self.next_v2()
+
+ def next_v2(self):
+ self._set_consumer_timeout()
+ while time.time() < self._consumer_timeout:
+ if not self._iterator:
+ self._iterator = self._message_generator_v2()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise StopIteration()
+
+ def next_v1(self):
if not self._iterator:
self._iterator = self._message_generator()