From e2b340c4408801515f5e924aec066af983aa5c57 Mon Sep 17 00:00:00 2001 From: Zack Dever Date: Wed, 13 Apr 2016 13:52:36 -0700 Subject: instrument metrics for fetch requests --- kafka/consumer/fetcher.py | 154 +++++++++++++++++++++++++++------------------- 1 file changed, 91 insertions(+), 63 deletions(-) (limited to 'kafka/consumer/fetcher.py') diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 015b3cd..1d4b0f0 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -3,11 +3,13 @@ from __future__ import absolute_import import collections import copy import logging +import time import six import kafka.errors as Errors from kafka.future import Future +from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy @@ -40,7 +42,8 @@ class Fetcher(six.Iterator): 'api_version': (0, 8, 0), } - def __init__(self, client, subscriptions, **configs): + def __init__(self, client, subscriptions, metrics, metric_group_prefix, + **configs): """Initialize a Kafka Message Fetcher. Keyword Arguments: @@ -68,8 +71,6 @@ class Fetcher(six.Iterator): the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. Default: True """ - #metrics=None, - #metric_group_prefix='consumer', self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -83,8 +84,7 @@ class Fetcher(six.Iterator): self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None self._fetch_futures = collections.deque() - - #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) + self._sensors = FetchManagerMetrics(metrics, metric_group_prefix) def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. @@ -109,7 +109,7 @@ class Fetcher(six.Iterator): if self._client.ready(node_id): log.debug("Sending FetchRequest to node %s", node_id) future = self._client.send(node_id, request) - future.add_callback(self._handle_fetch_response, request) + future.add_callback(self._handle_fetch_response, request, time.time()) future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) futures.append(future) self._fetch_futures.extend(futures) @@ -575,10 +575,11 @@ class Fetcher(six.Iterator): partition_data.items()) return requests - def _handle_fetch_response(self, request, response): + def _handle_fetch_response(self, request, send_time, response): """The callback for fetch completion""" - #total_bytes = 0 - #total_count = 0 + total_bytes = 0 + total_count = 0 + recv_time = time.time() fetch_offsets = {} for topic, partitions in request.topics: @@ -609,6 +610,7 @@ class Fetcher(six.Iterator): position) continue + num_bytes = 0 partial = None if messages and isinstance(messages[-1][-1], PartialMessage): partial = messages.pop() @@ -618,18 +620,18 @@ class Fetcher(six.Iterator): " offset %d to buffered record list", tp, position) self._records.append((fetch_offset, tp, messages)) - #last_offset, _, _ = messages[-1] - #self.sensors.records_fetch_lag.record(highwater - last_offset) + last_offset, _, _ = messages[-1] + self._sensors.records_fetch_lag.record(highwater - last_offset) + num_bytes = sum(msg[1] for msg in messages) elif partial: # we did not read a single message from a non-empty # buffer because that message's size is larger than # fetch size, in this case record this exception self._record_too_large_partitions[tp] = fetch_offset - # TODO: bytes metrics - #self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size()); - #totalBytes += num_bytes; - #totalCount += parsed.size(); + self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages)) + total_bytes += num_bytes + total_count += len(messages) elif error_type in (Errors.NotLeaderForPartitionError, Errors.UnknownTopicOrPartitionError): self._client.cluster.request_update() @@ -649,56 +651,82 @@ class Fetcher(six.Iterator): else: raise error_type('Unexpected error while fetching data') - """TOOD - metrics - self.sensors.bytesFetched.record(totalBytes) - self.sensors.recordsFetched.record(totalCount) - self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()) - self.sensors.fetchLatency.record(resp.requestLatencyMs()) + self._sensors.bytes_fetched.record(total_bytes) + self._sensors.records_fetched.record(total_count) + self._sensors.fetch_throttle_time_sensor.record(response['throttle_time_ms']) + self._sensors.fetch_latency.record((recv_time - send_time) * 1000) class FetchManagerMetrics(object): def __init__(self, metrics, prefix): self.metrics = metrics - self.group_name = prefix + "-fetch-manager-metrics" - - self.bytes_fetched = metrics.sensor("bytes-fetched") - self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name, - "The average number of bytes fetched per request"), metrics.Avg()) - self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name, - "The maximum number of bytes fetched per request"), metrics.Max()) - self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name, - "The average number of bytes consumed per second"), metrics.Rate()) - - self.records_fetched = self.metrics.sensor("records-fetched") - self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name, - "The average number of records in each request"), metrics.Avg()) - self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name, - "The average number of records consumed per second"), metrics.Rate()) - - self.fetch_latency = metrics.sensor("fetch-latency") - self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name, - "The average time taken for a fetch request."), metrics.Avg()) - self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name, - "The max time taken for any fetch request."), metrics.Max()) - self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name, - "The number of fetch requests per second."), metrics.Rate(sampled_stat=metrics.Count())) - - self.records_fetch_lag = metrics.sensor("records-lag") - self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name, - "The maximum lag in terms of number of records for any partition in self window"), metrics.Max()) - - self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time") - self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name, - "The average throttle time in ms"), metrics.Avg()) - self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name, - "The maximum throttle time in ms"), metrics.Max()) - - def record_topic_fetch_metrics(topic, num_bytes, num_records): - # record bytes fetched - name = '.'.join(["topic", topic, "bytes-fetched"]) - self.metrics[name].record(num_bytes); - - # record records fetched - name = '.'.join(["topic", topic, "records-fetched"]) - self.metrics[name].record(num_records) - """ + self.group_name = '%s-fetch-manager-metrics' % prefix + + self.bytes_fetched = metrics.sensor('bytes-fetched') + self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name, + 'The average number of bytes fetched per request'), Avg()) + self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name, + 'The maximum number of bytes fetched per request'), Max()) + self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name, + 'The average number of bytes consumed per second'), Rate()) + + self.records_fetched = self.metrics.sensor('records-fetched') + self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name, + 'The average number of records in each request'), Avg()) + self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name, + 'The average number of records consumed per second'), Rate()) + + self.fetch_latency = metrics.sensor('fetch-latency') + self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name, + 'The average time taken for a fetch request.'), Avg()) + self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name, + 'The max time taken for any fetch request.'), Max()) + self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name, + 'The number of fetch requests per second.'), Rate(sampled_stat=Count())) + + self.records_fetch_lag = metrics.sensor('records-lag') + self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name, + 'The maximum lag in terms of number of records for any partition in self window'), Max()) + + self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time') + self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name, + 'The average throttle time in ms'), Avg()) + self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name, + 'The maximum throttle time in ms'), Max()) + + def record_topic_fetch_metrics(self, topic, num_bytes, num_records): + metric_tags = {'topic': topic.replace('.', '_')} + + # record bytes fetched + name = '.'.join(['topic', topic, 'bytes-fetched']) + bytes_fetched = self.metrics.get_sensor(name) + if not bytes_fetched: + bytes_fetched = self.metrics.sensor(name) + bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', + self.group_name, + 'The average number of bytes fetched per request for topic %s' % topic, + metric_tags), Avg()) + bytes_fetched.add(self.metrics.metric_name('fetch-size-max', + self.group_name, + 'The maximum number of bytes fetched per request for topic %s' % topic, + metric_tags), Max()) + bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate', + self.group_name, + 'The average number of bytes consumed per second for topic %s' % topic, + metric_tags), Rate()) + bytes_fetched.record(num_bytes) + + # record records fetched + name = '.'.join(['topic', topic, 'records-fetched']) + records_fetched = self.metrics.get_sensor(name) + if not records_fetched: + records_fetched = self.metrics.sensor(name) + records_fetched.add(self.metrics.metric_name('records-per-request-avg', + self.group_name, + 'The average number of records in each request for topic %s' % topic, + metric_tags), Avg()) + records_fetched.add(self.metrics.metric_name('records-consumed-rate', + self.group_name, + 'The average number of records consumed per second for topic %s' % topic, + metric_tags), Rate()) + records_fetched.record(num_records) -- cgit v1.2.1