-from __future__ import absolute_import
-from collections import namedtuple
-from copy import deepcopy
-import logging
-import random
-import sys
-import time
-import six
-from kafka import SimpleClient
-from kafka.common import (
- OffsetFetchRequestPayload, OffsetCommitRequestPayload,
- OffsetRequestPayload, FetchRequestPayload,
- check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
- OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
- FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
-logger = logging.getLogger(__name__)
-OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"])
- 'client_id': __name__,
- 'group_id': None,
- 'bootstrap_servers': [],
- 'socket_timeout_ms': 30 * 1000,
- 'fetch_message_max_bytes': 1024 * 1024,
- 'auto_offset_reset': 'largest',
- 'fetch_min_bytes': 1,
- 'fetch_wait_max_ms': 100,
- 'refresh_leader_backoff_ms': 200,
- 'deserializer_class': lambda msg: msg,
- 'auto_commit_enable': False,
- 'auto_commit_interval_ms': 60 * 1000,
- 'auto_commit_interval_messages': None,
- 'consumer_timeout_ms': -1,
- # Currently unused
- 'socket_receive_buffer_bytes': 64 * 1024,
- 'num_consumer_fetchers': 1,
- 'default_fetcher_backoff_ms': 1000,
- 'queued_max_message_chunks': 10,
- 'rebalance_max_retries': 4,
- 'rebalance_backoff_ms': 2000,
- 'metadata_broker_list': 'bootstrap_servers',
-class KafkaConsumer(object):
- """A simpler kafka consumer"""
- def __init__(self, *topics, **configs):
- self.configure(**configs)
- self.set_topic_partitions(*topics)
- def configure(self, **configs):
- """Configure the consumer instance
- Configuration settings can be passed to constructor,
- otherwise defaults will be used:
- Keyword Arguments:
- bootstrap_servers (list): List of initial broker nodes the consumer
- should contact to bootstrap initial cluster metadata. This does
- not have to be the full node list. It just needs to have at
- least one broker that will respond to a Metadata API Request.
- client_id (str): a unique name for this client. Defaults to
- 'kafka.consumer.kafka'.
- group_id (str): the name of the consumer group to join,
- Offsets are fetched / committed to this group name.
- fetch_message_max_bytes (int, optional): Maximum bytes for each
- topic/partition fetch request. Defaults to 1024*1024.
- fetch_min_bytes (int, optional): Minimum amount of data the server
- should return for a fetch request, otherwise wait up to
- fetch_wait_max_ms for more data to accumulate. Defaults to 1.
- fetch_wait_max_ms (int, optional): Maximum time for the server to
- block waiting for fetch_min_bytes messages to accumulate.
- Defaults to 100.
- refresh_leader_backoff_ms (int, optional): Milliseconds to backoff
- when refreshing metadata on errors (subject to random jitter).
- Defaults to 200.
- socket_timeout_ms (int, optional): TCP socket timeout in
- milliseconds. Defaults to 30*1000.
- auto_offset_reset (str, optional): A policy for resetting offsets on
- OffsetOutOfRange errors. 'smallest' will move to the oldest
- available message, 'largest' will move to the most recent. Any
- ofther value will raise the exception. Defaults to 'largest'.
- deserializer_class (callable, optional): Any callable that takes a
- raw message value and returns a deserialized value. Defaults to
- lambda msg: msg.
- auto_commit_enable (bool, optional): Enabling auto-commit will cause
- the KafkaConsumer to periodically commit offsets without an
- explicit call to commit(). Defaults to False.
- auto_commit_interval_ms (int, optional): If auto_commit_enabled,
- the milliseconds between automatic offset commits. Defaults to
- 60 * 1000.
- auto_commit_interval_messages (int, optional): If
- auto_commit_enabled, a number of messages consumed between
- automatic offset commits. Defaults to None (disabled).
- consumer_timeout_ms (int, optional): number of millisecond to throw
- a timeout exception to the consumer if no message is available
- for consumption. Defaults to -1 (dont throw exception).
- Configuration parameters are described in more detail at
- """
- configs = self._deprecate_configs(**configs)
- self._config = {}
- for key in self.DEFAULT_CONFIG:
- self._config[key] = configs.pop(key, self.DEFAULT_CONFIG[key])
- if configs:
- raise KafkaConfigurationError('Unknown configuration key(s): ' +
- str(list(configs.keys())))
- if self._config['auto_commit_enable']:
- if not self._config['group_id']:
- raise KafkaConfigurationError(
- 'KafkaConsumer configured to auto-commit '
- 'without required consumer group (group_id)'
- )
- # Check auto-commit configuration
- if self._config['auto_commit_enable']:
-"Configuring consumer to auto-commit offsets")
- self._reset_auto_commit()
- if not self._config['bootstrap_servers']:
- raise KafkaConfigurationError(
- 'bootstrap_servers required to configure KafkaConsumer'
- )
- self._client = SimpleClient(
- self._config['bootstrap_servers'],
- client_id=self._config['client_id'],
- timeout=(self._config['socket_timeout_ms'] / 1000.0)
- )
- def set_topic_partitions(self, *topics):
- """
- Set the topic/partitions to consume
- Optionally specify offsets to start from
- Accepts types:
- * str (utf-8): topic name (will consume all available partitions)
- * tuple: (topic, partition)
- * dict:
- - { topic: partition }
- - { topic: [partition list] }
- - { topic: (partition tuple,) }
- Optionally, offsets can be specified directly:
- * tuple: (topic, partition, offset)
- * dict: { (topic, partition): offset, ... }
- Example:
- .. code:: python
- kafka = KafkaConsumer()
- # Consume topic1-all; topic2-partition2; topic3-partition0
- kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
- # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45
- # using tuples --
- kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45))
- # using dict --
- kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
- """
- self._topics = []
- self._client.load_metadata_for_topics()
- # Setup offsets
- self._offsets = OffsetsStruct(fetch=dict(),
- commit=dict(),
- highwater=dict(),
- task_done=dict())
- # Handle different topic types
- for arg in topics:
- # Topic name str -- all partitions
- if isinstance(arg, (six.string_types, six.binary_type)):
- topic = arg
- for partition in self._client.get_partition_ids_for_topic(topic):
- self._consume_topic_partition(topic, partition)
- # (topic, partition [, offset]) tuple
- elif isinstance(arg, tuple):
- topic = arg[0]
- partition = arg[1]
- self._consume_topic_partition(topic, partition)
- if len(arg) == 3:
- offset = arg[2]
- self._offsets.fetch[(topic, partition)] = offset
- # { topic: partitions, ... } dict
- elif isinstance(arg, dict):
- for key, value in six.iteritems(arg):
- # key can be string (a topic)
- if isinstance(key, (six.string_types, six.binary_type)):
- topic = key
- # topic: partition
- if isinstance(value, int):
- self._consume_topic_partition(topic, value)
- # topic: [ partition1, partition2, ... ]
- elif isinstance(value, (list, tuple)):
- for partition in value:
- self._consume_topic_partition(topic, partition)
- else:
- raise KafkaConfigurationError(
- 'Unknown topic type '
- '(dict key must be int or list/tuple of ints)'
- )
- # (topic, partition): offset
- elif isinstance(key, tuple):
- topic = key[0]
- partition = key[1]
- self._consume_topic_partition(topic, partition)
- self._offsets.fetch[(topic, partition)] = value
- else:
- raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
- # If we have a consumer group, try to fetch stored offsets
- if self._config['group_id']:
- self._get_commit_offsets()
- # Update missing fetch/commit offsets
- for topic_partition in self._topics:
- # Commit offsets default is None
- if topic_partition not in self._offsets.commit:
- self._offsets.commit[topic_partition] = None
- # Skip if we already have a fetch offset from user args
- if topic_partition not in self._offsets.fetch:
- # Fetch offsets default is (1) commit
- if self._offsets.commit[topic_partition] is not None:
- self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
- # or (2) auto reset
- else:
- self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
- # highwater marks (received from server on fetch response)
- # and task_done (set locally by user)
- # should always get initialized to None
- self._reset_highwater_offsets()
- self._reset_task_done_offsets()
- # Reset message iterator in case we were in the middle of one
- self._reset_message_iterator()
- def close(self):
- """Close this consumer's underlying client."""
- self._client.close()
- def next(self):
- """Return the next available message
- Blocks indefinitely unless consumer_timeout_ms > 0
- Returns:
- a single KafkaMessage from the message iterator
- Raises:
- ConsumerTimeout after consumer_timeout_ms and no message
- Note:
- This is also the method called internally during iteration
- """
- self._set_consumer_timeout_start()
- while True:
- try:
- return
- # Handle batch completion
- except StopIteration:
- self._reset_message_iterator()
- self._check_consumer_timeout()
- def fetch_messages(self):
- """Sends FetchRequests for all topic/partitions set for consumption
- Returns:
- Generator that yields KafkaMessage structs
- after deserializing with the configured `deserializer_class`
- Note:
- Refreshes metadata on errors, and resets fetch offset on
- OffsetOutOfRange, per the configured `auto_offset_reset` policy
- See Also:
- Key KafkaConsumer configuration parameters:
- * `fetch_message_max_bytes`
- * `fetch_max_wait_ms`
- * `fetch_min_bytes`
- * `deserializer_class`
- * `auto_offset_reset`
- """
- max_bytes = self._config['fetch_message_max_bytes']
- max_wait_time = self._config['fetch_wait_max_ms']
- min_bytes = self._config['fetch_min_bytes']
- if not self._topics:
- raise KafkaConfigurationError('No topics or partitions configured')
- if not self._offsets.fetch:
- raise KafkaConfigurationError(
- 'No fetch offsets found when calling fetch_messages'
- )
- fetches = [FetchRequestPayload(topic, partition,
- self._offsets.fetch[(topic, partition)],
- max_bytes)
- for (topic, partition) in self._topics]
- # send_fetch_request will batch topic/partition requests by leader
- responses = self._client.send_fetch_request(
- fetches,
- max_wait_time=max_wait_time,
- min_bytes=min_bytes,
- fail_on_error=False
- )
- for resp in responses:
- if isinstance(resp, FailedPayloadsError):
- logger.warning('FailedPayloadsError attempting to fetch data')
- self._refresh_metadata_on_error()
- continue
- topic = resp.topic
- partition = resp.partition
- try:
- check_error(resp)
- except OffsetOutOfRangeError:
- logger.warning('OffsetOutOfRange: topic %s, partition %d, '
- 'offset %d (Highwatermark: %d)',
- topic, partition,
- self._offsets.fetch[(topic, partition)],
- resp.highwaterMark)
- # Reset offset
- self._offsets.fetch[(topic, partition)] = (
- self._reset_partition_offset((topic, partition))
- )
- continue
- except NotLeaderForPartitionError:
- logger.warning("NotLeaderForPartitionError for %s - %d. "
- "Metadata may be out of date",
- topic, partition)
- self._refresh_metadata_on_error()
- continue
- except RequestTimedOutError:
- logger.warning("RequestTimedOutError for %s - %d",
- topic, partition)
- continue
- # Track server highwater mark
- self._offsets.highwater[(topic, partition)] = resp.highwaterMark
- # Yield each message
- # Kafka-python could raise an exception during iteration
- # we are not catching -- user will need to address
- for (offset, message) in resp.messages:
- # deserializer_class could raise an exception here
- val = self._config['deserializer_class'](message.value)
- msg = KafkaMessage(topic, partition, offset, message.key, val)
- # in some cases the server will return earlier messages
- # than we requested. skip them per kafka spec
- if offset < self._offsets.fetch[(topic, partition)]:
- logger.debug('message offset less than fetched offset '
- 'skipping: %s', msg)
- continue
- # Only increment fetch offset
- # if we safely got the message and deserialized
- self._offsets.fetch[(topic, partition)] = offset + 1
- # Then yield to user
- yield msg
- def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
- """Request available fetch offsets for a single topic/partition
- Keyword Arguments:
- topic (str): topic for offset request
- partition (int): partition for offset request
- request_time_ms (int): Used to ask for all messages before a
- certain time (ms). There are two special values.
- Specify -1 to receive the latest offset (i.e. the offset of the
- next coming message) and -2 to receive the earliest available
- offset. Note that because offsets are pulled in descending
- order, asking for the earliest offset will always return you a
- single element.
- max_num_offsets (int): Maximum offsets to include in the OffsetResponse
- Returns:
- a list of offsets in the OffsetResponse submitted for the provided
- topic / partition. See:
- """
- reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)]
- (resp,) = self._client.send_offset_request(reqs)
- check_error(resp)
- # Just for sanity..
- # probably unnecessary
- assert resp.topic == topic
- assert resp.partition == partition
- return resp.offsets
- def offsets(self, group=None):
- """Get internal consumer offset values
- Keyword Arguments:
- group: Either "fetch", "commit", "task_done", or "highwater".
- If no group specified, returns all groups.
- Returns:
- A copy of internal offsets struct
- """
- if not group:
- return {
- 'fetch': self.offsets('fetch'),
- 'commit': self.offsets('commit'),
- 'task_done': self.offsets('task_done'),
- 'highwater': self.offsets('highwater')
- }
- else:
- return dict(deepcopy(getattr(self._offsets, group)))
- def task_done(self, message):
- """Mark a fetched message as consumed.
- Offsets for messages marked as "task_done" will be stored back
- to the kafka cluster for this consumer group on commit()
- Arguments:
- message (KafkaMessage): the message to mark as complete
- Returns:
- True, unless the topic-partition for this message has not
- been configured for the consumer. In normal operation, this
- should not happen. But see github issue 364.
- """
- topic_partition = (message.topic, message.partition)
- if topic_partition not in self._topics:
- logger.warning('Unrecognized topic/partition in task_done message: '
- '{0}:{1}'.format(*topic_partition))
- return False
- offset = message.offset
- # Warn on non-contiguous offsets
- prev_done = self._offsets.task_done[topic_partition]
- if prev_done is not None and offset != (prev_done + 1):
- logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
- offset, prev_done)
- # Warn on smaller offsets than previous commit
- # "commit" offsets are actually the offset of the next message to fetch.
- prev_commit = self._offsets.commit[topic_partition]
- if prev_commit is not None and ((offset + 1) <= prev_commit):
- logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
- offset, prev_commit)
- self._offsets.task_done[topic_partition] = offset
- # Check for auto-commit
- if self._does_auto_commit_messages():
- self._incr_auto_commit_message_count()
- if self._should_auto_commit():
- self.commit()
- return True
- def commit(self):
- """Store consumed message offsets (marked via task_done())
- to kafka cluster for this consumer_group.
- Returns:
- True on success, or False if no offsets were found for commit
- Note:
- this functionality requires server version >=
- """
- if not self._config['group_id']:
- logger.warning('Cannot commit without a group_id!')
- raise KafkaConfigurationError(
- 'Attempted to commit offsets '
- 'without a configured consumer group (group_id)'
- )
- # API supports storing metadata with each commit
- # but for now it is unused
- metadata = b''
- offsets = self._offsets.task_done
- commits = []
- for topic_partition, task_done_offset in six.iteritems(offsets):
- # Skip if None
- if task_done_offset is None:
- continue
- # Commit offsets as the next offset to fetch
- # which is consistent with the Java Client
- # task_done is marked by messages consumed,
- # so add one to mark the next message for fetching
- commit_offset = (task_done_offset + 1)
- # Skip if no change from previous committed
- if commit_offset == self._offsets.commit[topic_partition]:
- continue
- commits.append(
- OffsetCommitRequestPayload(topic_partition[0], topic_partition[1],
- commit_offset, metadata)
- )
- if commits:
-'committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(
- self._config['group_id'], commits,
- fail_on_error=False
- )
- for r in resps:
- check_error(r)
- topic_partition = (r.topic, r.partition)
- task_done = self._offsets.task_done[topic_partition]
- self._offsets.commit[topic_partition] = (task_done + 1)
- if self._config['auto_commit_enable']:
- self._reset_auto_commit()
- return True
- else:
-'No new offsets found to commit in group %s', self._config['group_id'])
- return False
- #
- # Topic/partition management private methods
- #
- def _consume_topic_partition(self, topic, partition):
- if not isinstance(partition, int):
- raise KafkaConfigurationError('Unknown partition type (%s) '
- '-- expected int' % type(partition))
- if topic not in self._client.topic_partitions:
- raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
- if partition not in self._client.get_partition_ids_for_topic(topic):
- raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
- "in broker metadata" % (partition, topic))
-"Configuring consumer to fetch topic '%s', partition %d", topic, partition)
- self._topics.append((topic, partition))
- def _refresh_metadata_on_error(self):
- refresh_ms = self._config['refresh_leader_backoff_ms']
- jitter_pct = 0.20
- sleep_ms = random.randint(
- int((1.0 - 0.5 * jitter_pct) * refresh_ms),
- int((1.0 + 0.5 * jitter_pct) * refresh_ms)
- )
- while True:
-"Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
- time.sleep(sleep_ms / 1000.0)
- try:
- self._client.load_metadata_for_topics()
- except KafkaUnavailableError:
- logger.warning("Unable to refresh topic metadata... cluster unavailable")
- self._check_consumer_timeout()
- else:
-"Topic metadata refreshed")
- return
- #
- # Offset-managment private methods
- #
- def _get_commit_offsets(self):
-"Consumer fetching stored offsets")
- for topic_partition in self._topics:
- (resp,) = self._client.send_offset_fetch_request(
- self._config['group_id'],
- [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])],
- fail_on_error=False)
- try:
- check_error(resp)
- # API spec says server wont set an error here
- # but does actually...
- except UnknownTopicOrPartitionError:
- pass
- # -1 offset signals no commit is currently stored
- if resp.offset == -1:
- self._offsets.commit[topic_partition] = None
- # Otherwise we committed the stored offset
- # and need to fetch the next one
- else:
- self._offsets.commit[topic_partition] = resp.offset
- def _reset_highwater_offsets(self):
- for topic_partition in self._topics:
- self._offsets.highwater[topic_partition] = None
- def _reset_task_done_offsets(self):
- for topic_partition in self._topics:
- self._offsets.task_done[topic_partition] = None
- def _reset_partition_offset(self, topic_partition):
- (topic, partition) = topic_partition
- LATEST = -1
- request_time_ms = None
- if self._config['auto_offset_reset'] == 'largest':
- request_time_ms = LATEST
- elif self._config['auto_offset_reset'] == 'smallest':
- request_time_ms = EARLIEST
- else:
- # Let's raise an reasonable exception type if user calls
- # outside of an exception context
- if sys.exc_info() == (None, None, None):
- raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
- 'valid auto_offset_reset setting '
- '(largest|smallest)')
- # Otherwise we should re-raise the upstream exception
- # b/c it typically includes additional data about
- # the request that triggered it, and we do not want to drop that
- raise # pylint: disable=E0704
- (offset, ) = self.get_partition_offsets(topic, partition,
- request_time_ms, max_num_offsets=1)
- return offset
- #
- # Consumer Timeout private methods
- #
- def _set_consumer_timeout_start(self):
- self._consumer_timeout = False
- if self._config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
- def _check_consumer_timeout(self):
- if self._consumer_timeout and time.time() > self._consumer_timeout:
- raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
- #
- # Autocommit private methods
- #
- def _should_auto_commit(self):
- if self._does_auto_commit_ms():
- if time.time() >= self._next_commit_time:
- return True
- if self._does_auto_commit_messages():
- if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
- return True
- return False
- def _reset_auto_commit(self):
- self._uncommitted_message_count = 0
- self._next_commit_time = None
- if self._does_auto_commit_ms():
- self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
- def _incr_auto_commit_message_count(self, n=1):
- self._uncommitted_message_count += n
- def _does_auto_commit_ms(self):
- if not self._config['auto_commit_enable']:
- return False
- conf = self._config['auto_commit_interval_ms']
- if conf is not None and conf > 0:
- return True
- return False
- def _does_auto_commit_messages(self):
- if not self._config['auto_commit_enable']:
- return False
- conf = self._config['auto_commit_interval_messages']
- if conf is not None and conf > 0:
- return True
- return False
- #
- # Message iterator private methods
- #
- def __iter__(self):
- return self
- def __next__(self):
- return
- def _get_message_iterator(self):
- # Fetch a new batch if needed
- if self._msg_iter is None:
- self._msg_iter = self.fetch_messages()
- return self._msg_iter
- def _reset_message_iterator(self):
- self._msg_iter = None
- #
- # python private methods
- #
- def __repr__(self):
- return '<{0} topics=({1})>'.format(
- self.__class__.__name__,
- '|'.join(["%s-%d" % topic_partition
- for topic_partition in self._topics])
- )
- #
- # other private methods
- #
- def _deprecate_configs(self, **configs):
- for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS):
- if old in configs:
- logger.warning('Deprecated Kafka Consumer configuration: %s. '
- 'Please use %s instead.', old, new)
- old_value = configs.pop(old)
- if new not in configs:
- configs[new] = old_value
- return configs