diff options
author | Zack Dever <zdever@pandora.com> | 2016-04-13 13:52:36 -0700 |
---|---|---|
committer | Zack Dever <zdever@pandora.com> | 2016-04-13 17:26:39 -0700 |
commit | e2b340c4408801515f5e924aec066af983aa5c57 (patch) | |
tree | 5618627ab6919b6fd4cd476e801c0f9bf449d716 /kafka/consumer | |
parent | 81dc89a4fd17e601f8ea1570234d3c6ccf1e0d3a (diff) | |
download | kafka-python-e2b340c4408801515f5e924aec066af983aa5c57.tar.gz |
instrument metrics for fetch requests
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 154 | ||||
-rw-r--r-- | kafka/consumer/group.py | 4 |
2 files changed, 93 insertions, 65 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 015b3cd..1d4b0f0 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -3,11 +3,13 @@ from __future__ import absolute_import import collections import copy import logging +import time import six import kafka.errors as Errors from kafka.future import Future +from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy @@ -40,7 +42,8 @@ class Fetcher(six.Iterator): 'api_version': (0, 8, 0), } - def __init__(self, client, subscriptions, **configs): + def __init__(self, client, subscriptions, metrics, metric_group_prefix, + **configs): """Initialize a Kafka Message Fetcher. Keyword Arguments: @@ -68,8 +71,6 @@ class Fetcher(six.Iterator): the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. Default: True """ - #metrics=None, - #metric_group_prefix='consumer', self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -83,8 +84,7 @@ class Fetcher(six.Iterator): self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None self._fetch_futures = collections.deque() - - #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) + self._sensors = FetchManagerMetrics(metrics, metric_group_prefix) def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. @@ -109,7 +109,7 @@ class Fetcher(six.Iterator): if self._client.ready(node_id): log.debug("Sending FetchRequest to node %s", node_id) future = self._client.send(node_id, request) - future.add_callback(self._handle_fetch_response, request) + future.add_callback(self._handle_fetch_response, request, time.time()) future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) futures.append(future) self._fetch_futures.extend(futures) @@ -575,10 +575,11 @@ class Fetcher(six.Iterator): partition_data.items()) return requests - def _handle_fetch_response(self, request, response): + def _handle_fetch_response(self, request, send_time, response): """The callback for fetch completion""" - #total_bytes = 0 - #total_count = 0 + total_bytes = 0 + total_count = 0 + recv_time = time.time() fetch_offsets = {} for topic, partitions in request.topics: @@ -609,6 +610,7 @@ class Fetcher(six.Iterator): position) continue + num_bytes = 0 partial = None if messages and isinstance(messages[-1][-1], PartialMessage): partial = messages.pop() @@ -618,18 +620,18 @@ class Fetcher(six.Iterator): " offset %d to buffered record list", tp, position) self._records.append((fetch_offset, tp, messages)) - #last_offset, _, _ = messages[-1] - #self.sensors.records_fetch_lag.record(highwater - last_offset) + last_offset, _, _ = messages[-1] + self._sensors.records_fetch_lag.record(highwater - last_offset) + num_bytes = sum(msg[1] for msg in messages) elif partial: # we did not read a single message from a non-empty # buffer because that message's size is larger than # fetch size, in this case record this exception self._record_too_large_partitions[tp] = fetch_offset - # TODO: bytes metrics - #self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size()); - #totalBytes += num_bytes; - #totalCount += parsed.size(); + self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages)) + total_bytes += num_bytes + total_count += len(messages) elif error_type in (Errors.NotLeaderForPartitionError, Errors.UnknownTopicOrPartitionError): self._client.cluster.request_update() @@ -649,56 +651,82 @@ class Fetcher(six.Iterator): else: raise error_type('Unexpected error while fetching data') - """TOOD - metrics - self.sensors.bytesFetched.record(totalBytes) - self.sensors.recordsFetched.record(totalCount) - self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()) - self.sensors.fetchLatency.record(resp.requestLatencyMs()) + self._sensors.bytes_fetched.record(total_bytes) + self._sensors.records_fetched.record(total_count) + self._sensors.fetch_throttle_time_sensor.record(response['throttle_time_ms']) + self._sensors.fetch_latency.record((recv_time - send_time) * 1000) class FetchManagerMetrics(object): def __init__(self, metrics, prefix): self.metrics = metrics - self.group_name = prefix + "-fetch-manager-metrics" - - self.bytes_fetched = metrics.sensor("bytes-fetched") - self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name, - "The average number of bytes fetched per request"), metrics.Avg()) - self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name, - "The maximum number of bytes fetched per request"), metrics.Max()) - self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name, - "The average number of bytes consumed per second"), metrics.Rate()) - - self.records_fetched = self.metrics.sensor("records-fetched") - self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name, - "The average number of records in each request"), metrics.Avg()) - self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name, - "The average number of records consumed per second"), metrics.Rate()) - - self.fetch_latency = metrics.sensor("fetch-latency") - self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name, - "The average time taken for a fetch request."), metrics.Avg()) - self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name, - "The max time taken for any fetch request."), metrics.Max()) - self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name, - "The number of fetch requests per second."), metrics.Rate(sampled_stat=metrics.Count())) - - self.records_fetch_lag = metrics.sensor("records-lag") - self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name, - "The maximum lag in terms of number of records for any partition in self window"), metrics.Max()) - - self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time") - self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name, - "The average throttle time in ms"), metrics.Avg()) - self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name, - "The maximum throttle time in ms"), metrics.Max()) - - def record_topic_fetch_metrics(topic, num_bytes, num_records): - # record bytes fetched - name = '.'.join(["topic", topic, "bytes-fetched"]) - self.metrics[name].record(num_bytes); - - # record records fetched - name = '.'.join(["topic", topic, "records-fetched"]) - self.metrics[name].record(num_records) - """ + self.group_name = '%s-fetch-manager-metrics' % prefix + + self.bytes_fetched = metrics.sensor('bytes-fetched') + self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name, + 'The average number of bytes fetched per request'), Avg()) + self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name, + 'The maximum number of bytes fetched per request'), Max()) + self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name, + 'The average number of bytes consumed per second'), Rate()) + + self.records_fetched = self.metrics.sensor('records-fetched') + self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name, + 'The average number of records in each request'), Avg()) + self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name, + 'The average number of records consumed per second'), Rate()) + + self.fetch_latency = metrics.sensor('fetch-latency') + self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name, + 'The average time taken for a fetch request.'), Avg()) + self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name, + 'The max time taken for any fetch request.'), Max()) + self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name, + 'The number of fetch requests per second.'), Rate(sampled_stat=Count())) + + self.records_fetch_lag = metrics.sensor('records-lag') + self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name, + 'The maximum lag in terms of number of records for any partition in self window'), Max()) + + self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time') + self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name, + 'The average throttle time in ms'), Avg()) + self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name, + 'The maximum throttle time in ms'), Max()) + + def record_topic_fetch_metrics(self, topic, num_bytes, num_records): + metric_tags = {'topic': topic.replace('.', '_')} + + # record bytes fetched + name = '.'.join(['topic', topic, 'bytes-fetched']) + bytes_fetched = self.metrics.get_sensor(name) + if not bytes_fetched: + bytes_fetched = self.metrics.sensor(name) + bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', + self.group_name, + 'The average number of bytes fetched per request for topic %s' % topic, + metric_tags), Avg()) + bytes_fetched.add(self.metrics.metric_name('fetch-size-max', + self.group_name, + 'The maximum number of bytes fetched per request for topic %s' % topic, + metric_tags), Max()) + bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate', + self.group_name, + 'The average number of bytes consumed per second for topic %s' % topic, + metric_tags), Rate()) + bytes_fetched.record(num_bytes) + + # record records fetched + name = '.'.join(['topic', topic, 'records-fetched']) + records_fetched = self.metrics.get_sensor(name) + if not records_fetched: + records_fetched = self.metrics.sensor(name) + records_fetched.add(self.metrics.metric_name('records-per-request-avg', + self.group_name, + 'The average number of records in each request for topic %s' % topic, + metric_tags), Avg()) + records_fetched.add(self.metrics.metric_name('records-consumed-rate', + self.group_name, + 'The average number of records consumed per second for topic %s' % topic, + metric_tags), Rate()) + records_fetched.record(num_records) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index afcc996..abb65ef 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -218,7 +218,7 @@ class KafkaConsumer(six.Iterator): reporters.append(DictReporter('kafka.consumer')) self._metrics = Metrics(metric_config, reporters) metric_group_prefix = 'consumer' - # TODO _metrics likely needs to be passed to KafkaClient, Fetcher, etc. + # TODO _metrics likely needs to be passed to KafkaClient, etc. self._client = KafkaClient(**self.config) @@ -233,7 +233,7 @@ class KafkaConsumer(six.Iterator): self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( - self._client, self._subscription, **self.config) + self._client, self._subscription, self._metrics, metric_group_prefix, **self.config) self._coordinator = ConsumerCoordinator( self._client, self._subscription, self._metrics, metric_group_prefix, assignors=self.config['partition_assignment_strategy'], |