diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-15 21:52:58 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-10-15 21:52:58 -0700 |
commit | e3b1ad24b80dd60e3159566740f40fc6f5811070 (patch) | |
tree | ce09520d349f62cee520cb51ee83f6c6864b378e /kafka/client_async.py | |
parent | 0d2164431f8245359c417473fd84e7af034f1306 (diff) | |
download | kafka-python-pending_completions.tar.gz |
Move callback processing from BrokerConnection to KafkaClientpending_completions
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 32 |
1 files changed, 25 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index f6fe829..a90c0d4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,5 +1,6 @@ from __future__ import absolute_import, division +import collections import copy import functools import heapq @@ -204,6 +205,11 @@ class KafkaClient(object): self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) self._wake_lock = threading.Lock() + + # when requests complete, they are transferred to this queue prior to + # invocation. + self._pending_completion = collections.deque() + self._selector.register(self._wake_r, selectors.EVENT_READ) self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) self._closed = False @@ -254,7 +260,8 @@ class KafkaClient(object): future = bootstrap.send(metadata_request) while not future.is_done: self._selector.select(1) - bootstrap.recv() + for r, f in bootstrap.recv(): + f.success(r) if future.failed(): bootstrap.close() continue @@ -512,7 +519,9 @@ class KafkaClient(object): Returns: list: responses received (can be empty) """ - if timeout_ms is None: + if future is not None: + timeout_ms = 100 + elif timeout_ms is None: timeout_ms = self.config['request_timeout_ms'] responses = [] @@ -551,7 +560,9 @@ class KafkaClient(object): self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts - responses.extend(self._poll(timeout)) + self._poll(timeout) + + responses.extend(self._fire_pending_completed_requests()) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done @@ -561,7 +572,7 @@ class KafkaClient(object): return responses def _poll(self, timeout): - responses = [] + """Returns list of (response, future) tuples""" processed = set() start_select = time.time() @@ -600,14 +611,14 @@ class KafkaClient(object): continue self._idle_expiry_manager.update(conn.node_id) - responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks + self._pending_completion.extend(conn.recv()) # Check for additional pending SSL bytes if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): # TODO: optimize for conn in self._conns.values(): if conn not in processed and conn.connected() and conn._sock.pending(): - responses.extend(conn.recv()) + self._pending_completion.extend(conn.recv()) for conn in six.itervalues(self._conns): if conn.requests_timed_out(): @@ -621,7 +632,6 @@ class KafkaClient(object): self._sensors.io_time.record((time.time() - end_select) * 1000000000) self._maybe_close_oldest_connection() - return responses def in_flight_request_count(self, node_id=None): """Get the number of in-flight requests for a node or all nodes. @@ -640,6 +650,14 @@ class KafkaClient(object): else: return sum([len(conn.in_flight_requests) for conn in self._conns.values()]) + def _fire_pending_completed_requests(self): + responses = [] + while self._pending_completion: + response, future = self._pending_completion.popleft() + future.success(response) + responses.append(response) + return responses + def least_loaded_node(self): """Choose the node with fewest outstanding requests, with fallbacks. |