summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py65
-rw-r--r--test/test_client_async.py15
2 files changed, 47 insertions, 33 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 0c22f90..36e808c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -6,7 +6,14 @@ import heapq
import itertools
import logging
import random
-import select
+
+# selectors in stdlib as of py3.4
+try:
+ import selectors # pylint: disable=import-error
+except ImportError:
+ # vendored backport module
+ from . import selectors34 as selectors
+
import socket
import time
@@ -92,6 +99,7 @@ class KafkaClient(object):
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
+ self._selector = selectors.DefaultSelector()
self._conns = {}
self._connecting = set()
self._refresh_on_disconnects = True
@@ -101,6 +109,7 @@ class KafkaClient(object):
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
+ self._selector.register(self._wake_r, selectors.EVENT_READ)
def __del__(self):
self._wake_r.close()
@@ -160,11 +169,19 @@ class KafkaClient(object):
def _conn_state_change(self, node_id, conn):
if conn.connecting():
self._connecting.add(node_id)
+ self._selector.register(conn._sock, selectors.EVENT_WRITE)
elif conn.connected():
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
self._connecting.remove(node_id)
+
+ try:
+ self._selector.unregister(conn._sock)
+ except KeyError:
+ pass
+ self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+
if 'bootstrap' in self._conns and node_id != 'bootstrap':
bootstrap = self._conns.pop('bootstrap')
# XXX: make conn.close() require error to cause refresh
@@ -176,6 +193,10 @@ class KafkaClient(object):
elif conn.state is ConnectionStates.DISCONNECTING:
if node_id in self._connecting:
self._connecting.remove(node_id)
+ try:
+ self._selector.unregister(conn._sock)
+ except KeyError:
+ pass
if self._refresh_on_disconnects:
log.warning("Node %s connect failed -- refreshing metadata", node_id)
self.cluster.request_update()
@@ -388,45 +409,25 @@ class KafkaClient(object):
return responses
- def _poll(self, timeout, sleep=False):
+ def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
- sockets = dict([(conn._sock, conn)
- for conn in six.itervalues(self._conns)
- if conn.state is ConnectionStates.CONNECTED
- and conn.in_flight_requests])
- if not sockets:
- # if sockets are connecting, we can wake when they are writeable
- if self._connecting:
- sockets = [self._conns[node]._sock for node in self._connecting]
- select.select([self._wake_r], sockets, [], timeout)
- elif timeout:
- if sleep:
- log.debug('Sleeping at %s for %s', time.time(), timeout)
- select.select([self._wake_r], [], [], timeout)
- log.debug('Woke up at %s', time.time())
- else:
- log.warning('_poll called with a non-zero timeout and'
- ' sleep=False -- but there was nothing to do.'
- ' This can cause high CPU usage during idle.')
- self._clear_wake_fd()
- return []
-
- # Add a private pipe fd to allow external wakeups
- fds = list(sockets.keys())
- fds.append(self._wake_r)
- ready, _, _ = select.select(fds, [], [], timeout)
-
+ assert self.in_flight_request_count() > 0 or self._connecting or sleep
responses = []
- for sock in ready:
- if sock == self._wake_r:
+ for key, events in self._selector.select(timeout):
+ if key.fileobj is self._wake_r:
+ self._clear_wake_fd()
+ continue
+ elif not (events & selectors.EVENT_READ):
continue
- conn = sockets[sock]
+ conn = key.data
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
+
+ # Incomplete responses are buffered internally
+ # while conn.in_flight_requests retains the request
if not response:
break
responses.append(response)
- self._clear_wake_fd()
return responses
def in_flight_request_count(self, node_id=None):
diff --git a/test/test_client_async.py b/test/test_client_async.py
index ad76aad..922e43c 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -1,3 +1,10 @@
+# selectors in stdlib as of py3.4
+try:
+ import selectors # pylint: disable=import-error
+except ImportError:
+ # vendored backport module
+ import kafka.selectors34 as selectors
+
import socket
import time
@@ -99,15 +106,19 @@ def test_maybe_connect(conn):
def test_conn_state_change(mocker, conn):
cli = KafkaClient()
+ sel = mocker.patch.object(cli, '_selector')
node_id = 0
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
assert node_id in cli._connecting
+ sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE)
conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
+ sel.unregister.assert_called_with(conn._sock)
+ sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
@@ -115,6 +126,7 @@ def test_conn_state_change(mocker, conn):
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
assert cli.cluster._need_update is True
+ sel.unregister.assert_called_with(conn._sock)
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
@@ -167,8 +179,9 @@ def test_is_ready(mocker, conn):
assert not cli.is_ready(0)
-def test_close(conn):
+def test_close(mocker, conn):
cli = KafkaClient()
+ mocker.patch.object(cli, '_selector')
# Unknown node - silent
cli.close(2)