diff options
-rw-r--r-- | kafka/client_async.py | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 3dee2e1..bf2f6ea 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -94,6 +94,7 @@ class KafkaClient(object): self._metadata_refresh_in_progress = False self._conns = {} self._connecting = set() + self._refresh_on_disconnects = True self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 @@ -164,10 +165,11 @@ class KafkaClient(object): # Connection failures imply that our metadata is stale, so let's refresh elif conn.state is ConnectionStates.DISCONNECTING: - log.warning("Node %s connect failed -- refreshing metadata", node_id) if node_id in self._connecting: self._connecting.remove(node_id) - self.cluster.request_update() + if self._refresh_on_disconnects: + log.warning("Node %s connect failed -- refreshing metadata", node_id) + self.cluster.request_update() def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" @@ -597,9 +599,13 @@ class KafkaClient(object): if node_id is None: raise Errors.NoBrokersAvailable() + # We will be intentionally causing socket failures + # and should not trigger metadata refresh + self._refresh_on_disconnects = False self._maybe_connect(node_id) conn = self._conns[node_id] version = conn.check_version() + self._refresh_on_disconnects = True return version def wakeup(self): |