summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-12-17 14:50:40 -0800
committerDana Powers <dana.powers@gmail.com>2017-12-26 11:09:08 -0800
commit75e3ca9a4d48e405a8b11e6c33d126ad2f4d9f7c (patch)
tree56e53f2c2eadc23992c47515bae3c63071b916dd
parent4cfeaca5c867e15213420caad400f5f1863f64e3 (diff)
downloadkafka-python-75e3ca9a4d48e405a8b11e6c33d126ad2f4d9f7c.tar.gz
Recv all available network bytes before parsing
-rw-r--r--kafka/conn.py40
1 files changed, 20 insertions, 20 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 246cab8..f4fd8bf 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -742,19 +742,23 @@ class BrokerConnection(object):
return responses
def _recv(self):
- responses = []
+ """Take all available bytes from socket, return list of any responses from parser"""
+ recvd = []
SOCK_CHUNK_BYTES = 4096
- while True:
+ BUFFERED_CHUNKS = 1000
+ while len(recvd) < BUFFERED_CHUNKS:
try:
data = self._sock.recv(SOCK_CHUNK_BYTES)
- # We expect socket.recv to raise an exception if there is not
- # enough data to read the full bytes_to_read
+ # We expect socket.recv to raise an exception if there are no
+ # bytes available to read from the socket in non-blocking mode.
# but if the socket is disconnected, we will get empty data
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
- break
+ return []
+ else:
+ recvd.append(data)
except SSLWantReadError:
break
@@ -764,27 +768,23 @@ class BrokerConnection(object):
log.exception('%s: Error receiving network data'
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
- break
+ return []
except BlockingIOError:
if six.PY3:
break
raise
- if self._sensors:
- self._sensors.bytes_received.record(len(data))
-
- try:
- more_responses = self._protocol.receive_bytes(data)
- except Errors.KafkaProtocolError as e:
- self.close(e)
- break
- else:
- responses.extend([resp for (_, resp) in more_responses])
-
- if len(data) < SOCK_CHUNK_BYTES:
- break
+ recvd_data = b''.join(recvd)
+ if self._sensors:
+ self._sensors.bytes_received.record(len(recvd_data))
- return responses
+ try:
+ responses = self._protocol.receive_bytes(recvd_data)
+ except Errors.KafkaProtocolError as e:
+ self.close(e)
+ return []
+ else:
+ return [resp for (_, resp) in responses] # drop correlation id
def requests_timed_out(self):
if self.in_flight_requests: