summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-10 21:40:10 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 14:05:50 -0700
commit9b5c5acd9a8ba044c90ce3583c6c5231369627d6 (patch)
tree8453a9c6e70f7f356463f4e12aa7ef5851d2cc81
parent7a6f2349ea05bda9ee6103e7f8214b9778ebcce6 (diff)
downloadkafka-python-9b5c5acd9a8ba044c90ce3583c6c5231369627d6.tar.gz
First stab at public metrics() interface for KafkaConsumer / KafkaProducer
-rw-r--r--kafka/consumer/group.py18
-rw-r--r--kafka/producer/kafka.py15
2 files changed, 31 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 5a3b117..982cd7b 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -12,7 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-from kafka.metrics import DictReporter, MetricConfig, Metrics
+from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
@@ -241,7 +241,6 @@ class KafkaConsumer(six.Iterator):
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
- reporters.append(DictReporter('kafka.consumer'))
self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
@@ -760,6 +759,21 @@ class KafkaConsumer(six.Iterator):
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
+ def metrics(self, raw=False):
+ """Warning: this is an unstable interface.
+ It may change in future releases without warning"""
+ if raw:
+ return self._metrics.metrics
+
+ metrics = {}
+ for k, v in self._metrics.metrics.items():
+ if k.group not in metrics:
+ metrics[k.group] = {}
+ if k.name not in metrics[k.group]:
+ metrics[k.group][k.name] = {}
+ metrics[k.group][k.name] = v.value()
+ return metrics
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 61cdc8b..70c0cd0 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -602,3 +602,18 @@ class KafkaProducer(object):
return self.config['partitioner'](serialized_key,
all_partitions,
available)
+
+ def metrics(self, raw=False):
+ """Warning: this is an unstable interface.
+ It may change in future releases without warning"""
+ if raw:
+ return self._metrics.metrics
+
+ metrics = {}
+ for k, v in self._metrics.metrics.items():
+ if k.group not in metrics:
+ metrics[k.group] = {}
+ if k.name not in metrics[k.group]:
+ metrics[k.group][k.name] = {}
+ metrics[k.group][k.name] = v.value()
+ return metrics