summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-17 17:53:42 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-17 17:54:06 -0700
commita871aa60f3ac74bf88beff5b6df74d0466e2b0b0 (patch)
treeba340ae33cdc9749dbd36f5954ff0e1bcd2cace7
parentf8b8904491a4ac19c80aa827ec42f3356424800d (diff)
downloadkafka-python-a871aa60f3ac74bf88beff5b6df74d0466e2b0b0.tar.gz
Add KafkaClient metricsmore_metrics
-rw-r--r--kafka/client_async.py52
-rw-r--r--kafka/producer/kafka.py3
2 files changed, 53 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index c081f07..dee4a12 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -24,6 +24,8 @@ from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from . import errors as Errors
from .future import Future
+from .metrics.stats import Avg, Count, Rate
+from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
from . import socketpair
@@ -65,6 +67,8 @@ class KafkaClient(object):
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'selector': selectors.DefaultSelector,
+ 'metrics': None,
+ 'metric_group_prefix': '',
}
API_VERSIONS = [
(0, 10),
@@ -139,6 +143,9 @@ class KafkaClient(object):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
+ metrics (kafka.metrics.Metrics): Optionally provide a metrics
+ instance for capturing network IO stats. Default: None.
+ metric_group_prefix (str): Prefix for metric names. Default: ''
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -167,6 +174,9 @@ class KafkaClient(object):
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._closed = False
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
+ self._sensors = None
+ if self.config['metrics']:
+ self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix'])
# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
@@ -487,7 +497,14 @@ class KafkaClient(object):
responses = []
processed = set()
- for key, events in self._selector.select(timeout):
+
+ start_select = time.time()
+ ready = self._selector.select(timeout)
+ end_select = time.time()
+ if self._sensors:
+ self._sensors.select_time.record((end_select - start_select) * 1000000000)
+
+ for key, events in ready:
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
@@ -531,6 +548,9 @@ class KafkaClient(object):
response = conn.recv()
if response:
responses.append(response)
+
+ if self._sensors:
+ self._sensors.io_time.record((time.time() - end_select) * 1000000000)
return responses
def in_flight_request_count(self, node_id=None):
@@ -848,3 +868,33 @@ class DelayedTaskQueue(object):
break
ready_tasks.append(task)
return ready_tasks
+
+
+class KafkaClientMetrics(object):
+ def __init__(self, metrics, metric_group_prefix):
+ self.metrics = metrics
+ self.metric_group_name = metric_group_prefix + '-metrics'
+
+ self.select_time = metrics.sensor('select-time')
+ self.select_time.add(metrics.metric_name(
+ 'select-rate', self.metric_group_name,
+ 'Number of times the I/O layer checked for new I/O to perform per'
+ ' second'), Rate(sampled_stat=Count()))
+ self.select_time.add(metrics.metric_name(
+ 'io-wait-time-ns-avg', self.metric_group_name,
+ 'The average length of time the I/O thread spent waiting for a'
+ ' socket ready for reads or writes in nanoseconds.'), Avg())
+ self.select_time.add(metrics.metric_name(
+ 'io-wait-ratio', self.metric_group_name,
+ 'The fraction of time the I/O thread spent waiting.'),
+ Rate(time_unit=TimeUnit.NANOSECONDS))
+
+ self.io_time = metrics.sensor('io-time')
+ self.io_time.add(metrics.metric_name(
+ 'io-time-ns-avg', self.metric_group_name,
+ 'The average length of time for I/O per select call in nanoseconds.'),
+ Avg())
+ self.io_time.add(metrics.metric_name(
+ 'io-ratio', self.metric_group_name,
+ 'The fraction of time the I/O thread spent doing I/O'),
+ Rate(time_unit=TimeUnit.NANOSECONDS))
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index c4d1a36..02e4621 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -308,7 +308,8 @@ class KafkaProducer(object):
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
- client = KafkaClient(**self.config)
+ client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
+ **self.config)
# Get auto-discovered version from client if necessary
if self.config['api_version'] is None: