summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/fetcher.py8
-rw-r--r--kafka/consumer/group.py22
2 files changed, 24 insertions, 6 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 34ff4cb..d615848 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -729,6 +729,8 @@ class Fetcher(six.Iterator):
else:
raise error_type('Unexpected error while fetching data')
+ # Because we are currently decompressing messages lazily, the sensors here
+ # will get compressed bytes / message set stats when compression is enabled
self._sensors.bytes_fetched.record(total_bytes)
self._sensors.records_fetched.record(total_count)
if response.API_VERSION >= 1:
@@ -774,12 +776,12 @@ class FetchManagerMetrics(object):
'The maximum throttle time in ms'), Max())
def record_topic_fetch_metrics(self, topic, num_bytes, num_records):
- metric_tags = {'topic': topic.replace('.', '_')}
-
# record bytes fetched
name = '.'.join(['topic', topic, 'bytes-fetched'])
bytes_fetched = self.metrics.get_sensor(name)
if not bytes_fetched:
+ metric_tags = {'topic': topic.replace('.', '_')}
+
bytes_fetched = self.metrics.sensor(name)
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
self.group_name,
@@ -799,6 +801,8 @@ class FetchManagerMetrics(object):
name = '.'.join(['topic', topic, 'records-fetched'])
records_fetched = self.metrics.get_sensor(name)
if not records_fetched:
+ metric_tags = {'topic': topic.replace('.', '_')}
+
records_fetched = self.metrics.sensor(name)
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
self.group_name,
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 8fa43bc..982cd7b 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -12,7 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
-from kafka.metrics import DictReporter, MetricConfig, Metrics
+from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
@@ -171,8 +171,8 @@ class KafkaConsumer(six.Iterator):
in classes that will be notified of new metric creation. Default: []
metrics_num_samples (int): The number of samples maintained to compute
metrics. Default: 2
- metrics_sample_window_ms (int): The number of samples maintained to
- compute metrics. Default: 30000
+ metrics_sample_window_ms (int): The maximum age in milliseconds of
+ samples used to compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -241,7 +241,6 @@ class KafkaConsumer(six.Iterator):
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
- reporters.append(DictReporter('kafka.consumer'))
self._metrics = Metrics(metric_config, reporters)
metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
@@ -760,6 +759,21 @@ class KafkaConsumer(six.Iterator):
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
+ def metrics(self, raw=False):
+ """Warning: this is an unstable interface.
+ It may change in future releases without warning"""
+ if raw:
+ return self._metrics.metrics
+
+ metrics = {}
+ for k, v in self._metrics.metrics.items():
+ if k.group not in metrics:
+ metrics[k.group] = {}
+ if k.name not in metrics[k.group]:
+ metrics[k.group][k.name] = {}
+ metrics[k.group][k.name] = v.value()
+ return metrics
+
def _use_consumer_group(self):
"""Return True iff this consumer can/should join a broker-coordinated group."""
if self.config['api_version'] < (0, 9):