summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 21:20:42 -0700
committerGitHub <noreply@github.com>2016-07-16 21:20:42 -0700
commit5ab4d5c274112a4e2024dea415a0ec4b79009a28 (patch)
tree2f75731a028194d92d8df916a2a6c553385aae80 /kafka/cluster.py
parent2a7f4dbb8159464941afa25d49428976cc05f902 (diff)
parent277f0ddd61c230181f5f21d427070ec44b36a257 (diff)
downloadkafka-python-5ab4d5c274112a4e2024dea415a0ec4b79009a28.tar.gz
Merge pull request #762 from dpkp/metadata_v1
Use Metadata Request/Response v1 with 0.10+ brokers
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py60
1 files changed, 47 insertions, 13 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 9aabec1..694e115 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -34,6 +34,8 @@ class ClusterMetadata(object):
self._lock = threading.Lock()
self.need_all_topic_metadata = False
self.unauthorized_topics = set()
+ self.internal_topics = set()
+ self.controller = None
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -150,13 +152,23 @@ class ClusterMetadata(object):
self._future = Future()
return self._future
- def topics(self):
+ def topics(self, exclude_internal_topics=True):
"""Get set of known topics.
+ Arguments:
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to
+ True the only way to receive records from an internal topic is
+ subscribing to it. Default True
+
Returns:
set: {topic (str), ...}
"""
- return set(self._partitions.keys())
+ topics = set(self._partitions.keys())
+ if exclude_internal_topics:
+ return topics - self.internal_topics
+ else:
+ return topics
def failed_update(self, exception):
"""Update cluster state given a failed MetadataRequest."""
@@ -180,23 +192,41 @@ class ClusterMetadata(object):
# In the common case where we ask for a single topic and get back an
# error, we should fail the future
if len(metadata.topics) == 1 and metadata.topics[0][0] != 0:
- error_code, topic, _ = metadata.topics[0]
+ error_code, topic = metadata.topics[0][:2]
error = Errors.for_code(error_code)(topic)
return self.failed_update(error)
if not metadata.brokers:
log.warning("No broker metadata found in MetadataResponse")
- for node_id, host, port in metadata.brokers:
+ for broker in metadata.brokers:
+ if metadata.API_VERSION == 0:
+ node_id, host, port = broker
+ rack = None
+ else:
+ node_id, host, port, rack = broker
self._brokers.update({
- node_id: BrokerMetadata(node_id, host, port)
+ node_id: BrokerMetadata(node_id, host, port, rack)
})
+ if metadata.API_VERSION == 0:
+ self.controller = None
+ else:
+ self.controller = self._brokers.get(metadata.controller_id)
+
_new_partitions = {}
_new_broker_partitions = collections.defaultdict(set)
_new_unauthorized_topics = set()
+ _new_internal_topics = set()
- for error_code, topic, partitions in metadata.topics:
+ for topic_data in metadata.topics:
+ if metadata.API_VERSION == 0:
+ error_code, topic, partitions = topic_data
+ is_internal = False
+ else:
+ error_code, topic, is_internal, partitions = topic_data
+ if is_internal:
+ _new_internal_topics.add(topic)
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
_new_partitions[topic] = {}
@@ -226,6 +256,7 @@ class ClusterMetadata(object):
self._partitions = _new_partitions
self._broker_partitions = _new_broker_partitions
self.unauthorized_topics = _new_unauthorized_topics
+ self.internal_topics = _new_internal_topics
f = None
if self._future:
f = self._future
@@ -272,7 +303,8 @@ class ClusterMetadata(object):
coordinator = BrokerMetadata(
response.coordinator_id,
response.host,
- response.port)
+ response.port,
+ None)
# Assume that group coordinators are just brokers
# (this is true now, but could diverge in future)
@@ -281,12 +313,14 @@ class ClusterMetadata(object):
# If this happens, either brokers have moved without
# changing IDs, or our assumption above is wrong
- elif coordinator != self._brokers[node_id]:
- log.error("GroupCoordinator metadata conflicts with existing"
- " broker metadata. Coordinator: %s, Broker: %s",
- coordinator, self._brokers[node_id])
- self._groups[group] = node_id
- return False
+ else:
+ node = self._brokers[node_id]
+ if coordinator.host != node.host or coordinator.port != node.port:
+ log.error("GroupCoordinator metadata conflicts with existing"
+ " broker metadata. Coordinator: %s, Broker: %s",
+ coordinator, node)
+ self._groups[group] = node_id
+ return False
log.info("Group coordinator for %s is %s", group, coordinator)
self._groups[group] = node_id