diff options
-rw-r--r-- | kafka/consumer/group.py | 18 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 15 |
2 files changed, 31 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 5a3b117..982cd7b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -12,7 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.metrics import DictReporter, MetricConfig, Metrics +from kafka.metrics import MetricConfig, Metrics from kafka.protocol.offset import OffsetResetStrategy from kafka.structs import TopicPartition from kafka.version import __version__ @@ -241,7 +241,6 @@ class KafkaConsumer(six.Iterator): time_window_ms=self.config['metrics_sample_window_ms'], tags=metrics_tags) reporters = [reporter() for reporter in self.config['metric_reporters']] - reporters.append(DictReporter('kafka.consumer')) self._metrics = Metrics(metric_config, reporters) metric_group_prefix = 'consumer' # TODO _metrics likely needs to be passed to KafkaClient, etc. @@ -760,6 +759,21 @@ class KafkaConsumer(six.Iterator): self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") + def metrics(self, raw=False): + """Warning: this is an unstable interface. + It may change in future releases without warning""" + if raw: + return self._metrics.metrics + + metrics = {} + for k, v in self._metrics.metrics.items(): + if k.group not in metrics: + metrics[k.group] = {} + if k.name not in metrics[k.group]: + metrics[k.group][k.name] = {} + metrics[k.group][k.name] = v.value() + return metrics + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 61cdc8b..70c0cd0 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -602,3 +602,18 @@ class KafkaProducer(object): return self.config['partitioner'](serialized_key, all_partitions, available) + + def metrics(self, raw=False): + """Warning: this is an unstable interface. + It may change in future releases without warning""" + if raw: + return self._metrics.metrics + + metrics = {} + for k, v in self._metrics.metrics.items(): + if k.group not in metrics: + metrics[k.group] = {} + if k.name not in metrics[k.group]: + metrics[k.group][k.name] = {} + metrics[k.group][k.name] = v.value() + return metrics |