summaryrefslogtreecommitdiff
path: root/kafka/metrics/stats/percentiles.py
blob: b55c5acccfc170c5bcecb19b22ca2739670c024f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from __future__ import absolute_import

from kafka.metrics import AnonMeasurable, NamedMeasurable
from kafka.metrics.compound_stat import AbstractCompoundStat
from kafka.metrics.stats import Histogram
from kafka.metrics.stats.sampled_stat import AbstractSampledStat


class BucketSizing(object):
    CONSTANT = 0
    LINEAR = 1


class Percentiles(AbstractSampledStat, AbstractCompoundStat):
    """A compound stat that reports one or more percentiles"""
    def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
                 percentiles=None):
        super(Percentiles, self).__init__(0.0)
        self._percentiles = percentiles or []
        self._buckets = int(size_in_bytes / 4)
        if bucketing == BucketSizing.CONSTANT:
            self._bin_scheme = Histogram.ConstantBinScheme(self._buckets,
                                                           min_val, max_val)
        elif bucketing == BucketSizing.LINEAR:
            if min_val != 0.0:
                raise ValueError('Linear bucket sizing requires min_val'
                                 ' to be 0.0.')
            self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
        else:
            ValueError('Unknown bucket type: %s' % bucketing)

    def stats(self):
        measurables = []

        def make_measure_fn(pct):
            return lambda config, now: self.value(config, now,
                                                  pct / 100.0)

        for percentile in self._percentiles:
            measure_fn = make_measure_fn(percentile.percentile)
            stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn))
            measurables.append(stat)
        return measurables

    def value(self, config, now, quantile):
        self.purge_obsolete_samples(config, now)
        count = sum(sample.event_count for sample in self._samples)
        if count == 0.0:
            return float('NaN')
        sum_val = 0.0
        quant = float(quantile)
        for b in range(self._buckets):
            for sample in self._samples:
                assert type(sample) is self.HistogramSample
                hist = sample.histogram.counts
                sum_val += hist[b]
                if sum_val / count > quant:
                    return self._bin_scheme.from_bin(b)
        return float('inf')

    def combine(self, samples, config, now):
        return self.value(config, now, 0.5)

    def new_sample(self, time_ms):
        return Percentiles.HistogramSample(self._bin_scheme, time_ms)

    def update(self, sample, config, value, time_ms):
        assert type(sample) is self.HistogramSample
        sample.histogram.record(value)

    class HistogramSample(AbstractSampledStat.Sample):
        def __init__(self, scheme, now):
            super(Percentiles.HistogramSample, self).__init__(0.0, now)
            self.histogram = Histogram(scheme)