summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-15 21:52:58 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-15 21:52:58 -0700
commite3b1ad24b80dd60e3159566740f40fc6f5811070 (patch)
treece09520d349f62cee520cb51ee83f6c6864b378e /kafka/client_async.py
parent0d2164431f8245359c417473fd84e7af034f1306 (diff)
downloadkafka-python-pending_completions.tar.gz
Move callback processing from BrokerConnection to KafkaClientpending_completions
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py32
1 files changed, 25 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index f6fe829..a90c0d4 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import, division
+import collections
import copy
import functools
import heapq
@@ -204,6 +205,11 @@ class KafkaClient(object):
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
self._wake_lock = threading.Lock()
+
+ # when requests complete, they are transferred to this queue prior to
+ # invocation.
+ self._pending_completion = collections.deque()
+
self._selector.register(self._wake_r, selectors.EVENT_READ)
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
self._closed = False
@@ -254,7 +260,8 @@ class KafkaClient(object):
future = bootstrap.send(metadata_request)
while not future.is_done:
self._selector.select(1)
- bootstrap.recv()
+ for r, f in bootstrap.recv():
+ f.success(r)
if future.failed():
bootstrap.close()
continue
@@ -512,7 +519,9 @@ class KafkaClient(object):
Returns:
list: responses received (can be empty)
"""
- if timeout_ms is None:
+ if future is not None:
+ timeout_ms = 100
+ elif timeout_ms is None:
timeout_ms = self.config['request_timeout_ms']
responses = []
@@ -551,7 +560,9 @@ class KafkaClient(object):
self.config['request_timeout_ms'])
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
- responses.extend(self._poll(timeout))
+ self._poll(timeout)
+
+ responses.extend(self._fire_pending_completed_requests())
# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
@@ -561,7 +572,7 @@ class KafkaClient(object):
return responses
def _poll(self, timeout):
- responses = []
+ """Returns list of (response, future) tuples"""
processed = set()
start_select = time.time()
@@ -600,14 +611,14 @@ class KafkaClient(object):
continue
self._idle_expiry_manager.update(conn.node_id)
- responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks
+ self._pending_completion.extend(conn.recv())
# Check for additional pending SSL bytes
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
# TODO: optimize
for conn in self._conns.values():
if conn not in processed and conn.connected() and conn._sock.pending():
- responses.extend(conn.recv())
+ self._pending_completion.extend(conn.recv())
for conn in six.itervalues(self._conns):
if conn.requests_timed_out():
@@ -621,7 +632,6 @@ class KafkaClient(object):
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
self._maybe_close_oldest_connection()
- return responses
def in_flight_request_count(self, node_id=None):
"""Get the number of in-flight requests for a node or all nodes.
@@ -640,6 +650,14 @@ class KafkaClient(object):
else:
return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
+ def _fire_pending_completed_requests(self):
+ responses = []
+ while self._pending_completion:
+ response, future = self._pending_completion.popleft()
+ future.success(response)
+ responses.append(response)
+ return responses
+
def least_loaded_node(self):
"""Choose the node with fewest outstanding requests, with fallbacks.