diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 8fa43bc..982cd7b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -12,7 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor -from kafka.metrics import DictReporter, MetricConfig, Metrics +from kafka.metrics import MetricConfig, Metrics from kafka.protocol.offset import OffsetResetStrategy from kafka.structs import TopicPartition from kafka.version import __version__ @@ -171,8 +171,8 @@ class KafkaConsumer(six.Iterator): in classes that will be notified of new metric creation. Default: [] metrics_num_samples (int): The number of samples maintained to compute metrics. Default: 2 - metrics_sample_window_ms (int): The number of samples maintained to - compute metrics. Default: 30000 + metrics_sample_window_ms (int): The maximum age in milliseconds of + samples used to compute metrics. Default: 30000 Note: Configuration parameters are described in more detail at @@ -241,7 +241,6 @@ class KafkaConsumer(six.Iterator): time_window_ms=self.config['metrics_sample_window_ms'], tags=metrics_tags) reporters = [reporter() for reporter in self.config['metric_reporters']] - reporters.append(DictReporter('kafka.consumer')) self._metrics = Metrics(metric_config, reporters) metric_group_prefix = 'consumer' # TODO _metrics likely needs to be passed to KafkaClient, etc. @@ -760,6 +759,21 @@ class KafkaConsumer(six.Iterator): self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") + def metrics(self, raw=False): + """Warning: this is an unstable interface. + It may change in future releases without warning""" + if raw: + return self._metrics.metrics + + metrics = {} + for k, v in self._metrics.metrics.items(): + if k.group not in metrics: + metrics[k.group] = {} + if k.name not in metrics[k.group]: + metrics[k.group][k.name] = {} + metrics[k.group][k.name] = v.value() + return metrics + def _use_consumer_group(self): """Return True iff this consumer can/should join a broker-coordinated group.""" if self.config['api_version'] < (0, 9): |