diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-08-15 13:00:02 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-15 13:00:02 -0700 |
commit | ba7afd9bc9362055ec0bedcf53eb6f8909dc22d2 (patch) | |
tree | f68b4dc2653df1e379da7b497e0fa76a19d6c5a5 /kafka/client_async.py | |
parent | cbc6fdc4b973a6a94953c9ce9c33e54e415e45bf (diff) | |
download | kafka-python-ba7afd9bc9362055ec0bedcf53eb6f8909dc22d2.tar.gz |
BrokerConnection receive bytes pipe (#1032)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 16 |
1 files changed, 3 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 4e4e835..80e8494 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -599,25 +599,14 @@ class KafkaClient(object): continue self._idle_expiry_manager.update(conn.node_id) - - # Accumulate as many responses as the connection has pending - while conn.in_flight_requests: - response = conn.recv() # Note: conn.recv runs callbacks / errbacks - - # Incomplete responses are buffered internally - # while conn.in_flight_requests retains the request - if not response: - break - responses.append(response) + responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks # Check for additional pending SSL bytes if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): # TODO: optimize for conn in self._conns.values(): if conn not in processed and conn.connected() and conn._sock.pending(): - response = conn.recv() - if response: - responses.append(response) + responses.extend(conn.recv()) for conn in six.itervalues(self._conns): if conn.requests_timed_out(): @@ -629,6 +618,7 @@ class KafkaClient(object): if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) + self._maybe_close_oldest_connection() return responses |