diff options
-rw-r--r-- | kafka/conn.py | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index a131413..e10d4f1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -467,6 +467,19 @@ class BrokerConnection(object): 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _recv_bytes_blocking(self, n): + self._sock.setblocking(True) + try: + data = b'' + while len(data) < n: + fragment = self._sock.recv(n - len(data)) + if not fragment: + raise ConnectionError('Connection reset during recv') + data += fragment + return data + finally: + self._sock.setblocking(False) + def _try_authenticate_plain(self, future): if self.config['security_protocol'] == 'SASL_PLAINTEXT': log.warning('%s: Sending username and password in the clear', self) @@ -480,19 +493,12 @@ class BrokerConnection(object): self.config['sasl_plain_password']]).encode('utf-8')) size = Int32.encode(len(msg)) self._sock.sendall(size + msg) + self._sock.setblocking(False) # The server will send a zero sized message (that is Int32(0)) on success. # The connection is closed on failure - while len(data) < 4: - fragment = self._sock.recv(4 - len(data)) - if not fragment: - log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username']) - error = Errors.AuthenticationFailedError( - 'Authentication failed for user {0}'.format( - self.config['sasl_plain_username'])) - return future.failure(error) - data += fragment - self._sock.setblocking(False) + self._recv_bytes_blocking(4) + except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) @@ -528,14 +534,15 @@ class BrokerConnection(object): msg = output_token size = Int32.encode(len(msg)) self._sock.sendall(size + msg) + self._sock.setblocking(False) + # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. # The gssapi will be able to identify the needed next step. # The connection is closed on failure. - header = self._sock.recv(4) + header = self._recv_bytes_blocking(4) token_size = struct.unpack('>i', header) - received_token = self._sock.recv(token_size) - self._sock.setblocking(False) + received_token = self._recv_bytes_blocking(token_size) except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) |