summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py5
1 files changed, 5 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b6adb77..ba5c960 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -845,6 +845,7 @@ class KafkaClient(object):
UnrecognizedBrokerVersion: please file bug if seen!
AssertionError (if strict=True): please file bug if seen!
"""
+ self._lock.acquire()
end = time.time() + timeout
while time.time() < end:
@@ -852,6 +853,7 @@ class KafkaClient(object):
# which can block for an increasing backoff period
try_node = node_id or self.least_loaded_node()
if try_node is None:
+ self._lock.release()
raise Errors.NoBrokersAvailable()
self._maybe_connect(try_node)
conn = self._conns[try_node]
@@ -866,16 +868,19 @@ class KafkaClient(object):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
self._api_versions = conn.get_api_versions()
+ self._lock.release()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
if node_id is not None:
+ self._lock.release()
raise
finally:
self._refresh_on_disconnects = True
# Timeout
else:
+ self._lock.release()
raise Errors.NoBrokersAvailable()
def wakeup(self):