summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py12
1 files changed, 12 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 9271008..b91ae35 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -414,7 +414,9 @@ class KafkaClient(object):
def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
assert self.in_flight_request_count() > 0 or self._connecting or sleep
+
responses = []
+ processed = set()
for key, events in self._selector.select(timeout):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
@@ -422,6 +424,7 @@ class KafkaClient(object):
elif not (events & selectors.EVENT_READ):
continue
conn = key.data
+ processed.add(conn)
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
@@ -430,6 +433,15 @@ class KafkaClient(object):
if not response:
break
responses.append(response)
+
+ # Check for additional pending SSL bytes
+ if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
+ # TODO: optimize
+ for conn in self._conns.values():
+ if conn not in processed and conn.connected() and conn._sock.pending():
+ response = conn.recv()
+ if response:
+ responses.append(response)
return responses
def in_flight_request_count(self, node_id=None):