summaryrefslogtreecommitdiff
path: root/kafka/metrics/stats/sensor.py
blob: 72bacfc9a0614d564e6d009fcabeb827c8d9c69a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import threading
import time

from kafka.errors import QuotaViolationError
from kafka.metrics import KafkaMetric


class Sensor(object):
    """
    A sensor applies a continuous sequence of numerical values
    to a set of associated metrics. For example a sensor on
    message size would record a sequence of message sizes using
    the `record(double)` api and would maintain a set
    of metrics about request sizes such as the average or max.
    """
    def __init__(self, registry, name, parents, config,
                 inactive_sensor_expiration_time_seconds):
        if not name:
            raise ValueError('name must be non-empty')
        self._lock = threading.RLock()
        self._registry = registry
        self._name = name
        self._parents = parents or []
        self._metrics = []
        self._stats = []
        self._config = config
        self._inactive_sensor_expiration_time_ms = (
            inactive_sensor_expiration_time_seconds * 1000)
        self._last_record_time = time.time() * 1000
        self._check_forest(set())

    def _check_forest(self, sensors):
        """Validate that this sensor doesn't end up referencing itself."""
        if self in sensors:
            raise ValueError('Circular dependency in sensors: %s is its own'
                             'parent.' % self.name)
        sensors.add(self)
        for parent in self._parents:
            parent._check_forest(sensors)

    @property
    def name(self):
        """
        The name this sensor is registered with.
        This name will be unique among all registered sensors.
        """
        return self._name

    @property
    def metrics(self):
        return tuple(self._metrics)

    def record(self, value=1.0, time_ms=None):
        """
        Record a value at a known time.
        Arguments:
            value (double): The value we are recording
            time_ms (int): A POSIX timestamp in milliseconds.
                Default: The time when record() is evaluated (now)

        Raises:
            QuotaViolationException: if recording this value moves a
                metric beyond its configured maximum or minimum bound
        """
        if time_ms is None:
            time_ms = time.time() * 1000
        self._last_record_time = time_ms
        with self._lock:  # XXX high volume, might be performance issue
            # increment all the stats
            for stat in self._stats:
                stat.record(self._config, value, time_ms)
            self._check_quotas(time_ms)
        for parent in self._parents:
            parent.record(value, time_ms)

    def _check_quotas(self, time_ms):
        """
        Check if we have violated our quota for any metric that
        has a configured quota
        """
        for metric in self._metrics:
            if metric.config and metric.config.quota:
                value = metric.value(time_ms)
                if not metric.config.quota.is_acceptable(value):
                    raise QuotaViolationError('(%s) violated quota. Actual: '
                                              '(%d), Threshold: (%d)' %
                                              (metric.metric_name,
                                               metric.config.quota.bound,
                                               value))

    def add_compound(self, compound_stat, config=None):
        """
        Register a compound statistic with this sensor which
        yields multiple measurable quantities (like a histogram)

        Arguments:
            stat (AbstractCompoundStat): The stat to register
            config (MetricConfig): The configuration for this stat.
                If None then the stat will use the default configuration
                for this sensor.
        """
        if not compound_stat:
            raise ValueError('compound stat must be non-empty')
        self._stats.append(compound_stat)
        for named_measurable in compound_stat.stats():
            metric = KafkaMetric(named_measurable.name, named_measurable.stat,
                                 config or self._config)
            self._registry.register_metric(metric)
            self._metrics.append(metric)

    def add(self, metric_name, stat, config=None):
        """
        Register a metric with this sensor

        Arguments:
            metric_name (MetricName): The name of the metric
            stat (AbstractMeasurableStat): The statistic to keep
            config (MetricConfig): A special configuration for this metric.
                If None use the sensor default configuration.
        """
        with self._lock:
            metric = KafkaMetric(metric_name, stat, config or self._config)
            self._registry.register_metric(metric)
            self._metrics.append(metric)
            self._stats.append(stat)

    def has_expired(self):
        """
        Return True if the Sensor is eligible for removal due to inactivity.
        """
        return ((time.time() * 1000 - self._last_record_time) >
                self._inactive_sensor_expiration_time_ms)