diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-31 10:11:21 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-31 16:42:41 -0700 |
commit | 9638d1cfaa42d0902e39db1f6c3db103cd7a0cc9 (patch) | |
tree | 9542d82a7ed8609da0a389420278b7cebe6289c2 /kafka/client_async.py | |
parent | b1effa24aca3a6bcf2268354caae12ee82d6b36d (diff) | |
download | kafka-python-popped_conn_close.tar.gz |
Dont treat popped conn.close() as failure in state change callbackpopped_conn_close
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index b6adb77..5c8a559 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -314,7 +314,12 @@ class KafkaClient(object): idle_disconnect = True self._idle_expiry_manager.remove(node_id) - if self.cluster.is_bootstrap(node_id): + # If the connection has already by popped from self._conns, + # we can assume the disconnect was intentional and not a failure + if node_id not in self._conns: + pass + + elif self.cluster.is_bootstrap(node_id): self._bootstrap_fails += 1 elif self._refresh_on_disconnects and not self._closed and not idle_disconnect: @@ -419,10 +424,12 @@ class KafkaClient(object): with self._lock: if node_id is None: self._close() - for conn in self._conns.values(): + conns = list(self._conns.values()) + self._conns.clear() + for conn in conns: conn.close() elif node_id in self._conns: - self._conns[node_id].close() + self._conns.pop(node_id).close() else: log.warning("Node %s not found in current connection list; skipping", node_id) return |