diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 21:20:42 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-07-16 21:20:42 -0700 |
commit | 5ab4d5c274112a4e2024dea415a0ec4b79009a28 (patch) | |
tree | 2f75731a028194d92d8df916a2a6c553385aae80 /kafka/cluster.py | |
parent | 2a7f4dbb8159464941afa25d49428976cc05f902 (diff) | |
parent | 277f0ddd61c230181f5f21d427070ec44b36a257 (diff) | |
download | kafka-python-5ab4d5c274112a4e2024dea415a0ec4b79009a28.tar.gz |
Merge pull request #762 from dpkp/metadata_v1
Use Metadata Request/Response v1 with 0.10+ brokers
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 60 |
1 files changed, 47 insertions, 13 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 9aabec1..694e115 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -34,6 +34,8 @@ class ClusterMetadata(object): self._lock = threading.Lock() self.need_all_topic_metadata = False self.unauthorized_topics = set() + self.internal_topics = set() + self.controller = None self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -150,13 +152,23 @@ class ClusterMetadata(object): self._future = Future() return self._future - def topics(self): + def topics(self, exclude_internal_topics=True): """Get set of known topics. + Arguments: + exclude_internal_topics (bool): Whether records from internal topics + (such as offsets) should be exposed to the consumer. If set to + True the only way to receive records from an internal topic is + subscribing to it. Default True + Returns: set: {topic (str), ...} """ - return set(self._partitions.keys()) + topics = set(self._partitions.keys()) + if exclude_internal_topics: + return topics - self.internal_topics + else: + return topics def failed_update(self, exception): """Update cluster state given a failed MetadataRequest.""" @@ -180,23 +192,41 @@ class ClusterMetadata(object): # In the common case where we ask for a single topic and get back an # error, we should fail the future if len(metadata.topics) == 1 and metadata.topics[0][0] != 0: - error_code, topic, _ = metadata.topics[0] + error_code, topic = metadata.topics[0][:2] error = Errors.for_code(error_code)(topic) return self.failed_update(error) if not metadata.brokers: log.warning("No broker metadata found in MetadataResponse") - for node_id, host, port in metadata.brokers: + for broker in metadata.brokers: + if metadata.API_VERSION == 0: + node_id, host, port = broker + rack = None + else: + node_id, host, port, rack = broker self._brokers.update({ - node_id: BrokerMetadata(node_id, host, port) + node_id: BrokerMetadata(node_id, host, port, rack) }) + if metadata.API_VERSION == 0: + self.controller = None + else: + self.controller = self._brokers.get(metadata.controller_id) + _new_partitions = {} _new_broker_partitions = collections.defaultdict(set) _new_unauthorized_topics = set() + _new_internal_topics = set() - for error_code, topic, partitions in metadata.topics: + for topic_data in metadata.topics: + if metadata.API_VERSION == 0: + error_code, topic, partitions = topic_data + is_internal = False + else: + error_code, topic, is_internal, partitions = topic_data + if is_internal: + _new_internal_topics.add(topic) error_type = Errors.for_code(error_code) if error_type is Errors.NoError: _new_partitions[topic] = {} @@ -226,6 +256,7 @@ class ClusterMetadata(object): self._partitions = _new_partitions self._broker_partitions = _new_broker_partitions self.unauthorized_topics = _new_unauthorized_topics + self.internal_topics = _new_internal_topics f = None if self._future: f = self._future @@ -272,7 +303,8 @@ class ClusterMetadata(object): coordinator = BrokerMetadata( response.coordinator_id, response.host, - response.port) + response.port, + None) # Assume that group coordinators are just brokers # (this is true now, but could diverge in future) @@ -281,12 +313,14 @@ class ClusterMetadata(object): # If this happens, either brokers have moved without # changing IDs, or our assumption above is wrong - elif coordinator != self._brokers[node_id]: - log.error("GroupCoordinator metadata conflicts with existing" - " broker metadata. Coordinator: %s, Broker: %s", - coordinator, self._brokers[node_id]) - self._groups[group] = node_id - return False + else: + node = self._brokers[node_id] + if coordinator.host != node.host or coordinator.port != node.port: + log.error("GroupCoordinator metadata conflicts with existing" + " broker metadata. Coordinator: %s, Broker: %s", + coordinator, node) + self._groups[group] = node_id + return False log.info("Group coordinator for %s is %s", group, coordinator) self._groups[group] = node_id |