summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-13 19:34:08 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:54 -0800
commit53763510c77b252b103ed2a1b7cdba8e527ba7f6 (patch)
treed54d8fca001d13883eec497f65d5f1e84769dc41
parentb264d8f51751f9fc81cfe8e0fef0606dd877a8db (diff)
downloadkafka-python-53763510c77b252b103ed2a1b7cdba8e527ba7f6.tar.gz
Use configure() to check and set configuration keys
-rw-r--r--kafka/common.py4
-rw-r--r--kafka/consumer/new.py126
2 files changed, 71 insertions, 59 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 2e817cb..e4b3b1b 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -197,6 +197,10 @@ class UnsupportedCodecError(KafkaError):
pass
+class KafkaConfigurationError(KafkaError):
+ pass
+
+
kafka_errors = {
-1 : UnknownError,
0 : NoError,
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index b579bfc..a192b58 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -9,13 +9,37 @@ from kafka.common import (
OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
- FailedPayloadsError, KafkaUnavailableError
+ FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
logger = logging.getLogger(__name__)
OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"])
+DEFAULT_CONSUMER_CONFIG = {
+ 'client_id': __name__,
+ 'group_id': None,
+ 'metadata_broker_list': None,
+ 'socket_timeout_ms': 30 * 1000,
+ 'fetch_message_max_bytes': 1024 * 1024,
+ 'auto_offset_reset': 'largest',
+ 'fetch_min_bytes': 1,
+ 'fetch_wait_max_ms': 100,
+ 'refresh_leader_backoff_ms': 200,
+ 'deserializer_class': lambda msg: msg,
+ 'auto_commit_enable': False,
+ 'auto_commit_interval_ms': 60 * 1000,
+ 'consumer_timeout_ms': -1,
+
+ # Currently unused
+ 'socket_receive_buffer_bytes': 64 * 1024,
+ 'num_consumer_fetchers': 1,
+ 'default_fetcher_backoff_ms': 1000,
+ 'queued_max_message_chunks': 10,
+ 'rebalance_max_retries': 4,
+ 'rebalance_backoff_ms': 2000,
+}
+
class KafkaConsumer(object):
"""
@@ -93,52 +117,17 @@ class KafkaConsumer(object):
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
- DEFAULT_CONSUMER_CONFIG = {
- 'client_id': __name__,
- 'group_id': None,
- 'metadata_broker_list': None,
- 'socket_timeout_ms': 30 * 1000,
- 'fetch_message_max_bytes': 1024 * 1024,
- 'auto_offset_reset': 'largest',
- 'fetch_min_bytes': 1,
- 'fetch_wait_max_ms': 100,
- 'refresh_leader_backoff_ms': 200,
- 'deserializer_class': lambda msg: msg,
- 'auto_commit_enable': False,
- 'auto_commit_interval_ms': 60 * 1000,
- 'consumer_timeout_ms': -1,
-
- # Currently unused
- 'socket_receive_buffer_bytes': 64 * 1024,
- 'num_consumer_fetchers': 1,
- 'default_fetcher_backoff_ms': 1000,
- 'queued_max_message_chunks': 10,
- 'rebalance_max_retries': 4,
- 'rebalance_backoff_ms': 2000,
- }
-
def __init__(self, *topics, **configs):
self.topics = topics
- self.config = configs
- self.client = KafkaClient(self._get_config('metadata_broker_list'),
- client_id=self._get_config('client_id'),
- timeout=(self._get_config('socket_timeout_ms') / 1000.0))
+ self.configure(**configs)
# Get initial topic metadata
self.client.load_metadata_for_topics()
for topic in self.topics:
if topic not in self.client.topic_partitions:
- raise ValueError("Topic %s not found in broker metadata" % topic)
+ raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
logger.info("Configuring consumer to fetch topic '%s'", topic)
- # Check auto-commit configuration
- if self._get_config('auto_commit_enable'):
- if not self._get_config('group_id'):
- raise RuntimeError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
-
- logger.info("Configuring consumer to auto-commit offsets")
- self._set_next_auto_commit_time()
-
# Setup offsets
self._offsets = OffsetsStruct(fetch=defaultdict(dict),
commit=defaultdict(dict),
@@ -146,7 +135,7 @@ class KafkaConsumer(object):
task_done=defaultdict(dict))
# If we have a consumer group, try to fetch stored offsets
- if self._get_config('group_id'):
+ if self._config['group_id']:
self._fetch_stored_offsets()
else:
self._auto_reset_offsets()
@@ -166,7 +155,7 @@ class KafkaConsumer(object):
for partition in self.client.topic_partitions[topic]:
(resp,) = self.client.send_offset_fetch_request(
- self._get_config('group_id'),
+ self._config['group_id'],
[OffsetFetchRequest(topic, partition)],
fail_on_error=False)
try:
@@ -261,7 +250,7 @@ class KafkaConsumer(object):
self._offsets.task_done[topic][partition] = offset
def should_auto_commit(self):
- if not self._get_config('auto_commit_enable'):
+ if not self._config['auto_commit_enable']:
return False
if not self._next_commit:
@@ -270,10 +259,10 @@ class KafkaConsumer(object):
return (time.time() >= self._next_commit)
def _set_next_auto_commit_time(self):
- self._next_commit = time.time() + (self._get_config('auto_commit_interval_ms') / 1000.0)
+ self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
def commit(self):
- if not self._get_config('group_id'):
+ if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
raise RuntimeError('Attempted to commit offsets without a configured consumer group (group_id)')
@@ -303,8 +292,8 @@ class KafkaConsumer(object):
commits.append(OffsetCommitRequest(topic, partition, commit_offset, metadata))
if commits:
- logger.info('committing consumer offsets to group %s', self._get_config('group_id'))
- resps = self.client.send_offset_commit_request(self._get_config('group_id'),
+ logger.info('committing consumer offsets to group %s', self._config['group_id'])
+ resps = self.client.send_offset_commit_request(self._config['group_id'],
commits,
fail_on_error=False)
@@ -313,23 +302,42 @@ class KafkaConsumer(object):
task_done = self._offsets.task_done[r.topic][r.partition]
self._offsets.commit[r.topic][r.partition] = (task_done + 1)
- if self._get_config('auto_commit_enable'):
+ if self._config['auto_commit_enable']:
self._set_next_auto_commit_time()
return True
else:
- logger.info('No new offsets found to commit in group %s', self._get_config('group_id'))
+ logger.info('No new offsets found to commit in group %s', self._config['group_id'])
return False
- def _get_config(self, key):
- return self.config.get(key, self.DEFAULT_CONSUMER_CONFIG[key])
+ def configure(self, **configs):
+ self._config = {}
+ for key in DEFAULT_CONSUMER_CONFIG:
+ self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key])
+
+ if configs:
+ raise KafkaConfigurationError('Unknown configuration key(s): ' +
+ str(list(configs.keys())))
+
+ if self._config['auto_commit_enable']:
+ if not self._config['group_id']:
+ raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
+
+ # Check auto-commit configuration
+ if self._config['auto_commit_enable']:
+ logger.info("Configuring consumer to auto-commit offsets")
+ self._set_next_auto_commit_time()
+
+ self.client = KafkaClient(self._config['metadata_broker_list'],
+ client_id=self._config['client_id'],
+ timeout=(self._config['socket_timeout_ms'] / 1000.0))
def fetch_messages(self):
- max_bytes = self._get_config('fetch_message_max_bytes')
- max_wait_time = self._get_config('fetch_wait_max_ms')
- min_bytes = self._get_config('fetch_min_bytes')
+ max_bytes = self._config['fetch_message_max_bytes']
+ max_wait_time = self._config['fetch_wait_max_ms']
+ min_bytes = self._config['fetch_min_bytes']
fetches = []
offsets = self._offsets.fetch
@@ -383,7 +391,7 @@ class KafkaConsumer(object):
for (offset, message) in resp.messages:
# deserializer_class could raise an exception here
msg = KafkaMessage(topic, partition, offset, message.key,
- self._get_config('deserializer_class')(message.value))
+ self._config['deserializer_class'](message.value))
# Only increment fetch offset if we safely got the message and deserialized
self._offsets.fetch[topic][partition] = offset + 1
@@ -396,9 +404,9 @@ class KafkaConsumer(object):
EARLIEST = -2
RequestTime = None
- if self._get_config('auto_offset_reset') == 'largest':
+ if self._config['auto_offset_reset'] == 'largest':
RequestTime = LATEST
- elif self._get_config('auto_offset_reset') == 'smallest':
+ elif self._config['auto_offset_reset'] == 'smallest':
RequestTime = EARLIEST
else:
@@ -433,7 +441,7 @@ class KafkaConsumer(object):
return resp.offsets
def _refresh_metadata_on_error(self):
- sleep_ms = self._get_config('refresh_leader_backoff_ms')
+ sleep_ms = self._config['refresh_leader_backoff_ms']
while True:
logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
time.sleep(sleep_ms / 1000.0)
@@ -448,9 +456,9 @@ class KafkaConsumer(object):
def _set_consumer_timeout_start(self):
self._consumer_timeout = False
- if self._get_config('consumer_timeout_ms') >= 0:
- self._consumer_timeout = time.time() + (self._get_config('consumer_timeout_ms') / 1000.0)
+ if self._config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
def _check_consumer_timeout(self):
if self._consumer_timeout and time.time() > self._consumer_timeout:
- raise ConsumerTimeout('Consumer timed out after %d ms' % + self._get_config('consumer_timeout_ms'))
+ raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])