diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-08-04 14:22:40 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-08-04 14:22:40 -0700 |
commit | 68c8fa4ad01f8fef38708f257cb1c261cfac01ab (patch) | |
tree | 38d12fc11f82c492c68a4e04dbac26664862f541 | |
parent | 3c9b1b6fc498f95806ee12f67f84ea548ac1378f (diff) | |
parent | 025b69ef4ae22d1677904e99f924b9ef5a096e75 (diff) | |
download | kafka-python-68c8fa4ad01f8fef38708f257cb1c261cfac01ab.tar.gz |
Merge pull request #794 from dpkp/conn_metrics
Complete metrics instrumentation
-rw-r--r-- | kafka/client_async.py | 30 | ||||
-rw-r--r-- | kafka/conn.py | 190 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 6 | ||||
-rw-r--r-- | kafka/consumer/group.py | 8 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 5 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 10 | ||||
-rw-r--r-- | kafka/producer/buffer.py | 17 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 2 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 6 | ||||
-rw-r--r-- | kafka/producer/sender.py | 17 | ||||
-rw-r--r-- | test/test_client_async.py | 2 | ||||
-rw-r--r-- | test/test_coordinator.py | 7 | ||||
-rw-r--r-- | test/test_fetcher.py | 2 |
13 files changed, 253 insertions, 49 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 8af4acc..ce1d13b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -24,6 +24,7 @@ from .cluster import ClusterMetadata from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi from . import errors as Errors from .future import Future +from .metrics import AnonMeasurable from .metrics.stats import Avg, Count, Rate from .metrics.stats.rate import TimeUnit from .protocol.metadata import MetadataRequest @@ -187,10 +188,13 @@ class KafkaClient(object): self._wake_lock = threading.Lock() self._selector.register(self._wake_r, selectors.EVENT_READ) self._closed = False - self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._sensors = None if self.config['metrics']: - self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix']) + self._sensors = KafkaClientMetrics(self.config['metrics'], + self.config['metric_group_prefix'], + self._conns) + + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) # Check Broker Version if not set explicitly if self.config['api_version'] is None: @@ -218,6 +222,7 @@ class KafkaClient(object): cb = functools.partial(self._conn_state_change, 'bootstrap') bootstrap = BrokerConnection(host, port, afi, state_change_callback=cb, + node_id='bootstrap', **self.config) bootstrap.connect() while bootstrap.connecting(): @@ -273,6 +278,8 @@ class KafkaClient(object): except KeyError: pass self._selector.register(conn._sock, selectors.EVENT_READ, conn) + if self._sensors: + self._sensors.connection_created.record() if 'bootstrap' in self._conns and node_id != 'bootstrap': bootstrap = self._conns.pop('bootstrap') @@ -289,6 +296,8 @@ class KafkaClient(object): self._selector.unregister(conn._sock) except KeyError: pass + if self._sensors: + self._sensors.connection_closed.record() if self._refresh_on_disconnects and not self._closed: log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() @@ -305,6 +314,7 @@ class KafkaClient(object): cb = functools.partial(self._conn_state_change, node_id) self._conns[node_id] = BrokerConnection(host, broker.port, afi, state_change_callback=cb, + node_id=node_id, **self.config) conn = self._conns[node_id] if conn.connected(): @@ -888,10 +898,19 @@ class DelayedTaskQueue(object): class KafkaClientMetrics(object): - def __init__(self, metrics, metric_group_prefix): + def __init__(self, metrics, metric_group_prefix, conns): self.metrics = metrics self.metric_group_name = metric_group_prefix + '-metrics' + self.connection_closed = metrics.sensor('connections-closed') + self.connection_closed.add(metrics.metric_name( + 'connection-close-rate', self.metric_group_name, + 'Connections closed per second in the window.'), Rate()) + self.connection_created = metrics.sensor('connections-created') + self.connection_created.add(metrics.metric_name( + 'connection-creation-rate', self.metric_group_name, + 'New connections established per second in the window.'), Rate()) + self.select_time = metrics.sensor('select-time') self.select_time.add(metrics.metric_name( 'select-rate', self.metric_group_name, @@ -915,3 +934,8 @@ class KafkaClientMetrics(object): 'io-ratio', self.metric_group_name, 'The fraction of time the I/O thread spent doing I/O'), Rate(time_unit=TimeUnit.NANOSECONDS)) + + metrics.add_metric(metrics.metric_name( + 'connection-count', self.metric_group_name, + 'The current number of active connections.'), AnonMeasurable( + lambda config, now: len(conns))) diff --git a/kafka/conn.py b/kafka/conn.py index 05b0acb..6c4e476 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -14,8 +14,9 @@ from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future +from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.api import RequestHeader -from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse +from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -58,6 +59,7 @@ InFlightRequest = collections.namedtuple('InFlightRequest', class BrokerConnection(object): DEFAULT_CONFIG = { 'client_id': 'kafka-python-' + __version__, + 'node_id': 0, 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, @@ -74,6 +76,8 @@ class BrokerConnection(object): 'ssl_password': None, 'api_version': (0, 8, 2), # default to most restrictive 'state_change_callback': lambda conn: True, + 'metrics': None, + 'metric_group_prefix': '', 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': None, 'sasl_plain_password': None @@ -81,6 +85,74 @@ class BrokerConnection(object): SASL_MECHANISMS = ('PLAIN',) def __init__(self, host, port, afi, **configs): + """Initialize a kafka broker connection + + Keyword Arguments: + client_id (str): a name for this client. This string is passed in + each request to servers and can be used to identify specific + server-side log entries that correspond to this client. Also + submitted to GroupCoordinator for logging with respect to + consumer group administration. Default: 'kafka-python-{version}' + reconnect_backoff_ms (int): The amount of time in milliseconds to + wait before attempting to reconnect to a given host. + Default: 50. + request_timeout_ms (int): Client request timeout in milliseconds. + Default: 40000. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. + receive_buffer_bytes (int): The size of the TCP receive buffer + (SO_RCVBUF) to use when reading data. Default: None (relies on + system defaults). Java client defaults to 32768. + send_buffer_bytes (int): The size of the TCP send buffer + (SO_SNDBUF) to use when sending data. Default: None (relies on + system defaults). Java client defaults to 131072. + socket_options (list): List of tuple-arguments to socket.setsockopt + to apply to broker connection sockets. Default: + [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: True. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: None. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: None. + ssl_keyfile (str): optional filename containing the client private key. + default: None. + ssl_password (callable, str, bytes, bytearray): optional password or + callable function that returns a password, for decrypting the + client private key. Default: None. + ssl_crlfile (str): optional filename containing the CRL to check for + certificate expiration. By default, no CRL check is done. When + providing a file, only the leaf certificate will be checked against + this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. + default: None. + api_version (tuple): specify which kafka API version to use. Accepted + values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10) + If None, KafkaClient will attempt to infer the broker + version by probing various APIs. Default: None + api_version_auto_timeout_ms (int): number of milliseconds to throw a + timeout exception from the constructor when checking the broker + api version. Only applies if api_version is None + state_chance_callback (callable): function to be called when the + connection state changes from CONNECTING to CONNECTED etc. + metrics (kafka.metrics.Metrics): Optionally provide a metrics + instance for capturing network IO stats. Default: None. + metric_group_prefix (str): Prefix for metric names. Default: '' + sasl_mechanism (str): string picking sasl mechanism when security_protocol + is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. + Default: None + sasl_plain_username (str): username for sasl PLAIN authentication. + Default: None + sasl_plain_password (str): passowrd for sasl PLAIN authentication. + Defualt: None + """ self.host = host self.hostname = host self.port = port @@ -123,6 +195,11 @@ class BrokerConnection(object): self._correlation_id = 0 self._gai = None self._gai_index = 0 + self._sensors = None + if self.config['metrics']: + self._sensors = BrokerConnectionMetrics(self.config['metrics'], + self.config['metric_group_prefix'], + self.config['node_id']) def connect(self): """Attempt to connect and return ConnectionState""" @@ -453,6 +530,8 @@ class BrokerConnection(object): sent_bytes = self._sock.send(data[total_sent:]) total_sent += sent_bytes assert total_sent == len(data) + if self._sensors: + self._sensors.bytes_sent.record(total_sent) self._sock.setblocking(False) except (AssertionError, ConnectionError) as e: log.exception("Error sending %s to %s", request, self) @@ -583,6 +662,8 @@ class BrokerConnection(object): self._receiving = False self._next_payload_bytes = 0 + if self._sensors: + self._sensors.bytes_received.record(4 + self._rbuffer.tell()) self._rbuffer.seek(0) response = self._process_response(self._rbuffer) self._rbuffer.seek(0) @@ -593,6 +674,8 @@ class BrokerConnection(object): assert not self._processing, 'Recursion not supported' self._processing = True ifr = self.in_flight_requests.popleft() + if self._sensors: + self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000) # verify send/recv correlation ids match recv_correlation_id = Int32.decode(read_buffer) @@ -762,6 +845,111 @@ class BrokerConnection(object): self.port) +class BrokerConnectionMetrics(object): + def __init__(self, metrics, metric_group_prefix, node_id): + self.metrics = metrics + + # Any broker may have registered summary metrics already + # but if not, we need to create them so we can set as parents below + all_conns_transferred = metrics.get_sensor('bytes-sent-received') + if not all_conns_transferred: + metric_group_name = metric_group_prefix + '-metrics' + + bytes_transferred = metrics.sensor('bytes-sent-received') + bytes_transferred.add(metrics.metric_name( + 'network-io-rate', metric_group_name, + 'The average number of network operations (reads or writes) on all' + ' connections per second.'), Rate(sampled_stat=Count())) + + bytes_sent = metrics.sensor('bytes-sent', + parents=[bytes_transferred]) + bytes_sent.add(metrics.metric_name( + 'outgoing-byte-rate', metric_group_name, + 'The average number of outgoing bytes sent per second to all' + ' servers.'), Rate()) + bytes_sent.add(metrics.metric_name( + 'request-rate', metric_group_name, + 'The average number of requests sent per second.'), + Rate(sampled_stat=Count())) + bytes_sent.add(metrics.metric_name( + 'request-size-avg', metric_group_name, + 'The average size of all requests in the window.'), Avg()) + bytes_sent.add(metrics.metric_name( + 'request-size-max', metric_group_name, + 'The maximum size of any request sent in the window.'), Max()) + + bytes_received = metrics.sensor('bytes-received', + parents=[bytes_transferred]) + bytes_received.add(metrics.metric_name( + 'incoming-byte-rate', metric_group_name, + 'Bytes/second read off all sockets'), Rate()) + bytes_received.add(metrics.metric_name( + 'response-rate', metric_group_name, + 'Responses received sent per second.'), + Rate(sampled_stat=Count())) + + request_latency = metrics.sensor('request-latency') + request_latency.add(metrics.metric_name( + 'request-latency-avg', metric_group_name, + 'The average request latency in ms.'), + Avg()) + request_latency.add(metrics.metric_name( + 'request-latency-max', metric_group_name, + 'The maximum request latency in ms.'), + Max()) + + # if one sensor of the metrics has been registered for the connection, + # then all other sensors should have been registered; and vice versa + node_str = 'node-{0}'.format(node_id) + node_sensor = metrics.get_sensor(node_str + '.bytes-sent') + if not node_sensor: + metric_group_name = metric_group_prefix + '-node-metrics.' + node_str + + self.bytes_sent = metrics.sensor( + node_str + '.bytes-sent', + parents=[metrics.get_sensor('bytes-sent')]) + self.bytes_sent.add(metrics.metric_name( + 'outgoing-byte-rate', metric_group_name, + 'The average number of outgoing bytes sent per second.'), + Rate()) + self.bytes_sent.add(metrics.metric_name( + 'request-rate', metric_group_name, + 'The average number of requests sent per second.'), + Rate(sampled_stat=Count())) + self.bytes_sent.add(metrics.metric_name( + 'request-size-avg', metric_group_name, + 'The average size of all requests in the window.'), + Avg()) + self.bytes_sent.add(metrics.metric_name( + 'request-size-max', metric_group_name, + 'The maximum size of any request sent in the window.'), + Max()) + + self.bytes_received = metrics.sensor( + node_str + '.bytes-received', + parents=[metrics.get_sensor('bytes-received')]) + self.bytes_received.add(metrics.metric_name( + 'incoming-byte-rate', metric_group_name, + 'Bytes/second read off node-connection socket'), + Rate()) + self.bytes_received.add(metrics.metric_name( + 'response-rate', metric_group_name, + 'The average number of responses received per second.'), + Rate(sampled_stat=Count())) + + self.request_time = self.metrics.sensor( + node_str + '.latency', + parents=[metrics.get_sensor('request-latency')]) + self.request_time.add(metrics.metric_name( + 'request-latency-avg', metric_group_name, + 'The average request latency in ms.'), + Avg()) + self.request_time.add(metrics.metric_name( + 'request-latency-max', metric_group_name, + 'The maximum request latency in ms.'), + Max()) + + def _address_family(address): """ Attempt to determine the family of an address (or hostname) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c00681d..f5d44b1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -42,11 +42,11 @@ class Fetcher(six.Iterator): 'check_crcs': True, 'skip_double_compressed_messages': False, 'iterator_refetch_records': 1, # undocumented -- interface may change + 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), } - def __init__(self, client, subscriptions, metrics, metric_group_prefix, - **configs): + def __init__(self, client, subscriptions, metrics, **configs): """Initialize a Kafka Message Fetcher. Keyword Arguments: @@ -94,7 +94,7 @@ class Fetcher(six.Iterator): self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None self._fetch_futures = collections.deque() - self._sensors = FetchManagerMetrics(metrics, metric_group_prefix) + self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 5edfaea..d4e0ff3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -239,6 +239,7 @@ class KafkaConsumer(six.Iterator): 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, + 'metric_group_prefix': 'consumer', 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, 'sasl_mechanism': None, @@ -268,7 +269,6 @@ class KafkaConsumer(six.Iterator): tags=metrics_tags) reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - metric_group_prefix = 'consumer' # TODO _metrics likely needs to be passed to KafkaClient, etc. # api_version was previously a str. accept old format for now @@ -281,7 +281,7 @@ class KafkaConsumer(six.Iterator): log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated', str(self.config['api_version']), str_version) - self._client = KafkaClient(**self.config) + self._client = KafkaClient(metrics=self._metrics, **self.config) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: @@ -289,9 +289,9 @@ class KafkaConsumer(six.Iterator): self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( - self._client, self._subscription, self._metrics, metric_group_prefix, **self.config) + self._client, self._subscription, self._metrics, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self._subscription, self._metrics, metric_group_prefix, + self._client, self._subscription, self._metrics, assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c57d45a..d6ea6c0 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -55,9 +55,10 @@ class BaseCoordinator(object): 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, 'api_version': (0, 9), + 'metric_group_prefix': '', } - def __init__(self, client, metrics, metric_group_prefix, **configs): + def __init__(self, client, metrics, **configs): """ Keyword Arguments: group_id (str): name of the consumer group to join for dynamic @@ -92,7 +93,7 @@ class BaseCoordinator(object): self.heartbeat = Heartbeat(**self.config) self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, - metric_group_prefix) + self.config['metric_group_prefix']) def __del__(self): if hasattr(self, 'heartbeat_task') and self.heartbeat_task: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 0429e09..a600cb4 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -37,10 +37,10 @@ class ConsumerCoordinator(BaseCoordinator): 'retry_backoff_ms': 100, 'api_version': (0, 9), 'exclude_internal_topics': True, + 'metric_group_prefix': 'consumer' } - def __init__(self, client, subscription, metrics, metric_group_prefix, - **configs): + def __init__(self, client, subscription, metrics, **configs): """Initialize the coordination manager. Keyword Arguments: @@ -76,9 +76,7 @@ class ConsumerCoordinator(BaseCoordinator): True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True """ - super(ConsumerCoordinator, self).__init__(client, - metrics, metric_group_prefix, - **configs) + super(ConsumerCoordinator, self).__init__(client, metrics, **configs) self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -111,7 +109,7 @@ class ConsumerCoordinator(BaseCoordinator): self._auto_commit_task.reschedule() self.consumer_sensors = ConsumerCoordinatorMetrics( - metrics, metric_group_prefix, self._subscription) + metrics, self.config['metric_group_prefix'], self._subscription) def __del__(self): if hasattr(self, '_cluster') and self._cluster: diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 5f41bac..422d47c 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -9,6 +9,7 @@ from ..codec import (has_gzip, has_snappy, has_lz4, gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka) from .. import errors as Errors +from ..metrics.stats import Rate from ..protocol.types import Int32, Int64 from ..protocol.message import MessageSet, Message @@ -135,7 +136,7 @@ class MessageSetBuffer(object): class SimpleBufferPool(object): """A simple pool of BytesIO objects with a weak memory ceiling.""" - def __init__(self, memory, poolable_size): + def __init__(self, memory, poolable_size, metrics=None, metric_group_prefix='producer-metrics'): """Create a new buffer pool. Arguments: @@ -150,10 +151,13 @@ class SimpleBufferPool(object): self._free = collections.deque([io.BytesIO() for _ in range(buffers)]) self._waiters = collections.deque() - #self.metrics = metrics; - #self.waitTime = this.metrics.sensor("bufferpool-wait-time"); - #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); - #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); + self.wait_time = None + if metrics: + self.wait_time = metrics.sensor('bufferpool-wait-time') + self.wait_time.add(metrics.metric_name( + 'bufferpool-wait-ratio', metric_group_prefix, + 'The fraction of time an appender waits for space allocation.'), + Rate()) def allocate(self, size, max_time_to_block_ms): """ @@ -187,7 +191,8 @@ class SimpleBufferPool(object): start_wait = time.time() more_memory.wait(max_time_to_block_ms / 1000.0) end_wait = time.time() - #this.waitTime.record(endWait - startWait, time.milliseconds()); + if self.wait_time: + self.wait_time.record(end_wait - start_wait) if self._free: buf = self._free.popleft() diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e3b0d69..84039f6 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -335,7 +335,7 @@ class KafkaProducer(object): assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' message_version = 1 if self.config['api_version'] >= (0, 10) else 0 - self._accumulator = RecordAccumulator(message_version=message_version, **self.config) + self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) self._sender = Sender(client, self._metadata, diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 3e2d903..8fe6abb 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -162,6 +162,8 @@ class RecordAccumulator(object): 'linger_ms': 0, 'retry_backoff_ms': 100, 'message_version': 0, + 'metrics': None, + 'metric_group_prefix': 'producer-metrics', } def __init__(self, **configs): @@ -176,7 +178,9 @@ class RecordAccumulator(object): self._batches = collections.defaultdict(collections.deque) # TopicPartition: [RecordBatch] self._tp_locks = {None: threading.Lock()} # TopicPartition: Lock, plus a lock to add entries self._free = SimpleBufferPool(self.config['buffer_memory'], - self.config['batch_size']) + self.config['batch_size'], + metrics=self.config['metrics'], + metric_group_prefix=self.config['metric_group_prefix']) self._incomplete = IncompleteRecordBatches() # The following variables should only be accessed by the sender thread, # so we don't need to protect them w/ locking. diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index aafa06a..2974faf 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -204,7 +204,6 @@ class Sender(threading.Thread): batch = batches_by_partition[tp] self._complete_batch(batch, error, offset, ts) - self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id) if response.API_VERSION > 0: self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) @@ -343,15 +342,6 @@ class SenderMetrics(object): sensor_name=sensor_name, description='The maximum time in ms record batches spent in the record accumulator.') - sensor_name = 'request-time' - self.request_time_sensor = self.metrics.sensor(sensor_name) - self.add_metric('request-latency-avg', Avg(), - sensor_name=sensor_name, - description='The average request latency in ms') - self.add_metric('request-latency-max', Max(), - sensor_name=sensor_name, - description='The maximum request latency in ms') - sensor_name = 'produce-throttle-time' self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name) self.add_metric('produce-throttle-time-avg', Avg(), @@ -498,12 +488,5 @@ class SenderMetrics(object): if sensor: sensor.record(count) - def record_latency(self, latency, node=None): - self.request_time_sensor.record(latency) - if node is not None: - sensor = self.metrics.get_sensor('node-' + str(node) + '.latency') - if sensor: - sensor.record(latency) - def record_throttle_time(self, throttle_time_ms, node=None): self.produce_throttle_time_sensor.record(throttle_time_ms) diff --git a/test/test_client_async.py b/test/test_client_async.py index 8b3634a..b165f93 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -49,6 +49,7 @@ def test_bootstrap_success(conn): args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') + kwargs.pop('node_id') assert kwargs == cli.config conn.connect.assert_called_with() conn.send.assert_called_once_with(MetadataRequest[0]([])) @@ -62,6 +63,7 @@ def test_bootstrap_failure(conn): args, kwargs = conn.call_args assert args == ('localhost', 9092, socket.AF_UNSPEC) kwargs.pop('state_change_callback') + kwargs.pop('node_id') assert kwargs == cli.config conn.connect.assert_called_with() conn.close.assert_called_with() diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 35598e8..4115c03 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -29,8 +29,7 @@ def client(conn): @pytest.fixture def coordinator(client): - return ConsumerCoordinator(client, SubscriptionState(), Metrics(), - 'consumer') + return ConsumerCoordinator(client, SubscriptionState(), Metrics()) def test_init(client, coordinator): @@ -42,7 +41,7 @@ def test_init(client, coordinator): @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) def test_autocommit_enable_api_version(client, api_version): coordinator = ConsumerCoordinator(client, SubscriptionState(), - Metrics(), 'consumer', + Metrics(), enable_auto_commit=True, group_id='foobar', api_version=api_version) @@ -362,7 +361,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') client = KafkaClient(api_version=api_version) coordinator = ConsumerCoordinator(client, SubscriptionState(), - Metrics(), 'consumer', + Metrics(), api_version=api_version, enable_auto_commit=enable, group_id=group_id) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 1f1f7d3..6afd547 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -30,7 +30,7 @@ def fetcher(client, subscription_state): subscription_state.assign_from_subscribed(assignment) for tp in assignment: subscription_state.seek(tp, 0) - return Fetcher(client, subscription_state, Metrics(), 'test_fetcher') + return Fetcher(client, subscription_state, Metrics()) def test_init_fetches(fetcher, mocker): |