summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-29 19:08:35 -0800
committerDana Powers <dana.powers@rd.io>2015-12-29 19:08:35 -0800
commit3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9 (patch)
tree2c38fd2c577442cb90c99ee2d49b5b0f68300303 /kafka/cluster.py
parente5c7d81e7c35e6b013cece347ef42d9f21d03aa6 (diff)
downloadkafka-python-3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9.tar.gz
Switch configs from attributes to dict to make passing / inspecting easier
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py20
1 files changed, 12 insertions, 8 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 5b5fd8e..84ad1d3 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import copy
import logging
import random
import time
@@ -12,10 +13,12 @@ log = logging.getLogger(__name__)
class ClusterMetadata(object):
- _retry_backoff_ms = 100
- _metadata_max_age_ms = 300000
+ DEFAULT_CONFIG = {
+ 'retry_backoff_ms': 100,
+ 'metadata_max_age_ms': 300000,
+ }
- def __init__(self, **kwargs):
+ def __init__(self, **configs):
self._brokers = {}
self._partitions = {}
self._groups = {}
@@ -26,9 +29,10 @@ class ClusterMetadata(object):
self._future = None
self._listeners = set()
- for config in ('retry_backoff_ms', 'metadata_max_age_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
def brokers(self):
return set(self._brokers.values())
@@ -55,8 +59,8 @@ class ClusterMetadata(object):
if self._need_update:
ttl = 0
else:
- ttl = self._last_successful_refresh_ms + self._metadata_max_age_ms - now
- retry = self._last_refresh_ms + self._retry_backoff_ms - now
+ ttl = self._last_successful_refresh_ms + self.config['metadata_max_age_ms'] - now
+ retry = self._last_refresh_ms + self.config['retry_backoff_ms'] - now
return max(ttl, retry, 0)
def request_update(self):