diff options
-rw-r--r-- | kafka/client_async.py | 65 | ||||
-rw-r--r-- | test/test_client_async.py | 15 |
2 files changed, 47 insertions, 33 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 0c22f90..36e808c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -6,7 +6,14 @@ import heapq import itertools import logging import random -import select + +# selectors in stdlib as of py3.4 +try: + import selectors # pylint: disable=import-error +except ImportError: + # vendored backport module + from . import selectors34 as selectors + import socket import time @@ -92,6 +99,7 @@ class KafkaClient(object): self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False + self._selector = selectors.DefaultSelector() self._conns = {} self._connecting = set() self._refresh_on_disconnects = True @@ -101,6 +109,7 @@ class KafkaClient(object): self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) + self._selector.register(self._wake_r, selectors.EVENT_READ) def __del__(self): self._wake_r.close() @@ -160,11 +169,19 @@ class KafkaClient(object): def _conn_state_change(self, node_id, conn): if conn.connecting(): self._connecting.add(node_id) + self._selector.register(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) if node_id in self._connecting: self._connecting.remove(node_id) + + try: + self._selector.unregister(conn._sock) + except KeyError: + pass + self._selector.register(conn._sock, selectors.EVENT_READ, conn) + if 'bootstrap' in self._conns and node_id != 'bootstrap': bootstrap = self._conns.pop('bootstrap') # XXX: make conn.close() require error to cause refresh @@ -176,6 +193,10 @@ class KafkaClient(object): elif conn.state is ConnectionStates.DISCONNECTING: if node_id in self._connecting: self._connecting.remove(node_id) + try: + self._selector.unregister(conn._sock) + except KeyError: + pass if self._refresh_on_disconnects: log.warning("Node %s connect failed -- refreshing metadata", node_id) self.cluster.request_update() @@ -388,45 +409,25 @@ class KafkaClient(object): return responses - def _poll(self, timeout, sleep=False): + def _poll(self, timeout, sleep=True): # select on reads across all connected sockets, blocking up to timeout - sockets = dict([(conn._sock, conn) - for conn in six.itervalues(self._conns) - if conn.state is ConnectionStates.CONNECTED - and conn.in_flight_requests]) - if not sockets: - # if sockets are connecting, we can wake when they are writeable - if self._connecting: - sockets = [self._conns[node]._sock for node in self._connecting] - select.select([self._wake_r], sockets, [], timeout) - elif timeout: - if sleep: - log.debug('Sleeping at %s for %s', time.time(), timeout) - select.select([self._wake_r], [], [], timeout) - log.debug('Woke up at %s', time.time()) - else: - log.warning('_poll called with a non-zero timeout and' - ' sleep=False -- but there was nothing to do.' - ' This can cause high CPU usage during idle.') - self._clear_wake_fd() - return [] - - # Add a private pipe fd to allow external wakeups - fds = list(sockets.keys()) - fds.append(self._wake_r) - ready, _, _ = select.select(fds, [], [], timeout) - + assert self.in_flight_request_count() > 0 or self._connecting or sleep responses = [] - for sock in ready: - if sock == self._wake_r: + for key, events in self._selector.select(timeout): + if key.fileobj is self._wake_r: + self._clear_wake_fd() + continue + elif not (events & selectors.EVENT_READ): continue - conn = sockets[sock] + conn = key.data while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks + + # Incomplete responses are buffered internally + # while conn.in_flight_requests retains the request if not response: break responses.append(response) - self._clear_wake_fd() return responses def in_flight_request_count(self, node_id=None): diff --git a/test/test_client_async.py b/test/test_client_async.py index ad76aad..922e43c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -1,3 +1,10 @@ +# selectors in stdlib as of py3.4 +try: + import selectors # pylint: disable=import-error +except ImportError: + # vendored backport module + import kafka.selectors34 as selectors + import socket import time @@ -99,15 +106,19 @@ def test_maybe_connect(conn): def test_conn_state_change(mocker, conn): cli = KafkaClient() + sel = mocker.patch.object(cli, '_selector') node_id = 0 conn.state = ConnectionStates.CONNECTING cli._conn_state_change(node_id, conn) assert node_id in cli._connecting + sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE) conn.state = ConnectionStates.CONNECTED cli._conn_state_change(node_id, conn) assert node_id not in cli._connecting + sel.unregister.assert_called_with(conn._sock) + sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn) # Failure to connect should trigger metadata update assert cli.cluster._need_update is False @@ -115,6 +126,7 @@ def test_conn_state_change(mocker, conn): cli._conn_state_change(node_id, conn) assert node_id not in cli._connecting assert cli.cluster._need_update is True + sel.unregister.assert_called_with(conn._sock) conn.state = ConnectionStates.CONNECTING cli._conn_state_change(node_id, conn) @@ -167,8 +179,9 @@ def test_is_ready(mocker, conn): assert not cli.is_ready(0) -def test_close(conn): +def test_close(mocker, conn): cli = KafkaClient() + mocker.patch.object(cli, '_selector') # Unknown node - silent cli.close(2) |