summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorZack Dever <zdever@pandora.com>2016-04-13 13:52:36 -0700
committerZack Dever <zdever@pandora.com>2016-04-13 17:26:39 -0700
commite2b340c4408801515f5e924aec066af983aa5c57 (patch)
tree5618627ab6919b6fd4cd476e801c0f9bf449d716 /kafka/consumer
parent81dc89a4fd17e601f8ea1570234d3c6ccf1e0d3a (diff)
downloadkafka-python-e2b340c4408801515f5e924aec066af983aa5c57.tar.gz
instrument metrics for fetch requests
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/fetcher.py154
-rw-r--r--kafka/consumer/group.py4
2 files changed, 93 insertions, 65 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 015b3cd..1d4b0f0 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -3,11 +3,13 @@ from __future__ import absolute_import
import collections
import copy
import logging
+import time
import six
import kafka.errors as Errors
from kafka.future import Future
+from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
@@ -40,7 +42,8 @@ class Fetcher(six.Iterator):
'api_version': (0, 8, 0),
}
- def __init__(self, client, subscriptions, **configs):
+ def __init__(self, client, subscriptions, metrics, metric_group_prefix,
+ **configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
@@ -68,8 +71,6 @@ class Fetcher(six.Iterator):
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
"""
- #metrics=None,
- #metric_group_prefix='consumer',
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -83,8 +84,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
-
- #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
+ self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
@@ -109,7 +109,7 @@ class Fetcher(six.Iterator):
if self._client.ready(node_id):
log.debug("Sending FetchRequest to node %s", node_id)
future = self._client.send(node_id, request)
- future.add_callback(self._handle_fetch_response, request)
+ future.add_callback(self._handle_fetch_response, request, time.time())
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
futures.append(future)
self._fetch_futures.extend(futures)
@@ -575,10 +575,11 @@ class Fetcher(six.Iterator):
partition_data.items())
return requests
- def _handle_fetch_response(self, request, response):
+ def _handle_fetch_response(self, request, send_time, response):
"""The callback for fetch completion"""
- #total_bytes = 0
- #total_count = 0
+ total_bytes = 0
+ total_count = 0
+ recv_time = time.time()
fetch_offsets = {}
for topic, partitions in request.topics:
@@ -609,6 +610,7 @@ class Fetcher(six.Iterator):
position)
continue
+ num_bytes = 0
partial = None
if messages and isinstance(messages[-1][-1], PartialMessage):
partial = messages.pop()
@@ -618,18 +620,18 @@ class Fetcher(six.Iterator):
" offset %d to buffered record list", tp,
position)
self._records.append((fetch_offset, tp, messages))
- #last_offset, _, _ = messages[-1]
- #self.sensors.records_fetch_lag.record(highwater - last_offset)
+ last_offset, _, _ = messages[-1]
+ self._sensors.records_fetch_lag.record(highwater - last_offset)
+ num_bytes = sum(msg[1] for msg in messages)
elif partial:
# we did not read a single message from a non-empty
# buffer because that message's size is larger than
# fetch size, in this case record this exception
self._record_too_large_partitions[tp] = fetch_offset
- # TODO: bytes metrics
- #self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size());
- #totalBytes += num_bytes;
- #totalCount += parsed.size();
+ self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
+ total_bytes += num_bytes
+ total_count += len(messages)
elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
self._client.cluster.request_update()
@@ -649,56 +651,82 @@ class Fetcher(six.Iterator):
else:
raise error_type('Unexpected error while fetching data')
- """TOOD - metrics
- self.sensors.bytesFetched.record(totalBytes)
- self.sensors.recordsFetched.record(totalCount)
- self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime())
- self.sensors.fetchLatency.record(resp.requestLatencyMs())
+ self._sensors.bytes_fetched.record(total_bytes)
+ self._sensors.records_fetched.record(total_count)
+ self._sensors.fetch_throttle_time_sensor.record(response['throttle_time_ms'])
+ self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
self.metrics = metrics
- self.group_name = prefix + "-fetch-manager-metrics"
-
- self.bytes_fetched = metrics.sensor("bytes-fetched")
- self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name,
- "The average number of bytes fetched per request"), metrics.Avg())
- self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name,
- "The maximum number of bytes fetched per request"), metrics.Max())
- self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name,
- "The average number of bytes consumed per second"), metrics.Rate())
-
- self.records_fetched = self.metrics.sensor("records-fetched")
- self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name,
- "The average number of records in each request"), metrics.Avg())
- self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name,
- "The average number of records consumed per second"), metrics.Rate())
-
- self.fetch_latency = metrics.sensor("fetch-latency")
- self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name,
- "The average time taken for a fetch request."), metrics.Avg())
- self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name,
- "The max time taken for any fetch request."), metrics.Max())
- self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name,
- "The number of fetch requests per second."), metrics.Rate(sampled_stat=metrics.Count()))
-
- self.records_fetch_lag = metrics.sensor("records-lag")
- self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name,
- "The maximum lag in terms of number of records for any partition in self window"), metrics.Max())
-
- self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time")
- self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name,
- "The average throttle time in ms"), metrics.Avg())
- self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name,
- "The maximum throttle time in ms"), metrics.Max())
-
- def record_topic_fetch_metrics(topic, num_bytes, num_records):
- # record bytes fetched
- name = '.'.join(["topic", topic, "bytes-fetched"])
- self.metrics[name].record(num_bytes);
-
- # record records fetched
- name = '.'.join(["topic", topic, "records-fetched"])
- self.metrics[name].record(num_records)
- """
+ self.group_name = '%s-fetch-manager-metrics' % prefix
+
+ self.bytes_fetched = metrics.sensor('bytes-fetched')
+ self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
+ 'The average number of bytes fetched per request'), Avg())
+ self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name,
+ 'The maximum number of bytes fetched per request'), Max())
+ self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name,
+ 'The average number of bytes consumed per second'), Rate())
+
+ self.records_fetched = self.metrics.sensor('records-fetched')
+ self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name,
+ 'The average number of records in each request'), Avg())
+ self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name,
+ 'The average number of records consumed per second'), Rate())
+
+ self.fetch_latency = metrics.sensor('fetch-latency')
+ self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name,
+ 'The average time taken for a fetch request.'), Avg())
+ self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name,
+ 'The max time taken for any fetch request.'), Max())
+ self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name,
+ 'The number of fetch requests per second.'), Rate(sampled_stat=Count()))
+
+ self.records_fetch_lag = metrics.sensor('records-lag')
+ self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name,
+ 'The maximum lag in terms of number of records for any partition in self window'), Max())
+
+ self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time')
+ self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name,
+ 'The average throttle time in ms'), Avg())
+ self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name,
+ 'The maximum throttle time in ms'), Max())
+
+ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
+ metric_tags = {'topic': topic.replace('.', '_')}
+
+ # record bytes fetched
+ name = '.'.join(['topic', topic, 'bytes-fetched'])
+ bytes_fetched = self.metrics.get_sensor(name)
+ if not bytes_fetched:
+ bytes_fetched = self.metrics.sensor(name)
+ bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
+ self.group_name,
+ 'The average number of bytes fetched per request for topic %s' % topic,
+ metric_tags), Avg())
+ bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
+ self.group_name,
+ 'The maximum number of bytes fetched per request for topic %s' % topic,
+ metric_tags), Max())
+ bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
+ self.group_name,
+ 'The average number of bytes consumed per second for topic %s' % topic,
+ metric_tags), Rate())
+ bytes_fetched.record(num_bytes)
+
+ # record records fetched
+ name = '.'.join(['topic', topic, 'records-fetched'])
+ records_fetched = self.metrics.get_sensor(name)
+ if not records_fetched:
+ records_fetched = self.metrics.sensor(name)
+ records_fetched.add(self.metrics.metric_name('records-per-request-avg',
+ self.group_name,
+ 'The average number of records in each request for topic %s' % topic,
+ metric_tags), Avg())
+ records_fetched.add(self.metrics.metric_name('records-consumed-rate',
+ self.group_name,
+ 'The average number of records consumed per second for topic %s' % topic,
+ metric_tags), Rate())
+ records_fetched.record(num_records)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index afcc996..abb65ef 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -218,7 +218,7 @@ class KafkaConsumer(six.Iterator):
reporters.append(DictReporter('kafka.consumer'))
self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
- # TODO _metrics likely needs to be passed to KafkaClient, Fetcher, etc.
+ # TODO _metrics likely needs to be passed to KafkaClient, etc.
self._client = KafkaClient(**self.config)
@@ -233,7 +233,7 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
- self._client, self._subscription, **self.config)
+ self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
self._coordinator = ConsumerCoordinator(
self._client, self._subscription, self._metrics, metric_group_prefix,
assignors=self.config['partition_assignment_strategy'],