1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
from __future__ import absolute_import
from kafka.metrics.measurable_stat import AbstractMeasurableStat
from kafka.metrics.stats.sampled_stat import AbstractSampledStat
class TimeUnit(object):
_names = {
'nanosecond': 0,
'microsecond': 1,
'millisecond': 2,
'second': 3,
'minute': 4,
'hour': 5,
'day': 6,
}
NANOSECONDS = _names['nanosecond']
MICROSECONDS = _names['microsecond']
MILLISECONDS = _names['millisecond']
SECONDS = _names['second']
MINUTES = _names['minute']
HOURS = _names['hour']
DAYS = _names['day']
@staticmethod
def get_name(time_unit):
return TimeUnit._names[time_unit]
class Rate(AbstractMeasurableStat):
"""
The rate of the given quantity. By default this is the total observed
over a set of samples from a sampled statistic divided by the elapsed
time over the sample windows. Alternative AbstractSampledStat
implementations can be provided, however, to record the rate of
occurrences (e.g. the count of values measured over the time interval)
or other such values.
"""
def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None):
self._stat = sampled_stat or SampledTotal()
self._unit = time_unit
def unit_name(self):
return TimeUnit.get_name(self._unit)
def record(self, config, value, time_ms):
self._stat.record(config, value, time_ms)
def measure(self, config, now):
value = self._stat.measure(config, now)
return float(value) / self.convert(self.window_size(config, now))
def window_size(self, config, now):
# purge old samples before we compute the window size
self._stat.purge_obsolete_samples(config, now)
"""
Here we check the total amount of time elapsed since the oldest
non-obsolete window. This give the total window_size of the batch
which is the time used for Rate computation. However, there is
an issue if we do not have sufficient data for e.g. if only
1 second has elapsed in a 30 second window, the measured rate
will be very high. Hence we assume that the elapsed time is
always N-1 complete windows plus whatever fraction of the final
window is complete.
Note that we could simply count the amount of time elapsed in
the current window and add n-1 windows to get the total time,
but this approach does not account for sleeps. AbstractSampledStat
only creates samples whenever record is called, if no record is
called for a period of time that time is not accounted for in
window_size and produces incorrect results.
"""
total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms
# Check how many full windows of data we have currently retained
num_full_windows = int(total_elapsed_time_ms / config.time_window_ms)
min_full_windows = config.samples - 1
# If the available windows are less than the minimum required,
# add the difference to the totalElapsedTime
if num_full_windows < min_full_windows:
total_elapsed_time_ms += ((min_full_windows - num_full_windows) *
config.time_window_ms)
return total_elapsed_time_ms
def convert(self, time_ms):
if self._unit == TimeUnit.NANOSECONDS:
return time_ms * 1000.0 * 1000.0
elif self._unit == TimeUnit.MICROSECONDS:
return time_ms * 1000.0
elif self._unit == TimeUnit.MILLISECONDS:
return time_ms
elif self._unit == TimeUnit.SECONDS:
return time_ms / 1000.0
elif self._unit == TimeUnit.MINUTES:
return time_ms / (60.0 * 1000.0)
elif self._unit == TimeUnit.HOURS:
return time_ms / (60.0 * 60.0 * 1000.0)
elif self._unit == TimeUnit.DAYS:
return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
else:
raise ValueError('Unknown unit: %s' % (self._unit,))
class SampledTotal(AbstractSampledStat):
def __init__(self, initial_value=None):
if initial_value is not None:
raise ValueError('initial_value cannot be set on SampledTotal')
super(SampledTotal, self).__init__(0.0)
def update(self, sample, config, value, time_ms):
sample.value += value
def combine(self, samples, config, now):
return float(sum(sample.value for sample in samples))
|