summaryrefslogtreecommitdiff
path: root/kafka/metrics/stats
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/metrics/stats')
-rw-r--r--kafka/metrics/stats/__init__.py15
-rw-r--r--kafka/metrics/stats/avg.py22
-rw-r--r--kafka/metrics/stats/count.py15
-rw-r--r--kafka/metrics/stats/histogram.py93
-rw-r--r--kafka/metrics/stats/max_stat.py15
-rw-r--r--kafka/metrics/stats/min_stat.py17
-rw-r--r--kafka/metrics/stats/percentile.py12
-rw-r--r--kafka/metrics/stats/percentiles.py72
-rw-r--r--kafka/metrics/stats/rate.py115
-rw-r--r--kafka/metrics/stats/sampled_stat.py99
-rw-r--r--kafka/metrics/stats/sensor.py133
-rw-r--r--kafka/metrics/stats/total.py13
12 files changed, 621 insertions, 0 deletions
diff --git a/kafka/metrics/stats/__init__.py b/kafka/metrics/stats/__init__.py
new file mode 100644
index 0000000..15eafd9
--- /dev/null
+++ b/kafka/metrics/stats/__init__.py
@@ -0,0 +1,15 @@
+from .avg import Avg
+from .count import Count
+from .histogram import Histogram
+from .max_stat import Max
+from .min_stat import Min
+from .percentile import Percentile
+from .percentiles import Percentiles
+from .rate import Rate
+from .sensor import Sensor
+from .total import Total
+
+__all__ = [
+ 'Avg', 'Count', 'Histogram', 'Max', 'Min', 'Percentile', 'Percentiles',
+ 'Rate', 'Sensor', 'Total'
+]
diff --git a/kafka/metrics/stats/avg.py b/kafka/metrics/stats/avg.py
new file mode 100644
index 0000000..4d0be0a
--- /dev/null
+++ b/kafka/metrics/stats/avg.py
@@ -0,0 +1,22 @@
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class Avg(AbstractSampledStat):
+ """
+ An AbstractSampledStat that maintains a simple average over its samples.
+ """
+ def __init__(self):
+ super(Avg, self).__init__(0.0)
+
+ def update(self, sample, config, value, now):
+ sample.value += value
+
+ def combine(self, samples, config, now):
+ total_sum = 0
+ total_count = 0
+ for sample in samples:
+ total_sum += sample.value
+ total_count += sample.event_count
+ if not total_count:
+ return 0
+ return float(total_sum) / total_count
diff --git a/kafka/metrics/stats/count.py b/kafka/metrics/stats/count.py
new file mode 100644
index 0000000..183e4f2
--- /dev/null
+++ b/kafka/metrics/stats/count.py
@@ -0,0 +1,15 @@
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class Count(AbstractSampledStat):
+ """
+ An AbstractSampledStat that maintains a simple count of what it has seen.
+ """
+ def __init__(self):
+ super(Count, self).__init__(0.0)
+
+ def update(self, sample, config, value, now):
+ sample.value += 1.0
+
+ def combine(self, samples, config, now):
+ return float(sum(sample.value for sample in samples))
diff --git a/kafka/metrics/stats/histogram.py b/kafka/metrics/stats/histogram.py
new file mode 100644
index 0000000..42aacdb
--- /dev/null
+++ b/kafka/metrics/stats/histogram.py
@@ -0,0 +1,93 @@
+import math
+
+
+class Histogram(object):
+ def __init__(self, bin_scheme):
+ self._hist = [0.0] * bin_scheme.bins
+ self._count = 0.0
+ self._bin_scheme = bin_scheme
+
+ def record(self, value):
+ self._hist[self._bin_scheme.to_bin(value)] += 1.0
+ self._count += 1.0
+
+ def value(self, quantile):
+ if self._count == 0.0:
+ return float('NaN')
+ _sum = 0.0
+ quant = float(quantile)
+ for i, value in enumerate(self._hist[:-1]):
+ _sum += value
+ if _sum / self._count > quant:
+ return self._bin_scheme.from_bin(i)
+ return float('inf')
+
+ @property
+ def counts(self):
+ return self._hist
+
+ def clear(self):
+ for i in range(self._hist):
+ self._hist[i] = 0.0
+ self._count = 0
+
+ def __str__(self):
+ values = ['%.10f:%.0f' % (self._bin_scheme.from_bin(i), value) for
+ i, value in enumerate(self._hist[:-1])]
+ values.append('%s:%s' % (float('inf'), self._hist[-1]))
+ return '{%s}' % ','.join(values)
+
+ class ConstantBinScheme(object):
+ def __init__(self, bins, min_val, max_val):
+ if bins < 2:
+ raise ValueError('Must have at least 2 bins.')
+ self._min = float(min_val)
+ self._max = float(max_val)
+ self._bins = int(bins)
+ self._bucket_width = (max_val - min_val) / (bins - 2)
+
+ @property
+ def bins(self):
+ return self._bins
+
+ def from_bin(self, b):
+ if b == 0:
+ return float('-inf')
+ elif b == self._bins - 1:
+ return float('inf')
+ else:
+ return self._min + (b - 1) * self._bucket_width
+
+ def to_bin(self, x):
+ if x < self._min:
+ return 0
+ elif x > self._max:
+ return self._bins - 1
+ else:
+ return int(((x - self._min) / self._bucket_width) + 1)
+
+ class LinearBinScheme(object):
+ def __init__(self, num_bins, max_val):
+ self._bins = num_bins
+ self._max = max_val
+ self._scale = max_val / (num_bins * (num_bins - 1) / 2)
+
+ @property
+ def bins(self):
+ return self._bins
+
+ def from_bin(self, b):
+ if b == self._bins - 1:
+ return float('inf')
+ else:
+ unscaled = (b * (b + 1.0)) / 2.0
+ return unscaled * self._scale
+
+ def to_bin(self, x):
+ if x < 0.0:
+ raise ValueError('Values less than 0.0 not accepted.')
+ elif x > self._max:
+ return self._bins - 1
+ else:
+ scaled = x / self._scale
+ return int(-0.5 + math.sqrt(2.0 * scaled + 0.25))
diff --git a/kafka/metrics/stats/max_stat.py b/kafka/metrics/stats/max_stat.py
new file mode 100644
index 0000000..8df54d3
--- /dev/null
+++ b/kafka/metrics/stats/max_stat.py
@@ -0,0 +1,15 @@
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class Max(AbstractSampledStat):
+ """An AbstractSampledStat that gives the max over its samples."""
+ def __init__(self):
+ super(Max, self).__init__(float('-inf'))
+
+ def update(self, sample, config, value, now):
+ sample.value = max(sample.value, value)
+
+ def combine(self, samples, config, now):
+ if not samples:
+ return float('-inf')
+ return float(max(sample.value for sample in samples))
diff --git a/kafka/metrics/stats/min_stat.py b/kafka/metrics/stats/min_stat.py
new file mode 100644
index 0000000..a57c2dd
--- /dev/null
+++ b/kafka/metrics/stats/min_stat.py
@@ -0,0 +1,17 @@
+import sys
+
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class Min(AbstractSampledStat):
+ """An AbstractSampledStat that gives the min over its samples."""
+ def __init__(self):
+ super(Min, self).__init__(float(sys.maxsize))
+
+ def update(self, sample, config, value, now):
+ sample.value = min(sample.value, value)
+
+ def combine(self, samples, config, now):
+ if not samples:
+ return float(sys.maxsize)
+ return float(min(sample.value for sample in samples))
diff --git a/kafka/metrics/stats/percentile.py b/kafka/metrics/stats/percentile.py
new file mode 100644
index 0000000..723b9e6
--- /dev/null
+++ b/kafka/metrics/stats/percentile.py
@@ -0,0 +1,12 @@
+class Percentile(object):
+ def __init__(self, metric_name, percentile):
+ self._metric_name = metric_name
+ self._percentile = float(percentile)
+
+ @property
+ def name(self):
+ return self._metric_name
+
+ @property
+ def percentile(self):
+ return self._percentile
diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py
new file mode 100644
index 0000000..84e7160
--- /dev/null
+++ b/kafka/metrics/stats/percentiles.py
@@ -0,0 +1,72 @@
+from kafka.metrics import AnonMeasurable, NamedMeasurable
+from kafka.metrics.compound_stat import AbstractCompoundStat
+from kafka.metrics.stats import Histogram
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class BucketSizing(object):
+ CONSTANT = 0
+ LINEAR = 1
+
+
+class Percentiles(AbstractSampledStat, AbstractCompoundStat):
+ """A compound stat that reports one or more percentiles"""
+ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
+ percentiles=None):
+ super(Percentiles, self).__init__(0.0)
+ self._percentiles = percentiles or []
+ self._buckets = int(size_in_bytes / 4)
+ if bucketing == BucketSizing.CONSTANT:
+ self._bin_scheme = Histogram.ConstantBinScheme(self._buckets,
+ min_val, max_val)
+ elif bucketing == BucketSizing.LINEAR:
+ if min_val != 0.0:
+ raise ValueError('Linear bucket sizing requires min_val'
+ ' to be 0.0.')
+ self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
+ else:
+ ValueError('Unknown bucket type: %s' % bucketing)
+
+ def stats(self):
+ measurables = []
+
+ def make_measure_fn(pct):
+ return lambda config, now: self.value(config, now,
+ pct / 100.0)
+
+ for percentile in self._percentiles:
+ measure_fn = make_measure_fn(percentile.percentile)
+ stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn))
+ measurables.append(stat)
+ return measurables
+
+ def value(self, config, now, quantile):
+ self.purge_obsolete_samples(config, now)
+ count = sum(sample.event_count for sample in self._samples)
+ if count == 0.0:
+ return float('NaN')
+ sum_val = 0.0
+ quant = float(quantile)
+ for b in range(self._buckets):
+ for sample in self._samples:
+ assert type(sample) is self.HistogramSample
+ hist = sample.histogram.counts
+ sum_val += hist[b]
+ if sum_val / count > quant:
+ return self._bin_scheme.from_bin(b)
+ return float('inf')
+
+ def combine(self, samples, config, now):
+ return self.value(config, now, 0.5)
+
+ def new_sample(self, time_ms):
+ return Percentiles.HistogramSample(self._bin_scheme, time_ms)
+
+ def update(self, sample, config, value, time_ms):
+ assert type(sample) is self.HistogramSample
+ sample.histogram.record(value)
+
+ class HistogramSample(AbstractSampledStat.Sample):
+ def __init__(self, scheme, now):
+ super(Percentiles.HistogramSample, self).__init__(0.0, now)
+ self.histogram = Histogram(scheme)
diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py
new file mode 100644
index 0000000..3ce2e74
--- /dev/null
+++ b/kafka/metrics/stats/rate.py
@@ -0,0 +1,115 @@
+from kafka.metrics.measurable_stat import AbstractMeasurableStat
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class TimeUnit(object):
+ _names = {
+ 'nanosecond': 0,
+ 'microsecond': 1,
+ 'millisecond': 2,
+ 'second': 3,
+ 'minute': 4,
+ 'hour': 5,
+ 'day': 6,
+ }
+
+ NANOSECONDS = _names['nanosecond']
+ MICROSECONDS = _names['microsecond']
+ MILLISECONDS = _names['millisecond']
+ SECONDS = _names['second']
+ MINUTES = _names['minute']
+ HOURS = _names['hour']
+ DAYS = _names['day']
+
+ @staticmethod
+ def get_name(time_unit):
+ return TimeUnit._names[time_unit]
+
+
+class Rate(AbstractMeasurableStat):
+ """
+ The rate of the given quantity. By default this is the total observed
+ over a set of samples from a sampled statistic divided by the elapsed
+ time over the sample windows. Alternative AbstractSampledStat
+ implementations can be provided, however, to record the rate of
+ occurrences (e.g. the count of values measured over the time interval)
+ or other such values.
+ """
+ def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None):
+ self._stat = sampled_stat or SampledTotal()
+ self._unit = time_unit
+
+ def unit_name(self):
+ return TimeUnit.get_name(self._unit)
+
+ def record(self, config, value, time_ms):
+ self._stat.record(config, value, time_ms)
+
+ def measure(self, config, now):
+ value = self._stat.measure(config, now)
+ return float(value) / self.convert(self.window_size(config, now))
+
+ def window_size(self, config, now):
+ # purge old samples before we compute the window size
+ self._stat.purge_obsolete_samples(config, now)
+
+ """
+ Here we check the total amount of time elapsed since the oldest
+ non-obsolete window. This give the total window_size of the batch
+ which is the time used for Rate computation. However, there is
+ an issue if we do not have sufficient data for e.g. if only
+ 1 second has elapsed in a 30 second window, the measured rate
+ will be very high. Hence we assume that the elapsed time is
+ always N-1 complete windows plus whatever fraction of the final
+ window is complete.
+
+ Note that we could simply count the amount of time elapsed in
+ the current window and add n-1 windows to get the total time,
+ but this approach does not account for sleeps. AbstractSampledStat
+ only creates samples whenever record is called, if no record is
+ called for a period of time that time is not accounted for in
+ window_size and produces incorrect results.
+ """
+ total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms
+ # Check how many full windows of data we have currently retained
+ num_full_windows = int(total_elapsed_time_ms / config.time_window_ms)
+ min_full_windows = config.samples - 1
+
+ # If the available windows are less than the minimum required,
+ # add the difference to the totalElapsedTime
+ if num_full_windows < min_full_windows:
+ total_elapsed_time_ms += ((min_full_windows - num_full_windows) *
+ config.time_window_ms)
+
+ return total_elapsed_time_ms
+
+ def convert(self, time_ms):
+ if self._unit == TimeUnit.NANOSECONDS:
+ return time_ms * 1000.0 * 1000.0
+ elif self._unit == TimeUnit.MICROSECONDS:
+ return time_ms * 1000.0
+ elif self._unit == TimeUnit.MILLISECONDS:
+ return time_ms
+ elif self._unit == TimeUnit.SECONDS:
+ return time_ms / 1000.0
+ elif self._unit == TimeUnit.MINUTES:
+ return time_ms / (60.0 * 1000.0)
+ elif self._unit == TimeUnit.HOURS:
+ return time_ms / (60.0 * 60.0 * 1000.0)
+ elif self._unit == TimeUnit.DAYS:
+ return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
+ else:
+ raise ValueError('Unknown unit: %s' % self._unit)
+
+
+class SampledTotal(AbstractSampledStat):
+ def __init__(self, initial_value=None):
+ if initial_value is not None:
+ raise ValueError('initial_value cannot be set on SampledTotal')
+ super(SampledTotal, self).__init__(0.0)
+
+ def update(self, sample, config, value, time_ms):
+ sample.value += value
+
+ def combine(self, samples, config, now):
+ return float(sum(sample.value for sample in samples))
diff --git a/kafka/metrics/stats/sampled_stat.py b/kafka/metrics/stats/sampled_stat.py
new file mode 100644
index 0000000..ca0db69
--- /dev/null
+++ b/kafka/metrics/stats/sampled_stat.py
@@ -0,0 +1,99 @@
+import abc
+
+from kafka.metrics.measurable_stat import AbstractMeasurableStat
+
+
+class AbstractSampledStat(AbstractMeasurableStat):
+ """
+ An AbstractSampledStat records a single scalar value measured over
+ one or more samples. Each sample is recorded over a configurable
+ window. The window can be defined by number of events or elapsed
+ time (or both, if both are given the window is complete when
+ *either* the event count or elapsed time criterion is met).
+
+ All the samples are combined to produce the measurement. When a
+ window is complete the oldest sample is cleared and recycled to
+ begin recording the next sample.
+
+ Subclasses of this class define different statistics measured
+ using this basic pattern.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, initial_value):
+ self._initial_value = initial_value
+ self._samples = []
+ self._current = 0
+
+ @abc.abstractmethod
+ def update(self, sample, config, value, time_ms):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def combine(self, samples, config, now):
+ raise NotImplementedError
+
+ def record(self, config, value, time_ms):
+ sample = self.current(time_ms)
+ if sample.is_complete(time_ms, config):
+ sample = self._advance(config, time_ms)
+ self.update(sample, config, float(value), time_ms)
+ sample.event_count += 1
+
+ def new_sample(self, time_ms):
+ return self.Sample(self._initial_value, time_ms)
+
+ def measure(self, config, now):
+ self.purge_obsolete_samples(config, now)
+ return float(self.combine(self._samples, config, now))
+
+ def current(self, time_ms):
+ if not self._samples:
+ self._samples.append(self.new_sample(time_ms))
+ return self._samples[self._current]
+
+ def oldest(self, now):
+ if not self._samples:
+ self._samples.append(self.new_sample(now))
+ oldest = self._samples[0]
+ for sample in self._samples[1:]:
+ if sample.last_window_ms < oldest.last_window_ms:
+ oldest = sample
+ return oldest
+
+ def purge_obsolete_samples(self, config, now):
+ """
+ Timeout any windows that have expired in the absence of any events
+ """
+ expire_age = config.samples * config.time_window_ms
+ for sample in self._samples:
+ if now - sample.last_window_ms >= expire_age:
+ sample.reset(now)
+
+ def _advance(self, config, time_ms):
+ self._current = (self._current + 1) % config.samples
+ if self._current >= len(self._samples):
+ sample = self.new_sample(time_ms)
+ self._samples.append(sample)
+ return sample
+ else:
+ sample = self.current(time_ms)
+ sample.reset(time_ms)
+ return sample
+
+ class Sample(object):
+
+ def __init__(self, initial_value, now):
+ self.initial_value = initial_value
+ self.event_count = 0
+ self.last_window_ms = now
+ self.value = initial_value
+
+ def reset(self, now):
+ self.event_count = 0
+ self.last_window_ms = now
+ self.value = self.initial_value
+
+ def is_complete(self, time_ms, config):
+ return (time_ms - self.last_window_ms >= config.time_window_ms or
+ self.event_count >= config.event_window)
diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py
new file mode 100644
index 0000000..7d179cb
--- /dev/null
+++ b/kafka/metrics/stats/sensor.py
@@ -0,0 +1,133 @@
+import threading
+import time
+
+from kafka.errors import QuotaViolationError
+from kafka.metrics import KafkaMetric
+
+
+class Sensor(object):
+ """
+ A sensor applies a continuous sequence of numerical values
+ to a set of associated metrics. For example a sensor on
+ message size would record a sequence of message sizes using
+ the `record(double)` api and would maintain a set
+ of metrics about request sizes such as the average or max.
+ """
+ def __init__(self, registry, name, parents, config,
+ inactive_sensor_expiration_time_seconds):
+ if not name:
+ raise ValueError('name must be non-empty')
+ self._lock = threading.RLock()
+ self._registry = registry
+ self._name = name
+ self._parents = parents or []
+ self._metrics = []
+ self._stats = []
+ self._config = config
+ self._inactive_sensor_expiration_time_ms = (
+ inactive_sensor_expiration_time_seconds * 1000)
+ self._last_record_time = time.time() * 1000
+ self._check_forest(set())
+
+ def _check_forest(self, sensors):
+ """Validate that this sensor doesn't end up referencing itself."""
+ if self in sensors:
+ raise ValueError('Circular dependency in sensors: %s is its own'
+ 'parent.' % self.name)
+ sensors.add(self)
+ for parent in self._parents:
+ parent._check_forest(sensors)
+
+ @property
+ def name(self):
+ """
+ The name this sensor is registered with.
+ This name will be unique among all registered sensors.
+ """
+ return self._name
+
+ @property
+ def metrics(self):
+ return tuple(self._metrics)
+
+ def record(self, value=1.0, time_ms=None):
+ """
+ Record a value at a known time.
+ Arguments:
+ value (double): The value we are recording
+ time_ms (int): The current POSIX time in milliseconds
+
+ Raises:
+ QuotaViolationException: if recording this value moves a
+ metric beyond its configured maximum or minimum bound
+ """
+ now = time.time() * 1000
+ if time_ms is None:
+ time_ms = now
+ self._last_record_time = now
+ with self._lock: # XXX high volume, might be performance issue
+ # increment all the stats
+ for stat in self._stats:
+ stat.record(self._config, value, time_ms)
+ self._check_quotas(time_ms)
+ for parent in self._parents:
+ parent.record(value, time_ms)
+
+ def _check_quotas(self, time_ms):
+ """
+ Check if we have violated our quota for any metric that
+ has a configured quota
+ """
+ for metric in self._metrics:
+ if metric.config and metric.config.quota:
+ value = metric.value(time_ms)
+ if not metric.config.quota.is_acceptable(value):
+ raise QuotaViolationError('(%s) violated quota. Actual: '
+ '(%d), Threshold: (%d)' %
+ (metric.metric_name,
+ metric.config.quota.bound,
+ value))
+
+ def add_compound(self, compound_stat, config=None):
+ """
+ Register a compound statistic with this sensor which
+ yields multiple measurable quantities (like a histogram)
+
+ Arguments:
+ stat (AbstractCompoundStat): The stat to register
+ config (MetricConfig): The configuration for this stat.
+ If None then the stat will use the default configuration
+ for this sensor.
+ """
+ if not compound_stat:
+ raise ValueError('compound stat must be non-empty')
+ self._stats.append(compound_stat)
+ for named_measurable in compound_stat.stats():
+ metric = KafkaMetric(self._lock, named_measurable.name,
+ named_measurable.stat, config or self._config)
+ self._registry.register_metric(metric)
+ self._metrics.append(metric)
+
+ def add(self, metric_name, stat, config=None):
+ """
+ Register a metric with this sensor
+
+ Arguments:
+ metric_name (MetricName): The name of the metric
+ stat (AbstractMeasurableStat): The statistic to keep
+ config (MetricConfig): A special configuration for this metric.
+ If None use the sensor default configuration.
+ """
+ with self._lock:
+ metric = KafkaMetric(threading.Lock(), metric_name, stat,
+ config or self._config)
+ self._registry.register_metric(metric)
+ self._metrics.append(metric)
+ self._stats.append(stat)
+
+ def has_expired(self):
+ """
+ Return True if the Sensor is eligible for removal due to inactivity.
+ """
+ return ((time.time() * 1000 - self._last_record_time) >
+ self._inactive_sensor_expiration_time_ms)
diff --git a/kafka/metrics/stats/total.py b/kafka/metrics/stats/total.py
new file mode 100644
index 0000000..76a82d8
--- /dev/null
+++ b/kafka/metrics/stats/total.py
@@ -0,0 +1,13 @@
+from kafka.metrics.measurable_stat import AbstractMeasurableStat
+
+
+class Total(AbstractMeasurableStat):
+ """An un-windowed cumulative total maintained over all time."""
+ def __init__(self, value=0.0):
+ self._total = value
+
+ def record(self, config, value, now):
+ self._total += value
+
+ def measure(self, config, now):
+ return float(self._total)