diff options
25 files changed, 1716 insertions, 0 deletions
diff --git a/kafka/errors.py b/kafka/errors.py index a36ee75..dd64b04 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -361,6 +361,10 @@ class KafkaConfigurationError(KafkaError): pass +class QuotaViolationError(KafkaError): + pass + + class AsyncProducerQueueFull(KafkaError): def __init__(self, failed_msgs, *args): super(AsyncProducerQueueFull, self).__init__(*args) diff --git a/kafka/metrics/__init__.py b/kafka/metrics/__init__.py new file mode 100644 index 0000000..b930dea --- /dev/null +++ b/kafka/metrics/__init__.py @@ -0,0 +1,12 @@ +from .compound_stat import NamedMeasurable +from .kafka_metric import KafkaMetric +from .measurable import AnonMeasurable +from .metric_config import MetricConfig +from .metric_name import MetricName +from .metrics import Metrics +from .quota import Quota + +__all__ = [ + 'AnonMeasurable', 'KafkaMetric', 'MetricConfig', + 'MetricName', 'Metrics', 'NamedMeasurable', 'Quota' +] diff --git a/kafka/metrics/compound_stat.py b/kafka/metrics/compound_stat.py new file mode 100644 index 0000000..09bc24a --- /dev/null +++ b/kafka/metrics/compound_stat.py @@ -0,0 +1,32 @@ +import abc + +from kafka.metrics.stat import AbstractStat + + +class AbstractCompoundStat(AbstractStat): + """ + A compound stat is a stat where a single measurement and associated + data structure feeds many metrics. This is the example for a + histogram which has many associated percentiles. + """ + __metaclass__ = abc.ABCMeta + + def stats(self): + """ + Return list of NamedMeasurable + """ + raise NotImplementedError + + +class NamedMeasurable(object): + def __init__(self, metric_name, measurable_stat): + self._name = metric_name + self._stat = measurable_stat + + @property + def name(self): + return self._name + + @property + def stat(self): + return self._stat diff --git a/kafka/metrics/kafka_metric.py b/kafka/metrics/kafka_metric.py new file mode 100644 index 0000000..8bd1b75 --- /dev/null +++ b/kafka/metrics/kafka_metric.py @@ -0,0 +1,36 @@ +import time + + +class KafkaMetric(object): + def __init__(self, lock, metric_name, measurable, config): + if not metric_name: + raise ValueError('metric_name must be non-empty') + if not measurable: + raise ValueError('measurable must be non-empty') + self._metric_name = metric_name + self._lock = lock + self._measurable = measurable + self._config = config + + @property + def metric_name(self): + return self._metric_name + + @property + def measurable(self): + return self._measurable + + @property + def config(self): + return self._config + + @config.setter + def config(self, config): + with self._lock: + self._config = config + + def value(self, time_ms=None): + if time_ms is None: + # with (self._lock): This doesn't seem necessary? + time_ms = time.time() * 1000 + return self.measurable.measure(self.config, time_ms) diff --git a/kafka/metrics/measurable.py b/kafka/metrics/measurable.py new file mode 100644 index 0000000..ef096f3 --- /dev/null +++ b/kafka/metrics/measurable.py @@ -0,0 +1,27 @@ +import abc + + +class AbstractMeasurable(object): + """A measurable quantity that can be registered as a metric""" + @abc.abstractmethod + def measure(self, config, now): + """ + Measure this quantity and return the result + + Arguments: + config (MetricConfig): The configuration for this metric + now (int): The POSIX time in milliseconds the measurement + is being taken + + Returns: + The measured value + """ + raise NotImplementedError + + +class AnonMeasurable(AbstractMeasurable): + def __init__(self, measure_fn): + self._measure_fn = measure_fn + + def measure(self, config, now): + return float(self._measure_fn(config, now)) diff --git a/kafka/metrics/measurable_stat.py b/kafka/metrics/measurable_stat.py new file mode 100644 index 0000000..dba887d --- /dev/null +++ b/kafka/metrics/measurable_stat.py @@ -0,0 +1,14 @@ +import abc + +from kafka.metrics.measurable import AbstractMeasurable +from kafka.metrics.stat import AbstractStat + + +class AbstractMeasurableStat(AbstractStat, AbstractMeasurable): + """ + An AbstractMeasurableStat is an AbstractStat that is also + an AbstractMeasurable (i.e. can produce a single floating point value). + This is the interface used for most of the simple statistics such + as Avg, Max, Count, etc. + """ + __metaclass__ = abc.ABCMeta diff --git a/kafka/metrics/metric_config.py b/kafka/metrics/metric_config.py new file mode 100644 index 0000000..e30c477 --- /dev/null +++ b/kafka/metrics/metric_config.py @@ -0,0 +1,31 @@ +import sys + + +class MetricConfig(object): + """Configuration values for metrics""" + def __init__(self, quota=None, samples=2, event_window=sys.maxsize, + time_window_ms=30 * 1000, tags=None): + """ + Arguments: + quota (Quota, optional): Upper or lower bound of a value. + samples (int, optional): Max number of samples kept per metric. + event_window (int, optional): Max number of values per sample. + time_window_ms (int, optional): Max age of an individual sample. + tags (dict of {str: str}, optional): Tags for each metric. + """ + self.quota = quota + self._samples = samples + self.event_window = event_window + self.time_window_ms = time_window_ms + # tags should be OrderedDict (not supported in py26) + self.tags = tags if tags else {} + + @property + def samples(self): + return self._samples + + @samples.setter + def samples(self, value): + if value < 1: + raise ValueError('The number of samples must be at least 1.') + self._samples = value diff --git a/kafka/metrics/metric_name.py b/kafka/metrics/metric_name.py new file mode 100644 index 0000000..02068f0 --- /dev/null +++ b/kafka/metrics/metric_name.py @@ -0,0 +1,104 @@ +import copy + + +class MetricName(object): + """ + This class encapsulates a metric's name, logical group and its + related attributes (tags). + + group, tags parameters can be used to create unique metric names. + e.g. domainName:type=group,key1=val1,key2=val2 + + Usage looks something like this: + + # set up metrics: + metric_tags = {'client-id': 'producer-1', 'topic': 'topic'} + metric_config = MetricConfig(tags=metric_tags) + + # metrics is the global repository of metrics and sensors + metrics = Metrics(metric_config) + + sensor = metrics.sensor('message-sizes') + metric_name = metrics.metric_name('message-size-avg', + 'producer-metrics', + 'average message size') + sensor.add(metric_name, Avg()) + + metric_name = metrics.metric_name('message-size-max', + sensor.add(metric_name, Max()) + + tags = {'client-id': 'my-client', 'topic': 'my-topic'} + metric_name = metrics.metric_name('message-size-min', + 'producer-metrics', + 'message minimum size', tags) + sensor.add(metric_name, Min()) + + # as messages are sent we record the sizes + sensor.record(message_size) + """ + + def __init__(self, name, group, description=None, tags=None): + """ + Arguments: + name (str): The name of the metric. + group (str): The logical group name of the metrics to which this + metric belongs. + description (str, optional): A human-readable description to + include in the metric. + tags (dict, optional): Additional key/val attributes of the metric. + """ + if not (name and group): + raise Exception('name and group must be non-empty.') + if tags is not None and not isinstance(tags, dict): + raise Exception('tags must be a dict if present.') + + self._name = name + self._group = group + self._description = description + self._tags = copy.copy(tags) + self._hash = 0 + + @property + def name(self): + return self._name + + @property + def group(self): + return self._group + + @property + def description(self): + return self._description + + @property + def tags(self): + return copy.copy(self._tags) + + def __hash__(self): + if self._hash != 0: + return self._hash + prime = 31 + result = 1 + result = prime * result + hash(self.group) + result = prime * result + hash(self.name) + tags_hash = hash(frozenset(self.tags.items())) if self.tags else 0 + result = prime * result + tags_hash + self._hash = result + return result + + def __eq__(self, other): + if self is other: + return True + if other is None: + return False + return (type(self) == type(other) and + self.group == other.group and + self.name == other.name and + self.tags == other.tags) + + def __ne__(self, other): + return not self.__eq__(other) + + def __str__(self): + return 'MetricName(name=%s, group=%s, description=%s, tags=%s)' % ( + self.name, self.group, self.description, self.tags) diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py new file mode 100644 index 0000000..0920794 --- /dev/null +++ b/kafka/metrics/metrics.py @@ -0,0 +1,254 @@ +import logging +import sys +import time +import threading + +from kafka.metrics import AnonMeasurable, KafkaMetric, MetricConfig, MetricName +from kafka.metrics.stats import Sensor + +logger = logging.getLogger(__name__) + + +class Metrics(object): + """ + A registry of sensors and metrics. + + A metric is a named, numerical measurement. A sensor is a handle to + record numerical measurements as they occur. Each Sensor has zero or + more associated metrics. For example a Sensor might represent message + sizes and we might associate with this sensor a metric for the average, + maximum, or other statistics computed off the sequence of message sizes + that are recorded by the sensor. + + Usage looks something like this: + # set up metrics: + metrics = Metrics() # the global repository of metrics and sensors + sensor = metrics.sensor('message-sizes') + metric_name = MetricName('message-size-avg', 'producer-metrics') + sensor.add(metric_name, Avg()) + metric_name = MetricName('message-size-max', 'producer-metrics') + sensor.add(metric_name, Max()) + + # as messages are sent we record the sizes + sensor.record(message_size); + """ + def __init__(self, default_config=None, reporters=None, + enable_expiration=False): + """ + Create a metrics repository with a default config, given metric + reporters and the ability to expire eligible sensors + + Arguments: + default_config (MetricConfig, optional): The default config + reporters (list of AbstractMetricsReporter, optional): + The metrics reporters + enable_expiration (bool, optional): true if the metrics instance + can garbage collect inactive sensors, false otherwise + """ + self._lock = threading.RLock() + self._config = default_config or MetricConfig() + self._sensors = {} + self._metrics = {} + self._children_sensors = {} + self._reporters = reporters or [] + for reporter in self._reporters: + reporter.init([]) + + if enable_expiration: + def expire_loop(): + while True: + # delay 30 seconds + time.sleep(30) + self.ExpireSensorTask.run(self) + metrics_scheduler = threading.Thread(target=expire_loop) + # Creating a daemon thread to not block shutdown + metrics_scheduler.daemon = True + metrics_scheduler.start() + + self.add_metric(self.metric_name('count', 'kafka-metrics-count', + 'total number of registered metrics'), + AnonMeasurable(lambda config, now: len(self._metrics))) + + @property + def config(self): + return self._config + + @property + def metrics(self): + """ + Get all the metrics currently maintained and indexed by metricName + """ + return self._metrics + + def metric_name(self, name, group, description='', tags=None): + """ + Create a MetricName with the given name, group, description and tags, + plus default tags specified in the metric configuration. + Tag in tags takes precedence if the same tag key is specified in + the default metric configuration. + + Arguments: + name (str): The name of the metric + group (str): logical group name of the metrics to which this + metric belongs + description (str, optional): A human-readable description to + include in the metric + tags (dict, optionals): additional key/value attributes of + the metric + """ + combined_tags = dict(self.config.tags) + combined_tags.update(tags or {}) + return MetricName(name, group, description, combined_tags) + + def get_sensor(self, name): + """ + Get the sensor with the given name if it exists + + Arguments: + name (str): The name of the sensor + + Returns: + Sensor: The sensor or None if no such sensor exists + """ + if not name: + raise ValueError('name must be non-empty') + return self._sensors.get(name, None) + + def sensor(self, name, config=None, + inactive_sensor_expiration_time_seconds=sys.maxsize, + parents=None): + """ + Get or create a sensor with the given unique name and zero or + more parent sensors. All parent sensors will receive every value + recorded with this sensor. + + Arguments: + name (str): The name of the sensor + config (MetricConfig, optional): A default configuration to use + for this sensor for metrics that don't have their own config + inactive_sensor_expiration_time_seconds (int, optional): + If no value if recorded on the Sensor for this duration of + time, it is eligible for removal + parents (list of Sensor): The parent sensors + + Returns: + Sensor: The sensor that is created + """ + with self._lock: + sensor = self.get_sensor(name) + if not sensor: + sensor = Sensor(self, name, parents, config or self.config, + inactive_sensor_expiration_time_seconds) + self._sensors[name] = sensor + if parents: + for parent in parents: + children = self._children_sensors.get(parent) + if not children: + children = [] + self._children_sensors[parent] = children + children.append(sensor) + logger.debug('Added sensor with name %s', name) + return sensor + + def remove_sensor(self, name): + """ + Remove a sensor (if it exists), associated metrics and its children. + + Arguments: + name (str): The name of the sensor to be removed + """ + sensor = self._sensors.get(name) + if sensor: + child_sensors = None + with sensor._lock: + with self._lock: + val = self._sensors.pop(name, None) + if val and val == sensor: + for metric in sensor.metrics: + self.remove_metric(metric.metric_name) + logger.debug('Removed sensor with name %s', name) + child_sensors = self._children_sensors.pop(sensor, None) + if child_sensors: + for child_sensor in child_sensors: + self.remove_sensor(child_sensor.name) + + def add_metric(self, metric_name, measurable, config=None): + """ + Add a metric to monitor an object that implements measurable. + This metric won't be associated with any sensor. + This is a way to expose existing values as metrics. + + Arguments: + metricName (MetricName): The name of the metric + measurable (AbstractMeasurable): The measurable that will be + measured by this metric + config (MetricConfig, optional): The configuration to use when + measuring this measurable + """ + with self._lock: + metric = KafkaMetric(threading.Lock(), metric_name, measurable, + config or self.config) + self.register_metric(metric) + + def remove_metric(self, metric_name): + """ + Remove a metric if it exists and return it. Return None otherwise. + If a metric is removed, `metric_removal` will be invoked + for each reporter. + + Arguments: + metric_name (MetricName): The name of the metric + + Returns: + KafkaMetric: the removed `KafkaMetric` or None if no such + metric exists + """ + with self._lock: + metric = self._metrics.pop(metric_name, None) + if metric: + for reporter in self._reporters: + reporter.metric_removal(metric) + return metric + + def add_reporter(self, reporter): + """Add a MetricReporter""" + with self._lock: + reporter.init(list(self.metrics.values())) + self._reporters.append(reporter) + + def register_metric(self, metric): + with self._lock: + if metric.metric_name in self.metrics: + raise ValueError('A metric named "%s" already exists, cannot' + ' register another one.' % metric.metric_name) + self.metrics[metric.metric_name] = metric + for reporter in self._reporters: + reporter.metric_change(metric) + + class ExpireSensorTask(object): + """ + This iterates over every Sensor and triggers a remove_sensor + if it has expired. Package private for testing + """ + @staticmethod + def run(metrics): + items = list(metrics._sensors.items()) + for name, sensor in items: + # remove_sensor also locks the sensor object. This is fine + # because synchronized is reentrant. There is however a minor + # race condition here. Assume we have a parent sensor P and + # child sensor C. Calling record on C would cause a record on + # P as well. So expiration time for P == expiration time for C. + # If the record on P happens via C just after P is removed, + # that will cause C to also get removed. Since the expiration + # time is typically high it is not expected to be a significant + # concern and thus not necessary to optimize + with sensor._lock: + if sensor.has_expired(): + logger.debug('Removing expired sensor %s', name) + metrics.remove_sensor(name) + + def close(self): + """Close this metrics repository.""" + for reporter in self._reporters: + reporter.close() diff --git a/kafka/metrics/metrics_reporter.py b/kafka/metrics/metrics_reporter.py new file mode 100644 index 0000000..b48ad0b --- /dev/null +++ b/kafka/metrics/metrics_reporter.py @@ -0,0 +1,55 @@ +import abc + + +class AbstractMetricsReporter(object): + """ + An abstract class to allow things to listen as new metrics + are created so they can be reported. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def init(self, metrics): + """ + This is called when the reporter is first registered + to initially register all existing metrics + + Arguments: + metrics (list of KafkaMetric): All currently existing metrics + """ + raise NotImplementedError + + @abc.abstractmethod + def metric_change(self, metric): + """ + This is called whenever a metric is updated or added + + Arguments: + metric (KafkaMetric) + """ + raise NotImplementedError + + @abc.abstractmethod + def metric_removal(self, metric): + """ + This is called whenever a metric is removed + + Arguments: + metric (KafkaMetric) + """ + raise NotImplementedError + + @abc.abstractmethod + def configure(self, configs): + """ + Configure this class with the given key-value pairs + + Arguments: + configs (dict of {str, ?}) + """ + raise NotImplementedError + + @abc.abstractmethod + def close(self): + """Called when the metrics repository is closed.""" + raise NotImplementedError diff --git a/kafka/metrics/quota.py b/kafka/metrics/quota.py new file mode 100644 index 0000000..0410e37 --- /dev/null +++ b/kafka/metrics/quota.py @@ -0,0 +1,39 @@ +class Quota(object): + """An upper or lower bound for metrics""" + def __init__(self, bound, is_upper): + self._bound = bound + self._upper = is_upper + + @staticmethod + def upper_bound(upper_bound): + return Quota(upper_bound, True) + + @staticmethod + def lower_bound(lower_bound): + return Quota(lower_bound, False) + + def is_upper_bound(self): + return self._upper + + @property + def bound(self): + return self._bound + + def is_acceptable(self, value): + return ((self.is_upper_bound() and value <= self.bound) or + (not self.is_upper_bound() and value >= self.bound)) + + def __hash__(self): + prime = 31 + result = prime + self.bound + return prime * result + self.is_upper_bound() + + def __eq__(self, other): + if self is other: + return True + return (type(self) == type(other) and + self.bound == other.bound and + self.is_upper_bound() == other.is_upper_bound()) + + def __ne__(self, other): + return not self.__eq__(other) diff --git a/kafka/metrics/stat.py b/kafka/metrics/stat.py new file mode 100644 index 0000000..c10f3ce --- /dev/null +++ b/kafka/metrics/stat.py @@ -0,0 +1,21 @@ +import abc + + +class AbstractStat(object): + """ + An AbstractStat is a quantity such as average, max, etc that is computed + off the stream of updates to a sensor + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def record(self, config, value, time_ms): + """ + Record the given value + + Arguments: + config (MetricConfig): The configuration to use for this metric + value (float): The value to record + timeMs (int): The POSIX time in milliseconds this value occurred + """ + raise NotImplementedError diff --git a/kafka/metrics/stats/__init__.py b/kafka/metrics/stats/__init__.py new file mode 100644 index 0000000..15eafd9 --- /dev/null +++ b/kafka/metrics/stats/__init__.py @@ -0,0 +1,15 @@ +from .avg import Avg +from .count import Count +from .histogram import Histogram +from .max_stat import Max +from .min_stat import Min +from .percentile import Percentile +from .percentiles import Percentiles +from .rate import Rate +from .sensor import Sensor +from .total import Total + +__all__ = [ + 'Avg', 'Count', 'Histogram', 'Max', 'Min', 'Percentile', 'Percentiles', + 'Rate', 'Sensor', 'Total' +] diff --git a/kafka/metrics/stats/avg.py b/kafka/metrics/stats/avg.py new file mode 100644 index 0000000..4d0be0a --- /dev/null +++ b/kafka/metrics/stats/avg.py @@ -0,0 +1,22 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Avg(AbstractSampledStat): + """ + An AbstractSampledStat that maintains a simple average over its samples. + """ + def __init__(self): + super(Avg, self).__init__(0.0) + + def update(self, sample, config, value, now): + sample.value += value + + def combine(self, samples, config, now): + total_sum = 0 + total_count = 0 + for sample in samples: + total_sum += sample.value + total_count += sample.event_count + if not total_count: + return 0 + return float(total_sum) / total_count diff --git a/kafka/metrics/stats/count.py b/kafka/metrics/stats/count.py new file mode 100644 index 0000000..183e4f2 --- /dev/null +++ b/kafka/metrics/stats/count.py @@ -0,0 +1,15 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Count(AbstractSampledStat): + """ + An AbstractSampledStat that maintains a simple count of what it has seen. + """ + def __init__(self): + super(Count, self).__init__(0.0) + + def update(self, sample, config, value, now): + sample.value += 1.0 + + def combine(self, samples, config, now): + return float(sum(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/histogram.py b/kafka/metrics/stats/histogram.py new file mode 100644 index 0000000..42aacdb --- /dev/null +++ b/kafka/metrics/stats/histogram.py @@ -0,0 +1,93 @@ +import math + + +class Histogram(object): + def __init__(self, bin_scheme): + self._hist = [0.0] * bin_scheme.bins + self._count = 0.0 + self._bin_scheme = bin_scheme + + def record(self, value): + self._hist[self._bin_scheme.to_bin(value)] += 1.0 + self._count += 1.0 + + def value(self, quantile): + if self._count == 0.0: + return float('NaN') + _sum = 0.0 + quant = float(quantile) + for i, value in enumerate(self._hist[:-1]): + _sum += value + if _sum / self._count > quant: + return self._bin_scheme.from_bin(i) + return float('inf') + + @property + def counts(self): + return self._hist + + def clear(self): + for i in range(self._hist): + self._hist[i] = 0.0 + self._count = 0 + + def __str__(self): + values = ['%.10f:%.0f' % (self._bin_scheme.from_bin(i), value) for + i, value in enumerate(self._hist[:-1])] + values.append('%s:%s' % (float('inf'), self._hist[-1])) + return '{%s}' % ','.join(values) + + class ConstantBinScheme(object): + def __init__(self, bins, min_val, max_val): + if bins < 2: + raise ValueError('Must have at least 2 bins.') + self._min = float(min_val) + self._max = float(max_val) + self._bins = int(bins) + self._bucket_width = (max_val - min_val) / (bins - 2) + + @property + def bins(self): + return self._bins + + def from_bin(self, b): + if b == 0: + return float('-inf') + elif b == self._bins - 1: + return float('inf') + else: + return self._min + (b - 1) * self._bucket_width + + def to_bin(self, x): + if x < self._min: + return 0 + elif x > self._max: + return self._bins - 1 + else: + return int(((x - self._min) / self._bucket_width) + 1) + + class LinearBinScheme(object): + def __init__(self, num_bins, max_val): + self._bins = num_bins + self._max = max_val + self._scale = max_val / (num_bins * (num_bins - 1) / 2) + + @property + def bins(self): + return self._bins + + def from_bin(self, b): + if b == self._bins - 1: + return float('inf') + else: + unscaled = (b * (b + 1.0)) / 2.0 + return unscaled * self._scale + + def to_bin(self, x): + if x < 0.0: + raise ValueError('Values less than 0.0 not accepted.') + elif x > self._max: + return self._bins - 1 + else: + scaled = x / self._scale + return int(-0.5 + math.sqrt(2.0 * scaled + 0.25)) diff --git a/kafka/metrics/stats/max_stat.py b/kafka/metrics/stats/max_stat.py new file mode 100644 index 0000000..8df54d3 --- /dev/null +++ b/kafka/metrics/stats/max_stat.py @@ -0,0 +1,15 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Max(AbstractSampledStat): + """An AbstractSampledStat that gives the max over its samples.""" + def __init__(self): + super(Max, self).__init__(float('-inf')) + + def update(self, sample, config, value, now): + sample.value = max(sample.value, value) + + def combine(self, samples, config, now): + if not samples: + return float('-inf') + return float(max(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/min_stat.py b/kafka/metrics/stats/min_stat.py new file mode 100644 index 0000000..a57c2dd --- /dev/null +++ b/kafka/metrics/stats/min_stat.py @@ -0,0 +1,17 @@ +import sys + +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Min(AbstractSampledStat): + """An AbstractSampledStat that gives the min over its samples.""" + def __init__(self): + super(Min, self).__init__(float(sys.maxsize)) + + def update(self, sample, config, value, now): + sample.value = min(sample.value, value) + + def combine(self, samples, config, now): + if not samples: + return float(sys.maxsize) + return float(min(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/percentile.py b/kafka/metrics/stats/percentile.py new file mode 100644 index 0000000..723b9e6 --- /dev/null +++ b/kafka/metrics/stats/percentile.py @@ -0,0 +1,12 @@ +class Percentile(object): + def __init__(self, metric_name, percentile): + self._metric_name = metric_name + self._percentile = float(percentile) + + @property + def name(self): + return self._metric_name + + @property + def percentile(self): + return self._percentile diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py new file mode 100644 index 0000000..84e7160 --- /dev/null +++ b/kafka/metrics/stats/percentiles.py @@ -0,0 +1,72 @@ +from kafka.metrics import AnonMeasurable, NamedMeasurable +from kafka.metrics.compound_stat import AbstractCompoundStat +from kafka.metrics.stats import Histogram +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class BucketSizing(object): + CONSTANT = 0 + LINEAR = 1 + + +class Percentiles(AbstractSampledStat, AbstractCompoundStat): + """A compound stat that reports one or more percentiles""" + def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0, + percentiles=None): + super(Percentiles, self).__init__(0.0) + self._percentiles = percentiles or [] + self._buckets = int(size_in_bytes / 4) + if bucketing == BucketSizing.CONSTANT: + self._bin_scheme = Histogram.ConstantBinScheme(self._buckets, + min_val, max_val) + elif bucketing == BucketSizing.LINEAR: + if min_val != 0.0: + raise ValueError('Linear bucket sizing requires min_val' + ' to be 0.0.') + self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val) + else: + ValueError('Unknown bucket type: %s' % bucketing) + + def stats(self): + measurables = [] + + def make_measure_fn(pct): + return lambda config, now: self.value(config, now, + pct / 100.0) + + for percentile in self._percentiles: + measure_fn = make_measure_fn(percentile.percentile) + stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn)) + measurables.append(stat) + return measurables + + def value(self, config, now, quantile): + self.purge_obsolete_samples(config, now) + count = sum(sample.event_count for sample in self._samples) + if count == 0.0: + return float('NaN') + sum_val = 0.0 + quant = float(quantile) + for b in range(self._buckets): + for sample in self._samples: + assert type(sample) is self.HistogramSample + hist = sample.histogram.counts + sum_val += hist[b] + if sum_val / count > quant: + return self._bin_scheme.from_bin(b) + return float('inf') + + def combine(self, samples, config, now): + return self.value(config, now, 0.5) + + def new_sample(self, time_ms): + return Percentiles.HistogramSample(self._bin_scheme, time_ms) + + def update(self, sample, config, value, time_ms): + assert type(sample) is self.HistogramSample + sample.histogram.record(value) + + class HistogramSample(AbstractSampledStat.Sample): + def __init__(self, scheme, now): + super(Percentiles.HistogramSample, self).__init__(0.0, now) + self.histogram = Histogram(scheme) diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py new file mode 100644 index 0000000..3ce2e74 --- /dev/null +++ b/kafka/metrics/stats/rate.py @@ -0,0 +1,115 @@ +from kafka.metrics.measurable_stat import AbstractMeasurableStat +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class TimeUnit(object): + _names = { + 'nanosecond': 0, + 'microsecond': 1, + 'millisecond': 2, + 'second': 3, + 'minute': 4, + 'hour': 5, + 'day': 6, + } + + NANOSECONDS = _names['nanosecond'] + MICROSECONDS = _names['microsecond'] + MILLISECONDS = _names['millisecond'] + SECONDS = _names['second'] + MINUTES = _names['minute'] + HOURS = _names['hour'] + DAYS = _names['day'] + + @staticmethod + def get_name(time_unit): + return TimeUnit._names[time_unit] + + +class Rate(AbstractMeasurableStat): + """ + The rate of the given quantity. By default this is the total observed + over a set of samples from a sampled statistic divided by the elapsed + time over the sample windows. Alternative AbstractSampledStat + implementations can be provided, however, to record the rate of + occurrences (e.g. the count of values measured over the time interval) + or other such values. + """ + def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None): + self._stat = sampled_stat or SampledTotal() + self._unit = time_unit + + def unit_name(self): + return TimeUnit.get_name(self._unit) + + def record(self, config, value, time_ms): + self._stat.record(config, value, time_ms) + + def measure(self, config, now): + value = self._stat.measure(config, now) + return float(value) / self.convert(self.window_size(config, now)) + + def window_size(self, config, now): + # purge old samples before we compute the window size + self._stat.purge_obsolete_samples(config, now) + + """ + Here we check the total amount of time elapsed since the oldest + non-obsolete window. This give the total window_size of the batch + which is the time used for Rate computation. However, there is + an issue if we do not have sufficient data for e.g. if only + 1 second has elapsed in a 30 second window, the measured rate + will be very high. Hence we assume that the elapsed time is + always N-1 complete windows plus whatever fraction of the final + window is complete. + + Note that we could simply count the amount of time elapsed in + the current window and add n-1 windows to get the total time, + but this approach does not account for sleeps. AbstractSampledStat + only creates samples whenever record is called, if no record is + called for a period of time that time is not accounted for in + window_size and produces incorrect results. + """ + total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms + # Check how many full windows of data we have currently retained + num_full_windows = int(total_elapsed_time_ms / config.time_window_ms) + min_full_windows = config.samples - 1 + + # If the available windows are less than the minimum required, + # add the difference to the totalElapsedTime + if num_full_windows < min_full_windows: + total_elapsed_time_ms += ((min_full_windows - num_full_windows) * + config.time_window_ms) + + return total_elapsed_time_ms + + def convert(self, time_ms): + if self._unit == TimeUnit.NANOSECONDS: + return time_ms * 1000.0 * 1000.0 + elif self._unit == TimeUnit.MICROSECONDS: + return time_ms * 1000.0 + elif self._unit == TimeUnit.MILLISECONDS: + return time_ms + elif self._unit == TimeUnit.SECONDS: + return time_ms / 1000.0 + elif self._unit == TimeUnit.MINUTES: + return time_ms / (60.0 * 1000.0) + elif self._unit == TimeUnit.HOURS: + return time_ms / (60.0 * 60.0 * 1000.0) + elif self._unit == TimeUnit.DAYS: + return time_ms / (24.0 * 60.0 * 60.0 * 1000.0) + else: + raise ValueError('Unknown unit: %s' % self._unit) + + +class SampledTotal(AbstractSampledStat): + def __init__(self, initial_value=None): + if initial_value is not None: + raise ValueError('initial_value cannot be set on SampledTotal') + super(SampledTotal, self).__init__(0.0) + + def update(self, sample, config, value, time_ms): + sample.value += value + + def combine(self, samples, config, now): + return float(sum(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/sampled_stat.py b/kafka/metrics/stats/sampled_stat.py new file mode 100644 index 0000000..ca0db69 --- /dev/null +++ b/kafka/metrics/stats/sampled_stat.py @@ -0,0 +1,99 @@ +import abc + +from kafka.metrics.measurable_stat import AbstractMeasurableStat + + +class AbstractSampledStat(AbstractMeasurableStat): + """ + An AbstractSampledStat records a single scalar value measured over + one or more samples. Each sample is recorded over a configurable + window. The window can be defined by number of events or elapsed + time (or both, if both are given the window is complete when + *either* the event count or elapsed time criterion is met). + + All the samples are combined to produce the measurement. When a + window is complete the oldest sample is cleared and recycled to + begin recording the next sample. + + Subclasses of this class define different statistics measured + using this basic pattern. + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, initial_value): + self._initial_value = initial_value + self._samples = [] + self._current = 0 + + @abc.abstractmethod + def update(self, sample, config, value, time_ms): + raise NotImplementedError + + @abc.abstractmethod + def combine(self, samples, config, now): + raise NotImplementedError + + def record(self, config, value, time_ms): + sample = self.current(time_ms) + if sample.is_complete(time_ms, config): + sample = self._advance(config, time_ms) + self.update(sample, config, float(value), time_ms) + sample.event_count += 1 + + def new_sample(self, time_ms): + return self.Sample(self._initial_value, time_ms) + + def measure(self, config, now): + self.purge_obsolete_samples(config, now) + return float(self.combine(self._samples, config, now)) + + def current(self, time_ms): + if not self._samples: + self._samples.append(self.new_sample(time_ms)) + return self._samples[self._current] + + def oldest(self, now): + if not self._samples: + self._samples.append(self.new_sample(now)) + oldest = self._samples[0] + for sample in self._samples[1:]: + if sample.last_window_ms < oldest.last_window_ms: + oldest = sample + return oldest + + def purge_obsolete_samples(self, config, now): + """ + Timeout any windows that have expired in the absence of any events + """ + expire_age = config.samples * config.time_window_ms + for sample in self._samples: + if now - sample.last_window_ms >= expire_age: + sample.reset(now) + + def _advance(self, config, time_ms): + self._current = (self._current + 1) % config.samples + if self._current >= len(self._samples): + sample = self.new_sample(time_ms) + self._samples.append(sample) + return sample + else: + sample = self.current(time_ms) + sample.reset(time_ms) + return sample + + class Sample(object): + + def __init__(self, initial_value, now): + self.initial_value = initial_value + self.event_count = 0 + self.last_window_ms = now + self.value = initial_value + + def reset(self, now): + self.event_count = 0 + self.last_window_ms = now + self.value = self.initial_value + + def is_complete(self, time_ms, config): + return (time_ms - self.last_window_ms >= config.time_window_ms or + self.event_count >= config.event_window) diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py new file mode 100644 index 0000000..7d179cb --- /dev/null +++ b/kafka/metrics/stats/sensor.py @@ -0,0 +1,133 @@ +import threading +import time + +from kafka.errors import QuotaViolationError +from kafka.metrics import KafkaMetric + + +class Sensor(object): + """ + A sensor applies a continuous sequence of numerical values + to a set of associated metrics. For example a sensor on + message size would record a sequence of message sizes using + the `record(double)` api and would maintain a set + of metrics about request sizes such as the average or max. + """ + def __init__(self, registry, name, parents, config, + inactive_sensor_expiration_time_seconds): + if not name: + raise ValueError('name must be non-empty') + self._lock = threading.RLock() + self._registry = registry + self._name = name + self._parents = parents or [] + self._metrics = [] + self._stats = [] + self._config = config + self._inactive_sensor_expiration_time_ms = ( + inactive_sensor_expiration_time_seconds * 1000) + self._last_record_time = time.time() * 1000 + self._check_forest(set()) + + def _check_forest(self, sensors): + """Validate that this sensor doesn't end up referencing itself.""" + if self in sensors: + raise ValueError('Circular dependency in sensors: %s is its own' + 'parent.' % self.name) + sensors.add(self) + for parent in self._parents: + parent._check_forest(sensors) + + @property + def name(self): + """ + The name this sensor is registered with. + This name will be unique among all registered sensors. + """ + return self._name + + @property + def metrics(self): + return tuple(self._metrics) + + def record(self, value=1.0, time_ms=None): + """ + Record a value at a known time. + Arguments: + value (double): The value we are recording + time_ms (int): The current POSIX time in milliseconds + + Raises: + QuotaViolationException: if recording this value moves a + metric beyond its configured maximum or minimum bound + """ + now = time.time() * 1000 + if time_ms is None: + time_ms = now + self._last_record_time = now + with self._lock: # XXX high volume, might be performance issue + # increment all the stats + for stat in self._stats: + stat.record(self._config, value, time_ms) + self._check_quotas(time_ms) + for parent in self._parents: + parent.record(value, time_ms) + + def _check_quotas(self, time_ms): + """ + Check if we have violated our quota for any metric that + has a configured quota + """ + for metric in self._metrics: + if metric.config and metric.config.quota: + value = metric.value(time_ms) + if not metric.config.quota.is_acceptable(value): + raise QuotaViolationError('(%s) violated quota. Actual: ' + '(%d), Threshold: (%d)' % + (metric.metric_name, + metric.config.quota.bound, + value)) + + def add_compound(self, compound_stat, config=None): + """ + Register a compound statistic with this sensor which + yields multiple measurable quantities (like a histogram) + + Arguments: + stat (AbstractCompoundStat): The stat to register + config (MetricConfig): The configuration for this stat. + If None then the stat will use the default configuration + for this sensor. + """ + if not compound_stat: + raise ValueError('compound stat must be non-empty') + self._stats.append(compound_stat) + for named_measurable in compound_stat.stats(): + metric = KafkaMetric(self._lock, named_measurable.name, + named_measurable.stat, config or self._config) + self._registry.register_metric(metric) + self._metrics.append(metric) + + def add(self, metric_name, stat, config=None): + """ + Register a metric with this sensor + + Arguments: + metric_name (MetricName): The name of the metric + stat (AbstractMeasurableStat): The statistic to keep + config (MetricConfig): A special configuration for this metric. + If None use the sensor default configuration. + """ + with self._lock: + metric = KafkaMetric(threading.Lock(), metric_name, stat, + config or self._config) + self._registry.register_metric(metric) + self._metrics.append(metric) + self._stats.append(stat) + + def has_expired(self): + """ + Return True if the Sensor is eligible for removal due to inactivity. + """ + return ((time.time() * 1000 - self._last_record_time) > + self._inactive_sensor_expiration_time_ms) diff --git a/kafka/metrics/stats/total.py b/kafka/metrics/stats/total.py new file mode 100644 index 0000000..76a82d8 --- /dev/null +++ b/kafka/metrics/stats/total.py @@ -0,0 +1,13 @@ +from kafka.metrics.measurable_stat import AbstractMeasurableStat + + +class Total(AbstractMeasurableStat): + """An un-windowed cumulative total maintained over all time.""" + def __init__(self, value=0.0): + self._total = value + + def record(self, config, value, now): + self._total += value + + def measure(self, config, now): + return float(self._total) diff --git a/test/test_metrics.py b/test/test_metrics.py new file mode 100644 index 0000000..a78fe47 --- /dev/null +++ b/test/test_metrics.py @@ -0,0 +1,466 @@ +import sys +import time + +import pytest + +from kafka.errors import QuotaViolationError +from kafka.metrics import MetricConfig, MetricName, Metrics, Quota +from kafka.metrics.measurable import AbstractMeasurable +from kafka.metrics.stats import (Avg, Count, Max, Min, Percentile, Percentiles, + Rate, Total) +from kafka.metrics.stats.percentiles import BucketSizing +from kafka.metrics.stats.rate import TimeUnit + +EPS = 0.000001 + + +@pytest.fixture +def time_keeper(): + return TimeKeeper() + + +@pytest.fixture +def config(): + return MetricConfig() + + +@pytest.fixture +def metrics(request, config): + metrics = Metrics(config, None, enable_expiration=True) + request.addfinalizer(lambda: metrics.close()) + return metrics + + +def test_MetricName(): + # The Java test only cover the differences between the deprecated + # constructors, so I'm skipping them but doing some other basic testing. + + # In short, metrics should be equal IFF their name, group, and tags are + # the same. Descriptions do not matter. + name1 = MetricName('name', 'group', 'A metric.', {'a': 1, 'b': 2}) + name2 = MetricName('name', 'group', 'A description.', {'a': 1, 'b': 2}) + assert name1 == name2 + + name1 = MetricName('name', 'group', tags={'a': 1, 'b': 2}) + name2 = MetricName('name', 'group', tags={'a': 1, 'b': 2}) + assert name1 == name2 + + name1 = MetricName('foo', 'group') + name2 = MetricName('name', 'group') + assert name1 != name2 + + name1 = MetricName('name', 'foo') + name2 = MetricName('name', 'group') + assert name1 != name2 + + # name and group must be non-empty. Everything else is optional. + with pytest.raises(Exception): + MetricName('', 'group') + with pytest.raises(Exception): + MetricName('name', None) + # tags must be a dict if supplied + with pytest.raises(Exception): + MetricName('name', 'group', tags=set()) + + # Because of the implementation of __eq__ and __hash__, the values of + # a MetricName cannot be mutable. + tags = {'a': 1} + name = MetricName('name', 'group', 'description', tags=tags) + with pytest.raises(AttributeError): + name.name = 'new name' + with pytest.raises(AttributeError): + name.group = 'new name' + with pytest.raises(AttributeError): + name.tags = {} + # tags is a copy, so the instance isn't altered + name.tags['b'] = 2 + assert name.tags == tags + + +def test_simple_stats(mocker, time_keeper, config, metrics): + mocker.patch('time.time', side_effect=time_keeper.time) + + measurable = ConstantMeasurable() + + metrics.add_metric(metrics.metric_name('direct.measurable', 'grp1', + 'The fraction of time an appender waits for space allocation.'), + measurable) + sensor = metrics.sensor('test.sensor') + sensor.add(metrics.metric_name('test.avg', 'grp1'), Avg()) + sensor.add(metrics.metric_name('test.max', 'grp1'), Max()) + sensor.add(metrics.metric_name('test.min', 'grp1'), Min()) + sensor.add(metrics.metric_name('test.rate', 'grp1'), Rate(TimeUnit.SECONDS)) + sensor.add(metrics.metric_name('test.occurences', 'grp1'),Rate(TimeUnit.SECONDS, Count())) + sensor.add(metrics.metric_name('test.count', 'grp1'), Count()) + percentiles = [Percentile(metrics.metric_name('test.median', 'grp1'), 50.0), + Percentile(metrics.metric_name('test.perc99_9', 'grp1'), 99.9)] + sensor.add_compound(Percentiles(100, BucketSizing.CONSTANT, 100, -100, + percentiles=percentiles)) + + sensor2 = metrics.sensor('test.sensor2') + sensor2.add(metrics.metric_name('s2.total', 'grp1'), Total()) + sensor2.record(5.0) + + sum_val = 0 + count = 10 + for i in range(count): + sensor.record(i) + sum_val += i + + # prior to any time passing + elapsed_secs = (config.time_window_ms * (config.samples - 1)) / 1000.0 + assert abs(count / elapsed_secs - + metrics.metrics.get(metrics.metric_name('test.occurences', 'grp1')).value()) \ + < EPS, 'Occurrences(0...%d) = %f' % (count, count / elapsed_secs) + + # pretend 2 seconds passed... + sleep_time_seconds = 2.0 + time_keeper.sleep(sleep_time_seconds) + elapsed_secs += sleep_time_seconds + + assert abs(5.0 - metrics.metrics.get(metrics.metric_name('s2.total', 'grp1')).value()) \ + < EPS, 's2 reflects the constant value' + assert abs(4.5 - metrics.metrics.get(metrics.metric_name('test.avg', 'grp1')).value()) \ + < EPS, 'Avg(0...9) = 4.5' + assert abs((count - 1) - metrics.metrics.get(metrics.metric_name('test.max', 'grp1')).value()) \ + < EPS, 'Max(0...9) = 9' + assert abs(0.0 - metrics.metrics.get(metrics.metric_name('test.min', 'grp1')).value()) \ + < EPS, 'Min(0...9) = 0' + assert abs((sum_val / elapsed_secs) - metrics.metrics.get(metrics.metric_name('test.rate', 'grp1')).value()) \ + < EPS, 'Rate(0...9) = 1.40625' + assert abs((count / elapsed_secs) - metrics.metrics.get(metrics.metric_name('test.occurences', 'grp1')).value()) \ + < EPS, 'Occurrences(0...%d) = %f' % (count, count / elapsed_secs) + assert abs(count - metrics.metrics.get(metrics.metric_name('test.count', 'grp1')).value()) \ + < EPS, 'Count(0...9) = 10' + + +def test_hierarchical_sensors(metrics): + parent1 = metrics.sensor('test.parent1') + parent1.add(metrics.metric_name('test.parent1.count', 'grp1'), Count()) + parent2 = metrics.sensor('test.parent2') + parent2.add(metrics.metric_name('test.parent2.count', 'grp1'), Count()) + child1 = metrics.sensor('test.child1', parents=[parent1, parent2]) + child1.add(metrics.metric_name('test.child1.count', 'grp1'), Count()) + child2 = metrics.sensor('test.child2', parents=[parent1]) + child2.add(metrics.metric_name('test.child2.count', 'grp1'), Count()) + grandchild = metrics.sensor('test.grandchild', parents=[child1]) + grandchild.add(metrics.metric_name('test.grandchild.count', 'grp1'), Count()) + + # increment each sensor one time + parent1.record() + parent2.record() + child1.record() + child2.record() + grandchild.record() + + p1 = parent1.metrics[0].value() + p2 = parent2.metrics[0].value() + c1 = child1.metrics[0].value() + c2 = child2.metrics[0].value() + gc = grandchild.metrics[0].value() + + # each metric should have a count equal to one + its children's count + assert 1.0 == gc + assert 1.0 + gc == c1 + assert 1.0 == c2 + assert 1.0 + c1 == p2 + assert 1.0 + c1 + c2 == p1 + assert [child1, child2] == metrics._children_sensors.get(parent1) + assert [child1] == metrics._children_sensors.get(parent2) + assert metrics._children_sensors.get(grandchild) is None + + +def test_bad_sensor_hierarchy(metrics): + parent = metrics.sensor('parent') + child1 = metrics.sensor('child1', parents=[parent]) + child2 = metrics.sensor('child2', parents=[parent]) + + with pytest.raises(ValueError): + metrics.sensor('gc', parents=[child1, child2]) + + +def test_remove_sensor(metrics): + size = len(metrics.metrics) + parent1 = metrics.sensor('test.parent1') + parent1.add(metrics.metric_name('test.parent1.count', 'grp1'), Count()) + parent2 = metrics.sensor('test.parent2') + parent2.add(metrics.metric_name('test.parent2.count', 'grp1'), Count()) + child1 = metrics.sensor('test.child1', parents=[parent1, parent2]) + child1.add(metrics.metric_name('test.child1.count', 'grp1'), Count()) + child2 = metrics.sensor('test.child2', parents=[parent2]) + child2.add(metrics.metric_name('test.child2.count', 'grp1'), Count()) + grandchild1 = metrics.sensor('test.gchild2', parents=[child2]) + grandchild1.add(metrics.metric_name('test.gchild2.count', 'grp1'), Count()) + + sensor = metrics.get_sensor('test.parent1') + assert sensor is not None + metrics.remove_sensor('test.parent1') + assert metrics.get_sensor('test.parent1') is None + assert metrics.metrics.get(metrics.metric_name('test.parent1.count', 'grp1')) is None + assert metrics.get_sensor('test.child1') is None + assert metrics._children_sensors.get(sensor) is None + assert metrics.metrics.get(metrics.metric_name('test.child1.count', 'grp1')) is None + + sensor = metrics.get_sensor('test.gchild2') + assert sensor is not None + metrics.remove_sensor('test.gchild2') + assert metrics.get_sensor('test.gchild2') is None + assert metrics._children_sensors.get(sensor) is None + assert metrics.metrics.get(metrics.metric_name('test.gchild2.count', 'grp1')) is None + + sensor = metrics.get_sensor('test.child2') + assert sensor is not None + metrics.remove_sensor('test.child2') + assert metrics.get_sensor('test.child2') is None + assert metrics._children_sensors.get(sensor) is None + assert metrics.metrics.get(metrics.metric_name('test.child2.count', 'grp1')) is None + + sensor = metrics.get_sensor('test.parent2') + assert sensor is not None + metrics.remove_sensor('test.parent2') + assert metrics.get_sensor('test.parent2') is None + assert metrics._children_sensors.get(sensor) is None + assert metrics.metrics.get(metrics.metric_name('test.parent2.count', 'grp1')) is None + + assert size == len(metrics.metrics) + + +def test_remove_inactive_metrics(mocker, time_keeper, metrics): + mocker.patch('time.time', side_effect=time_keeper.time) + + s1 = metrics.sensor('test.s1', None, 1) + s1.add(metrics.metric_name('test.s1.count', 'grp1'), Count()) + + s2 = metrics.sensor('test.s2', None, 3) + s2.add(metrics.metric_name('test.s2.count', 'grp1'), Count()) + + purger = Metrics.ExpireSensorTask + purger.run(metrics) + assert metrics.get_sensor('test.s1') is not None, \ + 'Sensor test.s1 must be present' + assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is not None, \ + 'MetricName test.s1.count must be present' + assert metrics.get_sensor('test.s2') is not None, \ + 'Sensor test.s2 must be present' + assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \ + 'MetricName test.s2.count must be present' + + time_keeper.sleep(1.001) + purger.run(metrics) + assert metrics.get_sensor('test.s1') is None, \ + 'Sensor test.s1 should have been purged' + assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is None, \ + 'MetricName test.s1.count should have been purged' + assert metrics.get_sensor('test.s2') is not None, \ + 'Sensor test.s2 must be present' + assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \ + 'MetricName test.s2.count must be present' + + # record a value in sensor s2. This should reset the clock for that sensor. + # It should not get purged at the 3 second mark after creation + s2.record() + + time_keeper.sleep(2) + purger.run(metrics) + assert metrics.get_sensor('test.s2') is not None, \ + 'Sensor test.s2 must be present' + assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \ + 'MetricName test.s2.count must be present' + + # After another 1 second sleep, the metric should be purged + time_keeper.sleep(1) + purger.run(metrics) + assert metrics.get_sensor('test.s1') is None, \ + 'Sensor test.s2 should have been purged' + assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is None, \ + 'MetricName test.s2.count should have been purged' + + # After purging, it should be possible to recreate a metric + s1 = metrics.sensor('test.s1', None, 1) + s1.add(metrics.metric_name('test.s1.count', 'grp1'), Count()) + assert metrics.get_sensor('test.s1') is not None, \ + 'Sensor test.s1 must be present' + assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is not None, \ + 'MetricName test.s1.count must be present' + + +def test_remove_metric(metrics): + size = len(metrics.metrics) + metrics.add_metric(metrics.metric_name('test1', 'grp1'), Count()) + metrics.add_metric(metrics.metric_name('test2', 'grp1'), Count()) + + assert metrics.remove_metric(metrics.metric_name('test1', 'grp1')) is not None + assert metrics.metrics.get(metrics.metric_name('test1', 'grp1')) is None + assert metrics.metrics.get(metrics.metric_name('test2', 'grp1')) is not None + + assert metrics.remove_metric(metrics.metric_name('test2', 'grp1')) is not None + assert metrics.metrics.get(metrics.metric_name('test2', 'grp1')) is None + + assert size == len(metrics.metrics) + + +def test_event_windowing(mocker, time_keeper): + mocker.patch('time.time', side_effect=time_keeper.time) + + count = Count() + config = MetricConfig(event_window=1, samples=2) + count.record(config, 1.0, time_keeper.ms()) + count.record(config, 1.0, time_keeper.ms()) + assert 2.0 == count.measure(config, time_keeper.ms()) + count.record(config, 1.0, time_keeper.ms()) # first event times out + assert 2.0 == count.measure(config, time_keeper.ms()) + + +def test_time_windowing(mocker, time_keeper): + mocker.patch('time.time', side_effect=time_keeper.time) + + count = Count() + config = MetricConfig(time_window_ms=1, samples=2) + count.record(config, 1.0, time_keeper.ms()) + time_keeper.sleep(.001) + count.record(config, 1.0, time_keeper.ms()) + assert 2.0 == count.measure(config, time_keeper.ms()) + time_keeper.sleep(.001) + count.record(config, 1.0, time_keeper.ms()) # oldest event times out + assert 2.0 == count.measure(config, time_keeper.ms()) + + +def test_old_data_has_no_effect(mocker, time_keeper): + mocker.patch('time.time', side_effect=time_keeper.time) + + max_stat = Max() + min_stat = Min() + avg_stat = Avg() + count_stat = Count() + window_ms = 100 + samples = 2 + config = MetricConfig(time_window_ms=window_ms, samples=samples) + max_stat.record(config, 50, time_keeper.ms()) + min_stat.record(config, 50, time_keeper.ms()) + avg_stat.record(config, 50, time_keeper.ms()) + count_stat.record(config, 50, time_keeper.ms()) + + time_keeper.sleep(samples * window_ms / 1000.0) + assert float('-inf') == max_stat.measure(config, time_keeper.ms()) + assert float(sys.maxsize) == min_stat.measure(config, time_keeper.ms()) + assert 0.0 == avg_stat.measure(config, time_keeper.ms()) + assert 0 == count_stat.measure(config, time_keeper.ms()) + + +def test_duplicate_MetricName(metrics): + metrics.sensor('test').add(metrics.metric_name('test', 'grp1'), Avg()) + with pytest.raises(ValueError): + metrics.sensor('test2').add(metrics.metric_name('test', 'grp1'), Total()) + + +def test_Quotas(metrics): + sensor = metrics.sensor('test') + sensor.add(metrics.metric_name('test1.total', 'grp1'), Total(), + MetricConfig(quota=Quota.upper_bound(5.0))) + sensor.add(metrics.metric_name('test2.total', 'grp1'), Total(), + MetricConfig(quota=Quota.lower_bound(0.0))) + sensor.record(5.0) + with pytest.raises(QuotaViolationError): + sensor.record(1.0) + + assert abs(6.0 - metrics.metrics.get(metrics.metric_name('test1.total', 'grp1')).value()) \ + < EPS + + sensor.record(-6.0) + with pytest.raises(QuotaViolationError): + sensor.record(-1.0) + + +def test_Quotas_equality(): + quota1 = Quota.upper_bound(10.5) + quota2 = Quota.lower_bound(10.5) + assert quota1 != quota2, 'Quota with different upper values should not be equal' + + quota3 = Quota.lower_bound(10.5) + assert quota2 == quota3, 'Quota with same upper and bound values should be equal' + + +def test_Percentiles(metrics): + buckets = 100 + _percentiles = [ + Percentile(metrics.metric_name('test.p25', 'grp1'), 25), + Percentile(metrics.metric_name('test.p50', 'grp1'), 50), + Percentile(metrics.metric_name('test.p75', 'grp1'), 75), + ] + percs = Percentiles(4 * buckets, BucketSizing.CONSTANT, 100.0, 0.0, + percentiles=_percentiles) + config = MetricConfig(event_window=50, samples=2) + sensor = metrics.sensor('test', config) + sensor.add_compound(percs) + p25 = metrics.metrics.get(metrics.metric_name('test.p25', 'grp1')) + p50 = metrics.metrics.get(metrics.metric_name('test.p50', 'grp1')) + p75 = metrics.metrics.get(metrics.metric_name('test.p75', 'grp1')) + + # record two windows worth of sequential values + for i in range(buckets): + sensor.record(i) + + assert abs(p25.value() - 25) < 1.0 + assert abs(p50.value() - 50) < 1.0 + assert abs(p75.value() - 75) < 1.0 + + for i in range(buckets): + sensor.record(0.0) + + assert p25.value() < 1.0 + assert p50.value() < 1.0 + assert p75.value() < 1.0 + +def test_rate_windowing(mocker, time_keeper, metrics): + mocker.patch('time.time', side_effect=time_keeper.time) + + # Use the default time window. Set 3 samples + config = MetricConfig(samples=3) + sensor = metrics.sensor('test.sensor', config) + sensor.add(metrics.metric_name('test.rate', 'grp1'), Rate(TimeUnit.SECONDS)) + + sum_val = 0 + count = config.samples - 1 + # Advance 1 window after every record + for i in range(count): + sensor.record(100) + sum_val += 100 + time_keeper.sleep(config.time_window_ms / 1000.0) + + # Sleep for half the window. + time_keeper.sleep(config.time_window_ms / 2.0 / 1000.0) + + # prior to any time passing + elapsed_secs = (config.time_window_ms * (config.samples - 1) + config.time_window_ms / 2.0) / 1000.0 + + kafka_metric = metrics.metrics.get(metrics.metric_name('test.rate', 'grp1')) + assert abs((sum_val / elapsed_secs) - kafka_metric.value()) < EPS, \ + 'Rate(0...2) = 2.666' + assert abs(elapsed_secs - (kafka_metric.measurable.window_size(config, time.time() * 1000) / 1000.0)) \ + < EPS, 'Elapsed Time = 75 seconds' + + +class ConstantMeasurable(AbstractMeasurable): + _value = 0.0 + + def measure(self, config, now): + return self._value + + +class TimeKeeper(object): + """ + A clock that you can manually advance by calling sleep + """ + def __init__(self, auto_tick_ms=0): + self._millis = time.time() * 1000 + self._auto_tick_ms = auto_tick_ms + + def time(self): + return self.ms() / 1000.0 + + def ms(self): + self.sleep(self._auto_tick_ms) + return self._millis + + def sleep(self, seconds): + self._millis += (seconds * 1000) |