diff options
-rw-r--r-- | kafka/cluster.py | 52 |
1 files changed, 34 insertions, 18 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index cdd81f4..8c2c10e 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -4,6 +4,7 @@ import collections import copy import logging import random +import threading import time import six @@ -31,6 +32,7 @@ class ClusterMetadata(object): self._need_update = False self._future = None self._listeners = set() + self._lock = threading.Lock() self.need_all_topic_metadata = False self.unauthorized_topics = set() @@ -96,18 +98,23 @@ class ClusterMetadata(object): Returns: Future (value will be this cluster object after update) """ - self._need_update = True - if not self._future or self._future.is_done: - self._future = Future() - return self._future + with self._lock: + self._need_update = True + if not self._future or self._future.is_done: + self._future = Future() + return self._future def topics(self): return set(self._partitions.keys()) def failed_update(self, exception): - if self._future: - self._future.failure(exception) - self._future = None + f = None + with self._lock: + if self._future: + f = self._future + self._future = None + if f: + f.failure(exception) self._last_refresh_ms = time.time() * 1000 def update_metadata(self, metadata): @@ -126,20 +133,20 @@ class ClusterMetadata(object): node_id: BrokerMetadata(node_id, host, port) }) - self._partitions.clear() - self._broker_partitions.clear() - self.unauthorized_topics.clear() + _new_partitions = {} + _new_broker_partitions = collections.defaultdict(set) + _new_unauthorized_topics = set() for error_code, topic, partitions in metadata.topics: error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - self._partitions[topic] = {} + _new_partitions[topic] = {} for p_error, partition, leader, replicas, isr in partitions: - self._partitions[topic][partition] = PartitionMetadata( + _new_partitions[topic][partition] = PartitionMetadata( topic=topic, partition=partition, leader=leader, replicas=replicas, isr=isr, error=p_error) if leader != -1: - self._broker_partitions[leader].add( + _new_broker_partitions[leader].add( TopicPartition(topic, partition)) elif error_type is Errors.LeaderNotAvailableError: @@ -149,20 +156,29 @@ class ClusterMetadata(object): log.error("Topic %s not found in cluster metadata", topic) elif error_type is Errors.TopicAuthorizationFailedError: log.error("Topic %s is not authorized for this client", topic) - self.unauthorized_topics.add(topic) + _new_unauthorized_topics.add(topic) elif error_type is Errors.InvalidTopicError: log.error("'%s' is not a valid topic name", topic) else: log.error("Error fetching metadata for topic %s: %s", topic, error_type) - if self._future: - self._future.success(self) - self._future = None - self._need_update = False + with self._lock: + self._partitions = _new_partitions + self._broker_partitions = _new_broker_partitions + self.unauthorized_topics = _new_unauthorized_topics + f = None + if self._future: + f = self._future + self._future = None + self._need_update = False + now = time.time() * 1000 self._last_refresh_ms = now self._last_successful_refresh_ms = now + + if f: + f.success(self) log.debug("Updated cluster metadata to %s", self) for listener in self._listeners: |