summaryrefslogtreecommitdiff
path: root/kafka/metrics/stats/sampled_stat.py
blob: c41b14bbc73acc3b453ad4c6a38b2b8f03e61dc0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from __future__ import absolute_import

import abc

from kafka.metrics.measurable_stat import AbstractMeasurableStat


class AbstractSampledStat(AbstractMeasurableStat):
    """
    An AbstractSampledStat records a single scalar value measured over
    one or more samples. Each sample is recorded over a configurable
    window. The window can be defined by number of events or elapsed
    time (or both, if both are given the window is complete when
    *either* the event count or elapsed time criterion is met).

    All the samples are combined to produce the measurement. When a
    window is complete the oldest sample is cleared and recycled to
    begin recording the next sample.

    Subclasses of this class define different statistics measured
    using this basic pattern.
    """
    __metaclass__ = abc.ABCMeta

    def __init__(self, initial_value):
        self._initial_value = initial_value
        self._samples = []
        self._current = 0

    @abc.abstractmethod
    def update(self, sample, config, value, time_ms):
        raise NotImplementedError

    @abc.abstractmethod
    def combine(self, samples, config, now):
        raise NotImplementedError

    def record(self, config, value, time_ms):
        sample = self.current(time_ms)
        if sample.is_complete(time_ms, config):
            sample = self._advance(config, time_ms)
        self.update(sample, config, float(value), time_ms)
        sample.event_count += 1

    def new_sample(self, time_ms):
        return self.Sample(self._initial_value, time_ms)

    def measure(self, config, now):
        self.purge_obsolete_samples(config, now)
        return float(self.combine(self._samples, config, now))

    def current(self, time_ms):
        if not self._samples:
            self._samples.append(self.new_sample(time_ms))
        return self._samples[self._current]

    def oldest(self, now):
        if not self._samples:
            self._samples.append(self.new_sample(now))
        oldest = self._samples[0]
        for sample in self._samples[1:]:
            if sample.last_window_ms < oldest.last_window_ms:
                oldest = sample
        return oldest

    def purge_obsolete_samples(self, config, now):
        """
        Timeout any windows that have expired in the absence of any events
        """
        expire_age = config.samples * config.time_window_ms
        for sample in self._samples:
            if now - sample.last_window_ms >= expire_age:
                sample.reset(now)

    def _advance(self, config, time_ms):
        self._current = (self._current + 1) % config.samples
        if self._current >= len(self._samples):
            sample = self.new_sample(time_ms)
            self._samples.append(sample)
            return sample
        else:
            sample = self.current(time_ms)
            sample.reset(time_ms)
            return sample

    class Sample(object):

        def __init__(self, initial_value, now):
            self.initial_value = initial_value
            self.event_count = 0
            self.last_window_ms = now
            self.value = initial_value

        def reset(self, now):
            self.event_count = 0
            self.last_window_ms = now
            self.value = self.initial_value

        def is_complete(self, time_ms, config):
            return (time_ms - self.last_window_ms >= config.time_window_ms or
                    self.event_count >= config.event_window)