diff options
author | Zack Dever <zdever@pandora.com> | 2016-04-07 18:39:37 -0700 |
---|---|---|
committer | Zack Dever <zdever@pandora.com> | 2016-04-13 17:26:39 -0700 |
commit | caf4cdefe4f41b444d44ddef8f40f5ddeccf65b9 (patch) | |
tree | f6ef2313bb5756f65e0f65a6fa94b94537b08c5b | |
parent | 64e9cebfa5e883464cfe76af0c3476ae542ac17b (diff) | |
download | kafka-python-caf4cdefe4f41b444d44ddef8f40f5ddeccf65b9.tar.gz |
Basic dictionary reporter in place of the java JMX reporter.
-rw-r--r-- | kafka/metrics/__init__.py | 3 | ||||
-rw-r--r-- | kafka/metrics/dict_reporter.py | 82 | ||||
-rw-r--r-- | test/test_metrics.py | 39 |
3 files changed, 120 insertions, 4 deletions
diff --git a/kafka/metrics/__init__.py b/kafka/metrics/__init__.py index b930dea..dd22f53 100644 --- a/kafka/metrics/__init__.py +++ b/kafka/metrics/__init__.py @@ -1,4 +1,5 @@ from .compound_stat import NamedMeasurable +from .dict_reporter import DictReporter from .kafka_metric import KafkaMetric from .measurable import AnonMeasurable from .metric_config import MetricConfig @@ -7,6 +8,6 @@ from .metrics import Metrics from .quota import Quota __all__ = [ - 'AnonMeasurable', 'KafkaMetric', 'MetricConfig', + 'AnonMeasurable', 'DictReporter', 'KafkaMetric', 'MetricConfig', 'MetricName', 'Metrics', 'NamedMeasurable', 'Quota' ] diff --git a/kafka/metrics/dict_reporter.py b/kafka/metrics/dict_reporter.py new file mode 100644 index 0000000..4888fc8 --- /dev/null +++ b/kafka/metrics/dict_reporter.py @@ -0,0 +1,82 @@ +import logging +import threading + +from kafka.metrics.metrics_reporter import AbstractMetricsReporter + +logger = logging.getLogger(__name__) + + +class DictReporter(AbstractMetricsReporter): + """A basic dictionary based metrics reporter. + + Store all metrics in a two level dictionary of category > name > metric. + """ + def __init__(self, prefix=''): + self._lock = threading.RLock() + self._prefix = prefix if prefix else '' # never allow None + self._store = {} + + def snapshot(self): + """ + Return a nested dictionary snapshot of all metrics and their + values at this time. Example: + { + 'category': { + 'metric1_name': 42.0, + 'metric2_name': 'foo' + } + } + """ + return dict((category, dict((name, metric.value()) + for name, metric in metrics.items())) + for category, metrics in + self._store.items()) + + def init(self, metrics): + with self._lock: + for metric in metrics: + self.metric_change(metric) + + def metric_change(self, metric): + with self._lock: + category = self.get_category(metric) + if category not in self._store: + self._store[category] = {} + self._store[category][metric.metric_name.name] = metric + + def metric_removal(self, metric): + with self._lock: + category = self.get_category(metric) + metrics = self._store.get(category, {}) + removed = metrics.pop(metric.metric_name.name, None) + if not metrics: + self._store.pop(category, None) + return removed + + def get_category(self, metric): + """ + Return a string category for the metric. + + The category is made up of this reporter's prefix and the + metric's group and tags. + + Examples: + prefix = 'foo', group = 'bar', tags = {'a': 1, 'b': 2} + returns: 'foo.bar.a=1,b=2' + + prefix = 'foo', group = 'bar', tags = None + returns: 'foo.bar' + + prefix = None, group = 'bar', tags = None + returns: 'bar' + """ + tags = ','.join('%s=%s' % (k, v) for k, v in + sorted(metric.metric_name.tags.items())) + return '.'.join(x for x in + [self._prefix, metric.metric_name.group, tags] if x) + + def configure(self, configs): + pass + + def close(self): + pass diff --git a/test/test_metrics.py b/test/test_metrics.py index a78fe47..e4757d6 100644 --- a/test/test_metrics.py +++ b/test/test_metrics.py @@ -4,7 +4,7 @@ import time import pytest from kafka.errors import QuotaViolationError -from kafka.metrics import MetricConfig, MetricName, Metrics, Quota +from kafka.metrics import DictReporter, MetricConfig, MetricName, Metrics, Quota from kafka.metrics.measurable import AbstractMeasurable from kafka.metrics.stats import (Avg, Count, Max, Min, Percentile, Percentiles, Rate, Total) @@ -25,8 +25,13 @@ def config(): @pytest.fixture -def metrics(request, config): - metrics = Metrics(config, None, enable_expiration=True) +def reporter(): + return DictReporter() + + +@pytest.fixture +def metrics(request, config, reporter): + metrics = Metrics(config, [reporter], enable_expiration=True) request.addfinalizer(lambda: metrics.close()) return metrics @@ -440,6 +445,34 @@ def test_rate_windowing(mocker, time_keeper, metrics): < EPS, 'Elapsed Time = 75 seconds' +def test_reporter(metrics): + reporter = DictReporter() + foo_reporter = DictReporter(prefix='foo') + metrics.add_reporter(reporter) + metrics.add_reporter(foo_reporter) + sensor = metrics.sensor('kafka.requests') + sensor.add(metrics.metric_name('pack.bean1.avg', 'grp1'), Avg()) + sensor.add(metrics.metric_name('pack.bean2.total', 'grp2'), Total()) + sensor2 = metrics.sensor('kafka.blah') + sensor2.add(metrics.metric_name('pack.bean1.some', 'grp1'), Total()) + sensor2.add(metrics.metric_name('pack.bean2.some', 'grp1', + tags={'a': 42, 'b': 'bar'}), Total()) + + # kafka-metrics-count > count is the total number of metrics and automatic + expected = { + 'kafka-metrics-count': {'count': 5.0}, + 'grp2': {'pack.bean2.total': 0.0}, + 'grp1': {'pack.bean1.avg': 0.0, 'pack.bean1.some': 0.0}, + 'grp1.a=42,b=bar': {'pack.bean2.some': 0.0}, + } + assert expected == reporter.snapshot() + + for key in list(expected.keys()): + metrics = expected.pop(key) + expected['foo.%s' % key] = metrics + assert expected == foo_reporter.snapshot() + + class ConstantMeasurable(AbstractMeasurable): _value = 0.0 |