summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-28 19:19:29 -0700
committerGitHub <noreply@github.com>2019-09-28 19:19:29 -0700
commit5d1d42429e07f4aa2959b488ea76efb6d0bafc79 (patch)
tree6a3b0701b6c80d11bec9d8277757dc589561fb6a
parenta9f513cf9978b8b9f26ad04bba1d33a9ae6d1b99 (diff)
downloadkafka-python-5d1d42429e07f4aa2959b488ea76efb6d0bafc79.tar.gz
Wrap consumer.poll() for KafkaConsumer iteration (#1902)
-rw-r--r--kafka/consumer/fetcher.py10
-rw-r--r--kafka/consumer/group.py69
-rw-r--r--kafka/coordinator/base.py6
3 files changed, 74 insertions, 11 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 36e269f..17c818f 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -292,7 +292,7 @@ class Fetcher(six.Iterator):
raise Errors.KafkaTimeoutError(
"Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
- def fetched_records(self, max_records=None):
+ def fetched_records(self, max_records=None, update_offsets=True):
"""Returns previously fetched records and updates consumed offsets.
Arguments:
@@ -330,10 +330,11 @@ class Fetcher(six.Iterator):
else:
records_remaining -= self._append(drained,
self._next_partition_records,
- records_remaining)
+ records_remaining,
+ update_offsets)
return dict(drained), bool(self._completed_fetches)
- def _append(self, drained, part, max_records):
+ def _append(self, drained, part, max_records, update_offsets):
if not part:
return 0
@@ -366,7 +367,8 @@ class Fetcher(six.Iterator):
for record in part_records:
drained[tp].append(record)
- self._subscriptions.assignment[tp].position = next_offset
+ if update_offsets:
+ self._subscriptions.assignment[tp].position = next_offset
return len(part_records)
else:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index f9d0fb9..77b0b96 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -302,7 +302,8 @@ class KafkaConsumer(six.Iterator):
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
- 'sasl_oauth_token_provider': None
+ 'sasl_oauth_token_provider': None,
+ 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
@@ -597,7 +598,7 @@ class KafkaConsumer(six.Iterator):
partitions = cluster.partitions_for_topic(topic)
return partitions
- def poll(self, timeout_ms=0, max_records=None):
+ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
"""Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
@@ -621,6 +622,12 @@ class KafkaConsumer(six.Iterator):
dict: Topic to list of records since the last fetch for the
subscribed list of topics and partitions.
"""
+ # Note: update_offsets is an internal-use only argument. It is used to
+ # support the python iterator interface, and which wraps consumer.poll()
+ # and requires that the partition offsets tracked by the fetcher are not
+ # updated until the iterator returns each record to the user. As such,
+ # the argument is not documented and should not be relied on by library
+ # users to not break in the future.
assert timeout_ms >= 0, 'Timeout must not be negative'
if max_records is None:
max_records = self.config['max_poll_records']
@@ -631,7 +638,7 @@ class KafkaConsumer(six.Iterator):
start = time.time()
remaining = timeout_ms
while True:
- records = self._poll_once(remaining, max_records)
+ records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
if records:
return records
@@ -641,7 +648,7 @@ class KafkaConsumer(six.Iterator):
if remaining <= 0:
return {}
- def _poll_once(self, timeout_ms, max_records):
+ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
"""Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
@@ -660,7 +667,7 @@ class KafkaConsumer(six.Iterator):
# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
- records, partial = self._fetcher.fetched_records(max_records)
+ records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
if records:
# Before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
@@ -680,7 +687,7 @@ class KafkaConsumer(six.Iterator):
if self._coordinator.need_rejoin():
return {}
- records, _ = self._fetcher.fetched_records(max_records)
+ records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records
def position(self, partition):
@@ -743,6 +750,9 @@ class KafkaConsumer(six.Iterator):
for partition in partitions:
log.debug("Pausing partition %s", partition)
self._subscription.pause(partition)
+ # Because the iterator checks is_fetchable() on each iteration
+ # we expect pauses to get handled automatically and therefore
+ # we do not need to reset the full iterator (forcing a full refetch)
def paused(self):
"""Get the partitions that were previously paused using
@@ -790,6 +800,8 @@ class KafkaConsumer(six.Iterator):
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
log.debug("Seeking to offset %s for partition %s", offset, partition)
self._subscription.assignment[partition].seek(offset)
+ if not self.config['legacy_iterator']:
+ self._iterator = None
def seek_to_beginning(self, *partitions):
"""Seek to the oldest available offset for partitions.
@@ -814,6 +826,8 @@ class KafkaConsumer(six.Iterator):
for tp in partitions:
log.debug("Seeking to beginning of partition %s", tp)
self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
+ if not self.config['legacy_iterator']:
+ self._iterator = None
def seek_to_end(self, *partitions):
"""Seek to the most recent available offset for partitions.
@@ -838,6 +852,8 @@ class KafkaConsumer(six.Iterator):
for tp in partitions:
log.debug("Seeking to end of partition %s", tp)
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
+ if not self.config['legacy_iterator']:
+ self._iterator = None
def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern.
@@ -913,6 +929,8 @@ class KafkaConsumer(six.Iterator):
self._client.cluster.need_all_topic_metadata = False
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
+ if not self.config['legacy_iterator']:
+ self._iterator = None
def metrics(self, raw=False):
"""Get metrics on consumer performance.
@@ -1075,6 +1093,25 @@ class KafkaConsumer(six.Iterator):
# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
+ def _message_generator_v2(self):
+ timeout_ms = 1000 * (self._consumer_timeout - time.time())
+ record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
+ for tp, records in six.iteritems(record_map):
+ # Generators are stateful, and it is possible that the tp / records
+ # here may become stale during iteration -- i.e., we seek to a
+ # different offset, pause consumption, or lose assignment.
+ for record in records:
+ # is_fetchable(tp) should handle assignment changes and offset
+ # resets; for all other changes (e.g., seeks) we'll rely on the
+ # outer function destroying the existing iterator/generator
+ # via self._iterator = None
+ if not self._subscription.is_fetchable(tp):
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer fetchable", tp)
+ break
+ self._subscription.assignment[tp].position = record.offset + 1
+ yield record
+
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
@@ -1127,6 +1164,26 @@ class KafkaConsumer(six.Iterator):
return self
def __next__(self):
+ # Now that the heartbeat thread runs in the background
+ # there should be no reason to maintain a separate iterator
+ # but we'll keep it available for a few releases just in case
+ if self.config['legacy_iterator']:
+ return self.next_v1()
+ else:
+ return self.next_v2()
+
+ def next_v2(self):
+ self._set_consumer_timeout()
+ while time.time() < self._consumer_timeout:
+ if not self._iterator:
+ self._iterator = self._message_generator_v2()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise StopIteration()
+
+ def next_v1(self):
if not self._iterator:
self._iterator = self._message_generator()
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 421360e..5cdbdcf 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -321,10 +321,14 @@ class BaseCoordinator(object):
self.heartbeat.poll()
def time_to_next_heartbeat(self):
+ """Returns seconds (float) remaining before next heartbeat should be sent
+
+ Note: Returns infinite if group is not joined
+ """
with self._lock:
# if we have not joined the group, we don't need to send heartbeats
if self.state is MemberState.UNJOINED:
- return sys.maxsize
+ return float('inf')
return self.heartbeat.time_to_next_heartbeat()
def _handle_join_success(self, member_assignment_bytes):