diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-10 12:49:47 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-10 12:49:47 -0800 |
commit | 7ee73df4c4bb3c69ac38accc30ff68bc6d64d594 (patch) | |
tree | da3bac91a190733f11f94ee006bd9579b5ff143a | |
parent | 2d4b0d1f789bba5637ce746b37821ccd9c5eef8d (diff) | |
download | kafka-python-7ee73df4c4bb3c69ac38accc30ff68bc6d64d594.tar.gz |
Remove old kafka.consumer.kafka module
-rw-r--r-- | kafka/consumer/kafka.py | 771 |
1 files changed, 0 insertions, 771 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py deleted file mode 100644 index 29ddd0e..0000000 --- a/kafka/consumer/kafka.py +++ /dev/null @@ -1,771 +0,0 @@ -from __future__ import absolute_import - -from collections import namedtuple -from copy import deepcopy -import logging -import random -import sys -import time - -import six - -from kafka import SimpleClient -from kafka.common import ( - OffsetFetchRequestPayload, OffsetCommitRequestPayload, - OffsetRequestPayload, FetchRequestPayload, - check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, - OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, - FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError -) - -logger = logging.getLogger(__name__) - -OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"]) - -DEFAULT_CONSUMER_CONFIG = { - 'client_id': __name__, - 'group_id': None, - 'bootstrap_servers': [], - 'socket_timeout_ms': 30 * 1000, - 'fetch_message_max_bytes': 1024 * 1024, - 'auto_offset_reset': 'largest', - 'fetch_min_bytes': 1, - 'fetch_wait_max_ms': 100, - 'refresh_leader_backoff_ms': 200, - 'deserializer_class': lambda msg: msg, - 'auto_commit_enable': False, - 'auto_commit_interval_ms': 60 * 1000, - 'auto_commit_interval_messages': None, - 'consumer_timeout_ms': -1, - - # Currently unused - 'socket_receive_buffer_bytes': 64 * 1024, - 'num_consumer_fetchers': 1, - 'default_fetcher_backoff_ms': 1000, - 'queued_max_message_chunks': 10, - 'rebalance_max_retries': 4, - 'rebalance_backoff_ms': 2000, -} - -DEPRECATED_CONFIG_KEYS = { - 'metadata_broker_list': 'bootstrap_servers', -} - -class KafkaConsumer(object): - """A simpler kafka consumer""" - DEFAULT_CONFIG = deepcopy(DEFAULT_CONSUMER_CONFIG) - - def __init__(self, *topics, **configs): - self.configure(**configs) - self.set_topic_partitions(*topics) - - def configure(self, **configs): - """Configure the consumer instance - - Configuration settings can be passed to constructor, - otherwise defaults will be used: - - Keyword Arguments: - bootstrap_servers (list): List of initial broker nodes the consumer - should contact to bootstrap initial cluster metadata. This does - not have to be the full node list. It just needs to have at - least one broker that will respond to a Metadata API Request. - client_id (str): a unique name for this client. Defaults to - 'kafka.consumer.kafka'. - group_id (str): the name of the consumer group to join, - Offsets are fetched / committed to this group name. - fetch_message_max_bytes (int, optional): Maximum bytes for each - topic/partition fetch request. Defaults to 1024*1024. - fetch_min_bytes (int, optional): Minimum amount of data the server - should return for a fetch request, otherwise wait up to - fetch_wait_max_ms for more data to accumulate. Defaults to 1. - fetch_wait_max_ms (int, optional): Maximum time for the server to - block waiting for fetch_min_bytes messages to accumulate. - Defaults to 100. - refresh_leader_backoff_ms (int, optional): Milliseconds to backoff - when refreshing metadata on errors (subject to random jitter). - Defaults to 200. - socket_timeout_ms (int, optional): TCP socket timeout in - milliseconds. Defaults to 30*1000. - auto_offset_reset (str, optional): A policy for resetting offsets on - OffsetOutOfRange errors. 'smallest' will move to the oldest - available message, 'largest' will move to the most recent. Any - ofther value will raise the exception. Defaults to 'largest'. - deserializer_class (callable, optional): Any callable that takes a - raw message value and returns a deserialized value. Defaults to - lambda msg: msg. - auto_commit_enable (bool, optional): Enabling auto-commit will cause - the KafkaConsumer to periodically commit offsets without an - explicit call to commit(). Defaults to False. - auto_commit_interval_ms (int, optional): If auto_commit_enabled, - the milliseconds between automatic offset commits. Defaults to - 60 * 1000. - auto_commit_interval_messages (int, optional): If - auto_commit_enabled, a number of messages consumed between - automatic offset commits. Defaults to None (disabled). - consumer_timeout_ms (int, optional): number of millisecond to throw - a timeout exception to the consumer if no message is available - for consumption. Defaults to -1 (dont throw exception). - - Configuration parameters are described in more detail at - http://kafka.apache.org/documentation.html#highlevelconsumerapi - """ - configs = self._deprecate_configs(**configs) - self._config = {} - for key in self.DEFAULT_CONFIG: - self._config[key] = configs.pop(key, self.DEFAULT_CONFIG[key]) - - if configs: - raise KafkaConfigurationError('Unknown configuration key(s): ' + - str(list(configs.keys()))) - - if self._config['auto_commit_enable']: - if not self._config['group_id']: - raise KafkaConfigurationError( - 'KafkaConsumer configured to auto-commit ' - 'without required consumer group (group_id)' - ) - - # Check auto-commit configuration - if self._config['auto_commit_enable']: - logger.info("Configuring consumer to auto-commit offsets") - self._reset_auto_commit() - - if not self._config['bootstrap_servers']: - raise KafkaConfigurationError( - 'bootstrap_servers required to configure KafkaConsumer' - ) - - self._client = SimpleClient( - self._config['bootstrap_servers'], - client_id=self._config['client_id'], - timeout=(self._config['socket_timeout_ms'] / 1000.0) - ) - - def set_topic_partitions(self, *topics): - """ - Set the topic/partitions to consume - Optionally specify offsets to start from - - Accepts types: - - * str (utf-8): topic name (will consume all available partitions) - * tuple: (topic, partition) - * dict: - - { topic: partition } - - { topic: [partition list] } - - { topic: (partition tuple,) } - - Optionally, offsets can be specified directly: - - * tuple: (topic, partition, offset) - * dict: { (topic, partition): offset, ... } - - Example: - - .. code:: python - - kafka = KafkaConsumer() - - # Consume topic1-all; topic2-partition2; topic3-partition0 - kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) - - # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 - # using tuples -- - kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) - - # using dict -- - kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 }) - - """ - self._topics = [] - self._client.load_metadata_for_topics() - - # Setup offsets - self._offsets = OffsetsStruct(fetch=dict(), - commit=dict(), - highwater=dict(), - task_done=dict()) - - # Handle different topic types - for arg in topics: - - # Topic name str -- all partitions - if isinstance(arg, (six.string_types, six.binary_type)): - topic = arg - - for partition in self._client.get_partition_ids_for_topic(topic): - self._consume_topic_partition(topic, partition) - - # (topic, partition [, offset]) tuple - elif isinstance(arg, tuple): - topic = arg[0] - partition = arg[1] - self._consume_topic_partition(topic, partition) - if len(arg) == 3: - offset = arg[2] - self._offsets.fetch[(topic, partition)] = offset - - # { topic: partitions, ... } dict - elif isinstance(arg, dict): - for key, value in six.iteritems(arg): - - # key can be string (a topic) - if isinstance(key, (six.string_types, six.binary_type)): - topic = key - - # topic: partition - if isinstance(value, int): - self._consume_topic_partition(topic, value) - - # topic: [ partition1, partition2, ... ] - elif isinstance(value, (list, tuple)): - for partition in value: - self._consume_topic_partition(topic, partition) - else: - raise KafkaConfigurationError( - 'Unknown topic type ' - '(dict key must be int or list/tuple of ints)' - ) - - # (topic, partition): offset - elif isinstance(key, tuple): - topic = key[0] - partition = key[1] - self._consume_topic_partition(topic, partition) - self._offsets.fetch[(topic, partition)] = value - - else: - raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg)) - - # If we have a consumer group, try to fetch stored offsets - if self._config['group_id']: - self._get_commit_offsets() - - # Update missing fetch/commit offsets - for topic_partition in self._topics: - - # Commit offsets default is None - if topic_partition not in self._offsets.commit: - self._offsets.commit[topic_partition] = None - - # Skip if we already have a fetch offset from user args - if topic_partition not in self._offsets.fetch: - - # Fetch offsets default is (1) commit - if self._offsets.commit[topic_partition] is not None: - self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition] - - # or (2) auto reset - else: - self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition) - - # highwater marks (received from server on fetch response) - # and task_done (set locally by user) - # should always get initialized to None - self._reset_highwater_offsets() - self._reset_task_done_offsets() - - # Reset message iterator in case we were in the middle of one - self._reset_message_iterator() - - def close(self): - """Close this consumer's underlying client.""" - self._client.close() - - def next(self): - """Return the next available message - - Blocks indefinitely unless consumer_timeout_ms > 0 - - Returns: - a single KafkaMessage from the message iterator - - Raises: - ConsumerTimeout after consumer_timeout_ms and no message - - Note: - This is also the method called internally during iteration - - """ - self._set_consumer_timeout_start() - while True: - - try: - return six.next(self._get_message_iterator()) - - # Handle batch completion - except StopIteration: - self._reset_message_iterator() - - self._check_consumer_timeout() - - def fetch_messages(self): - """Sends FetchRequests for all topic/partitions set for consumption - - Returns: - Generator that yields KafkaMessage structs - after deserializing with the configured `deserializer_class` - - Note: - Refreshes metadata on errors, and resets fetch offset on - OffsetOutOfRange, per the configured `auto_offset_reset` policy - - See Also: - Key KafkaConsumer configuration parameters: - * `fetch_message_max_bytes` - * `fetch_max_wait_ms` - * `fetch_min_bytes` - * `deserializer_class` - * `auto_offset_reset` - - """ - - max_bytes = self._config['fetch_message_max_bytes'] - max_wait_time = self._config['fetch_wait_max_ms'] - min_bytes = self._config['fetch_min_bytes'] - - if not self._topics: - raise KafkaConfigurationError('No topics or partitions configured') - - if not self._offsets.fetch: - raise KafkaConfigurationError( - 'No fetch offsets found when calling fetch_messages' - ) - - fetches = [FetchRequestPayload(topic, partition, - self._offsets.fetch[(topic, partition)], - max_bytes) - for (topic, partition) in self._topics] - - # send_fetch_request will batch topic/partition requests by leader - responses = self._client.send_fetch_request( - fetches, - max_wait_time=max_wait_time, - min_bytes=min_bytes, - fail_on_error=False - ) - - for resp in responses: - - if isinstance(resp, FailedPayloadsError): - logger.warning('FailedPayloadsError attempting to fetch data') - self._refresh_metadata_on_error() - continue - - topic = resp.topic - partition = resp.partition - try: - check_error(resp) - except OffsetOutOfRangeError: - logger.warning('OffsetOutOfRange: topic %s, partition %d, ' - 'offset %d (Highwatermark: %d)', - topic, partition, - self._offsets.fetch[(topic, partition)], - resp.highwaterMark) - # Reset offset - self._offsets.fetch[(topic, partition)] = ( - self._reset_partition_offset((topic, partition)) - ) - continue - - except NotLeaderForPartitionError: - logger.warning("NotLeaderForPartitionError for %s - %d. " - "Metadata may be out of date", - topic, partition) - self._refresh_metadata_on_error() - continue - - except RequestTimedOutError: - logger.warning("RequestTimedOutError for %s - %d", - topic, partition) - continue - - # Track server highwater mark - self._offsets.highwater[(topic, partition)] = resp.highwaterMark - - # Yield each message - # Kafka-python could raise an exception during iteration - # we are not catching -- user will need to address - for (offset, message) in resp.messages: - # deserializer_class could raise an exception here - val = self._config['deserializer_class'](message.value) - msg = KafkaMessage(topic, partition, offset, message.key, val) - - # in some cases the server will return earlier messages - # than we requested. skip them per kafka spec - if offset < self._offsets.fetch[(topic, partition)]: - logger.debug('message offset less than fetched offset ' - 'skipping: %s', msg) - continue - # Only increment fetch offset - # if we safely got the message and deserialized - self._offsets.fetch[(topic, partition)] = offset + 1 - - # Then yield to user - yield msg - - def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): - """Request available fetch offsets for a single topic/partition - - Keyword Arguments: - topic (str): topic for offset request - partition (int): partition for offset request - request_time_ms (int): Used to ask for all messages before a - certain time (ms). There are two special values. - Specify -1 to receive the latest offset (i.e. the offset of the - next coming message) and -2 to receive the earliest available - offset. Note that because offsets are pulled in descending - order, asking for the earliest offset will always return you a - single element. - max_num_offsets (int): Maximum offsets to include in the OffsetResponse - - Returns: - a list of offsets in the OffsetResponse submitted for the provided - topic / partition. See: - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI - """ - reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)] - - (resp,) = self._client.send_offset_request(reqs) - - check_error(resp) - - # Just for sanity.. - # probably unnecessary - assert resp.topic == topic - assert resp.partition == partition - - return resp.offsets - - def offsets(self, group=None): - """Get internal consumer offset values - - Keyword Arguments: - group: Either "fetch", "commit", "task_done", or "highwater". - If no group specified, returns all groups. - - Returns: - A copy of internal offsets struct - """ - if not group: - return { - 'fetch': self.offsets('fetch'), - 'commit': self.offsets('commit'), - 'task_done': self.offsets('task_done'), - 'highwater': self.offsets('highwater') - } - else: - return dict(deepcopy(getattr(self._offsets, group))) - - def task_done(self, message): - """Mark a fetched message as consumed. - - Offsets for messages marked as "task_done" will be stored back - to the kafka cluster for this consumer group on commit() - - Arguments: - message (KafkaMessage): the message to mark as complete - - Returns: - True, unless the topic-partition for this message has not - been configured for the consumer. In normal operation, this - should not happen. But see github issue 364. - """ - topic_partition = (message.topic, message.partition) - if topic_partition not in self._topics: - logger.warning('Unrecognized topic/partition in task_done message: ' - '{0}:{1}'.format(*topic_partition)) - return False - - offset = message.offset - - # Warn on non-contiguous offsets - prev_done = self._offsets.task_done[topic_partition] - if prev_done is not None and offset != (prev_done + 1): - logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1', - offset, prev_done) - - # Warn on smaller offsets than previous commit - # "commit" offsets are actually the offset of the next message to fetch. - prev_commit = self._offsets.commit[topic_partition] - if prev_commit is not None and ((offset + 1) <= prev_commit): - logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d', - offset, prev_commit) - - self._offsets.task_done[topic_partition] = offset - - # Check for auto-commit - if self._does_auto_commit_messages(): - self._incr_auto_commit_message_count() - - if self._should_auto_commit(): - self.commit() - - return True - - def commit(self): - """Store consumed message offsets (marked via task_done()) - to kafka cluster for this consumer_group. - - Returns: - True on success, or False if no offsets were found for commit - - Note: - this functionality requires server version >=0.8.1.1 - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI - """ - if not self._config['group_id']: - logger.warning('Cannot commit without a group_id!') - raise KafkaConfigurationError( - 'Attempted to commit offsets ' - 'without a configured consumer group (group_id)' - ) - - # API supports storing metadata with each commit - # but for now it is unused - metadata = b'' - - offsets = self._offsets.task_done - commits = [] - for topic_partition, task_done_offset in six.iteritems(offsets): - - # Skip if None - if task_done_offset is None: - continue - - # Commit offsets as the next offset to fetch - # which is consistent with the Java Client - # task_done is marked by messages consumed, - # so add one to mark the next message for fetching - commit_offset = (task_done_offset + 1) - - # Skip if no change from previous committed - if commit_offset == self._offsets.commit[topic_partition]: - continue - - commits.append( - OffsetCommitRequestPayload(topic_partition[0], topic_partition[1], - commit_offset, metadata) - ) - - if commits: - logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request( - self._config['group_id'], commits, - fail_on_error=False - ) - - for r in resps: - check_error(r) - topic_partition = (r.topic, r.partition) - task_done = self._offsets.task_done[topic_partition] - self._offsets.commit[topic_partition] = (task_done + 1) - - if self._config['auto_commit_enable']: - self._reset_auto_commit() - - return True - - else: - logger.info('No new offsets found to commit in group %s', self._config['group_id']) - return False - - # - # Topic/partition management private methods - # - - def _consume_topic_partition(self, topic, partition): - if not isinstance(partition, int): - raise KafkaConfigurationError('Unknown partition type (%s) ' - '-- expected int' % type(partition)) - - if topic not in self._client.topic_partitions: - raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) - if partition not in self._client.get_partition_ids_for_topic(topic): - raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " - "in broker metadata" % (partition, topic)) - logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition) - self._topics.append((topic, partition)) - - def _refresh_metadata_on_error(self): - refresh_ms = self._config['refresh_leader_backoff_ms'] - jitter_pct = 0.20 - sleep_ms = random.randint( - int((1.0 - 0.5 * jitter_pct) * refresh_ms), - int((1.0 + 0.5 * jitter_pct) * refresh_ms) - ) - while True: - logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) - time.sleep(sleep_ms / 1000.0) - try: - self._client.load_metadata_for_topics() - except KafkaUnavailableError: - logger.warning("Unable to refresh topic metadata... cluster unavailable") - self._check_consumer_timeout() - else: - logger.info("Topic metadata refreshed") - return - - # - # Offset-managment private methods - # - - def _get_commit_offsets(self): - logger.info("Consumer fetching stored offsets") - for topic_partition in self._topics: - (resp,) = self._client.send_offset_fetch_request( - self._config['group_id'], - [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])], - fail_on_error=False) - try: - check_error(resp) - # API spec says server wont set an error here - # but 0.8.1.1 does actually... - except UnknownTopicOrPartitionError: - pass - - # -1 offset signals no commit is currently stored - if resp.offset == -1: - self._offsets.commit[topic_partition] = None - - # Otherwise we committed the stored offset - # and need to fetch the next one - else: - self._offsets.commit[topic_partition] = resp.offset - - def _reset_highwater_offsets(self): - for topic_partition in self._topics: - self._offsets.highwater[topic_partition] = None - - def _reset_task_done_offsets(self): - for topic_partition in self._topics: - self._offsets.task_done[topic_partition] = None - - def _reset_partition_offset(self, topic_partition): - (topic, partition) = topic_partition - LATEST = -1 - EARLIEST = -2 - - request_time_ms = None - if self._config['auto_offset_reset'] == 'largest': - request_time_ms = LATEST - elif self._config['auto_offset_reset'] == 'smallest': - request_time_ms = EARLIEST - else: - - # Let's raise an reasonable exception type if user calls - # outside of an exception context - if sys.exc_info() == (None, None, None): - raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' - 'valid auto_offset_reset setting ' - '(largest|smallest)') - - # Otherwise we should re-raise the upstream exception - # b/c it typically includes additional data about - # the request that triggered it, and we do not want to drop that - raise # pylint: disable=E0704 - - (offset, ) = self.get_partition_offsets(topic, partition, - request_time_ms, max_num_offsets=1) - return offset - - # - # Consumer Timeout private methods - # - - def _set_consumer_timeout_start(self): - self._consumer_timeout = False - if self._config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0) - - def _check_consumer_timeout(self): - if self._consumer_timeout and time.time() > self._consumer_timeout: - raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms']) - - # - # Autocommit private methods - # - - def _should_auto_commit(self): - if self._does_auto_commit_ms(): - if time.time() >= self._next_commit_time: - return True - - if self._does_auto_commit_messages(): - if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']: - return True - - return False - - def _reset_auto_commit(self): - self._uncommitted_message_count = 0 - self._next_commit_time = None - if self._does_auto_commit_ms(): - self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) - - def _incr_auto_commit_message_count(self, n=1): - self._uncommitted_message_count += n - - def _does_auto_commit_ms(self): - if not self._config['auto_commit_enable']: - return False - - conf = self._config['auto_commit_interval_ms'] - if conf is not None and conf > 0: - return True - return False - - def _does_auto_commit_messages(self): - if not self._config['auto_commit_enable']: - return False - - conf = self._config['auto_commit_interval_messages'] - if conf is not None and conf > 0: - return True - return False - - # - # Message iterator private methods - # - - def __iter__(self): - return self - - def __next__(self): - return self.next() - - def _get_message_iterator(self): - # Fetch a new batch if needed - if self._msg_iter is None: - self._msg_iter = self.fetch_messages() - - return self._msg_iter - - def _reset_message_iterator(self): - self._msg_iter = None - - # - # python private methods - # - - def __repr__(self): - return '<{0} topics=({1})>'.format( - self.__class__.__name__, - '|'.join(["%s-%d" % topic_partition - for topic_partition in self._topics]) - ) - - # - # other private methods - # - - def _deprecate_configs(self, **configs): - for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS): - if old in configs: - logger.warning('Deprecated Kafka Consumer configuration: %s. ' - 'Please use %s instead.', old, new) - old_value = configs.pop(old) - if new not in configs: - configs[new] = old_value - return configs |