summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py40
1 files changed, 20 insertions, 20 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 246cab8..f4fd8bf 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -742,19 +742,23 @@ class BrokerConnection(object):
return responses
def _recv(self):
- responses = []
+ """Take all available bytes from socket, return list of any responses from parser"""
+ recvd = []
SOCK_CHUNK_BYTES = 4096
- while True:
+ BUFFERED_CHUNKS = 1000
+ while len(recvd) < BUFFERED_CHUNKS:
try:
data = self._sock.recv(SOCK_CHUNK_BYTES)
- # We expect socket.recv to raise an exception if there is not
- # enough data to read the full bytes_to_read
+ # We expect socket.recv to raise an exception if there are no
+ # bytes available to read from the socket in non-blocking mode.
# but if the socket is disconnected, we will get empty data
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
- break
+ return []
+ else:
+ recvd.append(data)
except SSLWantReadError:
break
@@ -764,27 +768,23 @@ class BrokerConnection(object):
log.exception('%s: Error receiving network data'
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
- break
+ return []
except BlockingIOError:
if six.PY3:
break
raise
- if self._sensors:
- self._sensors.bytes_received.record(len(data))
-
- try:
- more_responses = self._protocol.receive_bytes(data)
- except Errors.KafkaProtocolError as e:
- self.close(e)
- break
- else:
- responses.extend([resp for (_, resp) in more_responses])
-
- if len(data) < SOCK_CHUNK_BYTES:
- break
+ recvd_data = b''.join(recvd)
+ if self._sensors:
+ self._sensors.bytes_received.record(len(recvd_data))
- return responses
+ try:
+ responses = self._protocol.receive_bytes(recvd_data)
+ except Errors.KafkaProtocolError as e:
+ self.close(e)
+ return []
+ else:
+ return [resp for (_, resp) in responses] # drop correlation id
def requests_timed_out(self):
if self.in_flight_requests: