diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-12-02 15:50:05 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-12-02 15:50:05 -0800 |
commit | 3e28b42e6691b864b6f940034a4ccdce0b69d406 (patch) | |
tree | b2badd68fc5ac37ab50016f25a0efdb2abb76c55 /kafka/client.py | |
parent | ee6b9cb5b1310c48a3ed5b66be0dd0c4dd16dc43 (diff) | |
parent | da03827d12520bd9c8c5b35bb43e35168f09771a (diff) | |
download | kafka-python-3e28b42e6691b864b6f940034a4ccdce0b69d406.tar.gz |
Merge pull request #420 from toddpalino/master
Initial support for consumer coordinator
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/kafka/client.py b/kafka/client.py index 64b814b..810fa46 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -98,6 +98,26 @@ class KafkaClient(object): # Otherwise return the BrokerMetadata return self.brokers[meta.leader] + def _get_coordinator_for_group(self, group): + """ + Returns the coordinator broker for a consumer group. + + ConsumerCoordinatorNotAvailableCode will be raised if the coordinator + does not currently exist for the group. + + OffsetsLoadInProgressCode is raised if the coordinator is available + but is still loading offsets from the internal topic + """ + + resp = self.send_consumer_metadata_request(group) + + # If there's a problem with finding the coordinator, raise the + # provided error + kafka.common.check_error(resp) + + # Otherwise return the BrokerMetadata + return BrokerMetadata(resp.nodeId, resp.host, resp.port) + def _next_id(self): """Generate a new correlation id""" # modulo to keep w/i int32 @@ -254,6 +274,96 @@ class KafkaClient(object): # Return responses in the same order as provided return [responses[tp] for tp in original_ordering] + def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn): + """ + Send a list of requests to the consumer coordinator for the group + specified using the supplied encode/decode functions. As the payloads + that use consumer-aware requests do not contain the group (e.g. + OffsetFetchRequest), all payloads must be for a single group. + + Arguments: + + group: the name of the consumer group (str) the payloads are for + payloads: list of object-like entities with topic (str) and + partition (int) attributes; payloads with duplicate + topic+partition are not supported. + + encode_fn: a method to encode the list of payloads to a request body, + must accept client_id, correlation_id, and payloads as + keyword arguments + + decode_fn: a method to decode a response body into response objects. + The response objects must be object-like and have topic + and partition attributes + + Returns: + + List of response objects in the same order as the supplied payloads + """ + # encoders / decoders do not maintain ordering currently + # so we need to keep this so we can rebuild order before returning + original_ordering = [(p.topic, p.partition) for p in payloads] + + broker = self._get_coordinator_for_group(group) + + # Send the list of request payloads and collect the responses and + # errors + responses = {} + requestId = self._next_id() + log.debug('Request %s to %s: %s', requestId, broker, payloads) + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, payloads=payloads) + + # Send the request, recv the response + try: + conn = self._get_conn(broker.host.decode('utf-8'), broker.port) + conn.send(requestId, request) + + except ConnectionError as e: + log.warning('ConnectionError attempting to send request %s ' + 'to server %s: %s', requestId, broker, e) + + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + # No exception, try to get response + else: + + # decoder_fn=None signal that the server is expected to not + # send a response. This probably only applies to + # ProduceRequest w/ acks = 0 + if decoder_fn is None: + log.debug('Request %s does not expect a response ' + '(skipping conn.recv)', requestId) + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = None + return [] + + try: + response = conn.recv(requestId) + except ConnectionError as e: + log.warning('ConnectionError attempting to receive a ' + 'response to request %s from server %s: %s', + requestId, broker, e) + + for payload in payloads: + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) + + else: + _resps = [] + for payload_response in decoder_fn(response): + topic_partition = (payload_response.topic, + payload_response.partition) + responses[topic_partition] = payload_response + _resps.append(payload_response) + log.debug('Response %s: %s', requestId, _resps) + + # Return responses in the same order as provided + return [responses[tp] for tp in original_ordering] + def __repr__(self): return '<KafkaClient client_id=%s>' % (self.client_id) @@ -446,6 +556,13 @@ class KafkaClient(object): return self._send_broker_unaware_request(payloads, encoder, decoder) + def send_consumer_metadata_request(self, payloads=[], fail_on_error=True, + callback=None): + encoder = KafkaProtocol.encode_consumer_metadata_request + decoder = KafkaProtocol.decode_consumer_metadata_response + + return self._send_broker_unaware_request(payloads, encoder, decoder) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -550,3 +667,14 @@ class KafkaClient(object): return [resp if not callback else callback(resp) for resp in resps if not fail_on_error or not self._raise_on_response_error(resp)] + + def send_offset_fetch_request_kafka(self, group, payloads=[], + fail_on_error=True, callback=None): + + encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, + group=group, from_kafka=True) + decoder = KafkaProtocol.decode_offset_fetch_response + resps = self._send_consumer_aware_request(group, payloads, encoder, decoder) + + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] |