summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py6
1 files changed, 3 insertions, 3 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c00681d..f5d44b1 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -42,11 +42,11 @@ class Fetcher(six.Iterator):
'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
+ 'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
}
- def __init__(self, client, subscriptions, metrics, metric_group_prefix,
- **configs):
+ def __init__(self, client, subscriptions, metrics, **configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
@@ -94,7 +94,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
- self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
+ self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.