1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
from __future__ import absolute_import
import threading
import time
from kafka.errors import QuotaViolationError
from kafka.metrics import KafkaMetric
class Sensor(object):
"""
A sensor applies a continuous sequence of numerical values
to a set of associated metrics. For example a sensor on
message size would record a sequence of message sizes using
the `record(double)` api and would maintain a set
of metrics about request sizes such as the average or max.
"""
def __init__(self, registry, name, parents, config,
inactive_sensor_expiration_time_seconds):
if not name:
raise ValueError('name must be non-empty')
self._lock = threading.RLock()
self._registry = registry
self._name = name
self._parents = parents or []
self._metrics = []
self._stats = []
self._config = config
self._inactive_sensor_expiration_time_ms = (
inactive_sensor_expiration_time_seconds * 1000)
self._last_record_time = time.time() * 1000
self._check_forest(set())
def _check_forest(self, sensors):
"""Validate that this sensor doesn't end up referencing itself."""
if self in sensors:
raise ValueError('Circular dependency in sensors: %s is its own'
'parent.' % self.name)
sensors.add(self)
for parent in self._parents:
parent._check_forest(sensors)
@property
def name(self):
"""
The name this sensor is registered with.
This name will be unique among all registered sensors.
"""
return self._name
@property
def metrics(self):
return tuple(self._metrics)
def record(self, value=1.0, time_ms=None):
"""
Record a value at a known time.
Arguments:
value (double): The value we are recording
time_ms (int): A POSIX timestamp in milliseconds.
Default: The time when record() is evaluated (now)
Raises:
QuotaViolationException: if recording this value moves a
metric beyond its configured maximum or minimum bound
"""
if time_ms is None:
time_ms = time.time() * 1000
self._last_record_time = time_ms
with self._lock: # XXX high volume, might be performance issue
# increment all the stats
for stat in self._stats:
stat.record(self._config, value, time_ms)
self._check_quotas(time_ms)
for parent in self._parents:
parent.record(value, time_ms)
def _check_quotas(self, time_ms):
"""
Check if we have violated our quota for any metric that
has a configured quota
"""
for metric in self._metrics:
if metric.config and metric.config.quota:
value = metric.value(time_ms)
if not metric.config.quota.is_acceptable(value):
raise QuotaViolationError('(%s) violated quota. Actual: '
'(%d), Threshold: (%d)' %
(metric.metric_name,
metric.config.quota.bound,
value))
def add_compound(self, compound_stat, config=None):
"""
Register a compound statistic with this sensor which
yields multiple measurable quantities (like a histogram)
Arguments:
stat (AbstractCompoundStat): The stat to register
config (MetricConfig): The configuration for this stat.
If None then the stat will use the default configuration
for this sensor.
"""
if not compound_stat:
raise ValueError('compound stat must be non-empty')
self._stats.append(compound_stat)
for named_measurable in compound_stat.stats():
metric = KafkaMetric(named_measurable.name, named_measurable.stat,
config or self._config)
self._registry.register_metric(metric)
self._metrics.append(metric)
def add(self, metric_name, stat, config=None):
"""
Register a metric with this sensor
Arguments:
metric_name (MetricName): The name of the metric
stat (AbstractMeasurableStat): The statistic to keep
config (MetricConfig): A special configuration for this metric.
If None use the sensor default configuration.
"""
with self._lock:
metric = KafkaMetric(metric_name, stat, config or self._config)
self._registry.register_metric(metric)
self._metrics.append(metric)
self._stats.append(stat)
def has_expired(self):
"""
Return True if the Sensor is eligible for removal due to inactivity.
"""
return ((time.time() * 1000 - self._last_record_time) >
self._inactive_sensor_expiration_time_ms)
|