diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-09-15 16:30:54 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-09-28 16:33:01 -0700 |
commit | 5a653346a55d8ce3780833cbbd96735e40e74927 (patch) | |
tree | 1b65c6d16c3b2767ae47411ef2d733c06446a65b | |
parent | 5381591bac7f1322e7a54e4be65d1a54e2898732 (diff) | |
download | kafka-python-5a653346a55d8ce3780833cbbd96735e40e74927.tar.gz |
Wrap consumer.poll() for KafkaConsumer iteration
-rw-r--r-- | kafka/consumer/fetcher.py | 10 | ||||
-rw-r--r-- | kafka/consumer/group.py | 57 |
2 files changed, 60 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 36e269f..17c818f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -292,7 +292,7 @@ class Fetcher(six.Iterator): raise Errors.KafkaTimeoutError( "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) - def fetched_records(self, max_records=None): + def fetched_records(self, max_records=None, update_offsets=True): """Returns previously fetched records and updates consumed offsets. Arguments: @@ -330,10 +330,11 @@ class Fetcher(six.Iterator): else: records_remaining -= self._append(drained, self._next_partition_records, - records_remaining) + records_remaining, + update_offsets) return dict(drained), bool(self._completed_fetches) - def _append(self, drained, part, max_records): + def _append(self, drained, part, max_records, update_offsets): if not part: return 0 @@ -366,7 +367,8 @@ class Fetcher(six.Iterator): for record in part_records: drained[tp].append(record) - self._subscriptions.assignment[tp].position = next_offset + if update_offsets: + self._subscriptions.assignment[tp].position = next_offset return len(part_records) else: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f9d0fb9..7b59567 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -302,7 +302,8 @@ class KafkaConsumer(six.Iterator): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'legacy_iterator': True, # experimental feature } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 @@ -660,7 +661,7 @@ class KafkaConsumer(six.Iterator): # If data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately - records, partial = self._fetcher.fetched_records(max_records) + records, partial = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator)) if records: # Before returning the fetched records, we can send off the # next round of fetches and avoid block waiting for their @@ -680,7 +681,7 @@ class KafkaConsumer(six.Iterator): if self._coordinator.need_rejoin(): return {} - records, _ = self._fetcher.fetched_records(max_records) + records, _ = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator)) return records def position(self, partition): @@ -743,6 +744,9 @@ class KafkaConsumer(six.Iterator): for partition in partitions: log.debug("Pausing partition %s", partition) self._subscription.pause(partition) + # Because the iterator checks is_fetchable() on each iteration + # we expect pauses to get handled automatically and therefore + # we do not need to reset the full iterator (forcing a full refetch) def paused(self): """Get the partitions that were previously paused using @@ -790,6 +794,8 @@ class KafkaConsumer(six.Iterator): assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' log.debug("Seeking to offset %s for partition %s", offset, partition) self._subscription.assignment[partition].seek(offset) + if not self.config['legacy_iterator']: + self._iterator = None def seek_to_beginning(self, *partitions): """Seek to the oldest available offset for partitions. @@ -814,6 +820,8 @@ class KafkaConsumer(six.Iterator): for tp in partitions: log.debug("Seeking to beginning of partition %s", tp) self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST) + if not self.config['legacy_iterator']: + self._iterator = None def seek_to_end(self, *partitions): """Seek to the most recent available offset for partitions. @@ -838,6 +846,8 @@ class KafkaConsumer(six.Iterator): for tp in partitions: log.debug("Seeking to end of partition %s", tp) self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST) + if not self.config['legacy_iterator']: + self._iterator = None def subscribe(self, topics=(), pattern=None, listener=None): """Subscribe to a list of topics, or a topic regex pattern. @@ -913,6 +923,8 @@ class KafkaConsumer(six.Iterator): self._client.cluster.need_all_topic_metadata = False self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") + if not self.config['legacy_iterator']: + self._iterator = None def metrics(self, raw=False): """Get metrics on consumer performance. @@ -1075,6 +1087,25 @@ class KafkaConsumer(six.Iterator): # Then, do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) + def _message_generator_v2(self): + timeout_ms = 1000 * (self._consumer_timeout - time.time()) + record_map = self.poll(timeout_ms=timeout_ms) + for tp, records in six.iteritems(record_map): + # Generators are stateful, and it is possible that the tp / records + # here may become stale during iteration -- i.e., we seek to a + # different offset, pause consumption, or lose assignment. + for record in records: + # is_fetchable(tp) should handle assignment changes and offset + # resets; for all other changes (e.g., seeks) we'll rely on the + # outer function destroying the existing iterator/generator + # via self._iterator = None + if not self._subscription.is_fetchable(tp): + log.debug("Not returning fetched records for partition %s" + " since it is no longer fetchable", tp) + break + self._subscription.assignment[tp].position = record.offset + 1 + yield record + def _message_generator(self): assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' while time.time() < self._consumer_timeout: @@ -1127,6 +1158,26 @@ class KafkaConsumer(six.Iterator): return self def __next__(self): + # Now that the heartbeat thread runs in the background + # there should be no reason to maintain a separate iterator + # but we'll keep it available for a few releases just in case + if self.config['legacy_iterator']: + return self.next_v1() + else: + return self.next_v2() + + def next_v2(self): + self._set_consumer_timeout() + while time.time() < self._consumer_timeout: + if not self._iterator: + self._iterator = self._message_generator_v2() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise StopIteration() + + def next_v1(self): if not self._iterator: self._iterator = self._message_generator() |