summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-08-04 12:19:46 -0700
committerDana Powers <dana.powers@gmail.com>2016-08-04 13:05:36 -0700
commitaf08b54875a5ae5c14fbdeccee4ffe266bda1e00 (patch)
tree51475b859960f304c74c47e0ce394f7451e5ddba
parenta698162dc9bcb228007c4942105515f0eb720c2c (diff)
downloadkafka-python-af08b54875a5ae5c14fbdeccee4ffe266bda1e00.tar.gz
Treat metric_group_prefix as config in KafkaConsumer
-rw-r--r--kafka/consumer/fetcher.py6
-rw-r--r--kafka/consumer/group.py6
-rw-r--r--kafka/coordinator/base.py5
-rw-r--r--kafka/coordinator/consumer.py10
-rw-r--r--test/test_coordinator.py7
-rw-r--r--test/test_fetcher.py2
6 files changed, 17 insertions, 19 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c00681d..f5d44b1 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -42,11 +42,11 @@ class Fetcher(six.Iterator):
'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
+ 'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
}
- def __init__(self, client, subscriptions, metrics, metric_group_prefix,
- **configs):
+ def __init__(self, client, subscriptions, metrics, **configs):
"""Initialize a Kafka Message Fetcher.
Keyword Arguments:
@@ -94,7 +94,7 @@ class Fetcher(six.Iterator):
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
- self._sensors = FetchManagerMetrics(metrics, metric_group_prefix)
+ self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 7dde29a..d4e0ff3 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -239,6 +239,7 @@ class KafkaConsumer(six.Iterator):
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
+ 'metric_group_prefix': 'consumer',
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
'sasl_mechanism': None,
@@ -268,7 +269,6 @@ class KafkaConsumer(six.Iterator):
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
- metric_group_prefix = 'consumer'
# TODO _metrics likely needs to be passed to KafkaClient, etc.
# api_version was previously a str. accept old format for now
@@ -289,9 +289,9 @@ class KafkaConsumer(six.Iterator):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
- self._client, self._subscription, self._metrics, metric_group_prefix, **self.config)
+ self._client, self._subscription, self._metrics, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self._subscription, self._metrics, metric_group_prefix,
+ self._client, self._subscription, self._metrics,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index c57d45a..d6ea6c0 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -55,9 +55,10 @@ class BaseCoordinator(object):
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
+ 'metric_group_prefix': '',
}
- def __init__(self, client, metrics, metric_group_prefix, **configs):
+ def __init__(self, client, metrics, **configs):
"""
Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic
@@ -92,7 +93,7 @@ class BaseCoordinator(object):
self.heartbeat = Heartbeat(**self.config)
self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
- metric_group_prefix)
+ self.config['metric_group_prefix'])
def __del__(self):
if hasattr(self, 'heartbeat_task') and self.heartbeat_task:
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 0429e09..a600cb4 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -37,10 +37,10 @@ class ConsumerCoordinator(BaseCoordinator):
'retry_backoff_ms': 100,
'api_version': (0, 9),
'exclude_internal_topics': True,
+ 'metric_group_prefix': 'consumer'
}
- def __init__(self, client, subscription, metrics, metric_group_prefix,
- **configs):
+ def __init__(self, client, subscription, metrics, **configs):
"""Initialize the coordination manager.
Keyword Arguments:
@@ -76,9 +76,7 @@ class ConsumerCoordinator(BaseCoordinator):
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
- super(ConsumerCoordinator, self).__init__(client,
- metrics, metric_group_prefix,
- **configs)
+ super(ConsumerCoordinator, self).__init__(client, metrics, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -111,7 +109,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task.reschedule()
self.consumer_sensors = ConsumerCoordinatorMetrics(
- metrics, metric_group_prefix, self._subscription)
+ metrics, self.config['metric_group_prefix'], self._subscription)
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 35598e8..4115c03 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -29,8 +29,7 @@ def client(conn):
@pytest.fixture
def coordinator(client):
- return ConsumerCoordinator(client, SubscriptionState(), Metrics(),
- 'consumer')
+ return ConsumerCoordinator(client, SubscriptionState(), Metrics())
def test_init(client, coordinator):
@@ -42,7 +41,7 @@ def test_init(client, coordinator):
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])
def test_autocommit_enable_api_version(client, api_version):
coordinator = ConsumerCoordinator(client, SubscriptionState(),
- Metrics(), 'consumer',
+ Metrics(),
enable_auto_commit=True,
group_id='foobar',
api_version=api_version)
@@ -362,7 +361,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable,
mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception')
client = KafkaClient(api_version=api_version)
coordinator = ConsumerCoordinator(client, SubscriptionState(),
- Metrics(), 'consumer',
+ Metrics(),
api_version=api_version,
enable_auto_commit=enable,
group_id=group_id)
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 1f1f7d3..6afd547 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -30,7 +30,7 @@ def fetcher(client, subscription_state):
subscription_state.assign_from_subscribed(assignment)
for tp in assignment:
subscription_state.seek(tp, 0)
- return Fetcher(client, subscription_state, Metrics(), 'test_fetcher')
+ return Fetcher(client, subscription_state, Metrics())
def test_init_fetches(fetcher, mocker):