summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-31 18:38:47 -0700
committerGitHub <noreply@github.com>2019-03-31 18:38:47 -0700
commit3664ae85e5a4c47075489e01688897f8cea8b11d (patch)
treed2ac9d2d6e03705d5073626f796c652d0c9090c6 /kafka/client_async.py
parentb1effa24aca3a6bcf2268354caae12ee82d6b36d (diff)
downloadkafka-python-3664ae85e5a4c47075489e01688897f8cea8b11d.tar.gz
lock client.check_version (#1771)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py5
1 files changed, 5 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b6adb77..ba5c960 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -845,6 +845,7 @@ class KafkaClient(object):
UnrecognizedBrokerVersion: please file bug if seen!
AssertionError (if strict=True): please file bug if seen!
"""
+ self._lock.acquire()
end = time.time() + timeout
while time.time() < end:
@@ -852,6 +853,7 @@ class KafkaClient(object):
# which can block for an increasing backoff period
try_node = node_id or self.least_loaded_node()
if try_node is None:
+ self._lock.release()
raise Errors.NoBrokersAvailable()
self._maybe_connect(try_node)
conn = self._conns[try_node]
@@ -866,16 +868,19 @@ class KafkaClient(object):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
self._api_versions = conn.get_api_versions()
+ self._lock.release()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
if node_id is not None:
+ self._lock.release()
raise
finally:
self._refresh_on_disconnects = True
# Timeout
else:
+ self._lock.release()
raise Errors.NoBrokersAvailable()
def wakeup(self):