summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py10
1 files changed, 8 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 4962d9f..24a5bef 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -665,8 +665,14 @@ class KafkaClient(object):
def _fire_pending_completed_requests(self):
responses = []
- while self._pending_completion:
- response, future = self._pending_completion.popleft()
+ while True:
+ try:
+ # We rely on deque.popleft remaining threadsafe
+ # to allow both the heartbeat thread and the main thread
+ # to process responses
+ response, future = self._pending_completion.popleft()
+ except IndexError:
+ break
future.success(response)
responses.append(response)
return responses