summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-17 02:24:26 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-18 15:15:11 -0800
commitac1a2a0a1012909faba4e711b968e5b0c3746ca5 (patch)
tree894a91eab760b2d91bf174d5d15133a91faa3e1e
parentd67157cb9a032a6f0493cea128bbcd0528f3e640 (diff)
downloadkafka-python-ac1a2a0a1012909faba4e711b968e5b0c3746ca5.tar.gz
Add group coordinator lookup
We need a way to send a request to the group coordinator. I spent a day and a half trying to implement a `_send_request_to_group_coordinator()` that included: 1. caching the value of the group coordinator so that it wouldn't have to be repeatedly looked up on every call. This is particularly important because the `list_consumer_groups()`, `list_consumer_group_offsets()`, and `describe_consumer_groups()` will frequently be used by monitoring scripts. I know across the production clusters that I support, using a cached value will save ~1M calls per day. 2. clean and consistent error handling. This is difficult because the responses are inconsistent about error codes. Some have a top-level error code, some bury it within the description of the actual item. 3. Avoiding tight coupling between this method and the request/response classes... the custom parsing logic for errors etc, given that it's non-standard, should live in the callers, not here. So finally I gave up and just went with this simpler solution and made it so the callers can optionally bypass this if they somehow already know the group coordinator.
-rw-r--r--kafka/admin/kafka.py40
1 files changed, 40 insertions, 0 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index 5ce8630..3dc2e44 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -4,6 +4,7 @@ import copy
import logging
import socket
from kafka.client_async import KafkaClient, selectors
+import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
NodeNotReadyError, NotControllerError)
@@ -11,6 +12,7 @@ from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest)
+from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.version import __version__
@@ -243,6 +245,44 @@ class KafkaAdmin(object):
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
.format(version))
+ def _find_group_coordinator_id(self, group_id):
+ """Find the broker node_id of the coordinator of the given group.
+
+ Sends a FindCoordinatorRequest message to the cluster. Will block until
+ the FindCoordinatorResponse is received. Any errors are immediately
+ raised.
+
+ :param group_id: The consumer group ID. This is typically the group
+ name as a string.
+ :return: The node_id of the broker that is the coordinator.
+ """
+ # Note: Java may change how this is implemented in KAFKA-6791.
+ #
+ # TODO add support for dynamically picking version of
+ # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
+ # When I experimented with this, GroupCoordinatorResponse_v1 didn't
+ # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
+ gc_request = GroupCoordinatorRequest[0](group_id)
+ gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
+ # use the extra error checking in add_group_coordinator() rather than
+ # immediately returning the group coordinator.
+ success = self._client.cluster.add_group_coordinator(group_id, gc_response)
+ if not success:
+ error_type = Errors.for_code(gc_response.error_code)
+ assert error_type is not Errors.NoError
+ # Note: When error_type.retriable, Java will retry... see
+ # KafkaAdminClient's handleFindCoordinatorError method
+ raise error_type(
+ "Could not identify group coordinator for group_id '{}' from response '{}'."
+ .format(group_id, gc_response))
+ group_coordinator = self._client.cluster.coordinator_for_group(group_id)
+ # will be None if the coordinator was never populated, which should never happen here
+ assert group_coordinator is not None
+ # will be -1 if add_group_coordinator() failed... but by this point the
+ # error should have been raised.
+ assert group_coordinator != -1
+ return group_coordinator
+
def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.