diff options
-rw-r--r-- | kafka/client_async.py | 12 |
1 files changed, 12 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 9271008..b91ae35 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -414,7 +414,9 @@ class KafkaClient(object): def _poll(self, timeout, sleep=True): # select on reads across all connected sockets, blocking up to timeout assert self.in_flight_request_count() > 0 or self._connecting or sleep + responses = [] + processed = set() for key, events in self._selector.select(timeout): if key.fileobj is self._wake_r: self._clear_wake_fd() @@ -422,6 +424,7 @@ class KafkaClient(object): elif not (events & selectors.EVENT_READ): continue conn = key.data + processed.add(conn) while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks @@ -430,6 +433,15 @@ class KafkaClient(object): if not response: break responses.append(response) + + # Check for additional pending SSL bytes + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + # TODO: optimize + for conn in self._conns.values(): + if conn not in processed and conn.connected() and conn._sock.pending(): + response = conn.recv() + if response: + responses.append(response) return responses def in_flight_request_count(self, node_id=None): |