diff options
-rw-r--r-- | kafka/conn.py | 43 |
1 files changed, 23 insertions, 20 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index e10d4f1..ffd7702 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -467,6 +467,19 @@ class BrokerConnection(object): 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _send_bytes_blocking(self, data): + self._sock.setblocking(True) + total_sent = 0 + try: + while total_sent < len(data): + sent_bytes = self._sock.send(data[total_sent:]) + total_sent += sent_bytes + if total_sent != len(data): + raise ConnectionError('Buffer overrun during socket send') + return total_sent + finally: + self._sock.setblocking(False) + def _recv_bytes_blocking(self, n): self._sock.setblocking(True) try: @@ -485,15 +498,13 @@ class BrokerConnection(object): log.warning('%s: Sending username and password in the clear', self) data = b'' + # Send PLAIN credentials per RFC-4616 + msg = bytes('\0'.join([self.config['sasl_plain_username'], + self.config['sasl_plain_username'], + self.config['sasl_plain_password']]).encode('utf-8')) + size = Int32.encode(len(msg)) try: - self._sock.setblocking(True) - # Send PLAIN credentials per RFC-4616 - msg = bytes('\0'.join([self.config['sasl_plain_username'], - self.config['sasl_plain_username'], - self.config['sasl_plain_password']]).encode('utf-8')) - size = Int32.encode(len(msg)) - self._sock.sendall(size + msg) - self._sock.setblocking(False) + self._send_bytes_blocking(size + msg) # The server will send a zero sized message (that is Int32(0)) on success. # The connection is closed on failure @@ -530,11 +541,9 @@ class BrokerConnection(object): # pass output token to kafka try: - self._sock.setblocking(True) msg = output_token size = Int32.encode(len(msg)) - self._sock.sendall(size + msg) - self._sock.setblocking(False) + self._send_bytes_blocking(size + msg) # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. @@ -662,16 +671,10 @@ class BrokerConnection(object): # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block # sending each request payload - self._sock.setblocking(True) - total_sent = 0 - while total_sent < len(data): - sent_bytes = self._sock.send(data[total_sent:]) - total_sent += sent_bytes - assert total_sent == len(data) + total_bytes = self._send_bytes_blocking(data) if self._sensors: - self._sensors.bytes_sent.record(total_sent) - self._sock.setblocking(False) - except (AssertionError, ConnectionError) as e: + self._sensors.bytes_sent.record(total_bytes) + except ConnectionError as e: log.exception("Error sending %s to %s", request, self) error = Errors.ConnectionError("%s: %s" % (self, e)) self.close(error=error) |