summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py29
1 files changed, 23 insertions, 6 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 0a78e7f..afcc996 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -12,6 +12,7 @@ from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
+from kafka.metrics import DictReporter, MetricConfig, Metrics
from kafka.protocol.offset import OffsetResetStrategy
from kafka.structs import TopicPartition
from kafka.version import __version__
@@ -143,6 +144,13 @@ class KafkaConsumer(six.Iterator):
offset commits; 0.8.0 is what is left. If set to 'auto', will
attempt to infer the broker version by probing various APIs.
Default: auto
+ metric_reporters (list): A list of classes to use as metrics reporters.
+ Implementing the AbstractMetricsReporter interface allows plugging
+ in classes that will be notified of new metric creation. Default: []
+ metrics_num_samples (int): The number of samples maintained to compute
+ metrics. Default: 2
+ metrics_sample_window_ms (int): The number of samples maintained to
+ compute metrics. Default: 30000
Note:
Configuration parameters are described in more detail at
@@ -181,9 +189,9 @@ class KafkaConsumer(six.Iterator):
'ssl_keyfile': None,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
- #'metric_reporters': None,
- #'metrics_num_samples': 2,
- #'metrics_sample_window_ms': 30000,
+ 'metric_reporters': [],
+ 'metrics_num_samples': 2,
+ 'metrics_sample_window_ms': 30000,
}
def __init__(self, *topics, **configs):
@@ -202,6 +210,16 @@ class KafkaConsumer(six.Iterator):
new_config, self.config['auto_offset_reset'])
self.config['auto_offset_reset'] = new_config
+ metrics_tags = {'client-id': self.config['client_id']}
+ metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
+ time_window_ms=self.config['metrics_sample_window_ms'],
+ tags=metrics_tags)
+ reporters = [reporter() for reporter in self.config['metric_reporters']]
+ reporters.append(DictReporter('kafka.consumer'))
+ self._metrics = Metrics(metric_config, reporters)
+ metric_group_prefix = 'consumer'
+ # TODO _metrics likely needs to be passed to KafkaClient, Fetcher, etc.
+
self._client = KafkaClient(**self.config)
# Check Broker Version if not set explicitly
@@ -217,14 +235,13 @@ class KafkaConsumer(six.Iterator):
self._fetcher = Fetcher(
self._client, self._subscription, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self._subscription,
+ self._client, self._subscription, self._metrics, metric_group_prefix,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
self._iterator = None
self._consumer_timeout = float('inf')
- #self.metrics = None
if topics:
self._subscription.subscribe(topics=topics)
self._client.set_topics(topics)
@@ -277,7 +294,7 @@ class KafkaConsumer(six.Iterator):
log.debug("Closing the KafkaConsumer.")
self._closed = True
self._coordinator.close()
- #self.metrics.close()
+ self._metrics.close()
self._client.close()
try:
self.config['key_deserializer'].close()