summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-19 08:26:30 -0700
committerGitHub <noreply@github.com>2016-06-19 08:26:30 -0700
commit461ccbd9ecf06722c9ff73f6ed439be4b8391672 (patch)
treea4bc99969cce7b0f3b3f18ae85658cff6bc33117 /kafka/conn.py
parent0b5a49e58d16336c1a632a4f5e42bc4fbbb3d118 (diff)
downloadkafka-python-461ccbd9ecf06722c9ff73f6ed439be4b8391672.tar.gz
check_version should scan nodes until version found or timeout (#731)
* Mute all connection logging during conn.check_version * Always process pending MetadataRequest in conn.check_version * KakfaClient.check_version: Scan all brokers until a version is identified or timeout
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py10
1 files changed, 5 insertions, 5 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index c5d3be1..005dd7e 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -561,9 +561,9 @@ class BrokerConnection(object):
class ConnFilter(Filter):
def filter(self, record):
- if record.funcName in ('recv', 'send'):
- return False
- return True
+ if record.funcName == 'check_version':
+ return True
+ return False
log_filter = ConnFilter()
log.addFilter(log_filter)
@@ -598,11 +598,11 @@ class BrokerConnection(object):
# the attempt to write to a disconnected socket should
# immediately fail and allow us to infer that the prior
# request was unrecognized
- self.send(MetadataRequest[0]([]))
+ mr = self.send(MetadataRequest[0]([]))
if self._sock:
self._sock.setblocking(True)
- while not f.is_done:
+ while not (f.is_done and mr.is_done):
self.recv()
if self._sock:
self._sock.setblocking(False)