summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-14 16:52:50 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-14 16:53:03 -0700
commitce536d4930f9b72cd6cb1dc391e237102833b7dd (patch)
treee0136f3e868f90f3d5554b84f0ead2a227689a9b
parentcfddc6bd179e236874e00a899e9349d5c9a54400 (diff)
downloadkafka-python-ce536d4930f9b72cd6cb1dc391e237102833b7dd.tar.gz
Use _send_bytes_blocking in BrokerConnection
-rw-r--r--kafka/conn.py43
1 files changed, 23 insertions, 20 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index e10d4f1..ffd7702 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -467,6 +467,19 @@ class BrokerConnection(object):
'kafka-python does not support SASL mechanism %s' %
self.config['sasl_mechanism']))
+ def _send_bytes_blocking(self, data):
+ self._sock.setblocking(True)
+ total_sent = 0
+ try:
+ while total_sent < len(data):
+ sent_bytes = self._sock.send(data[total_sent:])
+ total_sent += sent_bytes
+ if total_sent != len(data):
+ raise ConnectionError('Buffer overrun during socket send')
+ return total_sent
+ finally:
+ self._sock.setblocking(False)
+
def _recv_bytes_blocking(self, n):
self._sock.setblocking(True)
try:
@@ -485,15 +498,13 @@ class BrokerConnection(object):
log.warning('%s: Sending username and password in the clear', self)
data = b''
+ # Send PLAIN credentials per RFC-4616
+ msg = bytes('\0'.join([self.config['sasl_plain_username'],
+ self.config['sasl_plain_username'],
+ self.config['sasl_plain_password']]).encode('utf-8'))
+ size = Int32.encode(len(msg))
try:
- self._sock.setblocking(True)
- # Send PLAIN credentials per RFC-4616
- msg = bytes('\0'.join([self.config['sasl_plain_username'],
- self.config['sasl_plain_username'],
- self.config['sasl_plain_password']]).encode('utf-8'))
- size = Int32.encode(len(msg))
- self._sock.sendall(size + msg)
- self._sock.setblocking(False)
+ self._send_bytes_blocking(size + msg)
# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
@@ -530,11 +541,9 @@ class BrokerConnection(object):
# pass output token to kafka
try:
- self._sock.setblocking(True)
msg = output_token
size = Int32.encode(len(msg))
- self._sock.sendall(size + msg)
- self._sock.setblocking(False)
+ self._send_bytes_blocking(size + msg)
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
@@ -662,16 +671,10 @@ class BrokerConnection(object):
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
# sending each request payload
- self._sock.setblocking(True)
- total_sent = 0
- while total_sent < len(data):
- sent_bytes = self._sock.send(data[total_sent:])
- total_sent += sent_bytes
- assert total_sent == len(data)
+ total_bytes = self._send_bytes_blocking(data)
if self._sensors:
- self._sensors.bytes_sent.record(total_sent)
- self._sock.setblocking(False)
- except (AssertionError, ConnectionError) as e:
+ self._sensors.bytes_sent.record(total_bytes)
+ except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError("%s: %s" % (self, e))
self.close(error=error)