summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-17 14:39:27 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-17 17:54:06 -0700
commit3a7802d51c3a34f1efafb97b80deceab98ec8b09 (patch)
tree792ac17bcbb0f37fddd42a6594b4406fba9f7a67
parent436b2b20117ea60f9cdcad1f6f8ad46cb439c1ed (diff)
downloadkafka-python-3a7802d51c3a34f1efafb97b80deceab98ec8b09.tar.gz
Add base coordinator metrics
-rw-r--r--kafka/coordinator/base.py141
-rw-r--r--kafka/coordinator/consumer.py11
-rw-r--r--kafka/coordinator/heartbeat.py4
3 files changed, 80 insertions, 76 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 25dd000..bbdc8ad 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import, division
+
import abc
import copy
import logging
@@ -6,12 +8,14 @@ import weakref
import six
-import kafka.errors as Errors
-from kafka.future import Future
-from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
-from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
- LeaveGroupRequest, SyncGroupRequest)
from .heartbeat import Heartbeat
+from .. import errors as Errors
+from ..future import Future
+from ..metrics import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
+from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
+from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
+ LeaveGroupRequest, SyncGroupRequest)
log = logging.getLogger('kafka.coordinator')
@@ -53,7 +57,7 @@ class BaseCoordinator(object):
'api_version': (0, 9),
}
- def __init__(self, client, **configs):
+ def __init__(self, client, metrics, metric_group_prefix, **configs):
"""
Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic
@@ -87,7 +91,8 @@ class BaseCoordinator(object):
self.needs_join_prepare = True
self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
- #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
+ metric_group_prefix)
def __del__(self):
if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
@@ -254,7 +259,7 @@ class BaseCoordinator(object):
continue
elif not future.retriable():
raise exception # pylint: disable-msg=raising-bad-type
- time.sleep(self.config['retry_backoff_ms'] / 1000.0)
+ time.sleep(self.config['retry_backoff_ms'] / 1000)
def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
@@ -285,7 +290,7 @@ class BaseCoordinator(object):
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_join_group_response, future)
+ _f.add_callback(self._handle_join_group_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
@@ -300,7 +305,7 @@ class BaseCoordinator(object):
self.coordinator_dead()
future.failure(error)
- def _handle_join_group_response(self, future, response):
+ def _handle_join_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
@@ -311,7 +316,7 @@ class BaseCoordinator(object):
self.protocol = response.group_protocol
log.info("Joined group '%s' (generation %s) with member_id %s",
self.group_id, self.generation, self.member_id)
- #self.sensors.join_latency.record(response.requestLatencyMs())
+ self.sensors.join_latency.record((time.time() - send_time) * 1000)
if response.leader_id == response.member_id:
log.info("Elected group leader -- performing partition"
" assignments using %s", self.protocol)
@@ -402,17 +407,17 @@ class BaseCoordinator(object):
return Future().failure(e)
future = Future()
_f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_sync_group_response, future)
+ _f.add_callback(self._handle_sync_group_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
- def _handle_sync_group_response(self, future, response):
+ def _handle_sync_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.info("Successfully joined group %s with generation %s",
self.group_id, self.generation)
- #self.sensors.syncLatency.record(response.requestLatencyMs())
+ self.sensors.sync_latency.record((time.time() - send_time) * 1000)
future.success(response.member_assignment)
return
@@ -540,13 +545,13 @@ class BaseCoordinator(object):
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
- _f.add_callback(self._handle_heartbeat_response, future)
+ _f.add_callback(self._handle_heartbeat_response, future, time.time())
_f.add_errback(self._failed_request, self.coordinator_id,
request, future)
return future
- def _handle_heartbeat_response(self, future, response):
- #self.sensors.heartbeat_latency.record(response.requestLatencyMs())
+ def _handle_heartbeat_response(self, future, send_time, response):
+ self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful heartbeat response for group %s",
@@ -651,60 +656,56 @@ class HeartbeatTask(object):
def _handle_heartbeat_failure(self, e):
log.warning("Heartbeat failed (%s); retrying", e)
self._request_in_flight = False
- etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
+ etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000
self._client.schedule(self, etd)
-'''
+
class GroupCoordinatorMetrics(object):
- def __init__(self, metrics, prefix, tags=None):
+ def __init__(self, heartbeat, metrics, prefix, tags=None):
+ self.heartbeat = heartbeat
self.metrics = metrics
- self.group_name = prefix + "-coordinator-metrics"
-
- self.heartbeat_latency = metrics.sensor("heartbeat-latency")
- self.heartbeat_latency.add(metrics.metricName(
- "heartbeat-response-time-max", self.group_name,
- "The max time taken to receive a response to a heartbeat request",
- tags), metrics.Max())
- self.heartbeat_latency.add(metrics.metricName(
- "heartbeat-rate", self.group_name,
- "The average number of heartbeats per second",
- tags), metrics.Rate(sampled_stat=metrics.Count()))
-
- self.join_latency = metrics.sensor("join-latency")
- self.join_latency.add(metrics.metricName(
- "join-time-avg", self.group_name,
- "The average time taken for a group rejoin",
- tags), metrics.Avg())
- self.join_latency.add(metrics.metricName(
- "join-time-max", self.group_name,
- "The max time taken for a group rejoin",
- tags), metrics.Avg())
- self.join_latency.add(metrics.metricName(
- "join-rate", self.group_name,
- "The number of group joins per second",
- tags), metrics.Rate(sampled_stat=metrics.Count()))
-
- self.sync_latency = metrics.sensor("sync-latency")
- self.sync_latency.add(metrics.metricName(
- "sync-time-avg", self.group_name,
- "The average time taken for a group sync",
- tags), metrics.Avg())
- self.sync_latency.add(metrics.MetricName(
- "sync-time-max", self.group_name,
- "The max time taken for a group sync",
- tags), metrics.Avg())
- self.sync_latency.add(metrics.metricName(
- "sync-rate", self.group_name,
- "The number of group syncs per second",
- tags), metrics.Rate(sampled_stat=metrics.Count()))
-
- """
- lastHeartbeat = Measurable(
- measure=lambda _, value: value - heartbeat.last_heartbeat_send()
- )
- metrics.addMetric(metrics.metricName(
- "last-heartbeat-seconds-ago", self.group_name,
- "The number of seconds since the last controller heartbeat",
- tags), lastHeartbeat)
- """
-'''
+ self.metric_group_name = prefix + "-coordinator-metrics"
+
+ self.heartbeat_latency = metrics.sensor('heartbeat-latency')
+ self.heartbeat_latency.add(metrics.metric_name(
+ 'heartbeat-response-time-max', self.metric_group_name,
+ 'The max time taken to receive a response to a heartbeat request',
+ tags), Max())
+ self.heartbeat_latency.add(metrics.metric_name(
+ 'heartbeat-rate', self.metric_group_name,
+ 'The average number of heartbeats per second',
+ tags), Rate(sampled_stat=Count()))
+
+ self.join_latency = metrics.sensor('join-latency')
+ self.join_latency.add(metrics.metric_name(
+ 'join-time-avg', self.metric_group_name,
+ 'The average time taken for a group rejoin',
+ tags), Avg())
+ self.join_latency.add(metrics.metric_name(
+ 'join-time-max', self.metric_group_name,
+ 'The max time taken for a group rejoin',
+ tags), Avg())
+ self.join_latency.add(metrics.metric_name(
+ 'join-rate', self.metric_group_name,
+ 'The number of group joins per second',
+ tags), Rate(sampled_stat=Count()))
+
+ self.sync_latency = metrics.sensor('sync-latency')
+ self.sync_latency.add(metrics.metric_name(
+ 'sync-time-avg', self.metric_group_name,
+ 'The average time taken for a group sync',
+ tags), Avg())
+ self.sync_latency.add(metrics.metric_name(
+ 'sync-time-max', self.metric_group_name,
+ 'The max time taken for a group sync',
+ tags), Avg())
+ self.sync_latency.add(metrics.metric_name(
+ 'sync-rate', self.metric_group_name,
+ 'The number of group syncs per second',
+ tags), Rate(sampled_stat=Count()))
+
+ metrics.add_metric(metrics.metric_name(
+ 'last-heartbeat-seconds-ago', self.metric_group_name,
+ 'The number of seconds since the last controller heartbeat',
+ tags), AnonMeasurable(
+ lambda _, now: (now / 1000) - self.heartbeat.last_send))
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 517f66a..d6ad9e6 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -76,7 +76,10 @@ class ConsumerCoordinator(BaseCoordinator):
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
- super(ConsumerCoordinator, self).__init__(client, **configs)
+ super(ConsumerCoordinator, self).__init__(client,
+ metrics, metric_group_prefix,
+ **configs)
+
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -107,8 +110,8 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
self._auto_commit_task.reschedule()
- self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
- self._subscription)
+ self.consumer_sensors = ConsumerCoordinatorMetrics(
+ metrics, metric_group_prefix, self._subscription)
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
@@ -485,7 +488,7 @@ class ConsumerCoordinator(BaseCoordinator):
def _handle_offset_commit_response(self, offsets, future, send_time, response):
# TODO look at adding request_latency_ms to response (like java kafka)
- self._sensors.commit_latency.record((time.time() - send_time) * 1000)
+ self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
for topic, partitions in response.topics:
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
index 1cd9863..648cb1f 100644
--- a/kafka/coordinator/heartbeat.py
+++ b/kafka/coordinator/heartbeat.py
@@ -20,8 +20,8 @@ class Heartbeat(object):
self.interval = self.config['heartbeat_interval_ms'] / 1000.0
self.timeout = self.config['session_timeout_ms'] / 1000.0
- self.last_send = 0
- self.last_receive = 0
+ self.last_send = -1 * float('inf')
+ self.last_receive = -1 * float('inf')
self.last_reset = time.time()
def sent_heartbeat(self):