diff options
32 files changed, 1994 insertions, 121 deletions
| diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1f0619b..1d4b0f0 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -3,11 +3,13 @@ from __future__ import absolute_import  import collections  import copy  import logging +import time  import six  import kafka.errors as Errors  from kafka.future import Future +from kafka.metrics.stats import Avg, Count, Max, Rate  from kafka.protocol.fetch import FetchRequest  from kafka.protocol.message import PartialMessage  from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy @@ -40,7 +42,8 @@ class Fetcher(six.Iterator):          'api_version': (0, 8, 0),      } -    def __init__(self, client, subscriptions, **configs): +    def __init__(self, client, subscriptions, metrics, metric_group_prefix, +                 **configs):          """Initialize a Kafka Message Fetcher.          Keyword Arguments: @@ -68,8 +71,6 @@ class Fetcher(six.Iterator):                  the messages occurred. This check adds some overhead, so it may                  be disabled in cases seeking extreme performance. Default: True          """ -                 #metrics=None, -                 #metric_group_prefix='consumer',          self.config = copy.copy(self.DEFAULT_CONFIG)          for key in self.config:              if key in configs: @@ -83,8 +84,7 @@ class Fetcher(six.Iterator):          self._record_too_large_partitions = dict() # {topic_partition: offset}          self._iterator = None          self._fetch_futures = collections.deque() - -        #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) +        self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)      def init_fetches(self):          """Send FetchRequests asynchronously for all assigned partitions. @@ -109,7 +109,7 @@ class Fetcher(six.Iterator):              if self._client.ready(node_id):                  log.debug("Sending FetchRequest to node %s", node_id)                  future = self._client.send(node_id, request) -                future.add_callback(self._handle_fetch_response, request) +                future.add_callback(self._handle_fetch_response, request, time.time())                  future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)                  futures.append(future)          self._fetch_futures.extend(futures) @@ -575,10 +575,11 @@ class Fetcher(six.Iterator):                  partition_data.items())          return requests -    def _handle_fetch_response(self, request, response): +    def _handle_fetch_response(self, request, send_time, response):          """The callback for fetch completion""" -        #total_bytes = 0 -        #total_count = 0 +        total_bytes = 0 +        total_count = 0 +        recv_time = time.time()          fetch_offsets = {}          for topic, partitions in request.topics: @@ -609,6 +610,7 @@ class Fetcher(six.Iterator):                                    position)                          continue +                    num_bytes = 0                      partial = None                      if messages and isinstance(messages[-1][-1], PartialMessage):                          partial = messages.pop() @@ -618,18 +620,18 @@ class Fetcher(six.Iterator):                                    " offset %d to buffered record list", tp,                                    position)                          self._records.append((fetch_offset, tp, messages)) -                        #last_offset, _, _ = messages[-1] -                        #self.sensors.records_fetch_lag.record(highwater - last_offset) +                        last_offset, _, _ = messages[-1] +                        self._sensors.records_fetch_lag.record(highwater - last_offset) +                        num_bytes = sum(msg[1] for msg in messages)                      elif partial:                          # we did not read a single message from a non-empty                          # buffer because that message's size is larger than                          # fetch size, in this case record this exception                          self._record_too_large_partitions[tp] = fetch_offset -                    # TODO: bytes metrics -                    #self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size()); -                    #totalBytes += num_bytes; -                    #totalCount += parsed.size(); +                    self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages)) +                    total_bytes += num_bytes +                    total_count += len(messages)                  elif error_type in (Errors.NotLeaderForPartitionError,                                      Errors.UnknownTopicOrPartitionError):                      self._client.cluster.request_update() @@ -649,56 +651,82 @@ class Fetcher(six.Iterator):                  else:                      raise error_type('Unexpected error while fetching data') -        """TOOD - metrics -        self.sensors.bytesFetched.record(totalBytes) -        self.sensors.recordsFetched.record(totalCount) -        self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()) -        self.sensors.fetchLatency.record(resp.requestLatencyMs()) +        self._sensors.bytes_fetched.record(total_bytes) +        self._sensors.records_fetched.record(total_count) +        self._sensors.fetch_throttle_time_sensor.record(response['throttle_time_ms']) +        self._sensors.fetch_latency.record((recv_time - send_time) * 1000)  class FetchManagerMetrics(object):      def __init__(self, metrics, prefix):          self.metrics = metrics -        self.group_name = prefix + "-fetch-manager-metrics" - -        self.bytes_fetched = metrics.sensor("bytes-fetched") -        self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name, -            "The average number of bytes fetched per request"), metrics.Avg()) -        self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name, -            "The maximum number of bytes fetched per request"), metrics.Max()) -        self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name, -            "The average number of bytes consumed per second"), metrics.Rate()) - -        self.records_fetched = self.metrics.sensor("records-fetched") -        self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name, -            "The average number of records in each request"), metrics.Avg()) -        self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name, -            "The average number of records consumed per second"), metrics.Rate()) - -        self.fetch_latency = metrics.sensor("fetch-latency") -        self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name, -            "The average time taken for a fetch request."), metrics.Avg()) -        self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name, -            "The max time taken for any fetch request."), metrics.Max()) -        self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name, -            "The number of fetch requests per second."), metrics.Rate(metrics.Count())) - -        self.records_fetch_lag = metrics.sensor("records-lag") -        self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name, -            "The maximum lag in terms of number of records for any partition in self window"), metrics.Max()) - -        self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time") -        self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name, -            "The average throttle time in ms"), metrics.Avg()) -        self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name, -            "The maximum throttle time in ms"), metrics.Max()) - -        def record_topic_fetch_metrics(topic, num_bytes, num_records): -            # record bytes fetched -            name = '.'.join(["topic", topic, "bytes-fetched"]) -            self.metrics[name].record(num_bytes); - -            # record records fetched -            name = '.'.join(["topic", topic, "records-fetched"]) -            self.metrics[name].record(num_records) -        """ +        self.group_name = '%s-fetch-manager-metrics' % prefix + +        self.bytes_fetched = metrics.sensor('bytes-fetched') +        self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name, +            'The average number of bytes fetched per request'), Avg()) +        self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name, +            'The maximum number of bytes fetched per request'), Max()) +        self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name, +            'The average number of bytes consumed per second'), Rate()) + +        self.records_fetched = self.metrics.sensor('records-fetched') +        self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name, +            'The average number of records in each request'), Avg()) +        self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name, +            'The average number of records consumed per second'), Rate()) + +        self.fetch_latency = metrics.sensor('fetch-latency') +        self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name, +            'The average time taken for a fetch request.'), Avg()) +        self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name, +            'The max time taken for any fetch request.'), Max()) +        self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name, +            'The number of fetch requests per second.'), Rate(sampled_stat=Count())) + +        self.records_fetch_lag = metrics.sensor('records-lag') +        self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name, +            'The maximum lag in terms of number of records for any partition in self window'), Max()) + +        self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time') +        self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name, +            'The average throttle time in ms'), Avg()) +        self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name, +            'The maximum throttle time in ms'), Max()) + +    def record_topic_fetch_metrics(self, topic, num_bytes, num_records): +        metric_tags = {'topic': topic.replace('.', '_')} + +        # record bytes fetched +        name = '.'.join(['topic', topic, 'bytes-fetched']) +        bytes_fetched = self.metrics.get_sensor(name) +        if not bytes_fetched: +            bytes_fetched = self.metrics.sensor(name) +            bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', +                    self.group_name, +                    'The average number of bytes fetched per request for topic %s' % topic, +                    metric_tags), Avg()) +            bytes_fetched.add(self.metrics.metric_name('fetch-size-max', +                    self.group_name, +                    'The maximum number of bytes fetched per request for topic %s' % topic, +                    metric_tags), Max()) +            bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate', +                    self.group_name, +                    'The average number of bytes consumed per second for topic %s' % topic, +                    metric_tags), Rate()) +        bytes_fetched.record(num_bytes) + +        # record records fetched +        name = '.'.join(['topic', topic, 'records-fetched']) +        records_fetched = self.metrics.get_sensor(name) +        if not records_fetched: +            records_fetched = self.metrics.sensor(name) +            records_fetched.add(self.metrics.metric_name('records-per-request-avg', +                    self.group_name, +                    'The average number of records in each request for topic %s' % topic, +                    metric_tags), Avg()) +            records_fetched.add(self.metrics.metric_name('records-consumed-rate', +                    self.group_name, +                    'The average number of records consumed per second for topic %s' % topic, +                    metric_tags), Rate()) +        records_fetched.record(num_records) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 0a78e7f..abb65ef 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -12,6 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState  from kafka.coordinator.consumer import ConsumerCoordinator  from kafka.coordinator.assignors.range import RangePartitionAssignor  from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.metrics import DictReporter, MetricConfig, Metrics  from kafka.protocol.offset import OffsetResetStrategy  from kafka.structs import TopicPartition  from kafka.version import __version__ @@ -143,6 +144,13 @@ class KafkaConsumer(six.Iterator):              offset commits; 0.8.0 is what is left. If set to 'auto', will              attempt to infer the broker version by probing various APIs.              Default: auto +        metric_reporters (list): A list of classes to use as metrics reporters. +            Implementing the AbstractMetricsReporter interface allows plugging +            in classes that will be notified of new metric creation. Default: [] +        metrics_num_samples (int): The number of samples maintained to compute +            metrics. Default: 2 +        metrics_sample_window_ms (int): The number of samples maintained to +            compute metrics. Default: 30000      Note:          Configuration parameters are described in more detail at @@ -181,9 +189,9 @@ class KafkaConsumer(six.Iterator):          'ssl_keyfile': None,          'api_version': 'auto',          'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet -        #'metric_reporters': None, -        #'metrics_num_samples': 2, -        #'metrics_sample_window_ms': 30000, +        'metric_reporters': [], +        'metrics_num_samples': 2, +        'metrics_sample_window_ms': 30000,      }      def __init__(self, *topics, **configs): @@ -202,6 +210,16 @@ class KafkaConsumer(six.Iterator):                          new_config, self.config['auto_offset_reset'])              self.config['auto_offset_reset'] = new_config +        metrics_tags = {'client-id': self.config['client_id']} +        metric_config = MetricConfig(samples=self.config['metrics_num_samples'], +                                     time_window_ms=self.config['metrics_sample_window_ms'], +                                     tags=metrics_tags) +        reporters = [reporter() for reporter in self.config['metric_reporters']] +        reporters.append(DictReporter('kafka.consumer')) +        self._metrics = Metrics(metric_config, reporters) +        metric_group_prefix = 'consumer' +        # TODO _metrics likely needs to be passed to KafkaClient, etc. +          self._client = KafkaClient(**self.config)          # Check Broker Version if not set explicitly @@ -215,16 +233,15 @@ class KafkaConsumer(six.Iterator):          self._subscription = SubscriptionState(self.config['auto_offset_reset'])          self._fetcher = Fetcher( -            self._client, self._subscription, **self.config) +            self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)          self._coordinator = ConsumerCoordinator( -            self._client, self._subscription, +            self._client, self._subscription, self._metrics, metric_group_prefix,              assignors=self.config['partition_assignment_strategy'],              **self.config)          self._closed = False          self._iterator = None          self._consumer_timeout = float('inf') -        #self.metrics = None          if topics:              self._subscription.subscribe(topics=topics)              self._client.set_topics(topics) @@ -277,7 +294,7 @@ class KafkaConsumer(six.Iterator):          log.debug("Closing the KafkaConsumer.")          self._closed = True          self._coordinator.close() -        #self.metrics.close() +        self._metrics.close()          self._client.close()          try:              self.config['key_deserializer'].close() diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c75eb7c..a4c25a3 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -658,7 +658,7 @@ class GroupCoordinatorMetrics(object):          self.heartbeat_latency.add(metrics.metricName(              "heartbeat-rate", self.group_name,              "The average number of heartbeats per second", -            tags), metrics.Rate(metrics.Count())) +            tags), metrics.Rate(sampled_stat=metrics.Count()))          self.join_latency = metrics.sensor("join-latency")          self.join_latency.add(metrics.metricName( @@ -672,7 +672,7 @@ class GroupCoordinatorMetrics(object):          self.join_latency.add(metrics.metricName(              "join-rate", self.group_name,              "The number of group joins per second", -            tags), metrics.Rate(metrics.Count())) +            tags), metrics.Rate(sampled_stat=metrics.Count()))          self.sync_latency = metrics.sensor("sync-latency")          self.sync_latency.add(metrics.metricName( @@ -686,7 +686,7 @@ class GroupCoordinatorMetrics(object):          self.sync_latency.add(metrics.metricName(              "sync-rate", self.group_name,              "The number of group syncs per second", -            tags), metrics.Rate(metrics.Count())) +            tags), metrics.Rate(sampled_stat=metrics.Count()))          """          lastHeartbeat = Measurable( diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cd3d48a..50d2806 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,6 +14,8 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor  from .protocol import ConsumerProtocol  from .. import errors as Errors  from ..future import Future +from ..metrics import AnonMeasurable +from ..metrics.stats import Avg, Count, Max, Rate  from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest  from ..structs import OffsetAndMetadata, TopicPartition  from ..util import WeakMethod @@ -36,7 +38,8 @@ class ConsumerCoordinator(BaseCoordinator):          'api_version': (0, 9),      } -    def __init__(self, client, subscription, **configs): +    def __init__(self, client, subscription, metrics, metric_group_prefix, +                 **configs):          """Initialize the coordination manager.          Keyword Arguments: @@ -97,10 +100,8 @@ class ConsumerCoordinator(BaseCoordinator):                  interval = self.config['auto_commit_interval_ms'] / 1000.0                  self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) -        # metrics=None, -        # metric_group_prefix=None, -        # metric_tags=None, -        # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) +        self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, +                                                   self._subscription)      def __del__(self):          if hasattr(self, '_auto_commit_task') and self._auto_commit_task: @@ -470,12 +471,13 @@ class ConsumerCoordinator(BaseCoordinator):          future = Future()          _f = self._client.send(node_id, request) -        _f.add_callback(self._handle_offset_commit_response, offsets, future) +        _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())          _f.add_errback(self._failed_request, node_id, request, future)          return future -    def _handle_offset_commit_response(self, offsets, future, response): -        #self.sensors.commit_latency.record(response.requestLatencyMs()) +    def _handle_offset_commit_response(self, offsets, future, send_time, response): +        # TODO look at adding request_latency_ms to response (like java kafka) +        self._sensors.commit_latency.record((time.time() - send_time) * 1000)          unauthorized_topics = set()          for topic, partitions in response.topics: @@ -720,38 +722,25 @@ class AutoCommitTask(object):          self._reschedule(next_at) -# TODO -"""  class ConsumerCoordinatorMetrics(object): -    def __init__(self, metrics, prefix, tags): +    def __init__(self, metrics, metric_group_prefix, subscription):          self.metrics = metrics -        self.group_name = prefix + "-coordinator-metrics" - -        self.commit_latency = metrics.sensor("commit-latency") -        self.commit_latency.add(metrics.MetricName( -            "commit-latency-avg", self.group_name, -            "The average time taken for a commit request", -            tags), metrics.Avg()) -        self.commit_latency.add(metrics.MetricName( -            "commit-latency-max", self.group_name, -            "The max time taken for a commit request", -            tags), metrics.Max()) -        self.commit_latency.add(metrics.MetricName( -            "commit-rate", self.group_name, -            "The number of commit calls per second", -            tags), metrics.Rate(metrics.Count())) - -        ''' -        def _num_partitions(config, now): -            new Measurable() { -                public double measure(MetricConfig config, long now) { -                    return subscriptions.assignedPartitions().size(); -                } -            }; -        metrics.addMetric(new MetricName("assigned-partitions", -            this.metricGrpName, -            "The number of partitions currently assigned to this consumer", -            tags), -            numParts); -        ''' -""" +        self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix + +        self.commit_latency = metrics.sensor('commit-latency') +        self.commit_latency.add(metrics.metric_name( +            'commit-latency-avg', self.metric_group_name, +            'The average time taken for a commit request'), Avg()) +        self.commit_latency.add(metrics.metric_name( +            'commit-latency-max', self.metric_group_name, +            'The max time taken for a commit request'), Max()) +        self.commit_latency.add(metrics.metric_name( +            'commit-rate', self.metric_group_name, +            'The number of commit calls per second'), Rate(sampled_stat=Count())) + +        num_parts = AnonMeasurable(lambda config, now: +                                   len(subscription.assigned_partitions())) +        metrics.add_metric(metrics.metric_name( +            'assigned-partitions', self.metric_group_name, +            'The number of partitions currently assigned to this consumer'), +            num_parts) diff --git a/kafka/errors.py b/kafka/errors.py index a36ee75..dd64b04 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -361,6 +361,10 @@ class KafkaConfigurationError(KafkaError):      pass +class QuotaViolationError(KafkaError): +    pass + +  class AsyncProducerQueueFull(KafkaError):      def __init__(self, failed_msgs, *args):          super(AsyncProducerQueueFull, self).__init__(*args) diff --git a/kafka/metrics/__init__.py b/kafka/metrics/__init__.py new file mode 100644 index 0000000..dd22f53 --- /dev/null +++ b/kafka/metrics/__init__.py @@ -0,0 +1,13 @@ +from .compound_stat import NamedMeasurable +from .dict_reporter import DictReporter +from .kafka_metric import KafkaMetric +from .measurable import AnonMeasurable +from .metric_config import MetricConfig +from .metric_name import MetricName +from .metrics import Metrics +from .quota import Quota + +__all__ = [ +    'AnonMeasurable', 'DictReporter', 'KafkaMetric', 'MetricConfig', +    'MetricName', 'Metrics', 'NamedMeasurable', 'Quota' +] diff --git a/kafka/metrics/compound_stat.py b/kafka/metrics/compound_stat.py new file mode 100644 index 0000000..09bc24a --- /dev/null +++ b/kafka/metrics/compound_stat.py @@ -0,0 +1,32 @@ +import abc + +from kafka.metrics.stat import AbstractStat + + +class AbstractCompoundStat(AbstractStat): +    """ +    A compound stat is a stat where a single measurement and associated +    data structure feeds many metrics. This is the example for a +    histogram which has many associated percentiles. +    """ +    __metaclass__ = abc.ABCMeta + +    def stats(self): +        """ +        Return list of NamedMeasurable +        """ +        raise NotImplementedError + + +class NamedMeasurable(object): +    def __init__(self, metric_name, measurable_stat): +        self._name = metric_name +        self._stat = measurable_stat + +    @property +    def name(self): +        return self._name + +    @property +    def stat(self): +        return self._stat diff --git a/kafka/metrics/dict_reporter.py b/kafka/metrics/dict_reporter.py new file mode 100644 index 0000000..49af604 --- /dev/null +++ b/kafka/metrics/dict_reporter.py @@ -0,0 +1,81 @@ +import logging +import threading + +from kafka.metrics.metrics_reporter import AbstractMetricsReporter + +logger = logging.getLogger(__name__) + + +class DictReporter(AbstractMetricsReporter): +    """A basic dictionary based metrics reporter. + +    Store all metrics in a two level dictionary of category > name > metric. +    """ +    def __init__(self, prefix=''): +        self._lock = threading.Lock() +        self._prefix = prefix if prefix else ''  # never allow None +        self._store = {} + +    def snapshot(self): +        """ +        Return a nested dictionary snapshot of all metrics and their +        values at this time. Example: +        { +            'category': { +                'metric1_name': 42.0, +                'metric2_name': 'foo' +            } +        } +        """ +        return dict((category, dict((name, metric.value()) +                                    for name, metric in list(metrics.items()))) +                    for category, metrics in +                    list(self._store.items())) + +    def init(self, metrics): +        for metric in metrics: +            self.metric_change(metric) + +    def metric_change(self, metric): +        with self._lock: +            category = self.get_category(metric) +            if category not in self._store: +                self._store[category] = {} +            self._store[category][metric.metric_name.name] = metric + +    def metric_removal(self, metric): +        with self._lock: +            category = self.get_category(metric) +            metrics = self._store.get(category, {}) +            removed = metrics.pop(metric.metric_name.name, None) +            if not metrics: +                self._store.pop(category, None) +            return removed + +    def get_category(self, metric): +        """ +        Return a string category for the metric. + +        The category is made up of this reporter's prefix and the +        metric's group and tags. + +        Examples: +            prefix = 'foo', group = 'bar', tags = {'a': 1, 'b': 2} +            returns: 'foo.bar.a=1,b=2' + +            prefix = 'foo', group = 'bar', tags = None +            returns: 'foo.bar' + +            prefix = None, group = 'bar', tags = None +            returns: 'bar' +        """ +        tags = ','.join('%s=%s' % (k, v) for k, v in +                        sorted(metric.metric_name.tags.items())) +        return '.'.join(x for x in +                        [self._prefix, metric.metric_name.group, tags] if x) + +    def configure(self, configs): +        pass + +    def close(self): +        pass diff --git a/kafka/metrics/kafka_metric.py b/kafka/metrics/kafka_metric.py new file mode 100644 index 0000000..75d32a4 --- /dev/null +++ b/kafka/metrics/kafka_metric.py @@ -0,0 +1,34 @@ +import time + + +class KafkaMetric(object): +    # NOTE java constructor takes a lock instance +    def __init__(self, metric_name, measurable, config): +        if not metric_name: +            raise ValueError('metric_name must be non-empty') +        if not measurable: +            raise ValueError('measurable must be non-empty') +        self._metric_name = metric_name +        self._measurable = measurable +        self._config = config + +    @property +    def metric_name(self): +        return self._metric_name + +    @property +    def measurable(self): +        return self._measurable + +    @property +    def config(self): +        return self._config + +    @config.setter +    def config(self, config): +        self._config = config + +    def value(self, time_ms=None): +        if time_ms is None: +            time_ms = time.time() * 1000 +        return self.measurable.measure(self.config, time_ms) diff --git a/kafka/metrics/measurable.py b/kafka/metrics/measurable.py new file mode 100644 index 0000000..ef096f3 --- /dev/null +++ b/kafka/metrics/measurable.py @@ -0,0 +1,27 @@ +import abc + + +class AbstractMeasurable(object): +    """A measurable quantity that can be registered as a metric""" +    @abc.abstractmethod +    def measure(self, config, now): +        """ +        Measure this quantity and return the result + +        Arguments: +            config (MetricConfig): The configuration for this metric +            now (int): The POSIX time in milliseconds the measurement +                is being taken + +        Returns: +            The measured value +        """ +        raise NotImplementedError + + +class AnonMeasurable(AbstractMeasurable): +    def __init__(self, measure_fn): +        self._measure_fn = measure_fn + +    def measure(self, config, now): +        return float(self._measure_fn(config, now)) diff --git a/kafka/metrics/measurable_stat.py b/kafka/metrics/measurable_stat.py new file mode 100644 index 0000000..dba887d --- /dev/null +++ b/kafka/metrics/measurable_stat.py @@ -0,0 +1,14 @@ +import abc + +from kafka.metrics.measurable import AbstractMeasurable +from kafka.metrics.stat import AbstractStat + + +class AbstractMeasurableStat(AbstractStat, AbstractMeasurable): +    """ +    An AbstractMeasurableStat is an AbstractStat that is also +    an AbstractMeasurable (i.e. can produce a single floating point value). +    This is the interface used for most of the simple statistics such +    as Avg, Max, Count, etc. +    """ +    __metaclass__ = abc.ABCMeta diff --git a/kafka/metrics/metric_config.py b/kafka/metrics/metric_config.py new file mode 100644 index 0000000..e30c477 --- /dev/null +++ b/kafka/metrics/metric_config.py @@ -0,0 +1,31 @@ +import sys + + +class MetricConfig(object): +    """Configuration values for metrics""" +    def __init__(self, quota=None, samples=2, event_window=sys.maxsize, +                 time_window_ms=30 * 1000, tags=None): +        """ +        Arguments: +            quota (Quota, optional): Upper or lower bound of a value. +            samples (int, optional): Max number of samples kept per metric. +            event_window (int, optional): Max number of values per sample. +            time_window_ms (int, optional): Max age of an individual sample. +            tags (dict of {str: str}, optional): Tags for each metric. +        """ +        self.quota = quota +        self._samples = samples +        self.event_window = event_window +        self.time_window_ms = time_window_ms +        # tags should be OrderedDict (not supported in py26) +        self.tags = tags if tags else {} + +    @property +    def samples(self): +        return self._samples + +    @samples.setter +    def samples(self, value): +        if value < 1: +            raise ValueError('The number of samples must be at least 1.') +        self._samples = value diff --git a/kafka/metrics/metric_name.py b/kafka/metrics/metric_name.py new file mode 100644 index 0000000..02068f0 --- /dev/null +++ b/kafka/metrics/metric_name.py @@ -0,0 +1,104 @@ +import copy + + +class MetricName(object): +    """ +    This class encapsulates a metric's name, logical group and its +    related attributes (tags). + +    group, tags parameters can be used to create unique metric names. +    e.g. domainName:type=group,key1=val1,key2=val2 + +    Usage looks something like this: + +        # set up metrics: +        metric_tags = {'client-id': 'producer-1', 'topic': 'topic'} +        metric_config = MetricConfig(tags=metric_tags) + +        # metrics is the global repository of metrics and sensors +        metrics = Metrics(metric_config) + +        sensor = metrics.sensor('message-sizes') +        metric_name = metrics.metric_name('message-size-avg', +                                          'producer-metrics', +                                          'average message size') +        sensor.add(metric_name, Avg()) + +        metric_name = metrics.metric_name('message-size-max', +        sensor.add(metric_name, Max()) + +        tags = {'client-id': 'my-client', 'topic': 'my-topic'} +        metric_name = metrics.metric_name('message-size-min', +                                          'producer-metrics', +                                          'message minimum size', tags) +        sensor.add(metric_name, Min()) + +        # as messages are sent we record the sizes +        sensor.record(message_size) +    """ + +    def __init__(self, name, group, description=None, tags=None): +        """ +        Arguments: +            name (str): The name of the metric. +            group (str): The logical group name of the metrics to which this +                metric belongs. +            description (str, optional): A human-readable description to +                include in the metric. +            tags (dict, optional): Additional key/val attributes of the metric. +        """ +        if not (name and group): +            raise Exception('name and group must be non-empty.') +        if tags is not None and not isinstance(tags, dict): +            raise Exception('tags must be a dict if present.') + +        self._name = name +        self._group = group +        self._description = description +        self._tags = copy.copy(tags) +        self._hash = 0 + +    @property +    def name(self): +        return self._name + +    @property +    def group(self): +        return self._group + +    @property +    def description(self): +        return self._description + +    @property +    def tags(self): +        return copy.copy(self._tags) + +    def __hash__(self): +        if self._hash != 0: +            return self._hash +        prime = 31 +        result = 1 +        result = prime * result + hash(self.group) +        result = prime * result + hash(self.name) +        tags_hash = hash(frozenset(self.tags.items())) if self.tags else 0 +        result = prime * result + tags_hash +        self._hash = result +        return result + +    def __eq__(self, other): +        if self is other: +            return True +        if other is None: +            return False +        return (type(self) == type(other) and +                self.group == other.group and +                self.name == other.name and +                self.tags == other.tags) + +    def __ne__(self, other): +        return not self.__eq__(other) + +    def __str__(self): +        return 'MetricName(name=%s, group=%s, description=%s, tags=%s)' % ( +            self.name, self.group, self.description, self.tags) diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py new file mode 100644 index 0000000..d02f48d --- /dev/null +++ b/kafka/metrics/metrics.py @@ -0,0 +1,257 @@ +import logging +import sys +import time +import threading + +from kafka.metrics import AnonMeasurable, KafkaMetric, MetricConfig, MetricName +from kafka.metrics.stats import Sensor + +logger = logging.getLogger(__name__) + + +class Metrics(object): +    """ +    A registry of sensors and metrics. + +    A metric is a named, numerical measurement. A sensor is a handle to +    record numerical measurements as they occur. Each Sensor has zero or +    more associated metrics. For example a Sensor might represent message +    sizes and we might associate with this sensor a metric for the average, +    maximum, or other statistics computed off the sequence of message sizes +    that are recorded by the sensor. + +    Usage looks something like this: +        # set up metrics: +        metrics = Metrics() # the global repository of metrics and sensors +        sensor = metrics.sensor('message-sizes') +        metric_name = MetricName('message-size-avg', 'producer-metrics') +        sensor.add(metric_name, Avg()) +        metric_name = MetricName('message-size-max', 'producer-metrics') +        sensor.add(metric_name, Max()) + +        # as messages are sent we record the sizes +        sensor.record(message_size); +    """ +    def __init__(self, default_config=None, reporters=None, +                 enable_expiration=False): +        """ +        Create a metrics repository with a default config, given metric +        reporters and the ability to expire eligible sensors + +        Arguments: +            default_config (MetricConfig, optional): The default config +            reporters (list of AbstractMetricsReporter, optional): +                The metrics reporters +            enable_expiration (bool, optional): true if the metrics instance +                can garbage collect inactive sensors, false otherwise +        """ +        self._lock = threading.RLock() +        self._config = default_config or MetricConfig() +        self._sensors = {} +        self._metrics = {} +        self._children_sensors = {} +        self._reporters = reporters or [] +        for reporter in self._reporters: +            reporter.init([]) + +        if enable_expiration: +            def expire_loop(): +                while True: +                    # delay 30 seconds +                    time.sleep(30) +                    self.ExpireSensorTask.run(self) +            metrics_scheduler = threading.Thread(target=expire_loop) +            # Creating a daemon thread to not block shutdown +            metrics_scheduler.daemon = True +            metrics_scheduler.start() + +        self.add_metric(self.metric_name('count', 'kafka-metrics-count', +                                         'total number of registered metrics'), +                        AnonMeasurable(lambda config, now: len(self._metrics))) + +    @property +    def config(self): +        return self._config + +    @property +    def metrics(self): +        """ +        Get all the metrics currently maintained and indexed by metricName +        """ +        return self._metrics + +    def metric_name(self, name, group, description='', tags=None): +        """ +        Create a MetricName with the given name, group, description and tags, +        plus default tags specified in the metric configuration. +        Tag in tags takes precedence if the same tag key is specified in +        the default metric configuration. + +        Arguments: +            name (str): The name of the metric +            group (str): logical group name of the metrics to which this +                metric belongs +            description (str, optional): A human-readable description to +                include in the metric +            tags (dict, optionals): additional key/value attributes of +                the metric +        """ +        combined_tags = dict(self.config.tags) +        combined_tags.update(tags or {}) +        return MetricName(name, group, description, combined_tags) + +    def get_sensor(self, name): +        """ +        Get the sensor with the given name if it exists + +        Arguments: +            name (str): The name of the sensor + +        Returns: +            Sensor: The sensor or None if no such sensor exists +        """ +        if not name: +            raise ValueError('name must be non-empty') +        return self._sensors.get(name, None) + +    def sensor(self, name, config=None, +               inactive_sensor_expiration_time_seconds=sys.maxsize, +               parents=None): +        """ +        Get or create a sensor with the given unique name and zero or +        more parent sensors. All parent sensors will receive every value +        recorded with this sensor. + +        Arguments: +            name (str): The name of the sensor +            config (MetricConfig, optional): A default configuration to use +                for this sensor for metrics that don't have their own config +            inactive_sensor_expiration_time_seconds (int, optional): +                If no value if recorded on the Sensor for this duration of +                time, it is eligible for removal +            parents (list of Sensor): The parent sensors + +        Returns: +            Sensor: The sensor that is created +        """ +        sensor = self.get_sensor(name) +        if sensor: +            return sensor + +        with self._lock: +            sensor = self.get_sensor(name) +            if not sensor: +                sensor = Sensor(self, name, parents, config or self.config, +                                inactive_sensor_expiration_time_seconds) +                self._sensors[name] = sensor +                if parents: +                    for parent in parents: +                        children = self._children_sensors.get(parent) +                        if not children: +                            children = [] +                            self._children_sensors[parent] = children +                        children.append(sensor) +                logger.debug('Added sensor with name %s', name) +            return sensor + +    def remove_sensor(self, name): +        """ +        Remove a sensor (if it exists), associated metrics and its children. + +        Arguments: +            name (str): The name of the sensor to be removed +        """ +        sensor = self._sensors.get(name) +        if sensor: +            child_sensors = None +            with sensor._lock: +                with self._lock: +                    val = self._sensors.pop(name, None) +                    if val and val == sensor: +                        for metric in sensor.metrics: +                            self.remove_metric(metric.metric_name) +                        logger.debug('Removed sensor with name %s', name) +                        child_sensors = self._children_sensors.pop(sensor, None) +            if child_sensors: +                for child_sensor in child_sensors: +                    self.remove_sensor(child_sensor.name) + +    def add_metric(self, metric_name, measurable, config=None): +        """ +        Add a metric to monitor an object that implements measurable. +        This metric won't be associated with any sensor. +        This is a way to expose existing values as metrics. + +        Arguments: +            metricName (MetricName): The name of the metric +            measurable (AbstractMeasurable): The measurable that will be +                measured by this metric +            config (MetricConfig, optional): The configuration to use when +                measuring this measurable +        """ +        # NOTE there was a lock here, but i don't think it's needed +        metric = KafkaMetric(metric_name, measurable, config or self.config) +        self.register_metric(metric) + +    def remove_metric(self, metric_name): +        """ +        Remove a metric if it exists and return it. Return None otherwise. +        If a metric is removed, `metric_removal` will be invoked +        for each reporter. + +        Arguments: +            metric_name (MetricName): The name of the metric + +        Returns: +            KafkaMetric: the removed `KafkaMetric` or None if no such +                metric exists +        """ +        with self._lock: +            metric = self._metrics.pop(metric_name, None) +            if metric: +                for reporter in self._reporters: +                    reporter.metric_removal(metric) +            return metric + +    def add_reporter(self, reporter): +        """Add a MetricReporter""" +        with self._lock: +            reporter.init(list(self.metrics.values())) +            self._reporters.append(reporter) + +    def register_metric(self, metric): +        with self._lock: +            if metric.metric_name in self.metrics: +                raise ValueError('A metric named "%s" already exists, cannot' +                                 ' register another one.' % metric.metric_name) +            self.metrics[metric.metric_name] = metric +            for reporter in self._reporters: +                reporter.metric_change(metric) + +    class ExpireSensorTask(object): +        """ +        This iterates over every Sensor and triggers a remove_sensor +        if it has expired. Package private for testing +        """ +        @staticmethod +        def run(metrics): +            items = list(metrics._sensors.items()) +            for name, sensor in items: +                # remove_sensor also locks the sensor object. This is fine +                # because synchronized is reentrant. There is however a minor +                # race condition here. Assume we have a parent sensor P and +                # child sensor C. Calling record on C would cause a record on +                # P as well. So expiration time for P == expiration time for C. +                # If the record on P happens via C just after P is removed, +                # that will cause C to also get removed. Since the expiration +                # time is typically high it is not expected to be a significant +                # concern and thus not necessary to optimize +                with sensor._lock: +                    if sensor.has_expired(): +                        logger.debug('Removing expired sensor %s', name) +                        metrics.remove_sensor(name) + +    def close(self): +        """Close this metrics repository.""" +        for reporter in self._reporters: +            reporter.close() diff --git a/kafka/metrics/metrics_reporter.py b/kafka/metrics/metrics_reporter.py new file mode 100644 index 0000000..b48ad0b --- /dev/null +++ b/kafka/metrics/metrics_reporter.py @@ -0,0 +1,55 @@ +import abc + + +class AbstractMetricsReporter(object): +    """ +    An abstract class to allow things to listen as new metrics +    are created so they can be reported. +    """ +    __metaclass__ = abc.ABCMeta + +    @abc.abstractmethod +    def init(self, metrics): +        """ +        This is called when the reporter is first registered +        to initially register all existing metrics + +        Arguments: +            metrics (list of KafkaMetric): All currently existing metrics +        """ +        raise NotImplementedError + +    @abc.abstractmethod +    def metric_change(self, metric): +        """ +        This is called whenever a metric is updated or added + +        Arguments: +            metric (KafkaMetric) +        """ +        raise NotImplementedError + +    @abc.abstractmethod +    def metric_removal(self, metric): +        """ +        This is called whenever a metric is removed + +        Arguments: +            metric (KafkaMetric) +        """ +        raise NotImplementedError + +    @abc.abstractmethod +    def configure(self, configs): +        """ +        Configure this class with the given key-value pairs + +        Arguments: +            configs (dict of {str, ?}) +        """ +        raise NotImplementedError + +    @abc.abstractmethod +    def close(self): +        """Called when the metrics repository is closed.""" +        raise NotImplementedError diff --git a/kafka/metrics/quota.py b/kafka/metrics/quota.py new file mode 100644 index 0000000..0410e37 --- /dev/null +++ b/kafka/metrics/quota.py @@ -0,0 +1,39 @@ +class Quota(object): +    """An upper or lower bound for metrics""" +    def __init__(self, bound, is_upper): +        self._bound = bound +        self._upper = is_upper + +    @staticmethod +    def upper_bound(upper_bound): +        return Quota(upper_bound, True) + +    @staticmethod +    def lower_bound(lower_bound): +        return Quota(lower_bound, False) + +    def is_upper_bound(self): +        return self._upper + +    @property +    def bound(self): +        return self._bound + +    def is_acceptable(self, value): +        return ((self.is_upper_bound() and value <= self.bound) or +                (not self.is_upper_bound() and value >= self.bound)) + +    def __hash__(self): +        prime = 31 +        result = prime + self.bound +        return prime * result + self.is_upper_bound() + +    def __eq__(self, other): +        if self is other: +            return True +        return (type(self) == type(other) and +                self.bound == other.bound and +                self.is_upper_bound() == other.is_upper_bound()) + +    def __ne__(self, other): +        return not self.__eq__(other) diff --git a/kafka/metrics/stat.py b/kafka/metrics/stat.py new file mode 100644 index 0000000..c10f3ce --- /dev/null +++ b/kafka/metrics/stat.py @@ -0,0 +1,21 @@ +import abc + + +class AbstractStat(object): +    """ +    An AbstractStat is a quantity such as average, max, etc that is computed +    off the stream of updates to a sensor +    """ +    __metaclass__ = abc.ABCMeta + +    @abc.abstractmethod +    def record(self, config, value, time_ms): +        """ +        Record the given value + +        Arguments: +            config (MetricConfig): The configuration to use for this metric +            value (float): The value to record +            timeMs (int): The POSIX time in milliseconds this value occurred +        """ +        raise NotImplementedError diff --git a/kafka/metrics/stats/__init__.py b/kafka/metrics/stats/__init__.py new file mode 100644 index 0000000..15eafd9 --- /dev/null +++ b/kafka/metrics/stats/__init__.py @@ -0,0 +1,15 @@ +from .avg import Avg +from .count import Count +from .histogram import Histogram +from .max_stat import Max +from .min_stat import Min +from .percentile import Percentile +from .percentiles import Percentiles +from .rate import Rate +from .sensor import Sensor +from .total import Total + +__all__ = [ +    'Avg', 'Count', 'Histogram', 'Max', 'Min', 'Percentile', 'Percentiles', +    'Rate', 'Sensor', 'Total' +] diff --git a/kafka/metrics/stats/avg.py b/kafka/metrics/stats/avg.py new file mode 100644 index 0000000..4d0be0a --- /dev/null +++ b/kafka/metrics/stats/avg.py @@ -0,0 +1,22 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Avg(AbstractSampledStat): +    """ +    An AbstractSampledStat that maintains a simple average over its samples. +    """ +    def __init__(self): +        super(Avg, self).__init__(0.0) + +    def update(self, sample, config, value, now): +        sample.value += value + +    def combine(self, samples, config, now): +        total_sum = 0 +        total_count = 0 +        for sample in samples: +            total_sum += sample.value +            total_count += sample.event_count +        if not total_count: +            return 0 +        return float(total_sum) / total_count diff --git a/kafka/metrics/stats/count.py b/kafka/metrics/stats/count.py new file mode 100644 index 0000000..183e4f2 --- /dev/null +++ b/kafka/metrics/stats/count.py @@ -0,0 +1,15 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Count(AbstractSampledStat): +    """ +    An AbstractSampledStat that maintains a simple count of what it has seen. +    """ +    def __init__(self): +        super(Count, self).__init__(0.0) + +    def update(self, sample, config, value, now): +        sample.value += 1.0 + +    def combine(self, samples, config, now): +        return float(sum(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/histogram.py b/kafka/metrics/stats/histogram.py new file mode 100644 index 0000000..42aacdb --- /dev/null +++ b/kafka/metrics/stats/histogram.py @@ -0,0 +1,93 @@ +import math + + +class Histogram(object): +    def __init__(self, bin_scheme): +        self._hist = [0.0] * bin_scheme.bins +        self._count = 0.0 +        self._bin_scheme = bin_scheme + +    def record(self, value): +        self._hist[self._bin_scheme.to_bin(value)] += 1.0 +        self._count += 1.0 + +    def value(self, quantile): +        if self._count == 0.0: +            return float('NaN') +        _sum = 0.0 +        quant = float(quantile) +        for i, value in enumerate(self._hist[:-1]): +            _sum += value +            if _sum / self._count > quant: +                return self._bin_scheme.from_bin(i) +        return float('inf') + +    @property +    def counts(self): +        return self._hist + +    def clear(self): +        for i in range(self._hist): +            self._hist[i] = 0.0 +        self._count = 0 + +    def __str__(self): +        values = ['%.10f:%.0f' % (self._bin_scheme.from_bin(i), value) for +                  i, value in enumerate(self._hist[:-1])] +        values.append('%s:%s' % (float('inf'), self._hist[-1])) +        return '{%s}' % ','.join(values) + +    class ConstantBinScheme(object): +        def __init__(self, bins, min_val, max_val): +            if bins < 2: +                raise ValueError('Must have at least 2 bins.') +            self._min = float(min_val) +            self._max = float(max_val) +            self._bins = int(bins) +            self._bucket_width = (max_val - min_val) / (bins - 2) + +        @property +        def bins(self): +            return self._bins + +        def from_bin(self, b): +            if b == 0: +                return float('-inf') +            elif b == self._bins - 1: +                return float('inf') +            else: +                return self._min + (b - 1) * self._bucket_width + +        def to_bin(self, x): +            if x < self._min: +                return 0 +            elif x > self._max: +                return self._bins - 1 +            else: +                return int(((x - self._min) / self._bucket_width) + 1) + +    class LinearBinScheme(object): +        def __init__(self, num_bins, max_val): +            self._bins = num_bins +            self._max = max_val +            self._scale = max_val / (num_bins * (num_bins - 1) / 2) + +        @property +        def bins(self): +            return self._bins + +        def from_bin(self, b): +            if b == self._bins - 1: +                return float('inf') +            else: +                unscaled = (b * (b + 1.0)) / 2.0 +                return unscaled * self._scale + +        def to_bin(self, x): +            if x < 0.0: +                raise ValueError('Values less than 0.0 not accepted.') +            elif x > self._max: +                return self._bins - 1 +            else: +                scaled = x / self._scale +                return int(-0.5 + math.sqrt(2.0 * scaled + 0.25)) diff --git a/kafka/metrics/stats/max_stat.py b/kafka/metrics/stats/max_stat.py new file mode 100644 index 0000000..8df54d3 --- /dev/null +++ b/kafka/metrics/stats/max_stat.py @@ -0,0 +1,15 @@ +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Max(AbstractSampledStat): +    """An AbstractSampledStat that gives the max over its samples.""" +    def __init__(self): +        super(Max, self).__init__(float('-inf')) + +    def update(self, sample, config, value, now): +        sample.value = max(sample.value, value) + +    def combine(self, samples, config, now): +        if not samples: +            return float('-inf') +        return float(max(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/min_stat.py b/kafka/metrics/stats/min_stat.py new file mode 100644 index 0000000..a57c2dd --- /dev/null +++ b/kafka/metrics/stats/min_stat.py @@ -0,0 +1,17 @@ +import sys + +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class Min(AbstractSampledStat): +    """An AbstractSampledStat that gives the min over its samples.""" +    def __init__(self): +        super(Min, self).__init__(float(sys.maxsize)) + +    def update(self, sample, config, value, now): +        sample.value = min(sample.value, value) + +    def combine(self, samples, config, now): +        if not samples: +            return float(sys.maxsize) +        return float(min(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/percentile.py b/kafka/metrics/stats/percentile.py new file mode 100644 index 0000000..723b9e6 --- /dev/null +++ b/kafka/metrics/stats/percentile.py @@ -0,0 +1,12 @@ +class Percentile(object): +    def __init__(self, metric_name, percentile): +        self._metric_name = metric_name +        self._percentile = float(percentile) + +    @property +    def name(self): +        return self._metric_name + +    @property +    def percentile(self): +        return self._percentile diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py new file mode 100644 index 0000000..84e7160 --- /dev/null +++ b/kafka/metrics/stats/percentiles.py @@ -0,0 +1,72 @@ +from kafka.metrics import AnonMeasurable, NamedMeasurable +from kafka.metrics.compound_stat import AbstractCompoundStat +from kafka.metrics.stats import Histogram +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class BucketSizing(object): +    CONSTANT = 0 +    LINEAR = 1 + + +class Percentiles(AbstractSampledStat, AbstractCompoundStat): +    """A compound stat that reports one or more percentiles""" +    def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0, +                 percentiles=None): +        super(Percentiles, self).__init__(0.0) +        self._percentiles = percentiles or [] +        self._buckets = int(size_in_bytes / 4) +        if bucketing == BucketSizing.CONSTANT: +            self._bin_scheme = Histogram.ConstantBinScheme(self._buckets, +                                                           min_val, max_val) +        elif bucketing == BucketSizing.LINEAR: +            if min_val != 0.0: +                raise ValueError('Linear bucket sizing requires min_val' +                                 ' to be 0.0.') +            self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val) +        else: +            ValueError('Unknown bucket type: %s' % bucketing) + +    def stats(self): +        measurables = [] + +        def make_measure_fn(pct): +            return lambda config, now: self.value(config, now, +                                                  pct / 100.0) + +        for percentile in self._percentiles: +            measure_fn = make_measure_fn(percentile.percentile) +            stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn)) +            measurables.append(stat) +        return measurables + +    def value(self, config, now, quantile): +        self.purge_obsolete_samples(config, now) +        count = sum(sample.event_count for sample in self._samples) +        if count == 0.0: +            return float('NaN') +        sum_val = 0.0 +        quant = float(quantile) +        for b in range(self._buckets): +            for sample in self._samples: +                assert type(sample) is self.HistogramSample +                hist = sample.histogram.counts +                sum_val += hist[b] +                if sum_val / count > quant: +                    return self._bin_scheme.from_bin(b) +        return float('inf') + +    def combine(self, samples, config, now): +        return self.value(config, now, 0.5) + +    def new_sample(self, time_ms): +        return Percentiles.HistogramSample(self._bin_scheme, time_ms) + +    def update(self, sample, config, value, time_ms): +        assert type(sample) is self.HistogramSample +        sample.histogram.record(value) + +    class HistogramSample(AbstractSampledStat.Sample): +        def __init__(self, scheme, now): +            super(Percentiles.HistogramSample, self).__init__(0.0, now) +            self.histogram = Histogram(scheme) diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py new file mode 100644 index 0000000..3ce2e74 --- /dev/null +++ b/kafka/metrics/stats/rate.py @@ -0,0 +1,115 @@ +from kafka.metrics.measurable_stat import AbstractMeasurableStat +from kafka.metrics.stats.sampled_stat import AbstractSampledStat + + +class TimeUnit(object): +    _names = { +        'nanosecond': 0, +        'microsecond': 1, +        'millisecond': 2, +        'second': 3, +        'minute': 4, +        'hour':  5, +        'day': 6, +    } + +    NANOSECONDS = _names['nanosecond'] +    MICROSECONDS = _names['microsecond'] +    MILLISECONDS = _names['millisecond'] +    SECONDS = _names['second'] +    MINUTES = _names['minute'] +    HOURS = _names['hour'] +    DAYS = _names['day'] + +    @staticmethod +    def get_name(time_unit): +        return TimeUnit._names[time_unit] + + +class Rate(AbstractMeasurableStat): +    """ +    The rate of the given quantity. By default this is the total observed +    over a set of samples from a sampled statistic divided by the elapsed +    time over the sample windows. Alternative AbstractSampledStat +    implementations can be provided, however, to record the rate of +    occurrences (e.g. the count of values measured over the time interval) +    or other such values. +    """ +    def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None): +        self._stat = sampled_stat or SampledTotal() +        self._unit = time_unit + +    def unit_name(self): +        return TimeUnit.get_name(self._unit) + +    def record(self, config, value, time_ms): +        self._stat.record(config, value, time_ms) + +    def measure(self, config, now): +        value = self._stat.measure(config, now) +        return float(value) / self.convert(self.window_size(config, now)) + +    def window_size(self, config, now): +        # purge old samples before we compute the window size +        self._stat.purge_obsolete_samples(config, now) + +        """ +        Here we check the total amount of time elapsed since the oldest +        non-obsolete window. This give the total window_size of the batch +        which is the time used for Rate computation. However, there is +        an issue if we do not have sufficient data for e.g. if only +        1 second has elapsed in a 30 second window, the measured rate +        will be very high. Hence we assume that the elapsed time is +        always N-1 complete windows plus whatever fraction of the final +        window is complete. + +        Note that we could simply count the amount of time elapsed in +        the current window and add n-1 windows to get the total time, +        but this approach does not account for sleeps. AbstractSampledStat +        only creates samples whenever record is called, if no record is +        called for a period of time that time is not accounted for in +        window_size and produces incorrect results. +        """ +        total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms +        # Check how many full windows of data we have currently retained +        num_full_windows = int(total_elapsed_time_ms / config.time_window_ms) +        min_full_windows = config.samples - 1 + +        # If the available windows are less than the minimum required, +        # add the difference to the totalElapsedTime +        if num_full_windows < min_full_windows: +            total_elapsed_time_ms += ((min_full_windows - num_full_windows) * +                                      config.time_window_ms) + +        return total_elapsed_time_ms + +    def convert(self, time_ms): +        if self._unit == TimeUnit.NANOSECONDS: +            return time_ms * 1000.0 * 1000.0 +        elif self._unit == TimeUnit.MICROSECONDS: +            return time_ms * 1000.0 +        elif self._unit == TimeUnit.MILLISECONDS: +            return time_ms +        elif self._unit == TimeUnit.SECONDS: +            return time_ms / 1000.0 +        elif self._unit == TimeUnit.MINUTES: +            return time_ms / (60.0 * 1000.0) +        elif self._unit == TimeUnit.HOURS: +            return time_ms / (60.0 * 60.0 * 1000.0) +        elif self._unit == TimeUnit.DAYS: +            return time_ms / (24.0 * 60.0 * 60.0 * 1000.0) +        else: +            raise ValueError('Unknown unit: %s' % self._unit) + + +class SampledTotal(AbstractSampledStat): +    def __init__(self, initial_value=None): +        if initial_value is not None: +            raise ValueError('initial_value cannot be set on SampledTotal') +        super(SampledTotal, self).__init__(0.0) + +    def update(self, sample, config, value, time_ms): +        sample.value += value + +    def combine(self, samples, config, now): +        return float(sum(sample.value for sample in samples)) diff --git a/kafka/metrics/stats/sampled_stat.py b/kafka/metrics/stats/sampled_stat.py new file mode 100644 index 0000000..ca0db69 --- /dev/null +++ b/kafka/metrics/stats/sampled_stat.py @@ -0,0 +1,99 @@ +import abc + +from kafka.metrics.measurable_stat import AbstractMeasurableStat + + +class AbstractSampledStat(AbstractMeasurableStat): +    """ +    An AbstractSampledStat records a single scalar value measured over +    one or more samples. Each sample is recorded over a configurable +    window. The window can be defined by number of events or elapsed +    time (or both, if both are given the window is complete when +    *either* the event count or elapsed time criterion is met). + +    All the samples are combined to produce the measurement. When a +    window is complete the oldest sample is cleared and recycled to +    begin recording the next sample. + +    Subclasses of this class define different statistics measured +    using this basic pattern. +    """ +    __metaclass__ = abc.ABCMeta + +    def __init__(self, initial_value): +        self._initial_value = initial_value +        self._samples = [] +        self._current = 0 + +    @abc.abstractmethod +    def update(self, sample, config, value, time_ms): +        raise NotImplementedError + +    @abc.abstractmethod +    def combine(self, samples, config, now): +        raise NotImplementedError + +    def record(self, config, value, time_ms): +        sample = self.current(time_ms) +        if sample.is_complete(time_ms, config): +            sample = self._advance(config, time_ms) +        self.update(sample, config, float(value), time_ms) +        sample.event_count += 1 + +    def new_sample(self, time_ms): +        return self.Sample(self._initial_value, time_ms) + +    def measure(self, config, now): +        self.purge_obsolete_samples(config, now) +        return float(self.combine(self._samples, config, now)) + +    def current(self, time_ms): +        if not self._samples: +            self._samples.append(self.new_sample(time_ms)) +        return self._samples[self._current] + +    def oldest(self, now): +        if not self._samples: +            self._samples.append(self.new_sample(now)) +        oldest = self._samples[0] +        for sample in self._samples[1:]: +            if sample.last_window_ms < oldest.last_window_ms: +                oldest = sample +        return oldest + +    def purge_obsolete_samples(self, config, now): +        """ +        Timeout any windows that have expired in the absence of any events +        """ +        expire_age = config.samples * config.time_window_ms +        for sample in self._samples: +            if now - sample.last_window_ms >= expire_age: +                sample.reset(now) + +    def _advance(self, config, time_ms): +        self._current = (self._current + 1) % config.samples +        if self._current >= len(self._samples): +            sample = self.new_sample(time_ms) +            self._samples.append(sample) +            return sample +        else: +            sample = self.current(time_ms) +            sample.reset(time_ms) +            return sample + +    class Sample(object): + +        def __init__(self, initial_value, now): +            self.initial_value = initial_value +            self.event_count = 0 +            self.last_window_ms = now +            self.value = initial_value + +        def reset(self, now): +            self.event_count = 0 +            self.last_window_ms = now +            self.value = self.initial_value + +        def is_complete(self, time_ms, config): +            return (time_ms - self.last_window_ms >= config.time_window_ms or +                    self.event_count >= config.event_window) diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py new file mode 100644 index 0000000..6878096 --- /dev/null +++ b/kafka/metrics/stats/sensor.py @@ -0,0 +1,132 @@ +import threading +import time + +from kafka.errors import QuotaViolationError +from kafka.metrics import KafkaMetric + + +class Sensor(object): +    """ +    A sensor applies a continuous sequence of numerical values +    to a set of associated metrics. For example a sensor on +    message size would record a sequence of message sizes using +    the `record(double)` api and would maintain a set +    of metrics about request sizes such as the average or max. +    """ +    def __init__(self, registry, name, parents, config, +                 inactive_sensor_expiration_time_seconds): +        if not name: +            raise ValueError('name must be non-empty') +        self._lock = threading.RLock() +        self._registry = registry +        self._name = name +        self._parents = parents or [] +        self._metrics = [] +        self._stats = [] +        self._config = config +        self._inactive_sensor_expiration_time_ms = ( +            inactive_sensor_expiration_time_seconds * 1000) +        self._last_record_time = time.time() * 1000 +        self._check_forest(set()) + +    def _check_forest(self, sensors): +        """Validate that this sensor doesn't end up referencing itself.""" +        if self in sensors: +            raise ValueError('Circular dependency in sensors: %s is its own' +                             'parent.' % self.name) +        sensors.add(self) +        for parent in self._parents: +            parent._check_forest(sensors) + +    @property +    def name(self): +        """ +        The name this sensor is registered with. +        This name will be unique among all registered sensors. +        """ +        return self._name + +    @property +    def metrics(self): +        return tuple(self._metrics) + +    def record(self, value=1.0, time_ms=None): +        """ +        Record a value at a known time. +        Arguments: +            value (double): The value we are recording +            time_ms (int): The current POSIX time in milliseconds + +        Raises: +            QuotaViolationException: if recording this value moves a +                metric beyond its configured maximum or minimum bound +        """ +        now = time.time() * 1000 +        if time_ms is None: +            time_ms = now +        self._last_record_time = now +        with self._lock:  # XXX high volume, might be performance issue +            # increment all the stats +            for stat in self._stats: +                stat.record(self._config, value, time_ms) +            self._check_quotas(time_ms) +        for parent in self._parents: +            parent.record(value, time_ms) + +    def _check_quotas(self, time_ms): +        """ +        Check if we have violated our quota for any metric that +        has a configured quota +        """ +        for metric in self._metrics: +            if metric.config and metric.config.quota: +                value = metric.value(time_ms) +                if not metric.config.quota.is_acceptable(value): +                    raise QuotaViolationError('(%s) violated quota. Actual: ' +                                              '(%d), Threshold: (%d)' % +                                              (metric.metric_name, +                                               metric.config.quota.bound, +                                               value)) + +    def add_compound(self, compound_stat, config=None): +        """ +        Register a compound statistic with this sensor which +        yields multiple measurable quantities (like a histogram) + +        Arguments: +            stat (AbstractCompoundStat): The stat to register +            config (MetricConfig): The configuration for this stat. +                If None then the stat will use the default configuration +                for this sensor. +        """ +        if not compound_stat: +            raise ValueError('compound stat must be non-empty') +        self._stats.append(compound_stat) +        for named_measurable in compound_stat.stats(): +            metric = KafkaMetric(named_measurable.name, named_measurable.stat, +                                 config or self._config) +            self._registry.register_metric(metric) +            self._metrics.append(metric) + +    def add(self, metric_name, stat, config=None): +        """ +        Register a metric with this sensor + +        Arguments: +            metric_name (MetricName): The name of the metric +            stat (AbstractMeasurableStat): The statistic to keep +            config (MetricConfig): A special configuration for this metric. +                If None use the sensor default configuration. +        """ +        with self._lock: +            metric = KafkaMetric(metric_name, stat, config or self._config) +            self._registry.register_metric(metric) +            self._metrics.append(metric) +            self._stats.append(stat) + +    def has_expired(self): +        """ +        Return True if the Sensor is eligible for removal due to inactivity. +        """ +        return ((time.time() * 1000 - self._last_record_time) > +                self._inactive_sensor_expiration_time_ms) diff --git a/kafka/metrics/stats/total.py b/kafka/metrics/stats/total.py new file mode 100644 index 0000000..76a82d8 --- /dev/null +++ b/kafka/metrics/stats/total.py @@ -0,0 +1,13 @@ +from kafka.metrics.measurable_stat import AbstractMeasurableStat + + +class Total(AbstractMeasurableStat): +    """An un-windowed cumulative total maintained over all time.""" +    def __init__(self, value=0.0): +        self._total = value + +    def record(self, config, value, now): +        self._total += value + +    def measure(self, config, now): +        return float(self._total) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 399609d..4b90f30 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -1,5 +1,6 @@  # pylint: skip-file  from __future__ import absolute_import +import time  import pytest @@ -14,6 +15,7 @@ from kafka.coordinator.protocol import (      ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)  import kafka.errors as Errors  from kafka.future import Future +from kafka.metrics import Metrics  from kafka.protocol.commit import (      OffsetCommitRequest, OffsetCommitResponse,      OffsetFetchRequest, OffsetFetchResponse) @@ -23,12 +25,14 @@ from kafka.util import WeakMethod  @pytest.fixture  def coordinator(conn): -    return ConsumerCoordinator(KafkaClient(), SubscriptionState()) +    return ConsumerCoordinator(KafkaClient(), SubscriptionState(), Metrics(), +                               'consumer')  def test_init(conn):      cli = KafkaClient() -    coordinator = ConsumerCoordinator(cli, SubscriptionState()) +    coordinator = ConsumerCoordinator(cli, SubscriptionState(), Metrics(), +                                      'consumer')      # metadata update on init       assert cli.cluster._need_update is True @@ -38,6 +42,7 @@ def test_init(conn):  @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])  def test_autocommit_enable_api_version(conn, api_version):      coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), +                                      Metrics(), 'consumer',                                        enable_auto_commit=True,                                        group_id='foobar',                                        api_version=api_version) @@ -354,6 +359,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,      mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning')      mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')      coordinator = ConsumerCoordinator(KafkaClient(), SubscriptionState(), +                                      Metrics(), 'consumer',                                        api_version=api_version,                                        enable_auto_commit=enable,                                        group_id=group_id) @@ -441,7 +447,7 @@ def test_send_offset_commit_request_failure(patched_coord, offsets):      assert future.exception is error -def test_send_offset_commit_request_success(patched_coord, offsets): +def test_send_offset_commit_request_success(mocker, patched_coord, offsets):      _f = Future()      patched_coord._client.send.return_value = _f      future = patched_coord._send_offset_commit_request(offsets) @@ -449,7 +455,7 @@ def test_send_offset_commit_request_success(patched_coord, offsets):      response = OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])])      _f.success(response)      patched_coord._handle_offset_commit_response.assert_called_with( -        offsets, future, response)  +        offsets, future, mocker.ANY, response)  @pytest.mark.parametrize('response,error,dead,reassign', [ @@ -478,10 +484,11 @@ def test_send_offset_commit_request_success(patched_coord, offsets):      (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]),       Errors.TopicAuthorizationFailedError, False, False),  ]) -def test_handle_offset_commit_response(patched_coord, offsets, +def test_handle_offset_commit_response(mocker, patched_coord, offsets,                                         response, error, dead, reassign):      future = Future() -    patched_coord._handle_offset_commit_response(offsets, future, response) +    patched_coord._handle_offset_commit_response(offsets, future, time.time(), +                                                 response)      assert isinstance(future.exception, error)      assert patched_coord.coordinator_id is (None if dead else 0)      assert patched_coord._subscription.needs_partition_assignment is reassign diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 644adfa..bf4a3a9 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -8,6 +8,7 @@ from kafka.consumer.fetcher import Fetcher  from kafka.consumer.subscription_state import SubscriptionState  import kafka.errors as Errors  from kafka.future import Future +from kafka.metrics import Metrics  from kafka.protocol.fetch import FetchRequest  from kafka.structs import TopicPartition, OffsetAndMetadata @@ -29,7 +30,7 @@ def fetcher(client, subscription_state):      subscription_state.assign_from_subscribed(assignment)      for tp in assignment:          subscription_state.seek(tp, 0) -    return Fetcher(client, subscription_state) +    return Fetcher(client, subscription_state, Metrics(), 'test_fetcher')  def test_init_fetches(fetcher, mocker): diff --git a/test/test_metrics.py b/test/test_metrics.py new file mode 100644 index 0000000..e4757d6 --- /dev/null +++ b/test/test_metrics.py @@ -0,0 +1,499 @@ +import sys +import time + +import pytest + +from kafka.errors import QuotaViolationError +from kafka.metrics import DictReporter, MetricConfig, MetricName, Metrics, Quota +from kafka.metrics.measurable import AbstractMeasurable +from kafka.metrics.stats import (Avg, Count, Max, Min, Percentile, Percentiles, +                                 Rate, Total) +from kafka.metrics.stats.percentiles import BucketSizing +from kafka.metrics.stats.rate import TimeUnit + +EPS = 0.000001 + + +@pytest.fixture +def time_keeper(): +    return TimeKeeper() + + +@pytest.fixture +def config(): +    return MetricConfig() + + +@pytest.fixture +def reporter(): +    return DictReporter() + + +@pytest.fixture +def metrics(request, config, reporter): +    metrics = Metrics(config, [reporter], enable_expiration=True) +    request.addfinalizer(lambda: metrics.close()) +    return metrics + + +def test_MetricName(): +    # The Java test only cover the differences between the deprecated +    # constructors, so I'm skipping them but doing some other basic testing. + +    # In short, metrics should be equal IFF their name, group, and tags are +    # the same. Descriptions do not matter. +    name1 = MetricName('name', 'group', 'A metric.', {'a': 1, 'b': 2}) +    name2 = MetricName('name', 'group', 'A description.', {'a': 1, 'b': 2}) +    assert name1 == name2 + +    name1 = MetricName('name', 'group', tags={'a': 1, 'b': 2}) +    name2 = MetricName('name', 'group', tags={'a': 1, 'b': 2}) +    assert name1 == name2 + +    name1 = MetricName('foo', 'group') +    name2 = MetricName('name', 'group') +    assert name1 != name2 + +    name1 = MetricName('name', 'foo') +    name2 = MetricName('name', 'group') +    assert name1 != name2 + +    # name and group must be non-empty. Everything else is optional. +    with pytest.raises(Exception): +        MetricName('', 'group') +    with pytest.raises(Exception): +        MetricName('name', None) +    # tags must be a dict if supplied +    with pytest.raises(Exception): +        MetricName('name', 'group', tags=set()) + +    # Because of the implementation of __eq__ and __hash__, the values of +    # a MetricName cannot be mutable. +    tags = {'a': 1} +    name = MetricName('name', 'group', 'description', tags=tags) +    with pytest.raises(AttributeError): +        name.name = 'new name' +    with pytest.raises(AttributeError): +        name.group = 'new name' +    with pytest.raises(AttributeError): +        name.tags = {} +    # tags is a copy, so the instance isn't altered +    name.tags['b'] = 2 +    assert name.tags == tags + + +def test_simple_stats(mocker, time_keeper, config, metrics): +    mocker.patch('time.time', side_effect=time_keeper.time) + +    measurable = ConstantMeasurable() + +    metrics.add_metric(metrics.metric_name('direct.measurable', 'grp1', +                                            'The fraction of time an appender waits for space allocation.'), +                        measurable) +    sensor = metrics.sensor('test.sensor') +    sensor.add(metrics.metric_name('test.avg', 'grp1'), Avg()) +    sensor.add(metrics.metric_name('test.max', 'grp1'), Max()) +    sensor.add(metrics.metric_name('test.min', 'grp1'), Min()) +    sensor.add(metrics.metric_name('test.rate', 'grp1'), Rate(TimeUnit.SECONDS)) +    sensor.add(metrics.metric_name('test.occurences', 'grp1'),Rate(TimeUnit.SECONDS, Count())) +    sensor.add(metrics.metric_name('test.count', 'grp1'), Count()) +    percentiles = [Percentile(metrics.metric_name('test.median', 'grp1'), 50.0), +                Percentile(metrics.metric_name('test.perc99_9', 'grp1'), 99.9)] +    sensor.add_compound(Percentiles(100, BucketSizing.CONSTANT, 100, -100, +                        percentiles=percentiles)) + +    sensor2 = metrics.sensor('test.sensor2') +    sensor2.add(metrics.metric_name('s2.total', 'grp1'), Total()) +    sensor2.record(5.0) + +    sum_val = 0 +    count = 10 +    for i in range(count): +        sensor.record(i) +        sum_val += i + +    # prior to any time passing +    elapsed_secs = (config.time_window_ms * (config.samples - 1)) / 1000.0 +    assert abs(count / elapsed_secs - +            metrics.metrics.get(metrics.metric_name('test.occurences', 'grp1')).value()) \ +            < EPS, 'Occurrences(0...%d) = %f' % (count, count / elapsed_secs) + +    # pretend 2 seconds passed... +    sleep_time_seconds = 2.0 +    time_keeper.sleep(sleep_time_seconds) +    elapsed_secs += sleep_time_seconds + +    assert abs(5.0 - metrics.metrics.get(metrics.metric_name('s2.total', 'grp1')).value()) \ +            < EPS, 's2 reflects the constant value' +    assert abs(4.5 - metrics.metrics.get(metrics.metric_name('test.avg', 'grp1')).value()) \ +            < EPS, 'Avg(0...9) = 4.5' +    assert abs((count - 1) - metrics.metrics.get(metrics.metric_name('test.max', 'grp1')).value()) \ +            < EPS, 'Max(0...9) = 9' +    assert abs(0.0 - metrics.metrics.get(metrics.metric_name('test.min', 'grp1')).value()) \ +            < EPS, 'Min(0...9) = 0' +    assert abs((sum_val / elapsed_secs) - metrics.metrics.get(metrics.metric_name('test.rate', 'grp1')).value()) \ +            < EPS, 'Rate(0...9) = 1.40625' +    assert abs((count / elapsed_secs) - metrics.metrics.get(metrics.metric_name('test.occurences', 'grp1')).value()) \ +            < EPS, 'Occurrences(0...%d) = %f' % (count, count / elapsed_secs) +    assert abs(count - metrics.metrics.get(metrics.metric_name('test.count', 'grp1')).value()) \ +            < EPS, 'Count(0...9) = 10' + + +def test_hierarchical_sensors(metrics): +    parent1 = metrics.sensor('test.parent1') +    parent1.add(metrics.metric_name('test.parent1.count', 'grp1'), Count()) +    parent2 = metrics.sensor('test.parent2') +    parent2.add(metrics.metric_name('test.parent2.count', 'grp1'), Count()) +    child1 = metrics.sensor('test.child1', parents=[parent1, parent2]) +    child1.add(metrics.metric_name('test.child1.count', 'grp1'), Count()) +    child2 = metrics.sensor('test.child2', parents=[parent1]) +    child2.add(metrics.metric_name('test.child2.count', 'grp1'), Count()) +    grandchild = metrics.sensor('test.grandchild', parents=[child1]) +    grandchild.add(metrics.metric_name('test.grandchild.count', 'grp1'), Count()) + +    # increment each sensor one time +    parent1.record() +    parent2.record() +    child1.record() +    child2.record() +    grandchild.record() + +    p1 = parent1.metrics[0].value() +    p2 = parent2.metrics[0].value() +    c1 = child1.metrics[0].value() +    c2 = child2.metrics[0].value() +    gc = grandchild.metrics[0].value() + +    # each metric should have a count equal to one + its children's count +    assert 1.0 == gc +    assert 1.0 + gc == c1 +    assert 1.0 == c2 +    assert 1.0 + c1 == p2 +    assert 1.0 + c1 + c2 == p1 +    assert [child1, child2] == metrics._children_sensors.get(parent1) +    assert [child1] == metrics._children_sensors.get(parent2) +    assert metrics._children_sensors.get(grandchild) is None + + +def test_bad_sensor_hierarchy(metrics): +    parent = metrics.sensor('parent') +    child1 = metrics.sensor('child1', parents=[parent]) +    child2 = metrics.sensor('child2', parents=[parent]) + +    with pytest.raises(ValueError): +        metrics.sensor('gc', parents=[child1, child2]) + + +def test_remove_sensor(metrics): +    size = len(metrics.metrics) +    parent1 = metrics.sensor('test.parent1') +    parent1.add(metrics.metric_name('test.parent1.count', 'grp1'), Count()) +    parent2 = metrics.sensor('test.parent2') +    parent2.add(metrics.metric_name('test.parent2.count', 'grp1'), Count()) +    child1 = metrics.sensor('test.child1', parents=[parent1, parent2]) +    child1.add(metrics.metric_name('test.child1.count', 'grp1'), Count()) +    child2 = metrics.sensor('test.child2', parents=[parent2]) +    child2.add(metrics.metric_name('test.child2.count', 'grp1'), Count()) +    grandchild1 = metrics.sensor('test.gchild2', parents=[child2]) +    grandchild1.add(metrics.metric_name('test.gchild2.count', 'grp1'), Count()) + +    sensor = metrics.get_sensor('test.parent1') +    assert sensor is not None +    metrics.remove_sensor('test.parent1') +    assert metrics.get_sensor('test.parent1') is None +    assert metrics.metrics.get(metrics.metric_name('test.parent1.count', 'grp1')) is None +    assert metrics.get_sensor('test.child1') is None +    assert metrics._children_sensors.get(sensor) is None +    assert metrics.metrics.get(metrics.metric_name('test.child1.count', 'grp1')) is None + +    sensor = metrics.get_sensor('test.gchild2') +    assert sensor is not None +    metrics.remove_sensor('test.gchild2') +    assert metrics.get_sensor('test.gchild2') is None +    assert metrics._children_sensors.get(sensor) is None +    assert metrics.metrics.get(metrics.metric_name('test.gchild2.count', 'grp1')) is None + +    sensor = metrics.get_sensor('test.child2') +    assert sensor is not None +    metrics.remove_sensor('test.child2') +    assert metrics.get_sensor('test.child2') is None +    assert metrics._children_sensors.get(sensor) is None +    assert metrics.metrics.get(metrics.metric_name('test.child2.count', 'grp1')) is None + +    sensor = metrics.get_sensor('test.parent2') +    assert sensor is not None +    metrics.remove_sensor('test.parent2') +    assert metrics.get_sensor('test.parent2') is None +    assert metrics._children_sensors.get(sensor) is None +    assert metrics.metrics.get(metrics.metric_name('test.parent2.count', 'grp1')) is None + +    assert size == len(metrics.metrics) + + +def test_remove_inactive_metrics(mocker, time_keeper, metrics): +    mocker.patch('time.time', side_effect=time_keeper.time) + +    s1 = metrics.sensor('test.s1', None, 1) +    s1.add(metrics.metric_name('test.s1.count', 'grp1'), Count()) + +    s2 = metrics.sensor('test.s2', None, 3) +    s2.add(metrics.metric_name('test.s2.count', 'grp1'), Count()) + +    purger = Metrics.ExpireSensorTask +    purger.run(metrics) +    assert metrics.get_sensor('test.s1') is not None, \ +            'Sensor test.s1 must be present' +    assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is not None, \ +            'MetricName test.s1.count must be present' +    assert metrics.get_sensor('test.s2') is not None, \ +            'Sensor test.s2 must be present' +    assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \ +            'MetricName test.s2.count must be present' + +    time_keeper.sleep(1.001) +    purger.run(metrics) +    assert metrics.get_sensor('test.s1') is None, \ +            'Sensor test.s1 should have been purged' +    assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is None, \ +            'MetricName test.s1.count should have been purged' +    assert metrics.get_sensor('test.s2') is not None, \ +            'Sensor test.s2 must be present' +    assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \ +            'MetricName test.s2.count must be present' + +    # record a value in sensor s2. This should reset the clock for that sensor. +    # It should not get purged at the 3 second mark after creation +    s2.record() + +    time_keeper.sleep(2) +    purger.run(metrics) +    assert metrics.get_sensor('test.s2') is not None, \ +            'Sensor test.s2 must be present' +    assert metrics.metrics.get(metrics.metric_name('test.s2.count', 'grp1')) is not None, \ +            'MetricName test.s2.count must be present' + +    # After another 1 second sleep, the metric should be purged +    time_keeper.sleep(1) +    purger.run(metrics) +    assert metrics.get_sensor('test.s1') is None, \ +            'Sensor test.s2 should have been purged' +    assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is None, \ +            'MetricName test.s2.count should have been purged' + +    # After purging, it should be possible to recreate a metric +    s1 = metrics.sensor('test.s1', None, 1) +    s1.add(metrics.metric_name('test.s1.count', 'grp1'), Count()) +    assert metrics.get_sensor('test.s1') is not None, \ +        'Sensor test.s1 must be present' +    assert metrics.metrics.get(metrics.metric_name('test.s1.count', 'grp1')) is not None, \ +            'MetricName test.s1.count must be present' + + +def test_remove_metric(metrics): +    size = len(metrics.metrics) +    metrics.add_metric(metrics.metric_name('test1', 'grp1'), Count()) +    metrics.add_metric(metrics.metric_name('test2', 'grp1'), Count()) + +    assert metrics.remove_metric(metrics.metric_name('test1', 'grp1')) is not None +    assert metrics.metrics.get(metrics.metric_name('test1', 'grp1')) is None +    assert metrics.metrics.get(metrics.metric_name('test2', 'grp1')) is not None + +    assert metrics.remove_metric(metrics.metric_name('test2', 'grp1')) is not None +    assert metrics.metrics.get(metrics.metric_name('test2', 'grp1')) is None + +    assert size == len(metrics.metrics) + + +def test_event_windowing(mocker, time_keeper): +    mocker.patch('time.time', side_effect=time_keeper.time) + +    count = Count() +    config = MetricConfig(event_window=1, samples=2) +    count.record(config, 1.0, time_keeper.ms()) +    count.record(config, 1.0, time_keeper.ms()) +    assert 2.0 == count.measure(config, time_keeper.ms()) +    count.record(config, 1.0, time_keeper.ms())  # first event times out +    assert 2.0 == count.measure(config, time_keeper.ms()) + + +def test_time_windowing(mocker, time_keeper): +    mocker.patch('time.time', side_effect=time_keeper.time) + +    count = Count() +    config = MetricConfig(time_window_ms=1, samples=2) +    count.record(config, 1.0, time_keeper.ms()) +    time_keeper.sleep(.001) +    count.record(config, 1.0, time_keeper.ms()) +    assert 2.0 == count.measure(config, time_keeper.ms()) +    time_keeper.sleep(.001) +    count.record(config, 1.0, time_keeper.ms())  # oldest event times out +    assert 2.0 == count.measure(config, time_keeper.ms()) + + +def test_old_data_has_no_effect(mocker, time_keeper): +    mocker.patch('time.time', side_effect=time_keeper.time) + +    max_stat = Max() +    min_stat = Min() +    avg_stat = Avg() +    count_stat = Count() +    window_ms = 100 +    samples = 2 +    config = MetricConfig(time_window_ms=window_ms, samples=samples) +    max_stat.record(config, 50, time_keeper.ms()) +    min_stat.record(config, 50, time_keeper.ms()) +    avg_stat.record(config, 50, time_keeper.ms()) +    count_stat.record(config, 50, time_keeper.ms()) + +    time_keeper.sleep(samples * window_ms / 1000.0) +    assert float('-inf') == max_stat.measure(config, time_keeper.ms()) +    assert float(sys.maxsize) == min_stat.measure(config, time_keeper.ms()) +    assert 0.0 == avg_stat.measure(config, time_keeper.ms()) +    assert 0 == count_stat.measure(config, time_keeper.ms()) + + +def test_duplicate_MetricName(metrics): +    metrics.sensor('test').add(metrics.metric_name('test', 'grp1'), Avg()) +    with pytest.raises(ValueError): +        metrics.sensor('test2').add(metrics.metric_name('test', 'grp1'), Total()) + + +def test_Quotas(metrics): +    sensor = metrics.sensor('test') +    sensor.add(metrics.metric_name('test1.total', 'grp1'), Total(), +               MetricConfig(quota=Quota.upper_bound(5.0))) +    sensor.add(metrics.metric_name('test2.total', 'grp1'), Total(), +               MetricConfig(quota=Quota.lower_bound(0.0))) +    sensor.record(5.0) +    with pytest.raises(QuotaViolationError): +        sensor.record(1.0) + +    assert abs(6.0 - metrics.metrics.get(metrics.metric_name('test1.total', 'grp1')).value()) \ +            < EPS + +    sensor.record(-6.0) +    with pytest.raises(QuotaViolationError): +        sensor.record(-1.0) + + +def test_Quotas_equality(): +    quota1 = Quota.upper_bound(10.5) +    quota2 = Quota.lower_bound(10.5) +    assert quota1 != quota2, 'Quota with different upper values should not be equal' + +    quota3 = Quota.lower_bound(10.5) +    assert quota2 == quota3, 'Quota with same upper and bound values should be equal' + + +def test_Percentiles(metrics): +    buckets = 100 +    _percentiles = [ +        Percentile(metrics.metric_name('test.p25', 'grp1'), 25), +        Percentile(metrics.metric_name('test.p50', 'grp1'), 50), +        Percentile(metrics.metric_name('test.p75', 'grp1'), 75), +    ] +    percs = Percentiles(4 * buckets, BucketSizing.CONSTANT, 100.0, 0.0, +                        percentiles=_percentiles) +    config = MetricConfig(event_window=50, samples=2) +    sensor = metrics.sensor('test', config) +    sensor.add_compound(percs) +    p25 = metrics.metrics.get(metrics.metric_name('test.p25', 'grp1')) +    p50 = metrics.metrics.get(metrics.metric_name('test.p50', 'grp1')) +    p75 = metrics.metrics.get(metrics.metric_name('test.p75', 'grp1')) + +    # record two windows worth of sequential values +    for i in range(buckets): +        sensor.record(i) + +    assert abs(p25.value() - 25) < 1.0 +    assert abs(p50.value() - 50) < 1.0 +    assert abs(p75.value() - 75) < 1.0 + +    for i in range(buckets): +        sensor.record(0.0) + +    assert p25.value() < 1.0 +    assert p50.value() < 1.0 +    assert p75.value() < 1.0 + +def test_rate_windowing(mocker, time_keeper, metrics): +    mocker.patch('time.time', side_effect=time_keeper.time) + +    # Use the default time window. Set 3 samples +    config = MetricConfig(samples=3) +    sensor = metrics.sensor('test.sensor', config) +    sensor.add(metrics.metric_name('test.rate', 'grp1'), Rate(TimeUnit.SECONDS)) + +    sum_val = 0 +    count = config.samples - 1 +    # Advance 1 window after every record +    for i in range(count): +        sensor.record(100) +        sum_val += 100 +        time_keeper.sleep(config.time_window_ms / 1000.0) + +    # Sleep for half the window. +    time_keeper.sleep(config.time_window_ms / 2.0 / 1000.0) + +    # prior to any time passing +    elapsed_secs = (config.time_window_ms * (config.samples - 1) + config.time_window_ms / 2.0) / 1000.0 + +    kafka_metric = metrics.metrics.get(metrics.metric_name('test.rate', 'grp1')) +    assert abs((sum_val / elapsed_secs) - kafka_metric.value()) < EPS, \ +            'Rate(0...2) = 2.666' +    assert abs(elapsed_secs - (kafka_metric.measurable.window_size(config, time.time() * 1000) / 1000.0)) \ +            < EPS, 'Elapsed Time = 75 seconds' + + +def test_reporter(metrics): +    reporter = DictReporter() +    foo_reporter = DictReporter(prefix='foo') +    metrics.add_reporter(reporter) +    metrics.add_reporter(foo_reporter) +    sensor = metrics.sensor('kafka.requests') +    sensor.add(metrics.metric_name('pack.bean1.avg', 'grp1'), Avg()) +    sensor.add(metrics.metric_name('pack.bean2.total', 'grp2'), Total()) +    sensor2 = metrics.sensor('kafka.blah') +    sensor2.add(metrics.metric_name('pack.bean1.some', 'grp1'), Total()) +    sensor2.add(metrics.metric_name('pack.bean2.some', 'grp1', +                                    tags={'a': 42, 'b': 'bar'}), Total()) + +    # kafka-metrics-count > count is the total number of metrics and automatic +    expected = { +        'kafka-metrics-count': {'count': 5.0}, +        'grp2': {'pack.bean2.total': 0.0}, +        'grp1': {'pack.bean1.avg': 0.0, 'pack.bean1.some': 0.0}, +        'grp1.a=42,b=bar': {'pack.bean2.some': 0.0}, +    } +    assert expected == reporter.snapshot() + +    for key in list(expected.keys()): +        metrics = expected.pop(key) +        expected['foo.%s' % key] = metrics +    assert expected == foo_reporter.snapshot() + + +class ConstantMeasurable(AbstractMeasurable): +    _value = 0.0 + +    def measure(self, config, now): +        return self._value + + +class TimeKeeper(object): +    """ +    A clock that you can manually advance by calling sleep +    """ +    def __init__(self, auto_tick_ms=0): +        self._millis = time.time() * 1000 +        self._auto_tick_ms = auto_tick_ms + +    def time(self): +        return self.ms() / 1000.0 + +    def ms(self): +        self.sleep(self._auto_tick_ms) +        return self._millis + +    def sleep(self, seconds): +        self._millis += (seconds * 1000) | 
