summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authormelissacrawford396 <melissacrawford396@gmail.com>2017-01-11 20:19:38 -0500
committerDana Powers <dana.powers@gmail.com>2017-01-11 17:19:38 -0800
commit83081befc1a9da3c02f78e092698ddca0f41a0f9 (patch)
tree6ded62dffce4bdca3605b6691ebc8b61b72d6157 /kafka/consumer/group.py
parentcb06a6b125d798b3d60ba105f2f86bbcd1a1357a (diff)
downloadkafka-python-83081befc1a9da3c02f78e092698ddca0f41a0f9.tar.gz
Spelling and grammar changes (#923)
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py172
1 files changed, 85 insertions, 87 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 2562cfb..d1d6185 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -42,12 +42,12 @@ class KafkaConsumer(six.Iterator):
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092. If no servers are
specified, will default to localhost:9092.
- client_id (str): a name for this client. This string is passed in
+ client_id (str): A name for this client. This string is passed in
each request to servers and can be used to identify specific
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
- group_id (str or None): name of the consumer group to join for dynamic
+ group_id (str or None): The name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
@@ -85,20 +85,20 @@ class KafkaConsumer(six.Iterator):
OffsetOutOfRange errors: 'earliest' will move to the oldest
available message, 'latest' will move to the most recent. Any
other value will raise the exception. Default: 'latest'.
- enable_auto_commit (bool): If true the consumer's offset will be
+ enable_auto_commit (bool): If True , the consumer's offset will be
periodically committed in the background. Default: True.
- auto_commit_interval_ms (int): milliseconds between automatic
+ auto_commit_interval_ms (int): Number of milliseconds between automatic
offset commits, if enable_auto_commit is True. Default: 5000.
- default_offset_commit_callback (callable): called as
+ default_offset_commit_callback (callable): Called as
callback(offsets, response) response will be either an Exception
- or a OffsetCommitResponse struct. This callback can be used to
+ or an OffsetCommitResponse struct. This callback can be used to
trigger custom actions when a commit request completes.
check_crcs (bool): Automatically check the CRC32 of the records
consumed. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
metadata_max_age_ms (int): The period of time in milliseconds after
- which we force a refresh of metadata even if we haven't seen any
+ which we force a refresh of metadata, even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
partition_assignment_strategy (list): List of objects to use to
@@ -115,7 +115,7 @@ class KafkaConsumer(six.Iterator):
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
- using Kafka's group managementment facilities. Default: 30000
+ using Kafka's group management facilities. Default: 30000
max_poll_records (int): The maximum number of records returned in a
single call to poll().
receive_buffer_bytes (int): The size of the TCP receive buffer
@@ -139,27 +139,27 @@ class KafkaConsumer(six.Iterator):
set this option to True. Default: False.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
- ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ ssl_context (ssl.SSLContext): Pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
- ssl_check_hostname (bool): flag to configure whether ssl handshake
+ ssl_check_hostname (bool): Flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname.
- default: true.
- ssl_cafile (str): optional filename of ca file to use in certificate
- verification. default: none.
- ssl_certfile (str): optional filename of file in pem format containing
+ Default: True.
+ ssl_cafile (str): Optional filename of ca file to use in certificate
+ verification. Default: None.
+ ssl_certfile (str): Optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to
- establish the certificate's authenticity. default: none.
- ssl_keyfile (str): optional filename containing the client private key.
- default: none.
- ssl_password (str): optional password to be used when loading the
- certificate chain. default: None.
- ssl_crlfile (str): optional filename containing the CRL to check for
+ establish the certificate's authenticity. Default: None.
+ ssl_keyfile (str): Optional filename containing the client private key.
+ Default: None.
+ ssl_password (str): Optional password to be used when loading the
+ certificate chain. Default: None.
+ ssl_crlfile (str): Optional filename containing the CRL to check for
certificate expiration. By default, no CRL check is done. When
providing a file, only the leaf certificate will be checked against
this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
- default: none.
- api_version (tuple): specify which kafka API version to use.
+ Default: None.
+ api_version (tuple): Specify which kafka API version to use.
If set to None, the client will attempt to infer the broker version
by probing various APIs. Default: None
Examples:
@@ -189,12 +189,12 @@ class KafkaConsumer(six.Iterator):
(such as offsets) should be exposed to the consumer. If set to True
the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+ Default: True
- sasl_mechanism (str): string picking sasl mechanism when security_protocol
+ sasl_mechanism (str): String picking sasl mechanism when security_protocol
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
Default: None
- sasl_plain_username (str): username for sasl PLAIN authentication.
+ sasl_plain_username (str): Username for sasl PLAIN authentication.
Default: None
- sasl_plain_password (str): password for sasl PLAIN authentication.
+ sasl_plain_password (str): Password for sasl PLAIN authentication.
Default: None
Note:
@@ -239,7 +239,7 @@ class KafkaConsumer(six.Iterator):
'ssl_password': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000,
- 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
+ 'connections_max_idle_ms': 9 * 60 * 1000, # Not implemented yet
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
@@ -275,7 +275,7 @@ class KafkaConsumer(six.Iterator):
self._metrics = Metrics(metric_config, reporters)
# TODO _metrics likely needs to be passed to KafkaClient, etc.
- # api_version was previously a str. accept old format for now
+ # api_version was previously a str. Accept old format for now
if isinstance(self.config['api_version'], str):
str_version = self.config['api_version']
if str_version == 'auto':
@@ -310,10 +310,10 @@ class KafkaConsumer(six.Iterator):
"""Manually assign a list of TopicPartitions to this consumer.
Arguments:
- partitions (list of TopicPartition): assignment for this instance.
+ partitions (list of TopicPartition): Assignment for this instance.
Raises:
- IllegalStateError: if consumer has already called subscribe()
+ IllegalStateError: If consumer has already called subscribe()
Warning:
It is not possible to use both manual partition assignment with
@@ -339,7 +339,7 @@ class KafkaConsumer(six.Iterator):
simply return the same partitions that were previously assigned.
If topics were subscribed using subscribe(), then this will give the
set of topic partitions currently assigned to the consumer (which may
- be none if the assignment hasn't happened yet, or if the partitions are
+ be None if the assignment hasn't happened yet, or if the partitions are
in the process of being reassigned).
Returns:
@@ -367,7 +367,7 @@ class KafkaConsumer(six.Iterator):
log.debug("The KafkaConsumer has closed.")
def commit_async(self, offsets=None, callback=None):
- """Commit offsets to kafka asynchronously, optionally firing callback
+ """Commit offsets to kafka asynchronously, optionally firing callback.
This commits offsets only to Kafka. The offsets committed using this API
will be used on the first fetch after every rebalance and also on
@@ -381,10 +381,10 @@ class KafkaConsumer(six.Iterator):
Arguments:
offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
- to commit with the configured group_id. Defaults to current
+ to commit with the configured group_id. Defaults to currently
consumed offsets for all subscribed partitions.
- callback (callable, optional): called as callback(offsets, response)
- with response as either an Exception or a OffsetCommitResponse
+ callback (callable, optional): Called as callback(offsets, response)
+ with response as either an Exception or an OffsetCommitResponse
struct. This callback can be used to trigger custom actions when
a commit request completes.
@@ -401,7 +401,7 @@ class KafkaConsumer(six.Iterator):
return future
def commit(self, offsets=None):
- """Commit offsets to kafka, blocking until success or error
+ """Commit offsets to kafka, blocking until success or error.
This commits offsets only to Kafka. The offsets committed using this API
will be used on the first fetch after every rebalance and also on
@@ -413,11 +413,11 @@ class KafkaConsumer(six.Iterator):
Blocks until either the commit succeeds or an unrecoverable error is
encountered (in which case it is thrown to the caller).
- Currently only supports kafka-topic offset storage (not zookeeper)
+ Currently only supports kafka-topic offset storage (not zookeeper).
Arguments:
offsets (dict, optional): {TopicPartition: OffsetAndMetadata} dict
- to commit with the configured group_id. Defaults to current
+ to commit with the configured group_id. Defaults to currently
consumed offsets for all subscribed partitions.
"""
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
@@ -427,7 +427,7 @@ class KafkaConsumer(six.Iterator):
self._coordinator.commit_offsets_sync(offsets)
def committed(self, partition):
- """Get the last committed offset for the given partition
+ """Get the last committed offset for the given partition.
This offset will be used as the position for the consumer
in the event of a failure.
@@ -437,7 +437,7 @@ class KafkaConsumer(six.Iterator):
initialized its cache of committed offsets.
Arguments:
- partition (TopicPartition): the partition to check
+ partition (TopicPartition): The partition to check.
Returns:
The last committed offset, or None if there was no prior commit.
@@ -480,10 +480,10 @@ class KafkaConsumer(six.Iterator):
"""Get metadata about the partitions for a given topic.
Arguments:
- topic (str): topic to check
+ topic (str): Topic to check.
Returns:
- set: partition ids
+ set: Partition ids
"""
return self._client.cluster.partitions_for_topic(topic)
@@ -499,7 +499,7 @@ class KafkaConsumer(six.Iterator):
Incompatible with iterator interface -- use one or the other, not both.
Arguments:
- timeout_ms (int, optional): milliseconds spent waiting in poll if
+ timeout_ms (int, optional): Milliseconds spent waiting in poll if
data is not available in the buffer. If 0, returns immediately
with any records that are available currently in the buffer,
else returns empty. Must not be negative. Default: 0
@@ -508,14 +508,14 @@ class KafkaConsumer(six.Iterator):
max_poll_records.
Returns:
- dict: topic to list of records since the last fetch for the
- subscribed list of topics and partitions
+ dict: Topic to list of records since the last fetch for the
+ subscribed list of topics and partitions.
"""
assert timeout_ms >= 0, 'Timeout must not be negative'
if max_records is None:
max_records = self.config['max_poll_records']
- # poll for new data until the timeout expires
+ # Poll for new data until the timeout expires
start = time.time()
remaining = timeout_ms
while True:
@@ -530,15 +530,14 @@ class KafkaConsumer(six.Iterator):
return {}
def _poll_once(self, timeout_ms, max_records):
- """
- Do one round of polling. In addition to checking for new data, this does
+ """Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
Arguments:
- timeout_ms (int): The maximum time in milliseconds to block
+ timeout_ms (int): The maximum time in milliseconds to block.
Returns:
- dict: map of topic to list of records (may be empty)
+ dict: Map of topic to list of records (may be empty).
"""
if self._use_consumer_group():
self._coordinator.ensure_coordinator_known()
@@ -548,16 +547,16 @@ class KafkaConsumer(six.Iterator):
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
- # fetch positions if we have partitions we're subscribed to that we
+ # Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
- # if data is available already, e.g. from a previous network client
+ # If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
records, partial = self._fetcher.fetched_records(max_records)
if records:
- # before returning the fetched records, we can send off the
+ # Before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
# responses to enable pipelining while the user is handling the
# fetched records.
@@ -565,7 +564,7 @@ class KafkaConsumer(six.Iterator):
self._fetcher.send_fetches()
return records
- # send any new fetches (won't resend pending fetches)
+ # Send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()
self._client.poll(timeout_ms=timeout_ms, sleep=True)
@@ -576,10 +575,10 @@ class KafkaConsumer(six.Iterator):
"""Get the offset of the next record that will be fetched
Arguments:
- partition (TopicPartition): partition to check
+ partition (TopicPartition): Partition to check
Returns:
- int: offset
+ int: Offset
"""
if not isinstance(partition, TopicPartition):
raise TypeError('partition must be a TopicPartition namedtuple')
@@ -591,7 +590,7 @@ class KafkaConsumer(six.Iterator):
return offset
def highwater(self, partition):
- """Last known highwater offset for a partition
+ """Last known highwater offset for a partition.
A highwater offset is the offset that will be assigned to the next
message that is produced. It may be useful for calculating lag, by
@@ -604,10 +603,10 @@ class KafkaConsumer(six.Iterator):
yet.
Arguments:
- partition (TopicPartition): partition to check
+ partition (TopicPartition): Partition to check
Returns:
- int or None: offset if available
+ int or None: Offset if available
"""
if not isinstance(partition, TopicPartition):
raise TypeError('partition must be a TopicPartition namedtuple')
@@ -623,7 +622,7 @@ class KafkaConsumer(six.Iterator):
group rebalance when automatic assignment is used.
Arguments:
- *partitions (TopicPartition): partitions to pause
+ *partitions (TopicPartition): Partitions to pause.
"""
if not all([isinstance(p, TopicPartition) for p in partitions]):
raise TypeError('partitions must be TopicPartition namedtuples')
@@ -643,7 +642,7 @@ class KafkaConsumer(six.Iterator):
"""Resume fetching from the specified (paused) partitions.
Arguments:
- *partitions (TopicPartition): partitions to resume
+ *partitions (TopicPartition): Partitions to resume.
"""
if not all([isinstance(p, TopicPartition) for p in partitions]):
raise TypeError('partitions must be TopicPartition namedtuples')
@@ -661,11 +660,11 @@ class KafkaConsumer(six.Iterator):
to reset the fetch offsets.
Arguments:
- partition (TopicPartition): partition for seek operation
- offset (int): message offset in partition
+ partition (TopicPartition): Partition for seek operation
+ offset (int): Message offset in partition
Raises:
- AssertionError: if offset is not an int >= 0; or if partition is not
+ AssertionError: If offset is not an int >= 0; or if partition is not
currently assigned.
"""
if not isinstance(partition, TopicPartition):
@@ -679,12 +678,12 @@ class KafkaConsumer(six.Iterator):
"""Seek to the oldest available offset for partitions.
Arguments:
- *partitions: optionally provide specific TopicPartitions, otherwise
- default to all assigned partitions
+ *partitions: Optionally provide specific TopicPartitions, otherwise
+ default to all assigned partitions.
Raises:
- AssertionError: if any partition is not currently assigned, or if
- no partitions are assigned
+ AssertionError: If any partition is not currently assigned, or if
+ no partitions are assigned.
"""
if not all([isinstance(p, TopicPartition) for p in partitions]):
raise TypeError('partitions must be TopicPartition namedtuples')
@@ -703,12 +702,12 @@ class KafkaConsumer(six.Iterator):
"""Seek to the most recent available offset for partitions.
Arguments:
- *partitions: optionally provide specific TopicPartitions, otherwise
- default to all assigned partitions
+ *partitions: Optionally provide specific TopicPartitions, otherwise
+ default to all assigned partitions.
Raises:
- AssertionError: if any partition is not currently assigned, or if
- no partitions are assigned
+ AssertionError: If any partition is not currently assigned, or if
+ no partitions are assigned.
"""
if not all([isinstance(p, TopicPartition) for p in partitions]):
raise TypeError('partitions must be TopicPartition namedtuples')
@@ -724,13 +723,13 @@ class KafkaConsumer(six.Iterator):
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
def subscribe(self, topics=(), pattern=None, listener=None):
- """Subscribe to a list of topics, or a topic regex pattern
+ """Subscribe to a list of topics, or a topic regex pattern.
Partitions will be dynamically assigned via a group coordinator.
Topic subscriptions are not incremental: this list will replace the
current assignment (if there is one).
- This method is incompatible with assign()
+ This method is incompatible with assign().
Arguments:
topics (list): List of topics for subscription.
@@ -759,16 +758,16 @@ class KafkaConsumer(six.Iterator):
through this interface are from topics subscribed in this call.
Raises:
- IllegalStateError: if called after previously calling assign()
- AssertionError: if neither topics or pattern is provided
- TypeError: if listener is not a ConsumerRebalanceListener
+ IllegalStateError: If called after previously calling assign().
+ AssertionError: If neither topics or pattern is provided.
+ TypeError: If listener is not a ConsumerRebalanceListener.
"""
# SubscriptionState handles error checking
self._subscription.subscribe(topics=topics,
pattern=pattern,
listener=listener)
- # regex will need all topic metadata
+ # Regex will need all topic metadata
if pattern is not None:
self._client.cluster.need_all_topic_metadata = True
self._client.set_topics([])
@@ -821,25 +820,24 @@ class KafkaConsumer(six.Iterator):
return True
def _update_fetch_positions(self, partitions):
- """
- Set the fetch position to the committed position (if there is one)
+ """Set the fetch position to the committed position (if there is one)
or reset it using the offset reset policy the user has configured.
Arguments:
partitions (List[TopicPartition]): The partitions that need
- updating fetch positions
+ updating fetch positions.
Raises:
NoOffsetForPartitionError: If no offset is stored for a given
- partition and no offset reset policy is defined
+ partition and no offset reset policy is defined.
"""
if (self.config['api_version'] >= (0, 8, 1)
and self.config['group_id'] is not None):
- # refresh commits for all assigned partitions
+ # Refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed()
- # then do any offset lookups in case some positions are not known
+ # Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
@@ -854,7 +852,7 @@ class KafkaConsumer(six.Iterator):
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
- # fetch offsets for any subscribed partitions that we arent tracking yet
+ # Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
@@ -889,9 +887,9 @@ class KafkaConsumer(six.Iterator):
log.debug("internal iterator timeout - breaking for poll")
break
- # an else block on a for loop only executes if there was no break
+ # An else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
- # and we assume that it is safe to init_fetches when fetcher is done
+ # We assume that it is safe to init_fetches when fetcher is done
# i.e., there are no more records stored internally
else:
self._fetcher.send_fetches()
@@ -933,7 +931,7 @@ class KafkaConsumer(six.Iterator):
self._consumer_timeout = time.time() + (
self.config['consumer_timeout_ms'] / 1000.0)
- # old KafkaConsumer methods are deprecated
+ # Old KafkaConsumer methods are deprecated
def configure(self, **configs):
raise NotImplementedError(
'deprecated -- initialize a new consumer')