summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-17 08:37:15 -0700
committerGitHub <noreply@github.com>2016-07-17 08:37:15 -0700
commit31a29ecea000ad8e95b0ecb1b8e11f9600029135 (patch)
treea530f29e741526a1b26dc3926eba018f7f82fbc1
parent506d023978e7273bd323c0750e3f77af259d257b (diff)
downloadkafka-python-31a29ecea000ad8e95b0ecb1b8e11f9600029135.tar.gz
KAFKA-2832: Add a consumer config option to exclude internal topics (#765)
Use exclude_internal_topics config in KafkaConsumer to avoid subscribe patterns matching internal topics Raise error during rebalance if subscribed topics are not authorized
-rw-r--r--kafka/consumer/group.py5
-rw-r--r--kafka/coordinator/consumer.py12
2 files changed, 13 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 9ebfe02..db0022d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -176,6 +176,10 @@ class KafkaConsumer(six.Iterator):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to True
+ the only way to receive records from an internal topic is
+ subscribing to it. Requires 0.10+ Default: True
Note:
Configuration parameters are described in more detail at
@@ -222,6 +226,7 @@ class KafkaConsumer(six.Iterator):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
+ 'exclude_internal_topics': True,
}
def __init__(self, *topics, **configs):
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 083a36a..2543238 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -36,6 +36,7 @@ class ConsumerCoordinator(BaseCoordinator):
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
+ 'exclude_internal_topics': True,
}
def __init__(self, client, subscription, metrics, metric_group_prefix,
@@ -70,6 +71,10 @@ class ConsumerCoordinator(BaseCoordinator):
using Kafka's group managementment facilities. Default: 30000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to
+ True the only way to receive records from an internal topic is
+ subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -131,13 +136,12 @@ class ConsumerCoordinator(BaseCoordinator):
def _handle_metadata_update(self, cluster):
# if we encounter any unauthorized topics, raise an exception
- # TODO
- #if self._cluster.unauthorized_topics:
- # raise TopicAuthorizationError(self._cluster.unauthorized_topics)
+ if cluster.unauthorized_topics:
+ raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics)
if self._subscription.subscribed_pattern:
topics = []
- for topic in cluster.topics():
+ for topic in cluster.topics(self.config['exclude_internal_topics']):
if self._subscription.subscribed_pattern.match(topic):
topics.append(topic)