diff options
Diffstat (limited to 'kafka/metrics/stats')
| -rw-r--r-- | kafka/metrics/stats/__init__.py | 15 | ||||
| -rw-r--r-- | kafka/metrics/stats/avg.py | 22 | ||||
| -rw-r--r-- | kafka/metrics/stats/count.py | 15 | ||||
| -rw-r--r-- | kafka/metrics/stats/histogram.py | 93 | ||||
| -rw-r--r-- | kafka/metrics/stats/max_stat.py | 15 | ||||
| -rw-r--r-- | kafka/metrics/stats/min_stat.py | 17 | ||||
| -rw-r--r-- | kafka/metrics/stats/percentile.py | 12 | ||||
| -rw-r--r-- | kafka/metrics/stats/percentiles.py | 72 | ||||
| -rw-r--r-- | kafka/metrics/stats/rate.py | 115 | ||||
| -rw-r--r-- | kafka/metrics/stats/sampled_stat.py | 99 | ||||
| -rw-r--r-- | kafka/metrics/stats/sensor.py | 133 | ||||
| -rw-r--r-- | kafka/metrics/stats/total.py | 13 |
12 files changed, 621 insertions, 0 deletions
diff --git a/kafka/metrics/stats/__init__.py b/kafka/metrics/stats/__init__.py new file mode 100644 index 0000000..15eafd9 --- /dev/null +++ b/kafka/metrics/stats/__init__.py @@ -0,0 +1,15 @@ +from .avg import Avg +from .count import Count +from .histogram import Histogram +from .max_stat import Max +from .min_stat import Min +from .percentile import Percentile +from .percentiles import Percentiles +from .rate import Rate +from .sensor import Sensor +from .total import Total + +__all__ = [ + 'Avg', 'Count', 'Histogram', 'Max', 'Min', 'Percentile', 'Percentiles', + 'Rate', 'Sensor', 'Total' +] diff --git a/kafka/metrics/stats/avg.py b/kafka/metrics/stats/avg.py new file mode 100644 index 0000000..4d0be0a --- /dev/null +++ b/kafka/metrics/stats/avg.py @@ -0,0 +1,22 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Avg(AbstractSampledStat): + """ + An AbstractSampledStat that maintains a simple average over its samples. + """ + def __init__(self): + super(Avg, self).__init__(0.0) + + def update(self, sample, config, value, now): + sample.value += value + + def combine(self, samples, config, now): + total_sum = 0 + total_count = 0 + for sample in samples: + total_sum += sample.value + total_count += sample.event_count + if not total_count: + return 0 + return float(total_sum) / total_count diff --git a/kafka/metrics/stats/count.py b/kafka/metrics/stats/count.py new file mode 100644 index 0000000..183e4f2 --- /dev/null +++ b/kafka/metrics/stats/count.py @@ -0,0 +1,15 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Count(AbstractSampledStat): + """ + An AbstractSampledStat that maintains a simple count of what it has seen. + """ + def __init__(self): + super(Count, self).__init__(0.0) + + def update(self, sample, config, value, now): + sample.value += 1.0 + + def combine(self, samples, config, now): + return float(sum(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/histogram.py b/kafka/metrics/stats/histogram.py new file mode 100644 index 0000000..42aacdb --- /dev/null +++ b/kafka/metrics/stats/histogram.py @@ -0,0 +1,93 @@ +import math + + +class Histogram(object): + def __init__(self, bin_scheme): + self._hist = [0.0] * bin_scheme.bins + self._count = 0.0 + self._bin_scheme = bin_scheme + + def record(self, value): + self._hist[self._bin_scheme.to_bin(value)] += 1.0 + self._count += 1.0 + + def value(self, quantile): + if self._count == 0.0: + return float('NaN') + _sum = 0.0 + quant = float(quantile) + for i, value in enumerate(self._hist[:-1]): + _sum += value + if _sum / self._count > quant: + return self._bin_scheme.from_bin(i) + return float('inf') + + @property + def counts(self): + return self._hist + + def clear(self): + for i in range(self._hist): + self._hist[i] = 0.0 + self._count = 0 + + def __str__(self): + values = ['%.10f:%.0f' % (self._bin_scheme.from_bin(i), value) for + i, value in enumerate(self._hist[:-1])] + values.append('%s:%s' % (float('inf'), self._hist[-1])) + return '{%s}' % ','.join(values) + + class ConstantBinScheme(object): + def __init__(self, bins, min_val, max_val): + if bins < 2: + raise ValueError('Must have at least 2 bins.') + self._min = float(min_val) + self._max = float(max_val) + self._bins = int(bins) + self._bucket_width = (max_val - min_val) / (bins - 2) + + @property + def bins(self): + return self._bins + + def from_bin(self, b): + if b == 0: + return float('-inf') + elif b == self._bins - 1: + return float('inf') + else: + return self._min + (b - 1) * self._bucket_width + + def to_bin(self, x): + if x < self._min: + return 0 + elif x > self._max: + return self._bins - 1 + else: + return int(((x - self._min) / self._bucket_width) + 1) + + class LinearBinScheme(object): + def __init__(self, num_bins, max_val): + self._bins = num_bins + self._max = max_val + self._scale = max_val / (num_bins * (num_bins - 1) / 2) + + @property + def bins(self): + return self._bins + + def from_bin(self, b): + if b == self._bins - 1: + return float('inf') + else: + unscaled = (b * (b + 1.0)) / 2.0 + return unscaled * self._scale + + def to_bin(self, x): + if x < 0.0: + raise ValueError('Values less than 0.0 not accepted.') + elif x > self._max: + return self._bins - 1 + else: + scaled = x / self._scale + return int(-0.5 + math.sqrt(2.0 * scaled + 0.25)) diff --git a/kafka/metrics/stats/max_stat.py b/kafka/metrics/stats/max_stat.py new file mode 100644 index 0000000..8df54d3 --- /dev/null +++ b/kafka/metrics/stats/max_stat.py @@ -0,0 +1,15 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Max(AbstractSampledStat): + """An AbstractSampledStat that gives the max over its samples.""" + def __init__(self): + super(Max, self).__init__(float('-inf')) + + def update(self, sample, config, value, now): + sample.value = max(sample.value, value) + + def combine(self, samples, config, now): + if not samples: + return float('-inf') + return float(max(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/min_stat.py b/kafka/metrics/stats/min_stat.py new file mode 100644 index 0000000..a57c2dd --- /dev/null +++ b/kafka/metrics/stats/min_stat.py @@ -0,0 +1,17 @@ +import sys + +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Min(AbstractSampledStat): + """An AbstractSampledStat that gives the min over its samples.""" + def __init__(self): + super(Min, self).__init__(float(sys.maxsize)) + + def update(self, sample, config, value, now): + sample.value = min(sample.value, value) + + def combine(self, samples, config, now): + if not samples: + return float(sys.maxsize) + return float(min(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/percentile.py b/kafka/metrics/stats/percentile.py new file mode 100644 index 0000000..723b9e6 --- /dev/null +++ b/kafka/metrics/stats/percentile.py @@ -0,0 +1,12 @@ +class Percentile(object): + def __init__(self, metric_name, percentile): + self._metric_name = metric_name + self._percentile = float(percentile) + + @property + def name(self): + return self._metric_name + + @property + def percentile(self): + return self._percentile diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py new file mode 100644 index 0000000..84e7160 --- /dev/null +++ b/kafka/metrics/stats/percentiles.py @@ -0,0 +1,72 @@ +from kafka.metrics import AnonMeasurable, NamedMeasurable +from kafka.metrics.compound_stat import AbstractCompoundStat +from kafka.metrics.stats import Histogram +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class BucketSizing(object): + CONSTANT = 0 + LINEAR = 1 + + +class Percentiles(AbstractSampledStat, AbstractCompoundStat): + """A compound stat that reports one or more percentiles""" + def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0, + percentiles=None): + super(Percentiles, self).__init__(0.0) + self._percentiles = percentiles or [] + self._buckets = int(size_in_bytes / 4) + if bucketing == BucketSizing.CONSTANT: + self._bin_scheme = Histogram.ConstantBinScheme(self._buckets, + min_val, max_val) + elif bucketing == BucketSizing.LINEAR: + if min_val != 0.0: + raise ValueError('Linear bucket sizing requires min_val' + ' to be 0.0.') + self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val) + else: + ValueError('Unknown bucket type: %s' % bucketing) + + def stats(self): + measurables = [] + + def make_measure_fn(pct): + return lambda config, now: self.value(config, now, + pct / 100.0) + + for percentile in self._percentiles: + measure_fn = make_measure_fn(percentile.percentile) + stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn)) + measurables.append(stat) + return measurables + + def value(self, config, now, quantile): + self.purge_obsolete_samples(config, now) + count = sum(sample.event_count for sample in self._samples) + if count == 0.0: + return float('NaN') + sum_val = 0.0 + quant = float(quantile) + for b in range(self._buckets): + for sample in self._samples: + assert type(sample) is self.HistogramSample + hist = sample.histogram.counts + sum_val += hist[b] + if sum_val / count > quant: + return self._bin_scheme.from_bin(b) + return float('inf') + + def combine(self, samples, config, now): + return self.value(config, now, 0.5) + + def new_sample(self, time_ms): + return Percentiles.HistogramSample(self._bin_scheme, time_ms) + + def update(self, sample, config, value, time_ms): + assert type(sample) is self.HistogramSample + sample.histogram.record(value) + + class HistogramSample(AbstractSampledStat.Sample): + def __init__(self, scheme, now): + super(Percentiles.HistogramSample, self).__init__(0.0, now) + self.histogram = Histogram(scheme) diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py new file mode 100644 index 0000000..3ce2e74 --- /dev/null +++ b/kafka/metrics/stats/rate.py @@ -0,0 +1,115 @@ +from kafka.metrics.measurable_stat import AbstractMeasurableStat +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class TimeUnit(object): + _names = { + 'nanosecond': 0, + 'microsecond': 1, + 'millisecond': 2, + 'second': 3, + 'minute': 4, + 'hour': 5, + 'day': 6, + } + + NANOSECONDS = _names['nanosecond'] + MICROSECONDS = _names['microsecond'] + MILLISECONDS = _names['millisecond'] + SECONDS = _names['second'] + MINUTES = _names['minute'] + HOURS = _names['hour'] + DAYS = _names['day'] + + @staticmethod + def get_name(time_unit): + return TimeUnit._names[time_unit] + + +class Rate(AbstractMeasurableStat): + """ + The rate of the given quantity. By default this is the total observed + over a set of samples from a sampled statistic divided by the elapsed + time over the sample windows. Alternative AbstractSampledStat + implementations can be provided, however, to record the rate of + occurrences (e.g. the count of values measured over the time interval) + or other such values. + """ + def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None): + self._stat = sampled_stat or SampledTotal() + self._unit = time_unit + + def unit_name(self): + return TimeUnit.get_name(self._unit) + + def record(self, config, value, time_ms): + self._stat.record(config, value, time_ms) + + def measure(self, config, now): + value = self._stat.measure(config, now) + return float(value) / self.convert(self.window_size(config, now)) + + def window_size(self, config, now): + # purge old samples before we compute the window size + self._stat.purge_obsolete_samples(config, now) + + """ + Here we check the total amount of time elapsed since the oldest + non-obsolete window. This give the total window_size of the batch + which is the time used for Rate computation. However, there is + an issue if we do not have sufficient data for e.g. if only + 1 second has elapsed in a 30 second window, the measured rate + will be very high. Hence we assume that the elapsed time is + always N-1 complete windows plus whatever fraction of the final + window is complete. + + Note that we could simply count the amount of time elapsed in + the current window and add n-1 windows to get the total time, + but this approach does not account for sleeps. AbstractSampledStat + only creates samples whenever record is called, if no record is + called for a period of time that time is not accounted for in + window_size and produces incorrect results. + """ + total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms + # Check how many full windows of data we have currently retained + num_full_windows = int(total_elapsed_time_ms / config.time_window_ms) + min_full_windows = config.samples - 1 + + # If the available windows are less than the minimum required, + # add the difference to the totalElapsedTime + if num_full_windows < min_full_windows: + total_elapsed_time_ms += ((min_full_windows - num_full_windows) * + config.time_window_ms) + + return total_elapsed_time_ms + + def convert(self, time_ms): + if self._unit == TimeUnit.NANOSECONDS: + return time_ms * 1000.0 * 1000.0 + elif self._unit == TimeUnit.MICROSECONDS: + return time_ms * 1000.0 + elif self._unit == TimeUnit.MILLISECONDS: + return time_ms + elif self._unit == TimeUnit.SECONDS: + return time_ms / 1000.0 + elif self._unit == TimeUnit.MINUTES: + return time_ms / (60.0 * 1000.0) + elif self._unit == TimeUnit.HOURS: + return time_ms / (60.0 * 60.0 * 1000.0) + elif self._unit == TimeUnit.DAYS: + return time_ms / (24.0 * 60.0 * 60.0 * 1000.0) + else: + raise ValueError('Unknown unit: %s' % self._unit) + + +class SampledTotal(AbstractSampledStat): + def __init__(self, initial_value=None): + if initial_value is not None: + raise ValueError('initial_value cannot be set on SampledTotal') + super(SampledTotal, self).__init__(0.0) + + def update(self, sample, config, value, time_ms): + sample.value += value + + def combine(self, samples, config, now): + return float(sum(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/sampled_stat.py b/kafka/metrics/stats/sampled_stat.py new file mode 100644 index 0000000..ca0db69 --- /dev/null +++ b/kafka/metrics/stats/sampled_stat.py @@ -0,0 +1,99 @@ +import abc + +from kafka.metrics.measurable_stat import AbstractMeasurableStat + + +class AbstractSampledStat(AbstractMeasurableStat): + """ + An AbstractSampledStat records a single scalar value measured over + one or more samples. Each sample is recorded over a configurable + window. The window can be defined by number of events or elapsed + time (or both, if both are given the window is complete when + *either* the event count or elapsed time criterion is met). + + All the samples are combined to produce the measurement. When a + window is complete the oldest sample is cleared and recycled to + begin recording the next sample. + + Subclasses of this class define different statistics measured + using this basic pattern. + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, initial_value): + self._initial_value = initial_value + self._samples = [] + self._current = 0 + + @abc.abstractmethod + def update(self, sample, config, value, time_ms): + raise NotImplementedError + + @abc.abstractmethod + def combine(self, samples, config, now): + raise NotImplementedError + + def record(self, config, value, time_ms): + sample = self.current(time_ms) + if sample.is_complete(time_ms, config): + sample = self._advance(config, time_ms) + self.update(sample, config, float(value), time_ms) + sample.event_count += 1 + + def new_sample(self, time_ms): + return self.Sample(self._initial_value, time_ms) + + def measure(self, config, now): + self.purge_obsolete_samples(config, now) + return float(self.combine(self._samples, config, now)) + + def current(self, time_ms): + if not self._samples: + self._samples.append(self.new_sample(time_ms)) + return self._samples[self._current] + + def oldest(self, now): + if not self._samples: + self._samples.append(self.new_sample(now)) + oldest = self._samples[0] + for sample in self._samples[1:]: + if sample.last_window_ms < oldest.last_window_ms: + oldest = sample + return oldest + + def purge_obsolete_samples(self, config, now): + """ + Timeout any windows that have expired in the absence of any events + """ + expire_age = config.samples * config.time_window_ms + for sample in self._samples: + if now - sample.last_window_ms >= expire_age: + sample.reset(now) + + def _advance(self, config, time_ms): + self._current = (self._current + 1) % config.samples + if self._current >= len(self._samples): + sample = self.new_sample(time_ms) + self._samples.append(sample) + return sample + else: + sample = self.current(time_ms) + sample.reset(time_ms) + return sample + + class Sample(object): + + def __init__(self, initial_value, now): + self.initial_value = initial_value + self.event_count = 0 + self.last_window_ms = now + self.value = initial_value + + def reset(self, now): + self.event_count = 0 + self.last_window_ms = now + self.value = self.initial_value + + def is_complete(self, time_ms, config): + return (time_ms - self.last_window_ms >= config.time_window_ms or + self.event_count >= config.event_window) diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py new file mode 100644 index 0000000..7d179cb --- /dev/null +++ b/kafka/metrics/stats/sensor.py @@ -0,0 +1,133 @@ +import threading +import time + +from kafka.errors import QuotaViolationError +from kafka.metrics import KafkaMetric + + +class Sensor(object): + """ + A sensor applies a continuous sequence of numerical values + to a set of associated metrics. For example a sensor on + message size would record a sequence of message sizes using + the `record(double)` api and would maintain a set + of metrics about request sizes such as the average or max. + """ + def __init__(self, registry, name, parents, config, + inactive_sensor_expiration_time_seconds): + if not name: + raise ValueError('name must be non-empty') + self._lock = threading.RLock() + self._registry = registry + self._name = name + self._parents = parents or [] + self._metrics = [] + self._stats = [] + self._config = config + self._inactive_sensor_expiration_time_ms = ( + inactive_sensor_expiration_time_seconds * 1000) + self._last_record_time = time.time() * 1000 + self._check_forest(set()) + + def _check_forest(self, sensors): + """Validate that this sensor doesn't end up referencing itself.""" + if self in sensors: + raise ValueError('Circular dependency in sensors: %s is its own' + 'parent.' % self.name) + sensors.add(self) + for parent in self._parents: + parent._check_forest(sensors) + + @property + def name(self): + """ + The name this sensor is registered with. + This name will be unique among all registered sensors. + """ + return self._name + + @property + def metrics(self): + return tuple(self._metrics) + + def record(self, value=1.0, time_ms=None): + """ + Record a value at a known time. + Arguments: + value (double): The value we are recording + time_ms (int): The current POSIX time in milliseconds + + Raises: + QuotaViolationException: if recording this value moves a + metric beyond its configured maximum or minimum bound + """ + now = time.time() * 1000 + if time_ms is None: + time_ms = now + self._last_record_time = now + with self._lock: # XXX high volume, might be performance issue + # increment all the stats + for stat in self._stats: + stat.record(self._config, value, time_ms) + self._check_quotas(time_ms) + for parent in self._parents: + parent.record(value, time_ms) + + def _check_quotas(self, time_ms): + """ + Check if we have violated our quota for any metric that + has a configured quota + """ + for metric in self._metrics: + if metric.config and metric.config.quota: + value = metric.value(time_ms) + if not metric.config.quota.is_acceptable(value): + raise QuotaViolationError('(%s) violated quota. Actual: ' + '(%d), Threshold: (%d)' % + (metric.metric_name, + metric.config.quota.bound, + value)) + + def add_compound(self, compound_stat, config=None): + """ + Register a compound statistic with this sensor which + yields multiple measurable quantities (like a histogram) + + Arguments: + stat (AbstractCompoundStat): The stat to register + config (MetricConfig): The configuration for this stat. + If None then the stat will use the default configuration + for this sensor. + """ + if not compound_stat: + raise ValueError('compound stat must be non-empty') + self._stats.append(compound_stat) + for named_measurable in compound_stat.stats(): + metric = KafkaMetric(self._lock, named_measurable.name, + named_measurable.stat, config or self._config) + self._registry.register_metric(metric) + self._metrics.append(metric) + + def add(self, metric_name, stat, config=None): + """ + Register a metric with this sensor + + Arguments: + metric_name (MetricName): The name of the metric + stat (AbstractMeasurableStat): The statistic to keep + config (MetricConfig): A special configuration for this metric. + If None use the sensor default configuration. + """ + with self._lock: + metric = KafkaMetric(threading.Lock(), metric_name, stat, + config or self._config) + self._registry.register_metric(metric) + self._metrics.append(metric) + self._stats.append(stat) + + def has_expired(self): + """ + Return True if the Sensor is eligible for removal due to inactivity. + """ + return ((time.time() * 1000 - self._last_record_time) > + self._inactive_sensor_expiration_time_ms) diff --git a/kafka/metrics/stats/total.py b/kafka/metrics/stats/total.py new file mode 100644 index 0000000..76a82d8 --- /dev/null +++ b/kafka/metrics/stats/total.py @@ -0,0 +1,13 @@ +from kafka.metrics.measurable_stat import AbstractMeasurableStat + + +class Total(AbstractMeasurableStat): + """An un-windowed cumulative total maintained over all time.""" + def __init__(self, value=0.0): + self._total = value + + def record(self, config, value, now): + self._total += value + + def measure(self, config, now): + return float(self._total) |
