diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 8 | ||||
-rw-r--r-- | kafka/consumer/group.py | 22 |
2 files changed, 24 insertions, 6 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 34ff4cb..d615848 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -729,6 +729,8 @@ class Fetcher(six.Iterator): else: raise error_type('Unexpected error while fetching data') + # Because we are currently decompressing messages lazily, the sensors here + # will get compressed bytes / message set stats when compression is enabled self._sensors.bytes_fetched.record(total_bytes) self._sensors.records_fetched.record(total_count) if response.API_VERSION >= 1: @@ -774,12 +776,12 @@ class FetchManagerMetrics(object): 'The maximum throttle time in ms'), Max()) def record_topic_fetch_metrics(self, topic, num_bytes, num_records): - metric_tags = {'topic': topic.replace('.', '_')} - # record bytes fetched name = '.'.join(['topic', topic, 'bytes-fetched']) bytes_fetched = self.metrics.get_sensor(name) if not bytes_fetched: + metric_tags = {'topic': topic.replace('.', '_')} + bytes_fetched = self.metrics.sensor(name) bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', self.group_name, @@ -799,6 +801,8 @@ class FetchManagerMetrics(object): name = '.'.join(['topic', topic, 'records-fetched']) records_fetched = self.metrics.get_sensor(name) if not records_fetched: + metric_tags = {'topic': topic.replace('.', '_')} + records_fetched = self.metrics.sensor(name) records_fetched.add(self.metrics.metric_name('records-per-request-avg', self.group_name, diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 8fa43bc..982cd7b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -12,7 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.metrics import DictReporter, MetricConfig, Metrics +from kafka.metrics import MetricConfig, Metrics from kafka.protocol.offset import OffsetResetStrategy from kafka.structs import TopicPartition from kafka.version import __version__ @@ -171,8 +171,8 @@ class KafkaConsumer(six.Iterator): in classes that will be notified of new metric creation. Default: [] metrics_num_samples (int): The number of samples maintained to compute metrics. Default: 2 - metrics_sample_window_ms (int): The number of samples maintained to - compute metrics. Default: 30000 + metrics_sample_window_ms (int): The maximum age in milliseconds of + samples used to compute metrics. Default: 30000 Note: Configuration parameters are described in more detail at @@ -241,7 +241,6 @@ class KafkaConsumer(six.Iterator): time_window_ms=self.config['metrics_sample_window_ms'], tags=metrics_tags) reporters = [reporter() for reporter in self.config['metric_reporters']] - reporters.append(DictReporter('kafka.consumer')) self._metrics = Metrics(metric_config, reporters) metric_group_prefix = 'consumer' # TODO _metrics likely needs to be passed to KafkaClient, etc. @@ -760,6 +759,21 @@ class KafkaConsumer(six.Iterator): self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") + def metrics(self, raw=False): + """Warning: this is an unstable interface. + It may change in future releases without warning""" + if raw: + return self._metrics.metrics + + metrics = {} + for k, v in self._metrics.metrics.items(): + if k.group not in metrics: + metrics[k.group] = {} + if k.name not in metrics[k.group]: + metrics[k.group][k.name] = {} + metrics[k.group][k.name] = v.value() + return metrics + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): |