diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-01-07 10:52:01 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-08-13 19:47:36 -0700 |
commit | df227a6015992d8ddb79f5faa3f782d0042edd6b (patch) | |
tree | 064e31324df666ecc7575d5033948600f5df52fd /kafka/client_async.py | |
parent | f13ce1d87919ab763b02e38c17080580e199b4af (diff) | |
download | kafka-python-receive_bytes_pipe.tar.gz |
BrokerConnection.receive_bytes(data) -> response eventsreceive_bytes_pipe
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 16 |
1 files changed, 3 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ecd2cea..75b169e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -605,25 +605,14 @@ class KafkaClient(object): continue self._idle_expiry_manager.update(conn.node_id) - - # Accumulate as many responses as the connection has pending - while conn.in_flight_requests: - response = conn.recv() # Note: conn.recv runs callbacks / errbacks - - # Incomplete responses are buffered internally - # while conn.in_flight_requests retains the request - if not response: - break - responses.append(response) + responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks # Check for additional pending SSL bytes if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): # TODO: optimize for conn in self._conns.values(): if conn not in processed and conn.connected() and conn._sock.pending(): - response = conn.recv() - if response: - responses.append(response) + responses.extend(conn.recv()) for conn in six.itervalues(self._conns): if conn.requests_timed_out(): @@ -635,6 +624,7 @@ class KafkaClient(object): if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) + self._maybe_close_oldest_connection() return responses |