summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZack Dever <zdever@pandora.com>2016-04-07 18:39:37 -0700
committerZack Dever <zdever@pandora.com>2016-04-13 17:26:39 -0700
commitcaf4cdefe4f41b444d44ddef8f40f5ddeccf65b9 (patch)
treef6ef2313bb5756f65e0f65a6fa94b94537b08c5b
parent64e9cebfa5e883464cfe76af0c3476ae542ac17b (diff)
downloadkafka-python-caf4cdefe4f41b444d44ddef8f40f5ddeccf65b9.tar.gz
Basic dictionary reporter in place of the java JMX reporter.
-rw-r--r--kafka/metrics/__init__.py3
-rw-r--r--kafka/metrics/dict_reporter.py82
-rw-r--r--test/test_metrics.py39
3 files changed, 120 insertions, 4 deletions
diff --git a/kafka/metrics/__init__.py b/kafka/metrics/__init__.py
index b930dea..dd22f53 100644
--- a/kafka/metrics/__init__.py
+++ b/kafka/metrics/__init__.py
@@ -1,4 +1,5 @@
from .compound_stat import NamedMeasurable
+from .dict_reporter import DictReporter
from .kafka_metric import KafkaMetric
from .measurable import AnonMeasurable
from .metric_config import MetricConfig
@@ -7,6 +8,6 @@ from .metrics import Metrics
from .quota import Quota
__all__ = [
- 'AnonMeasurable', 'KafkaMetric', 'MetricConfig',
+ 'AnonMeasurable', 'DictReporter', 'KafkaMetric', 'MetricConfig',
'MetricName', 'Metrics', 'NamedMeasurable', 'Quota'
]
diff --git a/kafka/metrics/dict_reporter.py b/kafka/metrics/dict_reporter.py
new file mode 100644
index 0000000..4888fc8
--- /dev/null
+++ b/kafka/metrics/dict_reporter.py
@@ -0,0 +1,82 @@
+import logging
+import threading
+
+from kafka.metrics.metrics_reporter import AbstractMetricsReporter
+
+logger = logging.getLogger(__name__)
+
+
+class DictReporter(AbstractMetricsReporter):
+ """A basic dictionary based metrics reporter.
+
+ Store all metrics in a two level dictionary of category > name > metric.
+ """
+ def __init__(self, prefix=''):
+ self._lock = threading.RLock()
+ self._prefix = prefix if prefix else '' # never allow None
+ self._store = {}
+
+ def snapshot(self):
+ """
+ Return a nested dictionary snapshot of all metrics and their
+ values at this time. Example:
+ {
+ 'category': {
+ 'metric1_name': 42.0,
+ 'metric2_name': 'foo'
+ }
+ }
+ """
+ return dict((category, dict((name, metric.value())
+ for name, metric in metrics.items()))
+ for category, metrics in
+ self._store.items())
+
+ def init(self, metrics):
+ with self._lock:
+ for metric in metrics:
+ self.metric_change(metric)
+
+ def metric_change(self, metric):
+ with self._lock:
+ category = self.get_category(metric)
+ if category not in self._store:
+ self._store[category] = {}
+ self._store[category][metric.metric_name.name] = metric
+
+ def metric_removal(self, metric):
+ with self._lock:
+ category = self.get_category(metric)
+ metrics = self._store.get(category, {})
+ removed = metrics.pop(metric.metric_name.name, None)
+ if not metrics:
+ self._store.pop(category, None)
+ return removed
+
+ def get_category(self, metric):
+ """
+ Return a string category for the metric.
+
+ The category is made up of this reporter's prefix and the
+ metric's group and tags.
+
+ Examples:
+ prefix = 'foo', group = 'bar', tags = {'a': 1, 'b': 2}
+ returns: 'foo.bar.a=1,b=2'
+
+ prefix = 'foo', group = 'bar', tags = None
+ returns: 'foo.bar'
+
+ prefix = None, group = 'bar', tags = None
+ returns: 'bar'
+ """
+ tags = ','.join('%s=%s' % (k, v) for k, v in
+ sorted(metric.metric_name.tags.items()))
+ return '.'.join(x for x in
+ [self._prefix, metric.metric_name.group, tags] if x)
+
+ def configure(self, configs):
+ pass
+
+ def close(self):
+ pass
diff --git a/test/test_metrics.py b/test/test_metrics.py
index a78fe47..e4757d6 100644
--- a/test/test_metrics.py
+++ b/test/test_metrics.py
@@ -4,7 +4,7 @@ import time
import pytest
from kafka.errors import QuotaViolationError
-from kafka.metrics import MetricConfig, MetricName, Metrics, Quota
+from kafka.metrics import DictReporter, MetricConfig, MetricName, Metrics, Quota
from kafka.metrics.measurable import AbstractMeasurable
from kafka.metrics.stats import (Avg, Count, Max, Min, Percentile, Percentiles,
Rate, Total)
@@ -25,8 +25,13 @@ def config():
@pytest.fixture
-def metrics(request, config):
- metrics = Metrics(config, None, enable_expiration=True)
+def reporter():
+ return DictReporter()
+
+
+@pytest.fixture
+def metrics(request, config, reporter):
+ metrics = Metrics(config, [reporter], enable_expiration=True)
request.addfinalizer(lambda: metrics.close())
return metrics
@@ -440,6 +445,34 @@ def test_rate_windowing(mocker, time_keeper, metrics):
< EPS, 'Elapsed Time = 75 seconds'
+def test_reporter(metrics):
+ reporter = DictReporter()
+ foo_reporter = DictReporter(prefix='foo')
+ metrics.add_reporter(reporter)
+ metrics.add_reporter(foo_reporter)
+ sensor = metrics.sensor('kafka.requests')
+ sensor.add(metrics.metric_name('pack.bean1.avg', 'grp1'), Avg())
+ sensor.add(metrics.metric_name('pack.bean2.total', 'grp2'), Total())
+ sensor2 = metrics.sensor('kafka.blah')
+ sensor2.add(metrics.metric_name('pack.bean1.some', 'grp1'), Total())
+ sensor2.add(metrics.metric_name('pack.bean2.some', 'grp1',
+ tags={'a': 42, 'b': 'bar'}), Total())
+
+ # kafka-metrics-count > count is the total number of metrics and automatic
+ expected = {
+ 'kafka-metrics-count': {'count': 5.0},
+ 'grp2': {'pack.bean2.total': 0.0},
+ 'grp1': {'pack.bean1.avg': 0.0, 'pack.bean1.some': 0.0},
+ 'grp1.a=42,b=bar': {'pack.bean2.some': 0.0},
+ }
+ assert expected == reporter.snapshot()
+
+ for key in list(expected.keys()):
+ metrics = expected.pop(key)
+ expected['foo.%s' % key] = metrics
+ assert expected == foo_reporter.snapshot()
+
+
class ConstantMeasurable(AbstractMeasurable):
_value = 0.0