summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 21:10:39 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 23:17:42 -0700
commit50b96cafc4ae2d9a41fde5c07acb3316a650ec33 (patch)
tree8aec22f2862391bd122f81df8a368c966a605785
parent506d023978e7273bd323c0750e3f77af259d257b (diff)
downloadkafka-python-50b96cafc4ae2d9a41fde5c07acb3316a650ec33.tar.gz
Use exclude_internal_topics config in KafkaConsumer to avoid subscribe patterns matching internal topics
-rw-r--r--kafka/consumer/group.py5
-rw-r--r--kafka/coordinator/consumer.py7
2 files changed, 11 insertions, 1 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 9ebfe02..db0022d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -176,6 +176,10 @@ class KafkaConsumer(six.Iterator):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to True
+ the only way to receive records from an internal topic is
+ subscribing to it. Requires 0.10+ Default: True
Note:
Configuration parameters are described in more detail at
@@ -222,6 +226,7 @@ class KafkaConsumer(six.Iterator):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
+ 'exclude_internal_topics': True,
}
def __init__(self, *topics, **configs):
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 083a36a..fd728d9 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -36,6 +36,7 @@ class ConsumerCoordinator(BaseCoordinator):
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
+ 'exclude_internal_topics': True,
}
def __init__(self, client, subscription, metrics, metric_group_prefix,
@@ -70,6 +71,10 @@ class ConsumerCoordinator(BaseCoordinator):
using Kafka's group managementment facilities. Default: 30000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to
+ True the only way to receive records from an internal topic is
+ subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -137,7 +142,7 @@ class ConsumerCoordinator(BaseCoordinator):
if self._subscription.subscribed_pattern:
topics = []
- for topic in cluster.topics():
+ for topic in cluster.topics(self.config['exclude_internal_topics']):
if self._subscription.subscribed_pattern.match(topic):
topics.append(topic)