diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 40 |
1 files changed, 20 insertions, 20 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 246cab8..f4fd8bf 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -742,19 +742,23 @@ class BrokerConnection(object): return responses def _recv(self): - responses = [] + """Take all available bytes from socket, return list of any responses from parser""" + recvd = [] SOCK_CHUNK_BYTES = 4096 - while True: + BUFFERED_CHUNKS = 1000 + while len(recvd) < BUFFERED_CHUNKS: try: data = self._sock.recv(SOCK_CHUNK_BYTES) - # We expect socket.recv to raise an exception if there is not - # enough data to read the full bytes_to_read + # We expect socket.recv to raise an exception if there are no + # bytes available to read from the socket in non-blocking mode. # but if the socket is disconnected, we will get empty data # without an exception raised if not data: log.error('%s: socket disconnected', self) self.close(error=Errors.ConnectionError('socket disconnected')) - break + return [] + else: + recvd.append(data) except SSLWantReadError: break @@ -764,27 +768,23 @@ class BrokerConnection(object): log.exception('%s: Error receiving network data' ' closing socket', self) self.close(error=Errors.ConnectionError(e)) - break + return [] except BlockingIOError: if six.PY3: break raise - if self._sensors: - self._sensors.bytes_received.record(len(data)) - - try: - more_responses = self._protocol.receive_bytes(data) - except Errors.KafkaProtocolError as e: - self.close(e) - break - else: - responses.extend([resp for (_, resp) in more_responses]) - - if len(data) < SOCK_CHUNK_BYTES: - break + recvd_data = b''.join(recvd) + if self._sensors: + self._sensors.bytes_received.record(len(recvd_data)) - return responses + try: + responses = self._protocol.receive_bytes(recvd_data) + except Errors.KafkaProtocolError as e: + self.close(e) + return [] + else: + return [resp for (_, resp) in responses] # drop correlation id def requests_timed_out(self): if self.in_flight_requests: |