summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py10
1 files changed, 6 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 2b82b6d..ffc839e 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -188,10 +188,12 @@ class BrokerConnection(object):
# and send bytes asynchronously. For now, just block
# sending each request payload
self._sock.setblocking(True)
- sent_bytes = self._sock.send(size)
- assert sent_bytes == len(size)
- sent_bytes = self._sock.send(message)
- assert sent_bytes == len(message)
+ for data in (size, message):
+ total_sent = 0
+ while total_sent < len(data):
+ sent_bytes = self._sock.send(data[total_sent:])
+ total_sent += sent_bytes
+ assert total_sent == len(data)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)