summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorZack Dever <zackdever@gmail.com>2016-04-14 11:02:01 -0700
committerZack Dever <zackdever@gmail.com>2016-04-14 11:02:01 -0700
commitcf679ae387519f658f17b2da2d05ff84834cb1f5 (patch)
tree5618627ab6919b6fd4cd476e801c0f9bf449d716 /kafka
parent0c94b83a2dff8113b5fd7c16df8a11ca03c4377b (diff)
parente2b340c4408801515f5e924aec066af983aa5c57 (diff)
downloadkafka-python-cf679ae387519f658f17b2da2d05ff84834cb1f5.tar.gz
Merge pull request #637 from zackdever/metrics
Metrics java port
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/fetcher.py154
-rw-r--r--kafka/consumer/group.py31
-rw-r--r--kafka/coordinator/base.py6
-rw-r--r--kafka/coordinator/consumer.py71
-rw-r--r--kafka/errors.py4
-rw-r--r--kafka/metrics/__init__.py13
-rw-r--r--kafka/metrics/compound_stat.py32
-rw-r--r--kafka/metrics/dict_reporter.py81
-rw-r--r--kafka/metrics/kafka_metric.py34
-rw-r--r--kafka/metrics/measurable.py27
-rw-r--r--kafka/metrics/measurable_stat.py14
-rw-r--r--kafka/metrics/metric_config.py31
-rw-r--r--kafka/metrics/metric_name.py104
-rw-r--r--kafka/metrics/metrics.py257
-rw-r--r--kafka/metrics/metrics_reporter.py55
-rw-r--r--kafka/metrics/quota.py39
-rw-r--r--kafka/metrics/stat.py21
-rw-r--r--kafka/metrics/stats/__init__.py15
-rw-r--r--kafka/metrics/stats/avg.py22
-rw-r--r--kafka/metrics/stats/count.py15
-rw-r--r--kafka/metrics/stats/histogram.py93
-rw-r--r--kafka/metrics/stats/max_stat.py15
-rw-r--r--kafka/metrics/stats/min_stat.py17
-rw-r--r--kafka/metrics/stats/percentile.py12
-rw-r--r--kafka/metrics/stats/percentiles.py72
-rw-r--r--kafka/metrics/stats/rate.py115
-rw-r--r--kafka/metrics/stats/sampled_stat.py99
-rw-r--r--kafka/metrics/stats/sensor.py132
-rw-r--r--kafka/metrics/stats/total.py13
29 files changed, 1480 insertions, 114 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 1f0619b..1d4b0f0 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -3,11 +3,13 @@ from __future__ import absolute_import
import collections
import copy
import logging
+import time
import six
import kafka.errors as Errors
from kafka.future import Future
+from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
@@ -40,7 +42,8 @@ class Fetcher(six.Iterator):
'api_version': (0, 8, 0),
}
- def __init__(self, client, subscriptions, **configs):
+ def __init__(self, client, subscriptions, metrics, metric_group_prefix,
+ **configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
@@ -68,8 +71,6 @@ class Fetcher(six.Iterator):
the messages occurred. This check adds some overhead, so it may
be disabled in cases seeking extreme performance. Default: True
"""
- #metrics=None,
- #metric_group_prefix='consumer',
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -83,8 +84,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
-
- #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
+ self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
@@ -109,7 +109,7 @@ class Fetcher(six.Iterator):
if self._client.ready(node_id):
log.debug("Sending FetchRequest to node %s", node_id)
future = self._client.send(node_id, request)
- future.add_callback(self._handle_fetch_response, request)
+ future.add_callback(self._handle_fetch_response, request, time.time())
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
futures.append(future)
self._fetch_futures.extend(futures)
@@ -575,10 +575,11 @@ class Fetcher(six.Iterator):
partition_data.items())
return requests
- def _handle_fetch_response(self, request, response):
+ def _handle_fetch_response(self, request, send_time, response):
"""The callback for fetch completion"""
- #total_bytes = 0
- #total_count = 0
+ total_bytes = 0
+ total_count = 0
+ recv_time = time.time()
fetch_offsets = {}
for topic, partitions in request.topics:
@@ -609,6 +610,7 @@ class Fetcher(six.Iterator):
position)
continue
+ num_bytes = 0
partial = None
if messages and isinstance(messages[-1][-1], PartialMessage):
partial = messages.pop()
@@ -618,18 +620,18 @@ class Fetcher(six.Iterator):
" offset %d to buffered record list", tp,
position)
self._records.append((fetch_offset, tp, messages))
- #last_offset, _, _ = messages[-1]
- #self.sensors.records_fetch_lag.record(highwater - last_offset)
+ last_offset, _, _ = messages[-1]
+ self._sensors.records_fetch_lag.record(highwater - last_offset)
+ num_bytes = sum(msg[1] for msg in messages)
elif partial:
# we did not read a single message from a non-empty
# buffer because that message's size is larger than
# fetch size, in this case record this exception
self._record_too_large_partitions[tp] = fetch_offset
- # TODO: bytes metrics
- #self.sensors.record_topic_fetch_metrics(tp.topic, num_bytes, parsed.size());
- #totalBytes += num_bytes;
- #totalCount += parsed.size();
+ self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
+ total_bytes += num_bytes
+ total_count += len(messages)
elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
self._client.cluster.request_update()
@@ -649,56 +651,82 @@ class Fetcher(six.Iterator):
else:
raise error_type('Unexpected error while fetching data')
- """TOOD - metrics
- self.sensors.bytesFetched.record(totalBytes)
- self.sensors.recordsFetched.record(totalCount)
- self.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime())
- self.sensors.fetchLatency.record(resp.requestLatencyMs())
+ self._sensors.bytes_fetched.record(total_bytes)
+ self._sensors.records_fetched.record(total_count)
+ self._sensors.fetch_throttle_time_sensor.record(response['throttle_time_ms'])
+ self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
self.metrics = metrics
- self.group_name = prefix + "-fetch-manager-metrics"
-
- self.bytes_fetched = metrics.sensor("bytes-fetched")
- self.bytes_fetched.add(metrics.metricName("fetch-size-avg", self.group_name,
- "The average number of bytes fetched per request"), metrics.Avg())
- self.bytes_fetched.add(metrics.metricName("fetch-size-max", self.group_name,
- "The maximum number of bytes fetched per request"), metrics.Max())
- self.bytes_fetched.add(metrics.metricName("bytes-consumed-rate", self.group_name,
- "The average number of bytes consumed per second"), metrics.Rate())
-
- self.records_fetched = self.metrics.sensor("records-fetched")
- self.records_fetched.add(metrics.metricName("records-per-request-avg", self.group_name,
- "The average number of records in each request"), metrics.Avg())
- self.records_fetched.add(metrics.metricName("records-consumed-rate", self.group_name,
- "The average number of records consumed per second"), metrics.Rate())
-
- self.fetch_latency = metrics.sensor("fetch-latency")
- self.fetch_latency.add(metrics.metricName("fetch-latency-avg", self.group_name,
- "The average time taken for a fetch request."), metrics.Avg())
- self.fetch_latency.add(metrics.metricName("fetch-latency-max", self.group_name,
- "The max time taken for any fetch request."), metrics.Max())
- self.fetch_latency.add(metrics.metricName("fetch-rate", self.group_name,
- "The number of fetch requests per second."), metrics.Rate(metrics.Count()))
-
- self.records_fetch_lag = metrics.sensor("records-lag")
- self.records_fetch_lag.add(metrics.metricName("records-lag-max", self.group_name,
- "The maximum lag in terms of number of records for any partition in self window"), metrics.Max())
-
- self.fetch_throttle_time_sensor = metrics.sensor("fetch-throttle-time")
- self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-avg", self.group_name,
- "The average throttle time in ms"), metrics.Avg())
- self.fetch_throttle_time_sensor.add(metrics.metricName("fetch-throttle-time-max", self.group_name,
- "The maximum throttle time in ms"), metrics.Max())
-
- def record_topic_fetch_metrics(topic, num_bytes, num_records):
- # record bytes fetched
- name = '.'.join(["topic", topic, "bytes-fetched"])
- self.metrics[name].record(num_bytes);
-
- # record records fetched
- name = '.'.join(["topic", topic, "records-fetched"])
- self.metrics[name].record(num_records)
- """
+ self.group_name = '%s-fetch-manager-metrics' % prefix
+
+ self.bytes_fetched = metrics.sensor('bytes-fetched')
+ self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
+ 'The average number of bytes fetched per request'), Avg())
+ self.bytes_fetched.add(metrics.metric_name('fetch-size-max', self.group_name,
+ 'The maximum number of bytes fetched per request'), Max())
+ self.bytes_fetched.add(metrics.metric_name('bytes-consumed-rate', self.group_name,
+ 'The average number of bytes consumed per second'), Rate())
+
+ self.records_fetched = self.metrics.sensor('records-fetched')
+ self.records_fetched.add(metrics.metric_name('records-per-request-avg', self.group_name,
+ 'The average number of records in each request'), Avg())
+ self.records_fetched.add(metrics.metric_name('records-consumed-rate', self.group_name,
+ 'The average number of records consumed per second'), Rate())
+
+ self.fetch_latency = metrics.sensor('fetch-latency')
+ self.fetch_latency.add(metrics.metric_name('fetch-latency-avg', self.group_name,
+ 'The average time taken for a fetch request.'), Avg())
+ self.fetch_latency.add(metrics.metric_name('fetch-latency-max', self.group_name,
+ 'The max time taken for any fetch request.'), Max())
+ self.fetch_latency.add(metrics.metric_name('fetch-rate', self.group_name,
+ 'The number of fetch requests per second.'), Rate(sampled_stat=Count()))
+
+ self.records_fetch_lag = metrics.sensor('records-lag')
+ self.records_fetch_lag.add(metrics.metric_name('records-lag-max', self.group_name,
+ 'The maximum lag in terms of number of records for any partition in self window'), Max())
+
+ self.fetch_throttle_time_sensor = metrics.sensor('fetch-throttle-time')
+ self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-avg', self.group_name,
+ 'The average throttle time in ms'), Avg())
+ self.fetch_throttle_time_sensor.add(metrics.metric_name('fetch-throttle-time-max', self.group_name,
+ 'The maximum throttle time in ms'), Max())
+
+ def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
+ metric_tags = {'topic': topic.replace('.', '_')}
+
+ # record bytes fetched
+ name = '.'.join(['topic', topic, 'bytes-fetched'])
+ bytes_fetched = self.metrics.get_sensor(name)
+ if not bytes_fetched:
+ bytes_fetched = self.metrics.sensor(name)
+ bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
+ self.group_name,
+ 'The average number of bytes fetched per request for topic %s' % topic,
+ metric_tags), Avg())
+ bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
+ self.group_name,
+ 'The maximum number of bytes fetched per request for topic %s' % topic,
+ metric_tags), Max())
+ bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
+ self.group_name,
+ 'The average number of bytes consumed per second for topic %s' % topic,
+ metric_tags), Rate())
+ bytes_fetched.record(num_bytes)
+
+ # record records fetched
+ name = '.'.join(['topic', topic, 'records-fetched'])
+ records_fetched = self.metrics.get_sensor(name)
+ if not records_fetched:
+ records_fetched = self.metrics.sensor(name)
+ records_fetched.add(self.metrics.metric_name('records-per-request-avg',
+ self.group_name,
+ 'The average number of records in each request for topic %s' % topic,
+ metric_tags), Avg())
+ records_fetched.add(self.metrics.metric_name('records-consumed-rate',
+ self.group_name,
+ 'The average number of records consumed per second for topic %s' % topic,
+ metric_tags), Rate())
+ records_fetched.record(num_records)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 0a78e7f..abb65ef 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -12,6 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.metrics import DictReporter, MetricConfig, Metrics
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
@@ -143,6 +144,13 @@ class KafkaConsumer(six.Iterator):
offset commits; 0.8.0 is what is left. If set to 'auto', will
attempt to infer the broker version by probing various APIs.
Default: auto
+ metric_reporters (list): A list of classes to use as metrics reporters.
+ Implementing the AbstractMetricsReporter interface allows plugging
+ in classes that will be notified of new metric creation. Default: []
+ metrics_num_samples (int): The number of samples maintained to compute
+ metrics. Default: 2
+ metrics_sample_window_ms (int): The number of samples maintained to
+ compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -181,9 +189,9 @@ class KafkaConsumer(six.Iterator):
'ssl_keyfile': None,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
- #'metric_reporters': None,
- #'metrics_num_samples': 2,
- #'metrics_sample_window_ms': 30000,
+ 'metric_reporters': [],
+ 'metrics_num_samples': 2,
+ 'metrics_sample_window_ms': 30000,
}
def __init__(self, *topics, **configs):
@@ -202,6 +210,16 @@ class KafkaConsumer(six.Iterator):
new_config, self.config['auto_offset_reset'])
self.config['auto_offset_reset'] = new_config
+ metrics_tags = {'client-id': self.config['client_id']}
+ metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
+ time_window_ms=self.config['metrics_sample_window_ms'],
+ tags=metrics_tags)
+ reporters = [reporter() for reporter in self.config['metric_reporters']]
+ reporters.append(DictReporter('kafka.consumer'))
+ self._metrics = Metrics(metric_config, reporters)
+ metric_group_prefix = 'consumer'
+ # TODO _metrics likely needs to be passed to KafkaClient, etc.
+
self._client = KafkaClient(**self.config)
# Check Broker Version if not set explicitly
@@ -215,16 +233,15 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
- self._client, self._subscription, **self.config)
+ self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self._subscription,
+ self._client, self._subscription, self._metrics, metric_group_prefix,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
self._iterator = None
self._consumer_timeout = float('inf')
- #self.metrics = None
if topics:
self._subscription.subscribe(topics=topics)
self._client.set_topics(topics)
@@ -277,7 +294,7 @@ class KafkaConsumer(six.Iterator):
log.debug("Closing the KafkaConsumer.")
self._closed = True
self._coordinator.close()
- #self.metrics.close()
+ self._metrics.close()
self._client.close()
try:
self.config['key_deserializer'].close()
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index c75eb7c..a4c25a3 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -658,7 +658,7 @@ class GroupCoordinatorMetrics(object):
self.heartbeat_latency.add(metrics.metricName(
"heartbeat-rate", self.group_name,
"The average number of heartbeats per second",
- tags), metrics.Rate(metrics.Count()))
+ tags), metrics.Rate(sampled_stat=metrics.Count()))
self.join_latency = metrics.sensor("join-latency")
self.join_latency.add(metrics.metricName(
@@ -672,7 +672,7 @@ class GroupCoordinatorMetrics(object):
self.join_latency.add(metrics.metricName(
"join-rate", self.group_name,
"The number of group joins per second",
- tags), metrics.Rate(metrics.Count()))
+ tags), metrics.Rate(sampled_stat=metrics.Count()))
self.sync_latency = metrics.sensor("sync-latency")
self.sync_latency.add(metrics.metricName(
@@ -686,7 +686,7 @@ class GroupCoordinatorMetrics(object):
self.sync_latency.add(metrics.metricName(
"sync-rate", self.group_name,
"The number of group syncs per second",
- tags), metrics.Rate(metrics.Count()))
+ tags), metrics.Rate(sampled_stat=metrics.Count()))
"""
lastHeartbeat = Measurable(
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index cd3d48a..50d2806 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -14,6 +14,8 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
from .. import errors as Errors
from ..future import Future
+from ..metrics import AnonMeasurable
+from ..metrics.stats import Avg, Count, Max, Rate
from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
@@ -36,7 +38,8 @@ class ConsumerCoordinator(BaseCoordinator):
'api_version': (0, 9),
}
- def __init__(self, client, subscription, **configs):
+ def __init__(self, client, subscription, metrics, metric_group_prefix,
+ **configs):
"""Initialize the coordination manager.
Keyword Arguments:
@@ -97,10 +100,8 @@ class ConsumerCoordinator(BaseCoordinator):
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
- # metrics=None,
- # metric_group_prefix=None,
- # metric_tags=None,
- # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ self._sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix,
+ self._subscription)
def __del__(self):
if hasattr(self, '_auto_commit_task') and self._auto_commit_task:
@@ -470,12 +471,13 @@ class ConsumerCoordinator(BaseCoordinator):
future = Future()
_f = self._client.send(node_id, request)
- _f.add_callback(self._handle_offset_commit_response, offsets, future)
+ _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())
_f.add_errback(self._failed_request, node_id, request, future)
return future
- def _handle_offset_commit_response(self, offsets, future, response):
- #self.sensors.commit_latency.record(response.requestLatencyMs())
+ def _handle_offset_commit_response(self, offsets, future, send_time, response):
+ # TODO look at adding request_latency_ms to response (like java kafka)
+ self._sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
for topic, partitions in response.topics:
@@ -720,38 +722,25 @@ class AutoCommitTask(object):
self._reschedule(next_at)
-# TODO
-"""
class ConsumerCoordinatorMetrics(object):
- def __init__(self, metrics, prefix, tags):
+ def __init__(self, metrics, metric_group_prefix, subscription):
self.metrics = metrics
- self.group_name = prefix + "-coordinator-metrics"
-
- self.commit_latency = metrics.sensor("commit-latency")
- self.commit_latency.add(metrics.MetricName(
- "commit-latency-avg", self.group_name,
- "The average time taken for a commit request",
- tags), metrics.Avg())
- self.commit_latency.add(metrics.MetricName(
- "commit-latency-max", self.group_name,
- "The max time taken for a commit request",
- tags), metrics.Max())
- self.commit_latency.add(metrics.MetricName(
- "commit-rate", self.group_name,
- "The number of commit calls per second",
- tags), metrics.Rate(metrics.Count()))
-
- '''
- def _num_partitions(config, now):
- new Measurable() {
- public double measure(MetricConfig config, long now) {
- return subscriptions.assignedPartitions().size();
- }
- };
- metrics.addMetric(new MetricName("assigned-partitions",
- this.metricGrpName,
- "The number of partitions currently assigned to this consumer",
- tags),
- numParts);
- '''
-"""
+ self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
+
+ self.commit_latency = metrics.sensor('commit-latency')
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-latency-avg', self.metric_group_name,
+ 'The average time taken for a commit request'), Avg())
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-latency-max', self.metric_group_name,
+ 'The max time taken for a commit request'), Max())
+ self.commit_latency.add(metrics.metric_name(
+ 'commit-rate', self.metric_group_name,
+ 'The number of commit calls per second'), Rate(sampled_stat=Count()))
+
+ num_parts = AnonMeasurable(lambda config, now:
+ len(subscription.assigned_partitions()))
+ metrics.add_metric(metrics.metric_name(
+ 'assigned-partitions', self.metric_group_name,
+ 'The number of partitions currently assigned to this consumer'),
+ num_parts)
diff --git a/kafka/errors.py b/kafka/errors.py
index a36ee75..dd64b04 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -361,6 +361,10 @@ class KafkaConfigurationError(KafkaError):
pass
+class QuotaViolationError(KafkaError):
+ pass
+
+
class AsyncProducerQueueFull(KafkaError):
def __init__(self, failed_msgs, *args):
super(AsyncProducerQueueFull, self).__init__(*args)
diff --git a/kafka/metrics/__init__.py b/kafka/metrics/__init__.py
new file mode 100644
index 0000000..dd22f53
--- /dev/null
+++ b/kafka/metrics/__init__.py
@@ -0,0 +1,13 @@
+from .compound_stat import NamedMeasurable
+from .dict_reporter import DictReporter
+from .kafka_metric import KafkaMetric
+from .measurable import AnonMeasurable
+from .metric_config import MetricConfig
+from .metric_name import MetricName
+from .metrics import Metrics
+from .quota import Quota
+
+__all__ = [
+ 'AnonMeasurable', 'DictReporter', 'KafkaMetric', 'MetricConfig',
+ 'MetricName', 'Metrics', 'NamedMeasurable', 'Quota'
+]
diff --git a/kafka/metrics/compound_stat.py b/kafka/metrics/compound_stat.py
new file mode 100644
index 0000000..09bc24a
--- /dev/null
+++ b/kafka/metrics/compound_stat.py
@@ -0,0 +1,32 @@
+import abc
+
+from kafka.metrics.stat import AbstractStat
+
+
+class AbstractCompoundStat(AbstractStat):
+ """
+ A compound stat is a stat where a single measurement and associated
+ data structure feeds many metrics. This is the example for a
+ histogram which has many associated percentiles.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def stats(self):
+ """
+ Return list of NamedMeasurable
+ """
+ raise NotImplementedError
+
+
+class NamedMeasurable(object):
+ def __init__(self, metric_name, measurable_stat):
+ self._name = metric_name
+ self._stat = measurable_stat
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def stat(self):
+ return self._stat
diff --git a/kafka/metrics/dict_reporter.py b/kafka/metrics/dict_reporter.py
new file mode 100644
index 0000000..49af604
--- /dev/null
+++ b/kafka/metrics/dict_reporter.py
@@ -0,0 +1,81 @@
+import logging
+import threading
+
+from kafka.metrics.metrics_reporter import AbstractMetricsReporter
+
+logger = logging.getLogger(__name__)
+
+
+class DictReporter(AbstractMetricsReporter):
+ """A basic dictionary based metrics reporter.
+
+ Store all metrics in a two level dictionary of category > name > metric.
+ """
+ def __init__(self, prefix=''):
+ self._lock = threading.Lock()
+ self._prefix = prefix if prefix else '' # never allow None
+ self._store = {}
+
+ def snapshot(self):
+ """
+ Return a nested dictionary snapshot of all metrics and their
+ values at this time. Example:
+ {
+ 'category': {
+ 'metric1_name': 42.0,
+ 'metric2_name': 'foo'
+ }
+ }
+ """
+ return dict((category, dict((name, metric.value())
+ for name, metric in list(metrics.items())))
+ for category, metrics in
+ list(self._store.items()))
+
+ def init(self, metrics):
+ for metric in metrics:
+ self.metric_change(metric)
+
+ def metric_change(self, metric):
+ with self._lock:
+ category = self.get_category(metric)
+ if category not in self._store:
+ self._store[category] = {}
+ self._store[category][metric.metric_name.name] = metric
+
+ def metric_removal(self, metric):
+ with self._lock:
+ category = self.get_category(metric)
+ metrics = self._store.get(category, {})
+ removed = metrics.pop(metric.metric_name.name, None)
+ if not metrics:
+ self._store.pop(category, None)
+ return removed
+
+ def get_category(self, metric):
+ """
+ Return a string category for the metric.
+
+ The category is made up of this reporter's prefix and the
+ metric's group and tags.
+
+ Examples:
+ prefix = 'foo', group = 'bar', tags = {'a': 1, 'b': 2}
+ returns: 'foo.bar.a=1,b=2'
+
+ prefix = 'foo', group = 'bar', tags = None
+ returns: 'foo.bar'
+
+ prefix = None, group = 'bar', tags = None
+ returns: 'bar'
+ """
+ tags = ','.join('%s=%s' % (k, v) for k, v in
+ sorted(metric.metric_name.tags.items()))
+ return '.'.join(x for x in
+ [self._prefix, metric.metric_name.group, tags] if x)
+
+ def configure(self, configs):
+ pass
+
+ def close(self):
+ pass
diff --git a/kafka/metrics/kafka_metric.py b/kafka/metrics/kafka_metric.py
new file mode 100644
index 0000000..75d32a4
--- /dev/null
+++ b/kafka/metrics/kafka_metric.py
@@ -0,0 +1,34 @@
+import time
+
+
+class KafkaMetric(object):
+ # NOTE java constructor takes a lock instance
+ def __init__(self, metric_name, measurable, config):
+ if not metric_name:
+ raise ValueError('metric_name must be non-empty')
+ if not measurable:
+ raise ValueError('measurable must be non-empty')
+ self._metric_name = metric_name
+ self._measurable = measurable
+ self._config = config
+
+ @property
+ def metric_name(self):
+ return self._metric_name
+
+ @property
+ def measurable(self):
+ return self._measurable
+
+ @property
+ def config(self):
+ return self._config
+
+ @config.setter
+ def config(self, config):
+ self._config = config
+
+ def value(self, time_ms=None):
+ if time_ms is None:
+ time_ms = time.time() * 1000
+ return self.measurable.measure(self.config, time_ms)
diff --git a/kafka/metrics/measurable.py b/kafka/metrics/measurable.py
new file mode 100644
index 0000000..ef096f3
--- /dev/null
+++ b/kafka/metrics/measurable.py
@@ -0,0 +1,27 @@
+import abc
+
+
+class AbstractMeasurable(object):
+ """A measurable quantity that can be registered as a metric"""
+ @abc.abstractmethod
+ def measure(self, config, now):
+ """
+ Measure this quantity and return the result
+
+ Arguments:
+ config (MetricConfig): The configuration for this metric
+ now (int): The POSIX time in milliseconds the measurement
+ is being taken
+
+ Returns:
+ The measured value
+ """
+ raise NotImplementedError
+
+
+class AnonMeasurable(AbstractMeasurable):
+ def __init__(self, measure_fn):
+ self._measure_fn = measure_fn
+
+ def measure(self, config, now):
+ return float(self._measure_fn(config, now))
diff --git a/kafka/metrics/measurable_stat.py b/kafka/metrics/measurable_stat.py
new file mode 100644
index 0000000..dba887d
--- /dev/null
+++ b/kafka/metrics/measurable_stat.py
@@ -0,0 +1,14 @@
+import abc
+
+from kafka.metrics.measurable import AbstractMeasurable
+from kafka.metrics.stat import AbstractStat
+
+
+class AbstractMeasurableStat(AbstractStat, AbstractMeasurable):
+ """
+ An AbstractMeasurableStat is an AbstractStat that is also
+ an AbstractMeasurable (i.e. can produce a single floating point value).
+ This is the interface used for most of the simple statistics such
+ as Avg, Max, Count, etc.
+ """
+ __metaclass__ = abc.ABCMeta
diff --git a/kafka/metrics/metric_config.py b/kafka/metrics/metric_config.py
new file mode 100644
index 0000000..e30c477
--- /dev/null
+++ b/kafka/metrics/metric_config.py
@@ -0,0 +1,31 @@
+import sys
+
+
+class MetricConfig(object):
+ """Configuration values for metrics"""
+ def __init__(self, quota=None, samples=2, event_window=sys.maxsize,
+ time_window_ms=30 * 1000, tags=None):
+ """
+ Arguments:
+ quota (Quota, optional): Upper or lower bound of a value.
+ samples (int, optional): Max number of samples kept per metric.
+ event_window (int, optional): Max number of values per sample.
+ time_window_ms (int, optional): Max age of an individual sample.
+ tags (dict of {str: str}, optional): Tags for each metric.
+ """
+ self.quota = quota
+ self._samples = samples
+ self.event_window = event_window
+ self.time_window_ms = time_window_ms
+ # tags should be OrderedDict (not supported in py26)
+ self.tags = tags if tags else {}
+
+ @property
+ def samples(self):
+ return self._samples
+
+ @samples.setter
+ def samples(self, value):
+ if value < 1:
+ raise ValueError('The number of samples must be at least 1.')
+ self._samples = value
diff --git a/kafka/metrics/metric_name.py b/kafka/metrics/metric_name.py
new file mode 100644
index 0000000..02068f0
--- /dev/null
+++ b/kafka/metrics/metric_name.py
@@ -0,0 +1,104 @@
+import copy
+
+
+class MetricName(object):
+ """
+ This class encapsulates a metric's name, logical group and its
+ related attributes (tags).
+
+ group, tags parameters can be used to create unique metric names.
+ e.g. domainName:type=group,key1=val1,key2=val2
+
+ Usage looks something like this:
+
+ # set up metrics:
+ metric_tags = {'client-id': 'producer-1', 'topic': 'topic'}
+ metric_config = MetricConfig(tags=metric_tags)
+
+ # metrics is the global repository of metrics and sensors
+ metrics = Metrics(metric_config)
+
+ sensor = metrics.sensor('message-sizes')
+ metric_name = metrics.metric_name('message-size-avg',
+ 'producer-metrics',
+ 'average message size')
+ sensor.add(metric_name, Avg())
+
+ metric_name = metrics.metric_name('message-size-max',
+ sensor.add(metric_name, Max())
+
+ tags = {'client-id': 'my-client', 'topic': 'my-topic'}
+ metric_name = metrics.metric_name('message-size-min',
+ 'producer-metrics',
+ 'message minimum size', tags)
+ sensor.add(metric_name, Min())
+
+ # as messages are sent we record the sizes
+ sensor.record(message_size)
+ """
+
+ def __init__(self, name, group, description=None, tags=None):
+ """
+ Arguments:
+ name (str): The name of the metric.
+ group (str): The logical group name of the metrics to which this
+ metric belongs.
+ description (str, optional): A human-readable description to
+ include in the metric.
+ tags (dict, optional): Additional key/val attributes of the metric.
+ """
+ if not (name and group):
+ raise Exception('name and group must be non-empty.')
+ if tags is not None and not isinstance(tags, dict):
+ raise Exception('tags must be a dict if present.')
+
+ self._name = name
+ self._group = group
+ self._description = description
+ self._tags = copy.copy(tags)
+ self._hash = 0
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def group(self):
+ return self._group
+
+ @property
+ def description(self):
+ return self._description
+
+ @property
+ def tags(self):
+ return copy.copy(self._tags)
+
+ def __hash__(self):
+ if self._hash != 0:
+ return self._hash
+ prime = 31
+ result = 1
+ result = prime * result + hash(self.group)
+ result = prime * result + hash(self.name)
+ tags_hash = hash(frozenset(self.tags.items())) if self.tags else 0
+ result = prime * result + tags_hash
+ self._hash = result
+ return result
+
+ def __eq__(self, other):
+ if self is other:
+ return True
+ if other is None:
+ return False
+ return (type(self) == type(other) and
+ self.group == other.group and
+ self.name == other.name and
+ self.tags == other.tags)
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __str__(self):
+ return 'MetricName(name=%s, group=%s, description=%s, tags=%s)' % (
+ self.name, self.group, self.description, self.tags)
diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py
new file mode 100644
index 0000000..d02f48d
--- /dev/null
+++ b/kafka/metrics/metrics.py
@@ -0,0 +1,257 @@
+import logging
+import sys
+import time
+import threading
+
+from kafka.metrics import AnonMeasurable, KafkaMetric, MetricConfig, MetricName
+from kafka.metrics.stats import Sensor
+
+logger = logging.getLogger(__name__)
+
+
+class Metrics(object):
+ """
+ A registry of sensors and metrics.
+
+ A metric is a named, numerical measurement. A sensor is a handle to
+ record numerical measurements as they occur. Each Sensor has zero or
+ more associated metrics. For example a Sensor might represent message
+ sizes and we might associate with this sensor a metric for the average,
+ maximum, or other statistics computed off the sequence of message sizes
+ that are recorded by the sensor.
+
+ Usage looks something like this:
+ # set up metrics:
+ metrics = Metrics() # the global repository of metrics and sensors
+ sensor = metrics.sensor('message-sizes')
+ metric_name = MetricName('message-size-avg', 'producer-metrics')
+ sensor.add(metric_name, Avg())
+ metric_name = MetricName('message-size-max', 'producer-metrics')
+ sensor.add(metric_name, Max())
+
+ # as messages are sent we record the sizes
+ sensor.record(message_size);
+ """
+ def __init__(self, default_config=None, reporters=None,
+ enable_expiration=False):
+ """
+ Create a metrics repository with a default config, given metric
+ reporters and the ability to expire eligible sensors
+
+ Arguments:
+ default_config (MetricConfig, optional): The default config
+ reporters (list of AbstractMetricsReporter, optional):
+ The metrics reporters
+ enable_expiration (bool, optional): true if the metrics instance
+ can garbage collect inactive sensors, false otherwise
+ """
+ self._lock = threading.RLock()
+ self._config = default_config or MetricConfig()
+ self._sensors = {}
+ self._metrics = {}
+ self._children_sensors = {}
+ self._reporters = reporters or []
+ for reporter in self._reporters:
+ reporter.init([])
+
+ if enable_expiration:
+ def expire_loop():
+ while True:
+ # delay 30 seconds
+ time.sleep(30)
+ self.ExpireSensorTask.run(self)
+ metrics_scheduler = threading.Thread(target=expire_loop)
+ # Creating a daemon thread to not block shutdown
+ metrics_scheduler.daemon = True
+ metrics_scheduler.start()
+
+ self.add_metric(self.metric_name('count', 'kafka-metrics-count',
+ 'total number of registered metrics'),
+ AnonMeasurable(lambda config, now: len(self._metrics)))
+
+ @property
+ def config(self):
+ return self._config
+
+ @property
+ def metrics(self):
+ """
+ Get all the metrics currently maintained and indexed by metricName
+ """
+ return self._metrics
+
+ def metric_name(self, name, group, description='', tags=None):
+ """
+ Create a MetricName with the given name, group, description and tags,
+ plus default tags specified in the metric configuration.
+ Tag in tags takes precedence if the same tag key is specified in
+ the default metric configuration.
+
+ Arguments:
+ name (str): The name of the metric
+ group (str): logical group name of the metrics to which this
+ metric belongs
+ description (str, optional): A human-readable description to
+ include in the metric
+ tags (dict, optionals): additional key/value attributes of
+ the metric
+ """
+ combined_tags = dict(self.config.tags)
+ combined_tags.update(tags or {})
+ return MetricName(name, group, description, combined_tags)
+
+ def get_sensor(self, name):
+ """
+ Get the sensor with the given name if it exists
+
+ Arguments:
+ name (str): The name of the sensor
+
+ Returns:
+ Sensor: The sensor or None if no such sensor exists
+ """
+ if not name:
+ raise ValueError('name must be non-empty')
+ return self._sensors.get(name, None)
+
+ def sensor(self, name, config=None,
+ inactive_sensor_expiration_time_seconds=sys.maxsize,
+ parents=None):
+ """
+ Get or create a sensor with the given unique name and zero or
+ more parent sensors. All parent sensors will receive every value
+ recorded with this sensor.
+
+ Arguments:
+ name (str): The name of the sensor
+ config (MetricConfig, optional): A default configuration to use
+ for this sensor for metrics that don't have their own config
+ inactive_sensor_expiration_time_seconds (int, optional):
+ If no value if recorded on the Sensor for this duration of
+ time, it is eligible for removal
+ parents (list of Sensor): The parent sensors
+
+ Returns:
+ Sensor: The sensor that is created
+ """
+ sensor = self.get_sensor(name)
+ if sensor:
+ return sensor
+
+ with self._lock:
+ sensor = self.get_sensor(name)
+ if not sensor:
+ sensor = Sensor(self, name, parents, config or self.config,
+ inactive_sensor_expiration_time_seconds)
+ self._sensors[name] = sensor
+ if parents:
+ for parent in parents:
+ children = self._children_sensors.get(parent)
+ if not children:
+ children = []
+ self._children_sensors[parent] = children
+ children.append(sensor)
+ logger.debug('Added sensor with name %s', name)
+ return sensor
+
+ def remove_sensor(self, name):
+ """
+ Remove a sensor (if it exists), associated metrics and its children.
+
+ Arguments:
+ name (str): The name of the sensor to be removed
+ """
+ sensor = self._sensors.get(name)
+ if sensor:
+ child_sensors = None
+ with sensor._lock:
+ with self._lock:
+ val = self._sensors.pop(name, None)
+ if val and val == sensor:
+ for metric in sensor.metrics:
+ self.remove_metric(metric.metric_name)
+ logger.debug('Removed sensor with name %s', name)
+ child_sensors = self._children_sensors.pop(sensor, None)
+ if child_sensors:
+ for child_sensor in child_sensors:
+ self.remove_sensor(child_sensor.name)
+
+ def add_metric(self, metric_name, measurable, config=None):
+ """
+ Add a metric to monitor an object that implements measurable.
+ This metric won't be associated with any sensor.
+ This is a way to expose existing values as metrics.
+
+ Arguments:
+ metricName (MetricName): The name of the metric
+ measurable (AbstractMeasurable): The measurable that will be
+ measured by this metric
+ config (MetricConfig, optional): The configuration to use when
+ measuring this measurable
+ """
+ # NOTE there was a lock here, but i don't think it's needed
+ metric = KafkaMetric(metric_name, measurable, config or self.config)
+ self.register_metric(metric)
+
+ def remove_metric(self, metric_name):
+ """
+ Remove a metric if it exists and return it. Return None otherwise.
+ If a metric is removed, `metric_removal` will be invoked
+ for each reporter.
+
+ Arguments:
+ metric_name (MetricName): The name of the metric
+
+ Returns:
+ KafkaMetric: the removed `KafkaMetric` or None if no such
+ metric exists
+ """
+ with self._lock:
+ metric = self._metrics.pop(metric_name, None)
+ if metric:
+ for reporter in self._reporters:
+ reporter.metric_removal(metric)
+ return metric
+
+ def add_reporter(self, reporter):
+ """Add a MetricReporter"""
+ with self._lock:
+ reporter.init(list(self.metrics.values()))
+ self._reporters.append(reporter)
+
+ def register_metric(self, metric):
+ with self._lock:
+ if metric.metric_name in self.metrics:
+ raise ValueError('A metric named "%s" already exists, cannot'
+ ' register another one.' % metric.metric_name)
+ self.metrics[metric.metric_name] = metric
+ for reporter in self._reporters:
+ reporter.metric_change(metric)
+
+ class ExpireSensorTask(object):
+ """
+ This iterates over every Sensor and triggers a remove_sensor
+ if it has expired. Package private for testing
+ """
+ @staticmethod
+ def run(metrics):
+ items = list(metrics._sensors.items())
+ for name, sensor in items:
+ # remove_sensor also locks the sensor object. This is fine
+ # because synchronized is reentrant. There is however a minor
+ # race condition here. Assume we have a parent sensor P and
+ # child sensor C. Calling record on C would cause a record on
+ # P as well. So expiration time for P == expiration time for C.
+ # If the record on P happens via C just after P is removed,
+ # that will cause C to also get removed. Since the expiration
+ # time is typically high it is not expected to be a significant
+ # concern and thus not necessary to optimize
+ with sensor._lock:
+ if sensor.has_expired():
+ logger.debug('Removing expired sensor %s', name)
+ metrics.remove_sensor(name)
+
+ def close(self):
+ """Close this metrics repository."""
+ for reporter in self._reporters:
+ reporter.close()
diff --git a/kafka/metrics/metrics_reporter.py b/kafka/metrics/metrics_reporter.py
new file mode 100644
index 0000000..b48ad0b
--- /dev/null
+++ b/kafka/metrics/metrics_reporter.py
@@ -0,0 +1,55 @@
+import abc
+
+
+class AbstractMetricsReporter(object):
+ """
+ An abstract class to allow things to listen as new metrics
+ are created so they can be reported.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def init(self, metrics):
+ """
+ This is called when the reporter is first registered
+ to initially register all existing metrics
+
+ Arguments:
+ metrics (list of KafkaMetric): All currently existing metrics
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def metric_change(self, metric):
+ """
+ This is called whenever a metric is updated or added
+
+ Arguments:
+ metric (KafkaMetric)
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def metric_removal(self, metric):
+ """
+ This is called whenever a metric is removed
+
+ Arguments:
+ metric (KafkaMetric)
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def configure(self, configs):
+ """
+ Configure this class with the given key-value pairs
+
+ Arguments:
+ configs (dict of {str, ?})
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def close(self):
+ """Called when the metrics repository is closed."""
+ raise NotImplementedError
diff --git a/kafka/metrics/quota.py b/kafka/metrics/quota.py
new file mode 100644
index 0000000..0410e37
--- /dev/null
+++ b/kafka/metrics/quota.py
@@ -0,0 +1,39 @@
+class Quota(object):
+ """An upper or lower bound for metrics"""
+ def __init__(self, bound, is_upper):
+ self._bound = bound
+ self._upper = is_upper
+
+ @staticmethod
+ def upper_bound(upper_bound):
+ return Quota(upper_bound, True)
+
+ @staticmethod
+ def lower_bound(lower_bound):
+ return Quota(lower_bound, False)
+
+ def is_upper_bound(self):
+ return self._upper
+
+ @property
+ def bound(self):
+ return self._bound
+
+ def is_acceptable(self, value):
+ return ((self.is_upper_bound() and value <= self.bound) or
+ (not self.is_upper_bound() and value >= self.bound))
+
+ def __hash__(self):
+ prime = 31
+ result = prime + self.bound
+ return prime * result + self.is_upper_bound()
+
+ def __eq__(self, other):
+ if self is other:
+ return True
+ return (type(self) == type(other) and
+ self.bound == other.bound and
+ self.is_upper_bound() == other.is_upper_bound())
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
diff --git a/kafka/metrics/stat.py b/kafka/metrics/stat.py
new file mode 100644
index 0000000..c10f3ce
--- /dev/null
+++ b/kafka/metrics/stat.py
@@ -0,0 +1,21 @@
+import abc
+
+
+class AbstractStat(object):
+ """
+ An AbstractStat is a quantity such as average, max, etc that is computed
+ off the stream of updates to a sensor
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def record(self, config, value, time_ms):
+ """
+ Record the given value
+
+ Arguments:
+ config (MetricConfig): The configuration to use for this metric
+ value (float): The value to record
+ timeMs (int): The POSIX time in milliseconds this value occurred
+ """
+ raise NotImplementedError
diff --git a/kafka/metrics/stats/__init__.py b/kafka/metrics/stats/__init__.py
new file mode 100644
index 0000000..15eafd9
--- /dev/null
+++ b/kafka/metrics/stats/__init__.py
@@ -0,0 +1,15 @@
+from .avg import Avg
+from .count import Count
+from .histogram import Histogram
+from .max_stat import Max
+from .min_stat import Min
+from .percentile import Percentile
+from .percentiles import Percentiles
+from .rate import Rate
+from .sensor import Sensor
+from .total import Total
+
+__all__ = [
+ 'Avg', 'Count', 'Histogram', 'Max', 'Min', 'Percentile', 'Percentiles',
+ 'Rate', 'Sensor', 'Total'
+]
diff --git a/kafka/metrics/stats/avg.py b/kafka/metrics/stats/avg.py
new file mode 100644
index 0000000..4d0be0a
--- /dev/null
+++ b/kafka/metrics/stats/avg.py
@@ -0,0 +1,22 @@
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class Avg(AbstractSampledStat):
+ """
+ An AbstractSampledStat that maintains a simple average over its samples.
+ """
+ def __init__(self):
+ super(Avg, self).__init__(0.0)
+
+ def update(self, sample, config, value, now):
+ sample.value += value
+
+ def combine(self, samples, config, now):
+ total_sum = 0
+ total_count = 0
+ for sample in samples:
+ total_sum += sample.value
+ total_count += sample.event_count
+ if not total_count:
+ return 0
+ return float(total_sum) / total_count
diff --git a/kafka/metrics/stats/count.py b/kafka/metrics/stats/count.py
new file mode 100644
index 0000000..183e4f2
--- /dev/null
+++ b/kafka/metrics/stats/count.py
@@ -0,0 +1,15 @@
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class Count(AbstractSampledStat):
+ """
+ An AbstractSampledStat that maintains a simple count of what it has seen.
+ """
+ def __init__(self):
+ super(Count, self).__init__(0.0)
+
+ def update(self, sample, config, value, now):
+ sample.value += 1.0
+
+ def combine(self, samples, config, now):
+ return float(sum(sample.value for sample in samples))
diff --git a/kafka/metrics/stats/histogram.py b/kafka/metrics/stats/histogram.py
new file mode 100644
index 0000000..42aacdb
--- /dev/null
+++ b/kafka/metrics/stats/histogram.py
@@ -0,0 +1,93 @@
+import math
+
+
+class Histogram(object):
+ def __init__(self, bin_scheme):
+ self._hist = [0.0] * bin_scheme.bins
+ self._count = 0.0
+ self._bin_scheme = bin_scheme
+
+ def record(self, value):
+ self._hist[self._bin_scheme.to_bin(value)] += 1.0
+ self._count += 1.0
+
+ def value(self, quantile):
+ if self._count == 0.0:
+ return float('NaN')
+ _sum = 0.0
+ quant = float(quantile)
+ for i, value in enumerate(self._hist[:-1]):
+ _sum += value
+ if _sum / self._count > quant:
+ return self._bin_scheme.from_bin(i)
+ return float('inf')
+
+ @property
+ def counts(self):
+ return self._hist
+
+ def clear(self):
+ for i in range(self._hist):
+ self._hist[i] = 0.0
+ self._count = 0
+
+ def __str__(self):
+ values = ['%.10f:%.0f' % (self._bin_scheme.from_bin(i), value) for
+ i, value in enumerate(self._hist[:-1])]
+ values.append('%s:%s' % (float('inf'), self._hist[-1]))
+ return '{%s}' % ','.join(values)
+
+ class ConstantBinScheme(object):
+ def __init__(self, bins, min_val, max_val):
+ if bins < 2:
+ raise ValueError('Must have at least 2 bins.')
+ self._min = float(min_val)
+ self._max = float(max_val)
+ self._bins = int(bins)
+ self._bucket_width = (max_val - min_val) / (bins - 2)
+
+ @property
+ def bins(self):
+ return self._bins
+
+ def from_bin(self, b):
+ if b == 0:
+ return float('-inf')
+ elif b == self._bins - 1:
+ return float('inf')
+ else:
+ return self._min + (b - 1) * self._bucket_width
+
+ def to_bin(self, x):
+ if x < self._min:
+ return 0
+ elif x > self._max:
+ return self._bins - 1
+ else:
+ return int(((x - self._min) / self._bucket_width) + 1)
+
+ class LinearBinScheme(object):
+ def __init__(self, num_bins, max_val):
+ self._bins = num_bins
+ self._max = max_val
+ self._scale = max_val / (num_bins * (num_bins - 1) / 2)
+
+ @property
+ def bins(self):
+ return self._bins
+
+ def from_bin(self, b):
+ if b == self._bins - 1:
+ return float('inf')
+ else:
+ unscaled = (b * (b + 1.0)) / 2.0
+ return unscaled * self._scale
+
+ def to_bin(self, x):
+ if x < 0.0:
+ raise ValueError('Values less than 0.0 not accepted.')
+ elif x > self._max:
+ return self._bins - 1
+ else:
+ scaled = x / self._scale
+ return int(-0.5 + math.sqrt(2.0 * scaled + 0.25))
diff --git a/kafka/metrics/stats/max_stat.py b/kafka/metrics/stats/max_stat.py
new file mode 100644
index 0000000..8df54d3
--- /dev/null
+++ b/kafka/metrics/stats/max_stat.py
@@ -0,0 +1,15 @@
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class Max(AbstractSampledStat):
+ """An AbstractSampledStat that gives the max over its samples."""
+ def __init__(self):
+ super(Max, self).__init__(float('-inf'))
+
+ def update(self, sample, config, value, now):
+ sample.value = max(sample.value, value)
+
+ def combine(self, samples, config, now):
+ if not samples:
+ return float('-inf')
+ return float(max(sample.value for sample in samples))
diff --git a/kafka/metrics/stats/min_stat.py b/kafka/metrics/stats/min_stat.py
new file mode 100644
index 0000000..a57c2dd
--- /dev/null
+++ b/kafka/metrics/stats/min_stat.py
@@ -0,0 +1,17 @@
+import sys
+
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class Min(AbstractSampledStat):
+ """An AbstractSampledStat that gives the min over its samples."""
+ def __init__(self):
+ super(Min, self).__init__(float(sys.maxsize))
+
+ def update(self, sample, config, value, now):
+ sample.value = min(sample.value, value)
+
+ def combine(self, samples, config, now):
+ if not samples:
+ return float(sys.maxsize)
+ return float(min(sample.value for sample in samples))
diff --git a/kafka/metrics/stats/percentile.py b/kafka/metrics/stats/percentile.py
new file mode 100644
index 0000000..723b9e6
--- /dev/null
+++ b/kafka/metrics/stats/percentile.py
@@ -0,0 +1,12 @@
+class Percentile(object):
+ def __init__(self, metric_name, percentile):
+ self._metric_name = metric_name
+ self._percentile = float(percentile)
+
+ @property
+ def name(self):
+ return self._metric_name
+
+ @property
+ def percentile(self):
+ return self._percentile
diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py
new file mode 100644
index 0000000..84e7160
--- /dev/null
+++ b/kafka/metrics/stats/percentiles.py
@@ -0,0 +1,72 @@
+from kafka.metrics import AnonMeasurable, NamedMeasurable
+from kafka.metrics.compound_stat import AbstractCompoundStat
+from kafka.metrics.stats import Histogram
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class BucketSizing(object):
+ CONSTANT = 0
+ LINEAR = 1
+
+
+class Percentiles(AbstractSampledStat, AbstractCompoundStat):
+ """A compound stat that reports one or more percentiles"""
+ def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
+ percentiles=None):
+ super(Percentiles, self).__init__(0.0)
+ self._percentiles = percentiles or []
+ self._buckets = int(size_in_bytes / 4)
+ if bucketing == BucketSizing.CONSTANT:
+ self._bin_scheme = Histogram.ConstantBinScheme(self._buckets,
+ min_val, max_val)
+ elif bucketing == BucketSizing.LINEAR:
+ if min_val != 0.0:
+ raise ValueError('Linear bucket sizing requires min_val'
+ ' to be 0.0.')
+ self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
+ else:
+ ValueError('Unknown bucket type: %s' % bucketing)
+
+ def stats(self):
+ measurables = []
+
+ def make_measure_fn(pct):
+ return lambda config, now: self.value(config, now,
+ pct / 100.0)
+
+ for percentile in self._percentiles:
+ measure_fn = make_measure_fn(percentile.percentile)
+ stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn))
+ measurables.append(stat)
+ return measurables
+
+ def value(self, config, now, quantile):
+ self.purge_obsolete_samples(config, now)
+ count = sum(sample.event_count for sample in self._samples)
+ if count == 0.0:
+ return float('NaN')
+ sum_val = 0.0
+ quant = float(quantile)
+ for b in range(self._buckets):
+ for sample in self._samples:
+ assert type(sample) is self.HistogramSample
+ hist = sample.histogram.counts
+ sum_val += hist[b]
+ if sum_val / count > quant:
+ return self._bin_scheme.from_bin(b)
+ return float('inf')
+
+ def combine(self, samples, config, now):
+ return self.value(config, now, 0.5)
+
+ def new_sample(self, time_ms):
+ return Percentiles.HistogramSample(self._bin_scheme, time_ms)
+
+ def update(self, sample, config, value, time_ms):
+ assert type(sample) is self.HistogramSample
+ sample.histogram.record(value)
+
+ class HistogramSample(AbstractSampledStat.Sample):
+ def __init__(self, scheme, now):
+ super(Percentiles.HistogramSample, self).__init__(0.0, now)
+ self.histogram = Histogram(scheme)
diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py
new file mode 100644
index 0000000..3ce2e74
--- /dev/null
+++ b/kafka/metrics/stats/rate.py
@@ -0,0 +1,115 @@
+from kafka.metrics.measurable_stat import AbstractMeasurableStat
+from kafka.metrics.stats.sampled_stat import AbstractSampledStat
+
+
+class TimeUnit(object):
+ _names = {
+ 'nanosecond': 0,
+ 'microsecond': 1,
+ 'millisecond': 2,
+ 'second': 3,
+ 'minute': 4,
+ 'hour': 5,
+ 'day': 6,
+ }
+
+ NANOSECONDS = _names['nanosecond']
+ MICROSECONDS = _names['microsecond']
+ MILLISECONDS = _names['millisecond']
+ SECONDS = _names['second']
+ MINUTES = _names['minute']
+ HOURS = _names['hour']
+ DAYS = _names['day']
+
+ @staticmethod
+ def get_name(time_unit):
+ return TimeUnit._names[time_unit]
+
+
+class Rate(AbstractMeasurableStat):
+ """
+ The rate of the given quantity. By default this is the total observed
+ over a set of samples from a sampled statistic divided by the elapsed
+ time over the sample windows. Alternative AbstractSampledStat
+ implementations can be provided, however, to record the rate of
+ occurrences (e.g. the count of values measured over the time interval)
+ or other such values.
+ """
+ def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None):
+ self._stat = sampled_stat or SampledTotal()
+ self._unit = time_unit
+
+ def unit_name(self):
+ return TimeUnit.get_name(self._unit)
+
+ def record(self, config, value, time_ms):
+ self._stat.record(config, value, time_ms)
+
+ def measure(self, config, now):
+ value = self._stat.measure(config, now)
+ return float(value) / self.convert(self.window_size(config, now))
+
+ def window_size(self, config, now):
+ # purge old samples before we compute the window size
+ self._stat.purge_obsolete_samples(config, now)
+
+ """
+ Here we check the total amount of time elapsed since the oldest
+ non-obsolete window. This give the total window_size of the batch
+ which is the time used for Rate computation. However, there is
+ an issue if we do not have sufficient data for e.g. if only
+ 1 second has elapsed in a 30 second window, the measured rate
+ will be very high. Hence we assume that the elapsed time is
+ always N-1 complete windows plus whatever fraction of the final
+ window is complete.
+
+ Note that we could simply count the amount of time elapsed in
+ the current window and add n-1 windows to get the total time,
+ but this approach does not account for sleeps. AbstractSampledStat
+ only creates samples whenever record is called, if no record is
+ called for a period of time that time is not accounted for in
+ window_size and produces incorrect results.
+ """
+ total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms
+ # Check how many full windows of data we have currently retained
+ num_full_windows = int(total_elapsed_time_ms / config.time_window_ms)
+ min_full_windows = config.samples - 1
+
+ # If the available windows are less than the minimum required,
+ # add the difference to the totalElapsedTime
+ if num_full_windows < min_full_windows:
+ total_elapsed_time_ms += ((min_full_windows - num_full_windows) *
+ config.time_window_ms)
+
+ return total_elapsed_time_ms
+
+ def convert(self, time_ms):
+ if self._unit == TimeUnit.NANOSECONDS:
+ return time_ms * 1000.0 * 1000.0
+ elif self._unit == TimeUnit.MICROSECONDS:
+ return time_ms * 1000.0
+ elif self._unit == TimeUnit.MILLISECONDS:
+ return time_ms
+ elif self._unit == TimeUnit.SECONDS:
+ return time_ms / 1000.0
+ elif self._unit == TimeUnit.MINUTES:
+ return time_ms / (60.0 * 1000.0)
+ elif self._unit == TimeUnit.HOURS:
+ return time_ms / (60.0 * 60.0 * 1000.0)
+ elif self._unit == TimeUnit.DAYS:
+ return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
+ else:
+ raise ValueError('Unknown unit: %s' % self._unit)
+
+
+class SampledTotal(AbstractSampledStat):
+ def __init__(self, initial_value=None):
+ if initial_value is not None:
+ raise ValueError('initial_value cannot be set on SampledTotal')
+ super(SampledTotal, self).__init__(0.0)
+
+ def update(self, sample, config, value, time_ms):
+ sample.value += value
+
+ def combine(self, samples, config, now):
+ return float(sum(sample.value for sample in samples))
diff --git a/kafka/metrics/stats/sampled_stat.py b/kafka/metrics/stats/sampled_stat.py
new file mode 100644
index 0000000..ca0db69
--- /dev/null
+++ b/kafka/metrics/stats/sampled_stat.py
@@ -0,0 +1,99 @@
+import abc
+
+from kafka.metrics.measurable_stat import AbstractMeasurableStat
+
+
+class AbstractSampledStat(AbstractMeasurableStat):
+ """
+ An AbstractSampledStat records a single scalar value measured over
+ one or more samples. Each sample is recorded over a configurable
+ window. The window can be defined by number of events or elapsed
+ time (or both, if both are given the window is complete when
+ *either* the event count or elapsed time criterion is met).
+
+ All the samples are combined to produce the measurement. When a
+ window is complete the oldest sample is cleared and recycled to
+ begin recording the next sample.
+
+ Subclasses of this class define different statistics measured
+ using this basic pattern.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, initial_value):
+ self._initial_value = initial_value
+ self._samples = []
+ self._current = 0
+
+ @abc.abstractmethod
+ def update(self, sample, config, value, time_ms):
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def combine(self, samples, config, now):
+ raise NotImplementedError
+
+ def record(self, config, value, time_ms):
+ sample = self.current(time_ms)
+ if sample.is_complete(time_ms, config):
+ sample = self._advance(config, time_ms)
+ self.update(sample, config, float(value), time_ms)
+ sample.event_count += 1
+
+ def new_sample(self, time_ms):
+ return self.Sample(self._initial_value, time_ms)
+
+ def measure(self, config, now):
+ self.purge_obsolete_samples(config, now)
+ return float(self.combine(self._samples, config, now))
+
+ def current(self, time_ms):
+ if not self._samples:
+ self._samples.append(self.new_sample(time_ms))
+ return self._samples[self._current]
+
+ def oldest(self, now):
+ if not self._samples:
+ self._samples.append(self.new_sample(now))
+ oldest = self._samples[0]
+ for sample in self._samples[1:]:
+ if sample.last_window_ms < oldest.last_window_ms:
+ oldest = sample
+ return oldest
+
+ def purge_obsolete_samples(self, config, now):
+ """
+ Timeout any windows that have expired in the absence of any events
+ """
+ expire_age = config.samples * config.time_window_ms
+ for sample in self._samples:
+ if now - sample.last_window_ms >= expire_age:
+ sample.reset(now)
+
+ def _advance(self, config, time_ms):
+ self._current = (self._current + 1) % config.samples
+ if self._current >= len(self._samples):
+ sample = self.new_sample(time_ms)
+ self._samples.append(sample)
+ return sample
+ else:
+ sample = self.current(time_ms)
+ sample.reset(time_ms)
+ return sample
+
+ class Sample(object):
+
+ def __init__(self, initial_value, now):
+ self.initial_value = initial_value
+ self.event_count = 0
+ self.last_window_ms = now
+ self.value = initial_value
+
+ def reset(self, now):
+ self.event_count = 0
+ self.last_window_ms = now
+ self.value = self.initial_value
+
+ def is_complete(self, time_ms, config):
+ return (time_ms - self.last_window_ms >= config.time_window_ms or
+ self.event_count >= config.event_window)
diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py
new file mode 100644
index 0000000..6878096
--- /dev/null
+++ b/kafka/metrics/stats/sensor.py
@@ -0,0 +1,132 @@
+import threading
+import time
+
+from kafka.errors import QuotaViolationError
+from kafka.metrics import KafkaMetric
+
+
+class Sensor(object):
+ """
+ A sensor applies a continuous sequence of numerical values
+ to a set of associated metrics. For example a sensor on
+ message size would record a sequence of message sizes using
+ the `record(double)` api and would maintain a set
+ of metrics about request sizes such as the average or max.
+ """
+ def __init__(self, registry, name, parents, config,
+ inactive_sensor_expiration_time_seconds):
+ if not name:
+ raise ValueError('name must be non-empty')
+ self._lock = threading.RLock()
+ self._registry = registry
+ self._name = name
+ self._parents = parents or []
+ self._metrics = []
+ self._stats = []
+ self._config = config
+ self._inactive_sensor_expiration_time_ms = (
+ inactive_sensor_expiration_time_seconds * 1000)
+ self._last_record_time = time.time() * 1000
+ self._check_forest(set())
+
+ def _check_forest(self, sensors):
+ """Validate that this sensor doesn't end up referencing itself."""
+ if self in sensors:
+ raise ValueError('Circular dependency in sensors: %s is its own'
+ 'parent.' % self.name)
+ sensors.add(self)
+ for parent in self._parents:
+ parent._check_forest(sensors)
+
+ @property
+ def name(self):
+ """
+ The name this sensor is registered with.
+ This name will be unique among all registered sensors.
+ """
+ return self._name
+
+ @property
+ def metrics(self):
+ return tuple(self._metrics)
+
+ def record(self, value=1.0, time_ms=None):
+ """
+ Record a value at a known time.
+ Arguments:
+ value (double): The value we are recording
+ time_ms (int): The current POSIX time in milliseconds
+
+ Raises:
+ QuotaViolationException: if recording this value moves a
+ metric beyond its configured maximum or minimum bound
+ """
+ now = time.time() * 1000
+ if time_ms is None:
+ time_ms = now
+ self._last_record_time = now
+ with self._lock: # XXX high volume, might be performance issue
+ # increment all the stats
+ for stat in self._stats:
+ stat.record(self._config, value, time_ms)
+ self._check_quotas(time_ms)
+ for parent in self._parents:
+ parent.record(value, time_ms)
+
+ def _check_quotas(self, time_ms):
+ """
+ Check if we have violated our quota for any metric that
+ has a configured quota
+ """
+ for metric in self._metrics:
+ if metric.config and metric.config.quota:
+ value = metric.value(time_ms)
+ if not metric.config.quota.is_acceptable(value):
+ raise QuotaViolationError('(%s) violated quota. Actual: '
+ '(%d), Threshold: (%d)' %
+ (metric.metric_name,
+ metric.config.quota.bound,
+ value))
+
+ def add_compound(self, compound_stat, config=None):
+ """
+ Register a compound statistic with this sensor which
+ yields multiple measurable quantities (like a histogram)
+
+ Arguments:
+ stat (AbstractCompoundStat): The stat to register
+ config (MetricConfig): The configuration for this stat.
+ If None then the stat will use the default configuration
+ for this sensor.
+ """
+ if not compound_stat:
+ raise ValueError('compound stat must be non-empty')
+ self._stats.append(compound_stat)
+ for named_measurable in compound_stat.stats():
+ metric = KafkaMetric(named_measurable.name, named_measurable.stat,
+ config or self._config)
+ self._registry.register_metric(metric)
+ self._metrics.append(metric)
+
+ def add(self, metric_name, stat, config=None):
+ """
+ Register a metric with this sensor
+
+ Arguments:
+ metric_name (MetricName): The name of the metric
+ stat (AbstractMeasurableStat): The statistic to keep
+ config (MetricConfig): A special configuration for this metric.
+ If None use the sensor default configuration.
+ """
+ with self._lock:
+ metric = KafkaMetric(metric_name, stat, config or self._config)
+ self._registry.register_metric(metric)
+ self._metrics.append(metric)
+ self._stats.append(stat)
+
+ def has_expired(self):
+ """
+ Return True if the Sensor is eligible for removal due to inactivity.
+ """
+ return ((time.time() * 1000 - self._last_record_time) >
+ self._inactive_sensor_expiration_time_ms)
diff --git a/kafka/metrics/stats/total.py b/kafka/metrics/stats/total.py
new file mode 100644
index 0000000..76a82d8
--- /dev/null
+++ b/kafka/metrics/stats/total.py
@@ -0,0 +1,13 @@
+from kafka.metrics.measurable_stat import AbstractMeasurableStat
+
+
+class Total(AbstractMeasurableStat):
+ """An un-windowed cumulative total maintained over all time."""
+ def __init__(self, value=0.0):
+ self._total = value
+
+ def record(self, config, value, now):
+ self._total += value
+
+ def measure(self, config, now):
+ return float(self._total)