summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/cluster.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 4646378..0a5c07f 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -199,20 +199,21 @@ class ClusterMetadata(object):
if not metadata.brokers:
log.warning("No broker metadata found in MetadataResponse")
+ _new_brokers = {}
for broker in metadata.brokers:
if metadata.API_VERSION == 0:
node_id, host, port = broker
rack = None
else:
node_id, host, port, rack = broker
- self._brokers.update({
+ _new_brokers.update({
node_id: BrokerMetadata(node_id, host, port, rack)
})
if metadata.API_VERSION == 0:
- self.controller = None
+ _new_controller = None
else:
- self.controller = self._brokers.get(metadata.controller_id)
+ _new_controller = _new_brokers.get(metadata.controller_id)
_new_partitions = {}
_new_broker_partitions = collections.defaultdict(set)
@@ -253,6 +254,8 @@ class ClusterMetadata(object):
topic, error_type)
with self._lock:
+ self._brokers = _new_brokers
+ self.controller = _new_controller
self._partitions = _new_partitions
self._broker_partitions = _new_broker_partitions
self.unauthorized_topics = _new_unauthorized_topics