summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 3a1922e..88b8ec6 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -354,11 +354,12 @@ class KafkaClient(object):
ready, _, _ = select.select(list(sockets.keys()), [], [], timeout)
responses = []
- # list, not iterator, because inline callbacks may add to self._conns
for sock in ready:
conn = sockets[sock]
- response = conn.recv() # Note: conn.recv runs callbacks / errbacks
- if response:
+ while conn.in_flight_requests:
+ response = conn.recv() # Note: conn.recv runs callbacks / errbacks
+ if not response:
+ break
responses.append(response)
return responses