summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py52
1 files changed, 34 insertions, 18 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index cdd81f4..8c2c10e 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -4,6 +4,7 @@ import collections
import copy
import logging
import random
+import threading
import time
import six
@@ -31,6 +32,7 @@ class ClusterMetadata(object):
self._need_update = False
self._future = None
self._listeners = set()
+ self._lock = threading.Lock()
self.need_all_topic_metadata = False
self.unauthorized_topics = set()
@@ -96,18 +98,23 @@ class ClusterMetadata(object):
Returns: Future (value will be this cluster object after update)
"""
- self._need_update = True
- if not self._future or self._future.is_done:
- self._future = Future()
- return self._future
+ with self._lock:
+ self._need_update = True
+ if not self._future or self._future.is_done:
+ self._future = Future()
+ return self._future
def topics(self):
return set(self._partitions.keys())
def failed_update(self, exception):
- if self._future:
- self._future.failure(exception)
- self._future = None
+ f = None
+ with self._lock:
+ if self._future:
+ f = self._future
+ self._future = None
+ if f:
+ f.failure(exception)
self._last_refresh_ms = time.time() * 1000
def update_metadata(self, metadata):
@@ -126,20 +133,20 @@ class ClusterMetadata(object):
node_id: BrokerMetadata(node_id, host, port)
})
- self._partitions.clear()
- self._broker_partitions.clear()
- self.unauthorized_topics.clear()
+ _new_partitions = {}
+ _new_broker_partitions = collections.defaultdict(set)
+ _new_unauthorized_topics = set()
for error_code, topic, partitions in metadata.topics:
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
- self._partitions[topic] = {}
+ _new_partitions[topic] = {}
for p_error, partition, leader, replicas, isr in partitions:
- self._partitions[topic][partition] = PartitionMetadata(
+ _new_partitions[topic][partition] = PartitionMetadata(
topic=topic, partition=partition, leader=leader,
replicas=replicas, isr=isr, error=p_error)
if leader != -1:
- self._broker_partitions[leader].add(
+ _new_broker_partitions[leader].add(
TopicPartition(topic, partition))
elif error_type is Errors.LeaderNotAvailableError:
@@ -149,20 +156,29 @@ class ClusterMetadata(object):
log.error("Topic %s not found in cluster metadata", topic)
elif error_type is Errors.TopicAuthorizationFailedError:
log.error("Topic %s is not authorized for this client", topic)
- self.unauthorized_topics.add(topic)
+ _new_unauthorized_topics.add(topic)
elif error_type is Errors.InvalidTopicError:
log.error("'%s' is not a valid topic name", topic)
else:
log.error("Error fetching metadata for topic %s: %s",
topic, error_type)
- if self._future:
- self._future.success(self)
- self._future = None
- self._need_update = False
+ with self._lock:
+ self._partitions = _new_partitions
+ self._broker_partitions = _new_broker_partitions
+ self.unauthorized_topics = _new_unauthorized_topics
+ f = None
+ if self._future:
+ f = self._future
+ self._future = None
+ self._need_update = False
+
now = time.time() * 1000
self._last_refresh_ms = now
self._last_successful_refresh_ms = now
+
+ if f:
+ f.success(self)
log.debug("Updated cluster metadata to %s", self)
for listener in self._listeners: