summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/conn.py33
1 files changed, 20 insertions, 13 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index a131413..e10d4f1 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -467,6 +467,19 @@ class BrokerConnection(object):
'kafka-python does not support SASL mechanism %s' %
self.config['sasl_mechanism']))
+ def _recv_bytes_blocking(self, n):
+ self._sock.setblocking(True)
+ try:
+ data = b''
+ while len(data) < n:
+ fragment = self._sock.recv(n - len(data))
+ if not fragment:
+ raise ConnectionError('Connection reset during recv')
+ data += fragment
+ return data
+ finally:
+ self._sock.setblocking(False)
+
def _try_authenticate_plain(self, future):
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.warning('%s: Sending username and password in the clear', self)
@@ -480,19 +493,12 @@ class BrokerConnection(object):
self.config['sasl_plain_password']]).encode('utf-8'))
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
+ self._sock.setblocking(False)
# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
- while len(data) < 4:
- fragment = self._sock.recv(4 - len(data))
- if not fragment:
- log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
- error = Errors.AuthenticationFailedError(
- 'Authentication failed for user {0}'.format(
- self.config['sasl_plain_username']))
- return future.failure(error)
- data += fragment
- self._sock.setblocking(False)
+ self._recv_bytes_blocking(4)
+
except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
@@ -528,14 +534,15 @@ class BrokerConnection(object):
msg = output_token
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
+ self._sock.setblocking(False)
+
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
- header = self._sock.recv(4)
+ header = self._recv_bytes_blocking(4)
token_size = struct.unpack('>i', header)
- received_token = self._sock.recv(token_size)
- self._sock.setblocking(False)
+ received_token = self._recv_bytes_blocking(token_size)
except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)