summaryrefslogtreecommitdiff
path: root/kafka/metrics/stats/sensor.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/metrics/stats/sensor.py')
-rw-r--r--kafka/metrics/stats/sensor.py133
1 files changed, 133 insertions, 0 deletions
diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py
new file mode 100644
index 0000000..7d179cb
--- /dev/null
+++ b/kafka/metrics/stats/sensor.py
@@ -0,0 +1,133 @@
+import threading
+import time
+
+from kafka.errors import QuotaViolationError
+from kafka.metrics import KafkaMetric
+
+
+class Sensor(object):
+ """
+ A sensor applies a continuous sequence of numerical values
+ to a set of associated metrics. For example a sensor on
+ message size would record a sequence of message sizes using
+ the `record(double)` api and would maintain a set
+ of metrics about request sizes such as the average or max.
+ """
+ def __init__(self, registry, name, parents, config,
+ inactive_sensor_expiration_time_seconds):
+ if not name:
+ raise ValueError('name must be non-empty')
+ self._lock = threading.RLock()
+ self._registry = registry
+ self._name = name
+ self._parents = parents or []
+ self._metrics = []
+ self._stats = []
+ self._config = config
+ self._inactive_sensor_expiration_time_ms = (
+ inactive_sensor_expiration_time_seconds * 1000)
+ self._last_record_time = time.time() * 1000
+ self._check_forest(set())
+
+ def _check_forest(self, sensors):
+ """Validate that this sensor doesn't end up referencing itself."""
+ if self in sensors:
+ raise ValueError('Circular dependency in sensors: %s is its own'
+ 'parent.' % self.name)
+ sensors.add(self)
+ for parent in self._parents:
+ parent._check_forest(sensors)
+
+ @property
+ def name(self):
+ """
+ The name this sensor is registered with.
+ This name will be unique among all registered sensors.
+ """
+ return self._name
+
+ @property
+ def metrics(self):
+ return tuple(self._metrics)
+
+ def record(self, value=1.0, time_ms=None):
+ """
+ Record a value at a known time.
+ Arguments:
+ value (double): The value we are recording
+ time_ms (int): The current POSIX time in milliseconds
+
+ Raises:
+ QuotaViolationException: if recording this value moves a
+ metric beyond its configured maximum or minimum bound
+ """
+ now = time.time() * 1000
+ if time_ms is None:
+ time_ms = now
+ self._last_record_time = now
+ with self._lock: # XXX high volume, might be performance issue
+ # increment all the stats
+ for stat in self._stats:
+ stat.record(self._config, value, time_ms)
+ self._check_quotas(time_ms)
+ for parent in self._parents:
+ parent.record(value, time_ms)
+
+ def _check_quotas(self, time_ms):
+ """
+ Check if we have violated our quota for any metric that
+ has a configured quota
+ """
+ for metric in self._metrics:
+ if metric.config and metric.config.quota:
+ value = metric.value(time_ms)
+ if not metric.config.quota.is_acceptable(value):
+ raise QuotaViolationError('(%s) violated quota. Actual: '
+ '(%d), Threshold: (%d)' %
+ (metric.metric_name,
+ metric.config.quota.bound,
+ value))
+
+ def add_compound(self, compound_stat, config=None):
+ """
+ Register a compound statistic with this sensor which
+ yields multiple measurable quantities (like a histogram)
+
+ Arguments:
+ stat (AbstractCompoundStat): The stat to register
+ config (MetricConfig): The configuration for this stat.
+ If None then the stat will use the default configuration
+ for this sensor.
+ """
+ if not compound_stat:
+ raise ValueError('compound stat must be non-empty')
+ self._stats.append(compound_stat)
+ for named_measurable in compound_stat.stats():
+ metric = KafkaMetric(self._lock, named_measurable.name,
+ named_measurable.stat, config or self._config)
+ self._registry.register_metric(metric)
+ self._metrics.append(metric)
+
+ def add(self, metric_name, stat, config=None):
+ """
+ Register a metric with this sensor
+
+ Arguments:
+ metric_name (MetricName): The name of the metric
+ stat (AbstractMeasurableStat): The statistic to keep
+ config (MetricConfig): A special configuration for this metric.
+ If None use the sensor default configuration.
+ """
+ with self._lock:
+ metric = KafkaMetric(threading.Lock(), metric_name, stat,
+ config or self._config)
+ self._registry.register_metric(metric)
+ self._metrics.append(metric)
+ self._stats.append(stat)
+
+ def has_expired(self):
+ """
+ Return True if the Sensor is eligible for removal due to inactivity.
+ """
+ return ((time.time() * 1000 - self._last_record_time) >
+ self._inactive_sensor_expiration_time_ms)