summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 15:50:05 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 15:50:05 -0800
commit3e28b42e6691b864b6f940034a4ccdce0b69d406 (patch)
treeb2badd68fc5ac37ab50016f25a0efdb2abb76c55 /kafka/client.py
parentee6b9cb5b1310c48a3ed5b66be0dd0c4dd16dc43 (diff)
parentda03827d12520bd9c8c5b35bb43e35168f09771a (diff)
downloadkafka-python-3e28b42e6691b864b6f940034a4ccdce0b69d406.tar.gz
Merge pull request #420 from toddpalino/master
Initial support for consumer coordinator
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py128
1 files changed, 128 insertions, 0 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 64b814b..810fa46 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -98,6 +98,26 @@ class KafkaClient(object):
# Otherwise return the BrokerMetadata
return self.brokers[meta.leader]
+ def _get_coordinator_for_group(self, group):
+ """
+ Returns the coordinator broker for a consumer group.
+
+ ConsumerCoordinatorNotAvailableCode will be raised if the coordinator
+ does not currently exist for the group.
+
+ OffsetsLoadInProgressCode is raised if the coordinator is available
+ but is still loading offsets from the internal topic
+ """
+
+ resp = self.send_consumer_metadata_request(group)
+
+ # If there's a problem with finding the coordinator, raise the
+ # provided error
+ kafka.common.check_error(resp)
+
+ # Otherwise return the BrokerMetadata
+ return BrokerMetadata(resp.nodeId, resp.host, resp.port)
+
def _next_id(self):
"""Generate a new correlation id"""
# modulo to keep w/i int32
@@ -254,6 +274,96 @@ class KafkaClient(object):
# Return responses in the same order as provided
return [responses[tp] for tp in original_ordering]
+ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
+ """
+ Send a list of requests to the consumer coordinator for the group
+ specified using the supplied encode/decode functions. As the payloads
+ that use consumer-aware requests do not contain the group (e.g.
+ OffsetFetchRequest), all payloads must be for a single group.
+
+ Arguments:
+
+ group: the name of the consumer group (str) the payloads are for
+ payloads: list of object-like entities with topic (str) and
+ partition (int) attributes; payloads with duplicate
+ topic+partition are not supported.
+
+ encode_fn: a method to encode the list of payloads to a request body,
+ must accept client_id, correlation_id, and payloads as
+ keyword arguments
+
+ decode_fn: a method to decode a response body into response objects.
+ The response objects must be object-like and have topic
+ and partition attributes
+
+ Returns:
+
+ List of response objects in the same order as the supplied payloads
+ """
+ # encoders / decoders do not maintain ordering currently
+ # so we need to keep this so we can rebuild order before returning
+ original_ordering = [(p.topic, p.partition) for p in payloads]
+
+ broker = self._get_coordinator_for_group(group)
+
+ # Send the list of request payloads and collect the responses and
+ # errors
+ responses = {}
+ requestId = self._next_id()
+ log.debug('Request %s to %s: %s', requestId, broker, payloads)
+ request = encoder_fn(client_id=self.client_id,
+ correlation_id=requestId, payloads=payloads)
+
+ # Send the request, recv the response
+ try:
+ conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
+ conn.send(requestId, request)
+
+ except ConnectionError as e:
+ log.warning('ConnectionError attempting to send request %s '
+ 'to server %s: %s', requestId, broker, e)
+
+ for payload in payloads:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
+
+ # No exception, try to get response
+ else:
+
+ # decoder_fn=None signal that the server is expected to not
+ # send a response. This probably only applies to
+ # ProduceRequest w/ acks = 0
+ if decoder_fn is None:
+ log.debug('Request %s does not expect a response '
+ '(skipping conn.recv)', requestId)
+ for payload in payloads:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = None
+ return []
+
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError as e:
+ log.warning('ConnectionError attempting to receive a '
+ 'response to request %s from server %s: %s',
+ requestId, broker, e)
+
+ for payload in payloads:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
+
+ else:
+ _resps = []
+ for payload_response in decoder_fn(response):
+ topic_partition = (payload_response.topic,
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+ _resps.append(payload_response)
+ log.debug('Response %s: %s', requestId, _resps)
+
+ # Return responses in the same order as provided
+ return [responses[tp] for tp in original_ordering]
+
def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
@@ -446,6 +556,13 @@ class KafkaClient(object):
return self._send_broker_unaware_request(payloads, encoder, decoder)
+ def send_consumer_metadata_request(self, payloads=[], fail_on_error=True,
+ callback=None):
+ encoder = KafkaProtocol.encode_consumer_metadata_request
+ decoder = KafkaProtocol.decode_consumer_metadata_response
+
+ return self._send_broker_unaware_request(payloads, encoder, decoder)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
@@ -550,3 +667,14 @@ class KafkaClient(object):
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]
+
+ def send_offset_fetch_request_kafka(self, group, payloads=[],
+ fail_on_error=True, callback=None):
+
+ encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
+ group=group, from_kafka=True)
+ decoder = KafkaProtocol.decode_offset_fetch_response
+ resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)
+
+ return [resp if not callback else callback(resp) for resp in resps
+ if not fail_on_error or not self._raise_on_response_error(resp)]