summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-08-04 12:21:40 -0700
committerDana Powers <dana.powers@gmail.com>2016-08-04 13:05:36 -0700
commit460f0784a30f303b4543763ca330cce52d6054eb (patch)
tree471f0053fe1b7a9fd5e8cece0c43b8f012dd5ad2
parentaf08b54875a5ae5c14fbdeccee4ffe266bda1e00 (diff)
downloadkafka-python-460f0784a30f303b4543763ca330cce52d6054eb.tar.gz
Instrument metrics in BrokerConnection
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py123
-rw-r--r--kafka/producer/sender.py17
-rw-r--r--test/test_client_async.py2
4 files changed, 127 insertions, 17 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ff566ca..ce1d13b 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -222,6 +222,7 @@ class KafkaClient(object):
cb = functools.partial(self._conn_state_change, 'bootstrap')
bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb,
+ node_id='bootstrap',
**self.config)
bootstrap.connect()
while bootstrap.connecting():
@@ -313,6 +314,7 @@ class KafkaClient(object):
cb = functools.partial(self._conn_state_change, node_id)
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
state_change_callback=cb,
+ node_id=node_id,
**self.config)
conn = self._conns[node_id]
if conn.connected():
diff --git a/kafka/conn.py b/kafka/conn.py
index 0a5237d..6c4e476 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -14,6 +14,7 @@ from kafka.vendor import six
import kafka.errors as Errors
from kafka.future import Future
+from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse
@@ -58,6 +59,7 @@ InFlightRequest = collections.namedtuple('InFlightRequest',
class BrokerConnection(object):
DEFAULT_CONFIG = {
'client_id': 'kafka-python-' + __version__,
+ 'node_id': 0,
'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
@@ -74,6 +76,8 @@ class BrokerConnection(object):
'ssl_password': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
+ 'metrics': None,
+ 'metric_group_prefix': '',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': None,
'sasl_plain_password': None
@@ -138,6 +142,9 @@ class BrokerConnection(object):
api version. Only applies if api_version is None
state_chance_callback (callable): function to be called when the
connection state changes from CONNECTING to CONNECTED etc.
+ metrics (kafka.metrics.Metrics): Optionally provide a metrics
+ instance for capturing network IO stats. Default: None.
+ metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): string picking sasl mechanism when security_protocol
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
Default: None
@@ -188,6 +195,11 @@ class BrokerConnection(object):
self._correlation_id = 0
self._gai = None
self._gai_index = 0
+ self._sensors = None
+ if self.config['metrics']:
+ self._sensors = BrokerConnectionMetrics(self.config['metrics'],
+ self.config['metric_group_prefix'],
+ self.config['node_id'])
def connect(self):
"""Attempt to connect and return ConnectionState"""
@@ -518,6 +530,8 @@ class BrokerConnection(object):
sent_bytes = self._sock.send(data[total_sent:])
total_sent += sent_bytes
assert total_sent == len(data)
+ if self._sensors:
+ self._sensors.bytes_sent.record(total_sent)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
@@ -648,6 +662,8 @@ class BrokerConnection(object):
self._receiving = False
self._next_payload_bytes = 0
+ if self._sensors:
+ self._sensors.bytes_received.record(4 + self._rbuffer.tell())
self._rbuffer.seek(0)
response = self._process_response(self._rbuffer)
self._rbuffer.seek(0)
@@ -658,6 +674,8 @@ class BrokerConnection(object):
assert not self._processing, 'Recursion not supported'
self._processing = True
ifr = self.in_flight_requests.popleft()
+ if self._sensors:
+ self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)
# verify send/recv correlation ids match
recv_correlation_id = Int32.decode(read_buffer)
@@ -827,6 +845,111 @@ class BrokerConnection(object):
self.port)
+class BrokerConnectionMetrics(object):
+ def __init__(self, metrics, metric_group_prefix, node_id):
+ self.metrics = metrics
+
+ # Any broker may have registered summary metrics already
+ # but if not, we need to create them so we can set as parents below
+ all_conns_transferred = metrics.get_sensor('bytes-sent-received')
+ if not all_conns_transferred:
+ metric_group_name = metric_group_prefix + '-metrics'
+
+ bytes_transferred = metrics.sensor('bytes-sent-received')
+ bytes_transferred.add(metrics.metric_name(
+ 'network-io-rate', metric_group_name,
+ 'The average number of network operations (reads or writes) on all'
+ ' connections per second.'), Rate(sampled_stat=Count()))
+
+ bytes_sent = metrics.sensor('bytes-sent',
+ parents=[bytes_transferred])
+ bytes_sent.add(metrics.metric_name(
+ 'outgoing-byte-rate', metric_group_name,
+ 'The average number of outgoing bytes sent per second to all'
+ ' servers.'), Rate())
+ bytes_sent.add(metrics.metric_name(
+ 'request-rate', metric_group_name,
+ 'The average number of requests sent per second.'),
+ Rate(sampled_stat=Count()))
+ bytes_sent.add(metrics.metric_name(
+ 'request-size-avg', metric_group_name,
+ 'The average size of all requests in the window.'), Avg())
+ bytes_sent.add(metrics.metric_name(
+ 'request-size-max', metric_group_name,
+ 'The maximum size of any request sent in the window.'), Max())
+
+ bytes_received = metrics.sensor('bytes-received',
+ parents=[bytes_transferred])
+ bytes_received.add(metrics.metric_name(
+ 'incoming-byte-rate', metric_group_name,
+ 'Bytes/second read off all sockets'), Rate())
+ bytes_received.add(metrics.metric_name(
+ 'response-rate', metric_group_name,
+ 'Responses received sent per second.'),
+ Rate(sampled_stat=Count()))
+
+ request_latency = metrics.sensor('request-latency')
+ request_latency.add(metrics.metric_name(
+ 'request-latency-avg', metric_group_name,
+ 'The average request latency in ms.'),
+ Avg())
+ request_latency.add(metrics.metric_name(
+ 'request-latency-max', metric_group_name,
+ 'The maximum request latency in ms.'),
+ Max())
+
+ # if one sensor of the metrics has been registered for the connection,
+ # then all other sensors should have been registered; and vice versa
+ node_str = 'node-{0}'.format(node_id)
+ node_sensor = metrics.get_sensor(node_str + '.bytes-sent')
+ if not node_sensor:
+ metric_group_name = metric_group_prefix + '-node-metrics.' + node_str
+
+ self.bytes_sent = metrics.sensor(
+ node_str + '.bytes-sent',
+ parents=[metrics.get_sensor('bytes-sent')])
+ self.bytes_sent.add(metrics.metric_name(
+ 'outgoing-byte-rate', metric_group_name,
+ 'The average number of outgoing bytes sent per second.'),
+ Rate())
+ self.bytes_sent.add(metrics.metric_name(
+ 'request-rate', metric_group_name,
+ 'The average number of requests sent per second.'),
+ Rate(sampled_stat=Count()))
+ self.bytes_sent.add(metrics.metric_name(
+ 'request-size-avg', metric_group_name,
+ 'The average size of all requests in the window.'),
+ Avg())
+ self.bytes_sent.add(metrics.metric_name(
+ 'request-size-max', metric_group_name,
+ 'The maximum size of any request sent in the window.'),
+ Max())
+
+ self.bytes_received = metrics.sensor(
+ node_str + '.bytes-received',
+ parents=[metrics.get_sensor('bytes-received')])
+ self.bytes_received.add(metrics.metric_name(
+ 'incoming-byte-rate', metric_group_name,
+ 'Bytes/second read off node-connection socket'),
+ Rate())
+ self.bytes_received.add(metrics.metric_name(
+ 'response-rate', metric_group_name,
+ 'The average number of responses received per second.'),
+ Rate(sampled_stat=Count()))
+
+ self.request_time = self.metrics.sensor(
+ node_str + '.latency',
+ parents=[metrics.get_sensor('request-latency')])
+ self.request_time.add(metrics.metric_name(
+ 'request-latency-avg', metric_group_name,
+ 'The average request latency in ms.'),
+ Avg())
+ self.request_time.add(metrics.metric_name(
+ 'request-latency-max', metric_group_name,
+ 'The maximum request latency in ms.'),
+ Max())
+
+
def _address_family(address):
"""
Attempt to determine the family of an address (or hostname)
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index aafa06a..2974faf 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -204,7 +204,6 @@ class Sender(threading.Thread):
batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset, ts)
- self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id)
if response.API_VERSION > 0:
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
@@ -343,15 +342,6 @@ class SenderMetrics(object):
sensor_name=sensor_name,
description='The maximum time in ms record batches spent in the record accumulator.')
- sensor_name = 'request-time'
- self.request_time_sensor = self.metrics.sensor(sensor_name)
- self.add_metric('request-latency-avg', Avg(),
- sensor_name=sensor_name,
- description='The average request latency in ms')
- self.add_metric('request-latency-max', Max(),
- sensor_name=sensor_name,
- description='The maximum request latency in ms')
-
sensor_name = 'produce-throttle-time'
self.produce_throttle_time_sensor = self.metrics.sensor(sensor_name)
self.add_metric('produce-throttle-time-avg', Avg(),
@@ -498,12 +488,5 @@ class SenderMetrics(object):
if sensor:
sensor.record(count)
- def record_latency(self, latency, node=None):
- self.request_time_sensor.record(latency)
- if node is not None:
- sensor = self.metrics.get_sensor('node-' + str(node) + '.latency')
- if sensor:
- sensor.record(latency)
-
def record_throttle_time(self, throttle_time_ms, node=None):
self.produce_throttle_time_sensor.record(throttle_time_ms)
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 8b3634a..b165f93 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -49,6 +49,7 @@ def test_bootstrap_success(conn):
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback')
+ kwargs.pop('node_id')
assert kwargs == cli.config
conn.connect.assert_called_with()
conn.send.assert_called_once_with(MetadataRequest[0]([]))
@@ -62,6 +63,7 @@ def test_bootstrap_failure(conn):
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback')
+ kwargs.pop('node_id')
assert kwargs == cli.config
conn.connect.assert_called_with()
conn.close.assert_called_with()