summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-31 10:11:21 -0700
committerDana Powers <dana.powers@gmail.com>2019-03-31 16:42:41 -0700
commit9638d1cfaa42d0902e39db1f6c3db103cd7a0cc9 (patch)
tree9542d82a7ed8609da0a389420278b7cebe6289c2 /kafka/client_async.py
parentb1effa24aca3a6bcf2268354caae12ee82d6b36d (diff)
downloadkafka-python-popped_conn_close.tar.gz
Dont treat popped conn.close() as failure in state change callbackpopped_conn_close
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b6adb77..5c8a559 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -314,7 +314,12 @@ class KafkaClient(object):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)
- if self.cluster.is_bootstrap(node_id):
+ # If the connection has already by popped from self._conns,
+ # we can assume the disconnect was intentional and not a failure
+ if node_id not in self._conns:
+ pass
+
+ elif self.cluster.is_bootstrap(node_id):
self._bootstrap_fails += 1
elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
@@ -419,10 +424,12 @@ class KafkaClient(object):
with self._lock:
if node_id is None:
self._close()
- for conn in self._conns.values():
+ conns = list(self._conns.values())
+ self._conns.clear()
+ for conn in conns:
conn.close()
elif node_id in self._conns:
- self._conns[node_id].close()
+ self._conns.pop(node_id).close()
else:
log.warning("Node %s not found in current connection list; skipping", node_id)
return