diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 21:10:39 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 23:17:42 -0700 |
commit | 50b96cafc4ae2d9a41fde5c07acb3316a650ec33 (patch) | |
tree | 8aec22f2862391bd122f81df8a368c966a605785 | |
parent | 506d023978e7273bd323c0750e3f77af259d257b (diff) | |
download | kafka-python-50b96cafc4ae2d9a41fde5c07acb3316a650ec33.tar.gz |
Use exclude_internal_topics config in KafkaConsumer to avoid subscribe patterns matching internal topics
-rw-r--r-- | kafka/consumer/group.py | 5 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 7 |
2 files changed, 11 insertions, 1 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9ebfe02..db0022d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -176,6 +176,10 @@ class KafkaConsumer(six.Iterator): selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector + exclude_internal_topics (bool): Whether records from internal topics + (such as offsets) should be exposed to the consumer. If set to True + the only way to receive records from an internal topic is + subscribing to it. Requires 0.10+ Default: True Note: Configuration parameters are described in more detail at @@ -222,6 +226,7 @@ class KafkaConsumer(six.Iterator): 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'selector': selectors.DefaultSelector, + 'exclude_internal_topics': True, } def __init__(self, *topics, **configs): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 083a36a..fd728d9 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -36,6 +36,7 @@ class ConsumerCoordinator(BaseCoordinator): 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, 'api_version': (0, 9), + 'exclude_internal_topics': True, } def __init__(self, client, subscription, metrics, metric_group_prefix, @@ -70,6 +71,10 @@ class ConsumerCoordinator(BaseCoordinator): using Kafka's group managementment facilities. Default: 30000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. + exclude_internal_topics (bool): Whether records from internal topics + (such as offsets) should be exposed to the consumer. If set to + True the only way to receive records from an internal topic is + subscribing to it. Requires 0.10+. Default: True """ super(ConsumerCoordinator, self).__init__(client, **configs) self.config = copy.copy(self.DEFAULT_CONFIG) @@ -137,7 +142,7 @@ class ConsumerCoordinator(BaseCoordinator): if self._subscription.subscribed_pattern: topics = [] - for topic in cluster.topics(): + for topic in cluster.topics(self.config['exclude_internal_topics']): if self._subscription.subscribed_pattern.match(topic): topics.append(topic) |