diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 16 |
1 files changed, 3 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ecd2cea..75b169e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -605,25 +605,14 @@ class KafkaClient(object): continue self._idle_expiry_manager.update(conn.node_id) - - # Accumulate as many responses as the connection has pending - while conn.in_flight_requests: - response = conn.recv() # Note: conn.recv runs callbacks / errbacks - - # Incomplete responses are buffered internally - # while conn.in_flight_requests retains the request - if not response: - break - responses.append(response) + responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks # Check for additional pending SSL bytes if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): # TODO: optimize for conn in self._conns.values(): if conn not in processed and conn.connected() and conn._sock.pending(): - response = conn.recv() - if response: - responses.append(response) + responses.extend(conn.recv()) for conn in six.itervalues(self._conns): if conn.requests_timed_out(): @@ -635,6 +624,7 @@ class KafkaClient(object): if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) + self._maybe_close_oldest_connection() return responses |