summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-26 08:07:57 -0700
committerGitHub <noreply@github.com>2016-07-26 08:07:57 -0700
commitbdb0559b381c635c5d364b10a5a59cb53de2649c (patch)
tree165c4bec6e1ff0b68752e2b72796cd1ffa009852
parent0d161f72dd2ac610e625b6c197d1ef6f3af104e8 (diff)
parenta871aa60f3ac74bf88beff5b6df74d0466e2b0b0 (diff)
downloadkafka-python-bdb0559b381c635c5d364b10a5a59cb53de2649c.tar.gz
Merge pull request #772 from dpkp/more_metrics
Add client, base coordinator, and a few extra producer metrics.
-rw-r--r--kafka/client_async.py52
-rw-r--r--kafka/coordinator/base.py141
-rw-r--r--kafka/coordinator/consumer.py11
-rw-r--r--kafka/coordinator/heartbeat.py4
-rw-r--r--kafka/producer/kafka.py3
-rw-r--r--kafka/producer/sender.py13
6 files changed, 142 insertions, 82 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index c081f07..dee4a12 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -24,6 +24,8 @@ from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
from . import errors as Errors
from .future import Future
+from .metrics.stats import Avg, Count, Rate
+from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
from . import socketpair
@@ -65,6 +67,8 @@ class KafkaClient(object):
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'selector': selectors.DefaultSelector,
+ 'metrics': None,
+ 'metric_group_prefix': '',
}
API_VERSIONS = [
(0, 10),
@@ -139,6 +143,9 @@ class KafkaClient(object):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
+ metrics (kafka.metrics.Metrics): Optionally provide a metrics
+ instance for capturing network IO stats. Default: None.
+ metric_group_prefix (str): Prefix for metric names. Default: ''
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -167,6 +174,9 @@ class KafkaClient(object):
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._closed = False
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
+ self._sensors = None
+ if self.config['metrics']:
+ self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix'])
# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
@@ -487,7 +497,14 @@ class KafkaClient(object):
responses = []
processed = set()
- for key, events in self._selector.select(timeout):
+
+ start_select = time.time()
+ ready = self._selector.select(timeout)
+ end_select = time.time()
+ if self._sensors:
+ self._sensors.select_time.record((end_select - start_select) * 1000000000)
+
+ for key, events in ready:
if key.fileobj is self._wake_r:
self._clear_wake_fd()
continue
@@ -531,6 +548,9 @@ class KafkaClient(object):
response = conn.recv()
if response:
responses.append(response)
+
+ if self._sensors:
+ self._sensors.io_time.record((time.time() - end_select) * 1000000000)
return responses
def in_flight_request_count(self, node_id=None):
@@ -848,3 +868,33 @@ class DelayedTaskQueue(object):
break
ready_tasks.append(task)
return ready_tasks
+
+
+class KafkaClientMetrics(object):
+ def __init__(self, metrics, metric_group_prefix):
+ self.metrics = metrics
+ self.metric_group_name = metric_group_prefix + '-metrics'
+
+ self.select_time = metrics.sensor('select-time')
+ self.select_time.add(metrics.metric_name(
+ 'select-rate', self.metric_group_name,
+ 'Number of times the I/O layer checked for new I/O to perform per'
+ ' second'), Rate(sampled_stat=Count()))
+ self.select_time.add(metrics.metric_name(
+ 'io-wait-time-ns-avg', self.metric_group_name,
+ 'The average length of time the I/O thread spent waiting for a'
+ ' socket ready for reads or writes in nanoseconds.'), Avg())
+ self.select_time.add(metrics.metric_name(
+ 'io-wait-ratio', self.metric_group_name,
+ 'The fraction of time the I/O thread spent waiting.'),
+ Rate(time_unit=TimeUnit.NANOSECONDS))
+
+ self.io_time = metrics.sensor('io-time')
+ self.io_time.add(metrics.metric_name(
+ 'io-time-ns-avg', self.metric_group_name,
+ 'The average length of time for I/O per select call in nanoseconds.'),
+ Avg())
+ self.io_time.add(metrics.metric_name(
+ 'io-ratio', self.metric_group_name,
+ 'The fraction of time the I/O thread spent doing I/O'),
+ Rate(time_unit=TimeUnit.NANOSECONDS))
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 25dd000..bbdc8ad 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import, division
+
import abc
import copy
import logging
@@ -6,12 +8,14 @@ import weakref
import six
-import kafka.errors as Errors
-from kafka.future import Future
-from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
-from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
- LeaveGroupRequest, SyncGroupRequest)
from .heartbeat import Heartbeat
+from .. import errors as Errors
+from ..future import Future
+from ..metrics import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
+from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
+from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
+ LeaveGroupRequest, SyncGroupRequest)
log = logging.getLogger('kafka.coordinator')
@@ -53,7 +57,7 @@ class BaseCoordinator(object):
'api_version': (0, 9),
}
- def __init__(self, client, **configs):
+ def __init__(self, client, metrics, metric_group_prefix, **configs):
"""
Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic
@@ -87,7 +91,8 @@ class BaseCoordinator(object):
self.needs_join_prepare = True
self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
- #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
+ metric_group_prefix)
def __del__(self):
if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
@@ -254,7 +259,7 @@ class BaseCoordinator(object):
continue
elif not future.retriable():
raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
@@ -285,7 +290,7 @@ class BaseCoordinator(object):
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_join_group_response, future)
+ _f.add_callback(self._handle_join_group_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
@@ -300,7 +305,7 @@ class BaseCoordinator(object):
self.coordinator_dead()
future.failure(error)
- def _handle_join_group_response(self, future, response):
+ def _handle_join_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
@@ -311,7 +316,7 @@ class BaseCoordinator(object):
self.protocol = response.group_protocol
log.info("Joined group '%s' (generation %s) with member_id %s",
self.group_id, self.generation, self.member_id)
- #self.sensors.join_latency.record(response.requestLatencyMs())
+ self.sensors.join_latency.record((time.time() - send_time) * 1000)
if response.leader_id == response.member_id:
log.info("Elected group leader -- performing partition"
" assignments using %s", self.protocol)
@@ -402,17 +407,17 @@ class BaseCoordinator(object):
return Future().failure(e)
future = Future()
_f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_sync_group_response, future)
+ _f.add_callback(self._handle_sync_group_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
- def _handle_sync_group_response(self, future, response):
+ def _handle_sync_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.info("Successfully joined group %s with generation %s",
self.group_id, self.generation)
- #self.sensors.syncLatency.record(response.requestLatencyMs())
+ self.sensors.sync_latency.record((time.time() - send_time) * 1000)
future.success(response.member_assignment)
return
@@ -540,13 +545,13 @@ class BaseCoordinator(object):
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_heartbeat_response, future)
+ _f.add_callback(self._handle_heartbeat_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
- def _handle_heartbeat_response(self, future, response):
- #self.sensors.heartbeat_latency.record(response.requestLatencyMs())
+ def _handle_heartbeat_response(self, future, send_time, response):
+ self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful heartbeat response for group %s",
@@ -651,60 +656,56 @@ class HeartbeatTask(object):
def _handle_heartbeat_failure(self, e):
log.warning("Heartbeat failed (%s); retrying", e)
self._request_in_flight = False
- etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
+ etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000
self._client.schedule(self, etd)
-'''
+
class GroupCoordinatorMetrics(object):
- def __init__(self, metrics, prefix, tags=None):
+ def __init__(self, heartbeat, metrics, prefix, tags=None):
+ self.heartbeat = heartbeat
self.metrics = metrics
- self.group_name = prefix + "-coordinator-metrics"
-
- self.heartbeat_latency = metrics.sensor("heartbeat-latency")
- self.heartbeat_latency.add(metrics.metricName(
- "heartbeat-response-time-max", self.group_name,
- "The max time taken to receive a response to a heartbeat request",
- tags), metrics.Max())
- self.heartbeat_latency.add(metrics.metricName(
- "heartbeat-rate", self.group_name,
- "The average number of heartbeats per second",
- tags), metrics.Rate(sampled_stat=metrics.Count()))
-
- self.join_latency = metrics.sensor("join-latency")
- self.join_latency.add(metrics.metricName(
- "join-time-avg", self.group_name,
- "The average time taken for a group rejoin",
- tags), metrics.Avg())
- self.join_latency.add(metrics.metricName(
- "join-time-max", self.group_name,
- "The max time taken for a group rejoin",
- tags), metrics.Avg())
- self.join_latency.add(metrics.metricName(
- "join-rate", self.group_name,
- "The number of group joins per second",
- tags), metrics.Rate(sampled_stat=metrics.Count()))
-
- self.sync_latency = metrics.sensor("sync-latency")
- self.sync_latency.add(metrics.metricName(
- "sync-time-avg", self.group_name,
- "The average time taken for a group sync",
- tags), metrics.Avg())
- self.sync_latency.add(metrics.MetricName(
- "sync-time-max", self.group_name,
- "The max time taken for a group sync",
- tags), metrics.Avg())
- self.sync_latency.add(metrics.metricName(
- "sync-rate", self.group_name,
- "The number of group syncs per second",
- tags), metrics.Rate(sampled_stat=metrics.Count()))
-
- """
- lastHeartbeat = Measurable(
- measure=lambda _, value: value - heartbeat.last_heartbeat_send()
- )
- metrics.addMetric(metrics.metricName(
- "last-heartbeat-seconds-ago", self.group_name,
- "The number of seconds since the last controller heartbeat",
- tags), lastHeartbeat)
- """
-'''
+ self.metric_group_name = prefix + "-coordinator-metrics"
+
+ self.heartbeat_latency = metrics.sensor('heartbeat-latency')
+ self.heartbeat_latency.add(metrics.metric_name(
+ 'heartbeat-response-time-max', self.metric_group_name,
+ 'The max time taken to receive a response to a heartbeat request',
+ tags), Max())
+ self.heartbeat_latency.add(metrics.metric_name(
+ 'heartbeat-rate', self.metric_group_name,
+ 'The average number of heartbeats per second',
+ tags), Rate(sampled_stat=Count()))
+
+ self.join_latency = metrics.sensor('join-latency')
+ self.join_latency.add(metrics.metric_name(
+ 'join-time-avg', self.metric_group_name,
+ 'The average time taken for a group rejoin',
+ tags), Avg())
+ self.join_latency.add(metrics.metric_name(
+ 'join-time-max', self.metric_group_name,
+ 'The max time taken for a group rejoin',
+ tags), Avg())
+ self.join_latency.add(metrics.metric_name(
+ 'join-rate', self.metric_group_name,
+ 'The number of group joins per second',
+ tags), Rate(sampled_stat=Count()))
+
+ self.sync_latency = metrics.sensor('sync-latency')
+ self.sync_latency.add(metrics.metric_name(
+ 'sync-time-avg', self.metric_group_name,
+ 'The average time taken for a group sync',
+ tags), Avg())
+ self.sync_latency.add(metrics.metric_name(
+ 'sync-time-max', self.metric_group_name,
+ 'The max time taken for a group sync',
+ tags), Avg())
+ self.sync_latency.add(metrics.metric_name(
+ 'sync-rate', self.metric_group_name,
+ 'The number of group syncs per second',
+ tags), Rate(sampled_stat=Count()))
+
+ metrics.add_metric(metrics.metric_name(
+ 'last-heartbeat-seconds-ago', self.metric_group_name,
+ 'The number of seconds since the last controller heartbeat',
+ tags), AnonMeasurable(
+ lambda _, now: (now / 1000) - self.heartbeat.last_send))
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 517f66a..d6ad9e6 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -76,7 +76,10 @@ class ConsumerCoordinator(BaseCoordinator):
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
- super(ConsumerCoordinator, self).__init__(client, **configs)
+ super(ConsumerCoordinator, self).__init__(client,
+ metrics, metric_group_prefix,
+ **configs)
+
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -107,8 +110,8 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
self._auto_commit_task.reschedule()
- self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
- self._subscription)
+ self.consumer_sensors = ConsumerCoordinatorMetrics(
+ metrics, metric_group_prefix, self._subscription)
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
@@ -485,7 +488,7 @@ class ConsumerCoordinator(BaseCoordinator):
def _handle_offset_commit_response(self, offsets, future, send_time, response):
# TODO look at adding request_latency_ms to response (like java kafka)
- self._sensors.commit_latency.record((time.time() - send_time) * 1000)
+ self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
for topic, partitions in response.topics:
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
index 1cd9863..648cb1f 100644
--- a/kafka/coordinator/heartbeat.py
+++ b/kafka/coordinator/heartbeat.py
@@ -20,8 +20,8 @@ class Heartbeat(object):
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
self.timeout = self.config['session_timeout_ms'] / 1000.0
- self.last_send = 0
- self.last_receive = 0
+ self.last_send = -1 * float('inf')
+ self.last_receive = -1 * float('inf')
self.last_reset = time.time()
def sent_heartbeat(self):
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index c4d1a36..02e4621 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -308,7 +308,8 @@ class KafkaProducer(object):
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
- client = KafkaClient(**self.config)
+ client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
+ **self.config)
# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index c1d0905..e0381d5 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -4,6 +4,7 @@ import collections
import copy
import logging
import threading
+import time
import six
@@ -145,7 +146,7 @@ class Sender(threading.Thread):
log.debug('Sending Produce Request: %r', request)
(self._client.send(node_id, request)
.add_callback(
- self._handle_produce_response, batches)
+ self._handle_produce_response, node_id, time.time(), batches)
.add_errback(
self._failed_produce, batches, node_id))
@@ -183,7 +184,7 @@ class Sender(threading.Thread):
for batch in batches:
self._complete_batch(batch, error, -1, None)
- def _handle_produce_response(self, batches, response):
+ def _handle_produce_response(self, node_id, send_time, batches, response):
"""Handle a produce response."""
# if we have a response, parse it
log.debug('Parsing produce response: %r', response)
@@ -203,6 +204,10 @@ class Sender(threading.Thread):
batch = batches_by_partition[tp]
self._complete_batch(batch, error, offset, ts)
+ self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id)
+ if response.API_VERSION > 0:
+ self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
+
else:
# this is the acks = 0 case, just complete all requests
for batch in batches:
@@ -495,8 +500,8 @@ class SenderMetrics(object):
def record_latency(self, latency, node=None):
self.request_time_sensor.record(latency)
- if node:
- sensor = self.metrics.get_sensor('node-' + node + '.latency')
+ if node is not None:
+ sensor = self.metrics.get_sensor('node-' + str(node) + '.latency')
if sensor:
sensor.record(latency)