summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py26
1 files changed, 22 insertions, 4 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 3571e90..b5c7ba0 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -381,9 +381,17 @@ class BrokerConnection(object):
# Not receiving is the state of reading the payload header
if not self._receiving:
try:
- # An extremely small, but non-zero, probability that there are
- # more than 0 but not yet 4 bytes available to read
- self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
+ bytes_to_read = 4 - self._rbuffer.tell()
+ data = self._sock.recv(bytes_to_read)
+ # We expect socket.recv to raise an exception if there is not
+ # enough data to read the full bytes_to_read
+ # but if the socket is disconnected, we will get empty data
+ # without an exception raised
+ if not data:
+ log.error('%s: socket disconnected', self)
+ self.close(error=Errors.ConnectionError('socket disconnected'))
+ return None
+ self._rbuffer.write(data)
except ssl.SSLWantReadError:
return None
except ConnectionError as e:
@@ -411,7 +419,17 @@ class BrokerConnection(object):
if self._receiving:
staged_bytes = self._rbuffer.tell()
try:
- self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
+ bytes_to_read = self._next_payload_bytes - staged_bytes
+ data = self._sock.recv(bytes_to_read)
+ # We expect socket.recv to raise an exception if there is not
+ # enough data to read the full bytes_to_read
+ # but if the socket is disconnected, we will get empty data
+ # without an exception raised
+ if not data:
+ log.error('%s: socket disconnected', self)
+ self.close(error=Errors.ConnectionError('socket disconnected'))
+ return None
+ self._rbuffer.write(data)
except ssl.SSLWantReadError:
return None
except ConnectionError as e: