summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-18 16:48:57 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-18 23:58:56 -0700
commit22e1c7ebdc5f660c195dfa94ea1e83b7c004d500 (patch)
treea4bc99969cce7b0f3b3f18ae85658cff6bc33117
parent915466a9fcc84c453f03d4e0425f0bc81a696656 (diff)
downloadkafka-python-check_version.tar.gz
KakfaClient.check_version: Scan all brokers until a version is identified or timeoutcheck_version
-rw-r--r--kafka/client_async.py53
1 files changed, 41 insertions, 12 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 1276743..8916a3e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -662,20 +662,49 @@ class KafkaClient(object):
self._delayed_tasks.remove(task)
def check_version(self, node_id=None, timeout=2, strict=False):
- """Attempt to guess the broker version"""
- if node_id is None:
- node_id = self.least_loaded_node()
- if node_id is None:
+ """Attempt to guess a broker version
+
+ Note: it is possible that this method blocks longer than the
+ specified timeout. This can happen if the entire cluster
+ is down and the client enters a bootstrap backoff sleep.
+ This is only possible if node_id is None.
+
+ Returns: version str, i.e. '0.10', '0.9', '0.8.2', '0.8.1', '0.8.0'
+
+ Raises:
+ NodeNotReadyError (if node_id is provided)
+ NoBrokersAvailable (if node_id is None)
+ UnrecognizedBrokerVersion: please file bug if seen!
+ AssertionError (if strict=True): please file bug if seen!
+ """
+ end = time.time() + timeout
+ while time.time() < end:
+
+ # It is possible that least_loaded_node falls back to bootstrap,
+ # which can block for an increasing backoff period
+ try_node = node_id or self.least_loaded_node()
+ if try_node is None:
raise Errors.NoBrokersAvailable()
+ self._maybe_connect(try_node)
+ conn = self._conns[try_node]
- # We will be intentionally causing socket failures
- # and should not trigger metadata refresh
- self._refresh_on_disconnects = False
- self._maybe_connect(node_id)
- conn = self._conns[node_id]
- version = conn.check_version()
- self._refresh_on_disconnects = True
- return version
+ # We will intentionally cause socket failures
+ # These should not trigger metadata refresh
+ self._refresh_on_disconnects = False
+ try:
+ remaining = end - time.time()
+ version = conn.check_version(timeout=remaining, strict=strict)
+ return version
+ except Errors.NodeNotReadyError:
+ # Only raise to user if this is a node-specific request
+ if node_id is not None:
+ raise
+ finally:
+ self._refresh_on_disconnects = True
+
+ # Timeout
+ else:
+ raise Errors.NoBrokersAvailable()
def wakeup(self):
if self._wake_w.send(b'x') != 1: