diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-10 12:05:14 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-10 12:05:14 -0800 |
commit | 814b599c49a0e5fafc1e2598731e582aac5b380f (patch) | |
tree | 5643e99aa73328f8623dd7d7153e99a6abf23f78 /kafka | |
parent | 0a62cc1b862efb884b4e7f72ff3254763941fb04 (diff) | |
parent | 240f7029def4027bfccde7b8627c978ab1fdd5a6 (diff) | |
download | kafka-python-814b599c49a0e5fafc1e2598731e582aac5b380f.tar.gz |
Merge pull request #501 from dpkp/coordinator_tests
ConsumerCoordinator cleanups and test coverage
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/subscription_state.py | 3 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 5 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 44 |
3 files changed, 39 insertions, 13 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index c60f192..bb6034c 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -157,6 +157,9 @@ class SubscriptionState(object): self._group_subscription.update(topics) def mark_for_reassignment(self): + if self._user_assignment: + raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + assert self.subscription is not None, 'Subscription required' self._group_subscription.intersection_update(self.subscription) self.needs_partition_assignment = True diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index bcd5889..6dd65dc 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -621,7 +621,7 @@ class HeartbeatTask(object): etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0 self._client.schedule(self, etd) - +''' class GroupCoordinatorMetrics(object): def __init__(self, metrics, prefix, tags=None): self.metrics = metrics @@ -674,5 +674,4 @@ class GroupCoordinatorMetrics(object): "The number of seconds since the last controller heartbeat", tags), lastHeartbeat) """ - - +''' diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index af3e019..9828252 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -8,6 +8,7 @@ import time import six from .base import BaseCoordinator +from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ( ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol) @@ -29,7 +30,7 @@ class ConsumerCoordinator(BaseCoordinator): 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': lambda offsets, response: True, - 'assignors': (), + 'assignors': (RoundRobinPartitionAssignor,), 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, @@ -100,6 +101,7 @@ class ConsumerCoordinator(BaseCoordinator): def group_protocols(self): """Returns list of preferred (protocols, metadata)""" topics = self._subscription.subscription + assert topics is not None, 'Consumer has not subscribed to topics' metadata_list = [] for assignor in self.config['assignors']: metadata = assignor.metadata(topics) @@ -111,7 +113,7 @@ class ConsumerCoordinator(BaseCoordinator): # if we encounter any unauthorized topics, raise an exception # TODO #if self._cluster.unauthorized_topics: - # raise Errors.TopicAuthorizationError(self._cluster.unauthorized_topics) + # raise TopicAuthorizationError(self._cluster.unauthorized_topics) if self._subscription.subscribed_pattern: topics = [] @@ -122,7 +124,8 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription.change_subscription(topics) self._client.set_topics(self._subscription.group_subscription()) - # check if there are any changes to the metadata which should trigger a rebalance + # check if there are any changes to the metadata which should trigger + # a rebalance if self._subscription_metadata_changed(): if self.config['api_version'] >= (0, 9): self._subscription.mark_for_reassignment() @@ -182,7 +185,7 @@ class ConsumerCoordinator(BaseCoordinator): # execute the user's callback after rebalance if self._subscription.listener: try: - self._subscriptions.listener.on_partitions_assigned(assigned) + self._subscription.listener.on_partitions_assigned(assigned) except Exception: log.exception("User provided listener failed on partition" " assignment: %s", assigned) @@ -263,6 +266,9 @@ class ConsumerCoordinator(BaseCoordinator): Returns: dict: {TopicPartition: OffsetAndMetadata} """ + if not partitions: + return {} + while True: if self.config['api_version'] >= (0, 8, 2): self.ensure_coordinator_known() @@ -297,11 +303,16 @@ class ConsumerCoordinator(BaseCoordinator): Returns: Future: indicating whether the commit was successful or not """ + assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), + offsets.values())) if callback is None: callback = self.config['default_offset_commit_callback'] self._subscription.needs_fetch_committed_offsets = True future = self._send_offset_commit_request(offsets) future.add_both(callback, offsets) + return future def commit_offsets_sync(self, offsets): """Commit specific offsets synchronously. @@ -314,6 +325,10 @@ class ConsumerCoordinator(BaseCoordinator): Raises error on failure """ + assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), + offsets.values())) if not offsets: return @@ -325,7 +340,7 @@ class ConsumerCoordinator(BaseCoordinator): self._client.poll(future=future) if future.succeeded(): - return + return future.value if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type @@ -369,6 +384,13 @@ class ConsumerCoordinator(BaseCoordinator): Returns: Future: indicating whether the commit was successful or not """ + assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) + assert all(map(lambda v: isinstance(v, OffsetAndMetadata), + offsets.values())) + if not offsets: + return Future().success(None) + if self.config['api_version'] >= (0, 8, 2): if self.coordinator_unknown(): return Future().failure(Errors.GroupCoordinatorNotAvailableError) @@ -376,9 +398,6 @@ class ConsumerCoordinator(BaseCoordinator): else: node_id = self._client.least_loaded_node() - if not offsets: - return Future().failure(None) - # create the offset commit request offset_data = collections.defaultdict(dict) for tp, offset in six.iteritems(offsets): @@ -428,7 +447,7 @@ class ConsumerCoordinator(BaseCoordinator): future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_commit_response, offsets, future) - _f.add_errback(self._failed_request, future) + _f.add_errback(self._failed_request, node_id, request, future) return future def _handle_offset_commit_response(self, offsets, future, response): @@ -513,6 +532,11 @@ class ConsumerCoordinator(BaseCoordinator): Returns: Future: resolves to dict of offsets: {TopicPartition: int} """ + assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) + if not partitions: + return Future().success({}) + if self.config['api_version'] >= (0, 8, 2): if self.coordinator_unknown(): return Future().failure(Errors.GroupCoordinatorNotAvailableError) @@ -541,7 +565,7 @@ class ConsumerCoordinator(BaseCoordinator): future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_fetch_response, future) - _f.add_errback(self._failed_request, future) + _f.add_errback(self._failed_request, node_id, request, future) return future def _handle_offset_fetch_response(self, future, response): |