diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-26 08:07:57 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-07-26 08:07:57 -0700 |
commit | bdb0559b381c635c5d364b10a5a59cb53de2649c (patch) | |
tree | 165c4bec6e1ff0b68752e2b72796cd1ffa009852 | |
parent | 0d161f72dd2ac610e625b6c197d1ef6f3af104e8 (diff) | |
parent | a871aa60f3ac74bf88beff5b6df74d0466e2b0b0 (diff) | |
download | kafka-python-bdb0559b381c635c5d364b10a5a59cb53de2649c.tar.gz |
Merge pull request #772 from dpkp/more_metrics
Add client, base coordinator, and a few extra producer metrics.
-rw-r--r-- | kafka/client_async.py | 52 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 141 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 11 | ||||
-rw-r--r-- | kafka/coordinator/heartbeat.py | 4 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 3 | ||||
-rw-r--r-- | kafka/producer/sender.py | 13 |
6 files changed, 142 insertions, 82 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index c081f07..dee4a12 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -24,6 +24,8 @@ from .cluster import ClusterMetadata from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi from . import errors as Errors from .future import Future +from .metrics.stats import Avg, Count, Rate +from .metrics.stats.rate import TimeUnit from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest from . import socketpair @@ -65,6 +67,8 @@ class KafkaClient(object): 'api_version': None, 'api_version_auto_timeout_ms': 2000, 'selector': selectors.DefaultSelector, + 'metrics': None, + 'metric_group_prefix': '', } API_VERSIONS = [ (0, 10), @@ -139,6 +143,9 @@ class KafkaClient(object): selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector + metrics (kafka.metrics.Metrics): Optionally provide a metrics + instance for capturing network IO stats. Default: None. + metric_group_prefix (str): Prefix for metric names. Default: '' """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -167,6 +174,9 @@ class KafkaClient(object): self._selector.register(self._wake_r, selectors.EVENT_READ) self._closed = False self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) + self._sensors = None + if self.config['metrics']: + self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix']) # Check Broker Version if not set explicitly if self.config['api_version'] is None: @@ -487,7 +497,14 @@ class KafkaClient(object): responses = [] processed = set() - for key, events in self._selector.select(timeout): + + start_select = time.time() + ready = self._selector.select(timeout) + end_select = time.time() + if self._sensors: + self._sensors.select_time.record((end_select - start_select) * 1000000000) + + for key, events in ready: if key.fileobj is self._wake_r: self._clear_wake_fd() continue @@ -531,6 +548,9 @@ class KafkaClient(object): response = conn.recv() if response: responses.append(response) + + if self._sensors: + self._sensors.io_time.record((time.time() - end_select) * 1000000000) return responses def in_flight_request_count(self, node_id=None): @@ -848,3 +868,33 @@ class DelayedTaskQueue(object): break ready_tasks.append(task) return ready_tasks + + +class KafkaClientMetrics(object): + def __init__(self, metrics, metric_group_prefix): + self.metrics = metrics + self.metric_group_name = metric_group_prefix + '-metrics' + + self.select_time = metrics.sensor('select-time') + self.select_time.add(metrics.metric_name( + 'select-rate', self.metric_group_name, + 'Number of times the I/O layer checked for new I/O to perform per' + ' second'), Rate(sampled_stat=Count())) + self.select_time.add(metrics.metric_name( + 'io-wait-time-ns-avg', self.metric_group_name, + 'The average length of time the I/O thread spent waiting for a' + ' socket ready for reads or writes in nanoseconds.'), Avg()) + self.select_time.add(metrics.metric_name( + 'io-wait-ratio', self.metric_group_name, + 'The fraction of time the I/O thread spent waiting.'), + Rate(time_unit=TimeUnit.NANOSECONDS)) + + self.io_time = metrics.sensor('io-time') + self.io_time.add(metrics.metric_name( + 'io-time-ns-avg', self.metric_group_name, + 'The average length of time for I/O per select call in nanoseconds.'), + Avg()) + self.io_time.add(metrics.metric_name( + 'io-ratio', self.metric_group_name, + 'The fraction of time the I/O thread spent doing I/O'), + Rate(time_unit=TimeUnit.NANOSECONDS)) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 25dd000..bbdc8ad 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import, division + import abc import copy import logging @@ -6,12 +8,14 @@ import weakref import six -import kafka.errors as Errors -from kafka.future import Future -from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest -from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, - LeaveGroupRequest, SyncGroupRequest) from .heartbeat import Heartbeat +from .. import errors as Errors +from ..future import Future +from ..metrics import AnonMeasurable +from ..metrics.stats import Avg, Count, Max, Rate +from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest +from ..protocol.group import (HeartbeatRequest, JoinGroupRequest, + LeaveGroupRequest, SyncGroupRequest) log = logging.getLogger('kafka.coordinator') @@ -53,7 +57,7 @@ class BaseCoordinator(object): 'api_version': (0, 9), } - def __init__(self, client, **configs): + def __init__(self, client, metrics, metric_group_prefix, **configs): """ Keyword Arguments: group_id (str): name of the consumer group to join for dynamic @@ -87,7 +91,8 @@ class BaseCoordinator(object): self.needs_join_prepare = True self.heartbeat = Heartbeat(**self.config) self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) - #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics, + metric_group_prefix) def __del__(self): if hasattr(self, 'heartbeat_task') and self.heartbeat_task: @@ -254,7 +259,7 @@ class BaseCoordinator(object): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000.0) + time.sleep(self.config['retry_backoff_ms'] / 1000) def _send_join_group_request(self): """Join the group and return the assignment for the next generation. @@ -285,7 +290,7 @@ class BaseCoordinator(object): log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_join_group_response, future) + _f.add_callback(self._handle_join_group_response, future, time.time()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future @@ -300,7 +305,7 @@ class BaseCoordinator(object): self.coordinator_dead() future.failure(error) - def _handle_join_group_response(self, future, response): + def _handle_join_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Received successful JoinGroup response for group %s: %s", @@ -311,7 +316,7 @@ class BaseCoordinator(object): self.protocol = response.group_protocol log.info("Joined group '%s' (generation %s) with member_id %s", self.group_id, self.generation, self.member_id) - #self.sensors.join_latency.record(response.requestLatencyMs()) + self.sensors.join_latency.record((time.time() - send_time) * 1000) if response.leader_id == response.member_id: log.info("Elected group leader -- performing partition" " assignments using %s", self.protocol) @@ -402,17 +407,17 @@ class BaseCoordinator(object): return Future().failure(e) future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_sync_group_response, future) + _f.add_callback(self._handle_sync_group_response, future, time.time()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future - def _handle_sync_group_response(self, future, response): + def _handle_sync_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.info("Successfully joined group %s with generation %s", self.group_id, self.generation) - #self.sensors.syncLatency.record(response.requestLatencyMs()) + self.sensors.sync_latency.record((time.time() - send_time) * 1000) future.success(response.member_assignment) return @@ -540,13 +545,13 @@ class BaseCoordinator(object): log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_heartbeat_response, future) + _f.add_callback(self._handle_heartbeat_response, future, time.time()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future - def _handle_heartbeat_response(self, future, response): - #self.sensors.heartbeat_latency.record(response.requestLatencyMs()) + def _handle_heartbeat_response(self, future, send_time, response): + self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.debug("Received successful heartbeat response for group %s", @@ -651,60 +656,56 @@ class HeartbeatTask(object): def _handle_heartbeat_failure(self, e): log.warning("Heartbeat failed (%s); retrying", e) self._request_in_flight = False - etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0 + etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000 self._client.schedule(self, etd) -''' + class GroupCoordinatorMetrics(object): - def __init__(self, metrics, prefix, tags=None): + def __init__(self, heartbeat, metrics, prefix, tags=None): + self.heartbeat = heartbeat self.metrics = metrics - self.group_name = prefix + "-coordinator-metrics" - - self.heartbeat_latency = metrics.sensor("heartbeat-latency") - self.heartbeat_latency.add(metrics.metricName( - "heartbeat-response-time-max", self.group_name, - "The max time taken to receive a response to a heartbeat request", - tags), metrics.Max()) - self.heartbeat_latency.add(metrics.metricName( - "heartbeat-rate", self.group_name, - "The average number of heartbeats per second", - tags), metrics.Rate(sampled_stat=metrics.Count())) - - self.join_latency = metrics.sensor("join-latency") - self.join_latency.add(metrics.metricName( - "join-time-avg", self.group_name, - "The average time taken for a group rejoin", - tags), metrics.Avg()) - self.join_latency.add(metrics.metricName( - "join-time-max", self.group_name, - "The max time taken for a group rejoin", - tags), metrics.Avg()) - self.join_latency.add(metrics.metricName( - "join-rate", self.group_name, - "The number of group joins per second", - tags), metrics.Rate(sampled_stat=metrics.Count())) - - self.sync_latency = metrics.sensor("sync-latency") - self.sync_latency.add(metrics.metricName( - "sync-time-avg", self.group_name, - "The average time taken for a group sync", - tags), metrics.Avg()) - self.sync_latency.add(metrics.MetricName( - "sync-time-max", self.group_name, - "The max time taken for a group sync", - tags), metrics.Avg()) - self.sync_latency.add(metrics.metricName( - "sync-rate", self.group_name, - "The number of group syncs per second", - tags), metrics.Rate(sampled_stat=metrics.Count())) - - """ - lastHeartbeat = Measurable( - measure=lambda _, value: value - heartbeat.last_heartbeat_send() - ) - metrics.addMetric(metrics.metricName( - "last-heartbeat-seconds-ago", self.group_name, - "The number of seconds since the last controller heartbeat", - tags), lastHeartbeat) - """ -''' + self.metric_group_name = prefix + "-coordinator-metrics" + + self.heartbeat_latency = metrics.sensor('heartbeat-latency') + self.heartbeat_latency.add(metrics.metric_name( + 'heartbeat-response-time-max', self.metric_group_name, + 'The max time taken to receive a response to a heartbeat request', + tags), Max()) + self.heartbeat_latency.add(metrics.metric_name( + 'heartbeat-rate', self.metric_group_name, + 'The average number of heartbeats per second', + tags), Rate(sampled_stat=Count())) + + self.join_latency = metrics.sensor('join-latency') + self.join_latency.add(metrics.metric_name( + 'join-time-avg', self.metric_group_name, + 'The average time taken for a group rejoin', + tags), Avg()) + self.join_latency.add(metrics.metric_name( + 'join-time-max', self.metric_group_name, + 'The max time taken for a group rejoin', + tags), Avg()) + self.join_latency.add(metrics.metric_name( + 'join-rate', self.metric_group_name, + 'The number of group joins per second', + tags), Rate(sampled_stat=Count())) + + self.sync_latency = metrics.sensor('sync-latency') + self.sync_latency.add(metrics.metric_name( + 'sync-time-avg', self.metric_group_name, + 'The average time taken for a group sync', + tags), Avg()) + self.sync_latency.add(metrics.metric_name( + 'sync-time-max', self.metric_group_name, + 'The max time taken for a group sync', + tags), Avg()) + self.sync_latency.add(metrics.metric_name( + 'sync-rate', self.metric_group_name, + 'The number of group syncs per second', + tags), Rate(sampled_stat=Count())) + + metrics.add_metric(metrics.metric_name( + 'last-heartbeat-seconds-ago', self.metric_group_name, + 'The number of seconds since the last controller heartbeat', + tags), AnonMeasurable( + lambda _, now: (now / 1000) - self.heartbeat.last_send)) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 517f66a..d6ad9e6 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -76,7 +76,10 @@ class ConsumerCoordinator(BaseCoordinator): True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True """ - super(ConsumerCoordinator, self).__init__(client, **configs) + super(ConsumerCoordinator, self).__init__(client, + metrics, metric_group_prefix, + **configs) + self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -107,8 +110,8 @@ class ConsumerCoordinator(BaseCoordinator): self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) self._auto_commit_task.reschedule() - self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, - self._subscription) + self.consumer_sensors = ConsumerCoordinatorMetrics( + metrics, metric_group_prefix, self._subscription) def __del__(self): if hasattr(self, '_cluster') and self._cluster: @@ -485,7 +488,7 @@ class ConsumerCoordinator(BaseCoordinator): def _handle_offset_commit_response(self, offsets, future, send_time, response): # TODO look at adding request_latency_ms to response (like java kafka) - self._sensors.commit_latency.record((time.time() - send_time) * 1000) + self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) unauthorized_topics = set() for topic, partitions in response.topics: diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 1cd9863..648cb1f 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -20,8 +20,8 @@ class Heartbeat(object): self.interval = self.config['heartbeat_interval_ms'] / 1000.0 self.timeout = self.config['session_timeout_ms'] / 1000.0 - self.last_send = 0 - self.last_receive = 0 + self.last_send = -1 * float('inf') + self.last_receive = -1 * float('inf') self.last_reset = time.time() def sent_heartbeat(self): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index c4d1a36..02e4621 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -308,7 +308,8 @@ class KafkaProducer(object): reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - client = KafkaClient(**self.config) + client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + **self.config) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index c1d0905..e0381d5 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -4,6 +4,7 @@ import collections import copy import logging import threading +import time import six @@ -145,7 +146,7 @@ class Sender(threading.Thread): log.debug('Sending Produce Request: %r', request) (self._client.send(node_id, request) .add_callback( - self._handle_produce_response, batches) + self._handle_produce_response, node_id, time.time(), batches) .add_errback( self._failed_produce, batches, node_id)) @@ -183,7 +184,7 @@ class Sender(threading.Thread): for batch in batches: self._complete_batch(batch, error, -1, None) - def _handle_produce_response(self, batches, response): + def _handle_produce_response(self, node_id, send_time, batches, response): """Handle a produce response.""" # if we have a response, parse it log.debug('Parsing produce response: %r', response) @@ -203,6 +204,10 @@ class Sender(threading.Thread): batch = batches_by_partition[tp] self._complete_batch(batch, error, offset, ts) + self._sensors.record_latency((time.time() - send_time) * 1000, node=node_id) + if response.API_VERSION > 0: + self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) + else: # this is the acks = 0 case, just complete all requests for batch in batches: @@ -495,8 +500,8 @@ class SenderMetrics(object): def record_latency(self, latency, node=None): self.request_time_sensor.record(latency) - if node: - sensor = self.metrics.get_sensor('node-' + node + '.latency') + if node is not None: + sensor = self.metrics.get_sensor('node-' + str(node) + '.latency') if sensor: sensor.record(latency) |