diff options
author | Zack Dever <zdever@pandora.com> | 2016-04-07 18:36:14 -0700 |
---|---|---|
committer | Zack Dever <zdever@pandora.com> | 2016-04-13 17:26:38 -0700 |
commit | 64e9cebfa5e883464cfe76af0c3476ae542ac17b (patch) | |
tree | 0f011c36c12076df940f49d5b58fd16b46e9b5b6 /kafka/metrics/stats/sensor.py | |
parent | 0c94b83a2dff8113b5fd7c16df8a11ca03c4377b (diff) | |
download | kafka-python-64e9cebfa5e883464cfe76af0c3476ae542ac17b.tar.gz |
Kafka metrics java port. No reporters or instrumentation.
There is no straight translation for the JMX reporter into python,
so I'll do something else in a separate commit.
Diffstat (limited to 'kafka/metrics/stats/sensor.py')
-rw-r--r-- | kafka/metrics/stats/sensor.py | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py new file mode 100644 index 0000000..7d179cb --- /dev/null +++ b/kafka/metrics/stats/sensor.py @@ -0,0 +1,133 @@ +import threading +import time + +from kafka.errors import QuotaViolationError +from kafka.metrics import KafkaMetric + + +class Sensor(object): + """ + A sensor applies a continuous sequence of numerical values + to a set of associated metrics. For example a sensor on + message size would record a sequence of message sizes using + the `record(double)` api and would maintain a set + of metrics about request sizes such as the average or max. + """ + def __init__(self, registry, name, parents, config, + inactive_sensor_expiration_time_seconds): + if not name: + raise ValueError('name must be non-empty') + self._lock = threading.RLock() + self._registry = registry + self._name = name + self._parents = parents or [] + self._metrics = [] + self._stats = [] + self._config = config + self._inactive_sensor_expiration_time_ms = ( + inactive_sensor_expiration_time_seconds * 1000) + self._last_record_time = time.time() * 1000 + self._check_forest(set()) + + def _check_forest(self, sensors): + """Validate that this sensor doesn't end up referencing itself.""" + if self in sensors: + raise ValueError('Circular dependency in sensors: %s is its own' + 'parent.' % self.name) + sensors.add(self) + for parent in self._parents: + parent._check_forest(sensors) + + @property + def name(self): + """ + The name this sensor is registered with. + This name will be unique among all registered sensors. + """ + return self._name + + @property + def metrics(self): + return tuple(self._metrics) + + def record(self, value=1.0, time_ms=None): + """ + Record a value at a known time. + Arguments: + value (double): The value we are recording + time_ms (int): The current POSIX time in milliseconds + + Raises: + QuotaViolationException: if recording this value moves a + metric beyond its configured maximum or minimum bound + """ + now = time.time() * 1000 + if time_ms is None: + time_ms = now + self._last_record_time = now + with self._lock: # XXX high volume, might be performance issue + # increment all the stats + for stat in self._stats: + stat.record(self._config, value, time_ms) + self._check_quotas(time_ms) + for parent in self._parents: + parent.record(value, time_ms) + + def _check_quotas(self, time_ms): + """ + Check if we have violated our quota for any metric that + has a configured quota + """ + for metric in self._metrics: + if metric.config and metric.config.quota: + value = metric.value(time_ms) + if not metric.config.quota.is_acceptable(value): + raise QuotaViolationError('(%s) violated quota. Actual: ' + '(%d), Threshold: (%d)' % + (metric.metric_name, + metric.config.quota.bound, + value)) + + def add_compound(self, compound_stat, config=None): + """ + Register a compound statistic with this sensor which + yields multiple measurable quantities (like a histogram) + + Arguments: + stat (AbstractCompoundStat): The stat to register + config (MetricConfig): The configuration for this stat. + If None then the stat will use the default configuration + for this sensor. + """ + if not compound_stat: + raise ValueError('compound stat must be non-empty') + self._stats.append(compound_stat) + for named_measurable in compound_stat.stats(): + metric = KafkaMetric(self._lock, named_measurable.name, + named_measurable.stat, config or self._config) + self._registry.register_metric(metric) + self._metrics.append(metric) + + def add(self, metric_name, stat, config=None): + """ + Register a metric with this sensor + + Arguments: + metric_name (MetricName): The name of the metric + stat (AbstractMeasurableStat): The statistic to keep + config (MetricConfig): A special configuration for this metric. + If None use the sensor default configuration. + """ + with self._lock: + metric = KafkaMetric(threading.Lock(), metric_name, stat, + config or self._config) + self._registry.register_metric(metric) + self._metrics.append(metric) + self._stats.append(stat) + + def has_expired(self): + """ + Return True if the Sensor is eligible for removal due to inactivity. + """ + return ((time.time() * 1000 - self._last_record_time) > + self._inactive_sensor_expiration_time_ms) |