diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-21 21:20:38 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-21 21:20:38 +0900 |
commit | 146b893e0fbac21150f74a8ba2f17cc64e1714ad (patch) | |
tree | 071b27882c76ae44103d6047c83c818a3b9582b8 | |
parent | 0bd5d2ab5738065df410ec2f9381844b28fe7425 (diff) | |
parent | e3b1ad24b80dd60e3159566740f40fc6f5811070 (diff) | |
download | kafka-python-146b893e0fbac21150f74a8ba2f17cc64e1714ad.tar.gz |
Merge pull request #1258 from dpkp/pending_completions
Move callback processing from BrokerConnection to KafkaClient
-rw-r--r-- | kafka/client.py | 22 | ||||
-rw-r--r-- | kafka/client_async.py | 32 | ||||
-rw-r--r-- | kafka/conn.py | 39 | ||||
-rw-r--r-- | test/test_client.py | 3 |
4 files changed, 65 insertions, 31 deletions
diff --git a/kafka/client.py b/kafka/client.py index 75b05bf..22918ac 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -175,7 +175,8 @@ class SimpleClient(object): # Block while not future.is_done: - conn.recv() + for r, f in conn.recv(): + f.success(r) if future.failed(): log.error("Request failed: %s", future.exception) @@ -288,7 +289,8 @@ class SimpleClient(object): if not future.is_done: conn, _ = connections_by_future[future] - conn.recv() + for r, f in conn.recv(): + f.success(r) continue _, broker = connections_by_future.pop(future) @@ -352,8 +354,6 @@ class SimpleClient(object): try: host, port, afi = get_ip_port_afi(broker.host) conn = self._get_conn(host, broker.port, afi) - conn.send(request_id, request) - except ConnectionError as e: log.warning('ConnectionError attempting to send request %s ' 'to server %s: %s', request_id, broker, e) @@ -365,6 +365,11 @@ class SimpleClient(object): # No exception, try to get response else: + future = conn.send(request_id, request) + while not future.is_done: + for r, f in conn.recv(): + f.success(r) + # decoder_fn=None signal that the server is expected to not # send a response. This probably only applies to # ProduceRequest w/ acks = 0 @@ -376,18 +381,17 @@ class SimpleClient(object): responses[topic_partition] = None return [] - try: - response = conn.recv(request_id) - except ConnectionError as e: - log.warning('ConnectionError attempting to receive a ' + if future.failed(): + log.warning('Error attempting to receive a ' 'response to request %s from server %s: %s', - request_id, broker, e) + request_id, broker, future.exception) for payload in payloads: topic_partition = (payload.topic, payload.partition) responses[topic_partition] = FailedPayloadsError(payload) else: + response = future.value _resps = [] for payload_response in decoder_fn(response): topic_partition = (payload_response.topic, diff --git a/kafka/client_async.py b/kafka/client_async.py index aec609d..602c0c1 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,5 +1,6 @@ from __future__ import absolute_import, division +import collections import copy import functools import heapq @@ -204,6 +205,11 @@ class KafkaClient(object): self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) self._wake_lock = threading.Lock() + + # when requests complete, they are transferred to this queue prior to + # invocation. + self._pending_completion = collections.deque() + self._selector.register(self._wake_r, selectors.EVENT_READ) self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) self._closed = False @@ -254,7 +260,8 @@ class KafkaClient(object): future = bootstrap.send(metadata_request) while not future.is_done: self._selector.select(1) - bootstrap.recv() + for r, f in bootstrap.recv(): + f.success(r) if future.failed(): bootstrap.close() continue @@ -512,7 +519,9 @@ class KafkaClient(object): Returns: list: responses received (can be empty) """ - if timeout_ms is None: + if future is not None: + timeout_ms = 100 + elif timeout_ms is None: timeout_ms = self.config['request_timeout_ms'] responses = [] @@ -551,7 +560,9 @@ class KafkaClient(object): self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts - responses.extend(self._poll(timeout)) + self._poll(timeout) + + responses.extend(self._fire_pending_completed_requests()) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done @@ -561,7 +572,7 @@ class KafkaClient(object): return responses def _poll(self, timeout): - responses = [] + """Returns list of (response, future) tuples""" processed = set() start_select = time.time() @@ -600,14 +611,14 @@ class KafkaClient(object): continue self._idle_expiry_manager.update(conn.node_id) - responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks + self._pending_completion.extend(conn.recv()) # Check for additional pending SSL bytes if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): # TODO: optimize for conn in self._conns.values(): if conn not in processed and conn.connected() and conn._sock.pending(): - responses.extend(conn.recv()) + self._pending_completion.extend(conn.recv()) for conn in six.itervalues(self._conns): if conn.requests_timed_out(): @@ -621,7 +632,6 @@ class KafkaClient(object): self._sensors.io_time.record((time.time() - end_select) * 1000000000) self._maybe_close_oldest_connection() - return responses def in_flight_request_count(self, node_id=None): """Get the number of in-flight requests for a node or all nodes. @@ -640,6 +650,14 @@ class KafkaClient(object): else: return sum([len(conn.in_flight_requests) for conn in self._conns.values()]) + def _fire_pending_completed_requests(self): + responses = [] + while self._pending_completion: + response, future = self._pending_completion.popleft() + future.success(response) + responses.append(response) + return responses + def least_loaded_node(self): """Choose the node with fewest outstanding requests, with fallbacks. diff --git a/kafka/conn.py b/kafka/conn.py index 6117262..4c21b8c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,6 +5,14 @@ import copy import errno import logging from random import shuffle, uniform + +# selectors in stdlib as of py3.4 +try: + import selectors # pylint: disable=import-error +except ImportError: + # vendored backport module + from .vendor import selectors34 as selectors + import socket import struct import sys @@ -138,6 +146,9 @@ class BrokerConnection(object): api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version is None + selector (selectors.BaseSelector): Provide a specific selector + implementation to use for I/O multiplexing. + Default: selectors.DefaultSelector state_change_callback (callable): function to be called when the connection state changes from CONNECTING to CONNECTED etc. metrics (kafka.metrics.Metrics): Optionally provide a metrics @@ -173,6 +184,7 @@ class BrokerConnection(object): 'ssl_crlfile': None, 'ssl_password': None, 'api_version': (0, 8, 2), # default to most restrictive + 'selector': selectors.DefaultSelector, 'state_change_callback': lambda conn: True, 'metrics': None, 'metric_group_prefix': '', @@ -704,7 +716,7 @@ class BrokerConnection(object): def recv(self): """Non-blocking network receive. - Return response if available + Return list of (response, future) """ if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: log.warning('%s cannot recv: socket not connected', self) @@ -727,17 +739,16 @@ class BrokerConnection(object): self.config['request_timeout_ms'])) return () - for response in responses: + # augment respones w/ correlation_id, future, and timestamp + for i in range(len(responses)): (correlation_id, future, timestamp) = self.in_flight_requests.popleft() - if isinstance(response, Errors.KafkaError): - self.close(response) - break - + latency_ms = (time.time() - timestamp) * 1000 if self._sensors: - self._sensors.request_time.record((time.time() - timestamp) * 1000) + self._sensors.request_time.record(latency_ms) - log.debug('%s Response %d: %s', self, correlation_id, response) - future.success(response) + response = responses[i] + log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response) + responses[i] = (response, future) return responses @@ -899,12 +910,12 @@ class BrokerConnection(object): # request was unrecognized mr = self.send(MetadataRequest[0]([])) - if self._sock: - self._sock.setblocking(True) + selector = self.config['selector']() + selector.register(self._sock, selectors.EVENT_READ) while not (f.is_done and mr.is_done): - self.recv() - if self._sock: - self._sock.setblocking(False) + for response, future in self.recv(): + future.success(response) + selector.select(1) if f.succeeded(): if isinstance(request, ApiVersionRequest[0]): diff --git a/test/test_client.py b/test/test_client.py index 42a1623..d02c621 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -28,6 +28,7 @@ def mock_conn(conn, success=True): else: mocked.send.return_value = Future().failure(Exception()) conn.return_value = mocked + conn.recv.return_value = [] class TestSimpleClient(unittest.TestCase): @@ -94,7 +95,7 @@ class TestSimpleClient(unittest.TestCase): mock_conn(mocked_conns[('kafka03', 9092)], success=False) future = Future() mocked_conns[('kafka02', 9092)].send.return_value = future - mocked_conns[('kafka02', 9092)].recv.side_effect = lambda: future.success('valid response') + mocked_conns[('kafka02', 9092)].recv.return_value = [('valid response', future)] def mock_get_conn(host, port, afi): return mocked_conns[(host, port)] |