summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py16
1 files changed, 3 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ecd2cea..75b169e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -605,25 +605,14 @@ class KafkaClient(object):
continue
self._idle_expiry_manager.update(conn.node_id)
-
- # Accumulate as many responses as the connection has pending
- while conn.in_flight_requests:
- response = conn.recv() # Note: conn.recv runs callbacks / errbacks
-
- # Incomplete responses are buffered internally
- # while conn.in_flight_requests retains the request
- if not response:
- break
- responses.append(response)
+ responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks
# Check for additional pending SSL bytes
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
# TODO: optimize
for conn in self._conns.values():
if conn not in processed and conn.connected() and conn._sock.pending():
- response = conn.recv()
- if response:
- responses.append(response)
+ responses.extend(conn.recv())
for conn in six.itervalues(self._conns):
if conn.requests_timed_out():
@@ -635,6 +624,7 @@ class KafkaClient(object):
if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
+
self._maybe_close_oldest_connection()
return responses