summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-08-04 14:22:40 -0700
committerGitHub <noreply@github.com>2016-08-04 14:22:40 -0700
commit68c8fa4ad01f8fef38708f257cb1c261cfac01ab (patch)
tree38d12fc11f82c492c68a4e04dbac26664862f541 /kafka/consumer/group.py
parent3c9b1b6fc498f95806ee12f67f84ea548ac1378f (diff)
parent025b69ef4ae22d1677904e99f924b9ef5a096e75 (diff)
downloadkafka-python-68c8fa4ad01f8fef38708f257cb1c261cfac01ab.tar.gz
Merge pull request #794 from dpkp/conn_metrics
Complete metrics instrumentation
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 5edfaea..d4e0ff3 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -239,6 +239,7 @@ class KafkaConsumer(six.Iterator):
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
+ 'metric_group_prefix': 'consumer',
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
'sasl_mechanism': None,
@@ -268,7 +269,6 @@ class KafkaConsumer(six.Iterator):
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
- metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
# api_version was previously a str. accept old format for now
@@ -281,7 +281,7 @@ class KafkaConsumer(six.Iterator):
log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
str(self.config['api_version']), str_version)
- self._client = KafkaClient(**self.config)
+ self._client = KafkaClient(metrics=self._metrics, **self.config)
# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
@@ -289,9 +289,9 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
- self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
+ self._client, self._subscription, self._metrics, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self._subscription, self._metrics, metric_group_prefix,
+ self._client, self._subscription, self._metrics,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False