diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-13 19:34:08 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:54 -0800 |
commit | 53763510c77b252b103ed2a1b7cdba8e527ba7f6 (patch) | |
tree | d54d8fca001d13883eec497f65d5f1e84769dc41 | |
parent | b264d8f51751f9fc81cfe8e0fef0606dd877a8db (diff) | |
download | kafka-python-53763510c77b252b103ed2a1b7cdba8e527ba7f6.tar.gz |
Use configure() to check and set configuration keys
-rw-r--r-- | kafka/common.py | 4 | ||||
-rw-r--r-- | kafka/consumer/new.py | 126 |
2 files changed, 71 insertions, 59 deletions
diff --git a/kafka/common.py b/kafka/common.py index 2e817cb..e4b3b1b 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -197,6 +197,10 @@ class UnsupportedCodecError(KafkaError): pass +class KafkaConfigurationError(KafkaError): + pass + + kafka_errors = { -1 : UnknownError, 0 : NoError, diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index b579bfc..a192b58 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -9,13 +9,37 @@ from kafka.common import ( OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, - FailedPayloadsError, KafkaUnavailableError + FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError ) logger = logging.getLogger(__name__) OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"]) +DEFAULT_CONSUMER_CONFIG = { + 'client_id': __name__, + 'group_id': None, + 'metadata_broker_list': None, + 'socket_timeout_ms': 30 * 1000, + 'fetch_message_max_bytes': 1024 * 1024, + 'auto_offset_reset': 'largest', + 'fetch_min_bytes': 1, + 'fetch_wait_max_ms': 100, + 'refresh_leader_backoff_ms': 200, + 'deserializer_class': lambda msg: msg, + 'auto_commit_enable': False, + 'auto_commit_interval_ms': 60 * 1000, + 'consumer_timeout_ms': -1, + + # Currently unused + 'socket_receive_buffer_bytes': 64 * 1024, + 'num_consumer_fetchers': 1, + 'default_fetcher_backoff_ms': 1000, + 'queued_max_message_chunks': 10, + 'rebalance_max_retries': 4, + 'rebalance_backoff_ms': 2000, +} + class KafkaConsumer(object): """ @@ -93,52 +117,17 @@ class KafkaConsumer(object): http://kafka.apache.org/documentation.html#highlevelconsumerapi """ - DEFAULT_CONSUMER_CONFIG = { - 'client_id': __name__, - 'group_id': None, - 'metadata_broker_list': None, - 'socket_timeout_ms': 30 * 1000, - 'fetch_message_max_bytes': 1024 * 1024, - 'auto_offset_reset': 'largest', - 'fetch_min_bytes': 1, - 'fetch_wait_max_ms': 100, - 'refresh_leader_backoff_ms': 200, - 'deserializer_class': lambda msg: msg, - 'auto_commit_enable': False, - 'auto_commit_interval_ms': 60 * 1000, - 'consumer_timeout_ms': -1, - - # Currently unused - 'socket_receive_buffer_bytes': 64 * 1024, - 'num_consumer_fetchers': 1, - 'default_fetcher_backoff_ms': 1000, - 'queued_max_message_chunks': 10, - 'rebalance_max_retries': 4, - 'rebalance_backoff_ms': 2000, - } - def __init__(self, *topics, **configs): self.topics = topics - self.config = configs - self.client = KafkaClient(self._get_config('metadata_broker_list'), - client_id=self._get_config('client_id'), - timeout=(self._get_config('socket_timeout_ms') / 1000.0)) + self.configure(**configs) # Get initial topic metadata self.client.load_metadata_for_topics() for topic in self.topics: if topic not in self.client.topic_partitions: - raise ValueError("Topic %s not found in broker metadata" % topic) + raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) logger.info("Configuring consumer to fetch topic '%s'", topic) - # Check auto-commit configuration - if self._get_config('auto_commit_enable'): - if not self._get_config('group_id'): - raise RuntimeError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') - - logger.info("Configuring consumer to auto-commit offsets") - self._set_next_auto_commit_time() - # Setup offsets self._offsets = OffsetsStruct(fetch=defaultdict(dict), commit=defaultdict(dict), @@ -146,7 +135,7 @@ class KafkaConsumer(object): task_done=defaultdict(dict)) # If we have a consumer group, try to fetch stored offsets - if self._get_config('group_id'): + if self._config['group_id']: self._fetch_stored_offsets() else: self._auto_reset_offsets() @@ -166,7 +155,7 @@ class KafkaConsumer(object): for partition in self.client.topic_partitions[topic]: (resp,) = self.client.send_offset_fetch_request( - self._get_config('group_id'), + self._config['group_id'], [OffsetFetchRequest(topic, partition)], fail_on_error=False) try: @@ -261,7 +250,7 @@ class KafkaConsumer(object): self._offsets.task_done[topic][partition] = offset def should_auto_commit(self): - if not self._get_config('auto_commit_enable'): + if not self._config['auto_commit_enable']: return False if not self._next_commit: @@ -270,10 +259,10 @@ class KafkaConsumer(object): return (time.time() >= self._next_commit) def _set_next_auto_commit_time(self): - self._next_commit = time.time() + (self._get_config('auto_commit_interval_ms') / 1000.0) + self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) def commit(self): - if not self._get_config('group_id'): + if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') raise RuntimeError('Attempted to commit offsets without a configured consumer group (group_id)') @@ -303,8 +292,8 @@ class KafkaConsumer(object): commits.append(OffsetCommitRequest(topic, partition, commit_offset, metadata)) if commits: - logger.info('committing consumer offsets to group %s', self._get_config('group_id')) - resps = self.client.send_offset_commit_request(self._get_config('group_id'), + logger.info('committing consumer offsets to group %s', self._config['group_id']) + resps = self.client.send_offset_commit_request(self._config['group_id'], commits, fail_on_error=False) @@ -313,23 +302,42 @@ class KafkaConsumer(object): task_done = self._offsets.task_done[r.topic][r.partition] self._offsets.commit[r.topic][r.partition] = (task_done + 1) - if self._get_config('auto_commit_enable'): + if self._config['auto_commit_enable']: self._set_next_auto_commit_time() return True else: - logger.info('No new offsets found to commit in group %s', self._get_config('group_id')) + logger.info('No new offsets found to commit in group %s', self._config['group_id']) return False - def _get_config(self, key): - return self.config.get(key, self.DEFAULT_CONSUMER_CONFIG[key]) + def configure(self, **configs): + self._config = {} + for key in DEFAULT_CONSUMER_CONFIG: + self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key]) + + if configs: + raise KafkaConfigurationError('Unknown configuration key(s): ' + + str(list(configs.keys()))) + + if self._config['auto_commit_enable']: + if not self._config['group_id']: + raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') + + # Check auto-commit configuration + if self._config['auto_commit_enable']: + logger.info("Configuring consumer to auto-commit offsets") + self._set_next_auto_commit_time() + + self.client = KafkaClient(self._config['metadata_broker_list'], + client_id=self._config['client_id'], + timeout=(self._config['socket_timeout_ms'] / 1000.0)) def fetch_messages(self): - max_bytes = self._get_config('fetch_message_max_bytes') - max_wait_time = self._get_config('fetch_wait_max_ms') - min_bytes = self._get_config('fetch_min_bytes') + max_bytes = self._config['fetch_message_max_bytes'] + max_wait_time = self._config['fetch_wait_max_ms'] + min_bytes = self._config['fetch_min_bytes'] fetches = [] offsets = self._offsets.fetch @@ -383,7 +391,7 @@ class KafkaConsumer(object): for (offset, message) in resp.messages: # deserializer_class could raise an exception here msg = KafkaMessage(topic, partition, offset, message.key, - self._get_config('deserializer_class')(message.value)) + self._config['deserializer_class'](message.value)) # Only increment fetch offset if we safely got the message and deserialized self._offsets.fetch[topic][partition] = offset + 1 @@ -396,9 +404,9 @@ class KafkaConsumer(object): EARLIEST = -2 RequestTime = None - if self._get_config('auto_offset_reset') == 'largest': + if self._config['auto_offset_reset'] == 'largest': RequestTime = LATEST - elif self._get_config('auto_offset_reset') == 'smallest': + elif self._config['auto_offset_reset'] == 'smallest': RequestTime = EARLIEST else: @@ -433,7 +441,7 @@ class KafkaConsumer(object): return resp.offsets def _refresh_metadata_on_error(self): - sleep_ms = self._get_config('refresh_leader_backoff_ms') + sleep_ms = self._config['refresh_leader_backoff_ms'] while True: logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms) time.sleep(sleep_ms / 1000.0) @@ -448,9 +456,9 @@ class KafkaConsumer(object): def _set_consumer_timeout_start(self): self._consumer_timeout = False - if self._get_config('consumer_timeout_ms') >= 0: - self._consumer_timeout = time.time() + (self._get_config('consumer_timeout_ms') / 1000.0) + if self._config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0) def _check_consumer_timeout(self): if self._consumer_timeout and time.time() > self._consumer_timeout: - raise ConsumerTimeout('Consumer timed out after %d ms' % + self._get_config('consumer_timeout_ms')) + raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms']) |