diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-07 16:26:22 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-09 08:03:09 -0700 |
commit | 237bd730fd29a105b6aabdc0262a694fb7c8f510 (patch) | |
tree | c1478b5a63c3fdbd5adb9274742717b82608596c /kafka/client_async.py | |
parent | 26260d4a04cf00878885e901a22d8ee6121a9f6a (diff) | |
download | kafka-python-selectors.tar.gz |
Use selectors module in KafkaClientselectors
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 65 |
1 files changed, 33 insertions, 32 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 0c22f90..36e808c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -6,7 +6,14 @@ import heapq import itertools import logging import random -import select + +# selectors in stdlib as of py3.4 +try: + import selectors # pylint: disable=import-error +except ImportError: + # vendored backport module + from . import selectors34 as selectors + import socket import time @@ -92,6 +99,7 @@ class KafkaClient(object): self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False + self._selector = selectors.DefaultSelector() self._conns = {} self._connecting = set() self._refresh_on_disconnects = True @@ -101,6 +109,7 @@ class KafkaClient(object): self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) + self._selector.register(self._wake_r, selectors.EVENT_READ) def __del__(self): self._wake_r.close() @@ -160,11 +169,19 @@ class KafkaClient(object): def _conn_state_change(self, node_id, conn): if conn.connecting(): self._connecting.add(node_id) + self._selector.register(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) if node_id in self._connecting: self._connecting.remove(node_id) + + try: + self._selector.unregister(conn._sock) + except KeyError: + pass + self._selector.register(conn._sock, selectors.EVENT_READ, conn) + if 'bootstrap' in self._conns and node_id != 'bootstrap': bootstrap = self._conns.pop('bootstrap') # XXX: make conn.close() require error to cause refresh @@ -176,6 +193,10 @@ class KafkaClient(object): elif conn.state is ConnectionStates.DISCONNECTING: if node_id in self._connecting: self._connecting.remove(node_id) + try: + self._selector.unregister(conn._sock) + except KeyError: + pass if self._refresh_on_disconnects: log.warning("Node %s connect failed -- refreshing metadata", node_id) self.cluster.request_update() @@ -388,45 +409,25 @@ class KafkaClient(object): return responses - def _poll(self, timeout, sleep=False): + def _poll(self, timeout, sleep=True): # select on reads across all connected sockets, blocking up to timeout - sockets = dict([(conn._sock, conn) - for conn in six.itervalues(self._conns) - if conn.state is ConnectionStates.CONNECTED - and conn.in_flight_requests]) - if not sockets: - # if sockets are connecting, we can wake when they are writeable - if self._connecting: - sockets = [self._conns[node]._sock for node in self._connecting] - select.select([self._wake_r], sockets, [], timeout) - elif timeout: - if sleep: - log.debug('Sleeping at %s for %s', time.time(), timeout) - select.select([self._wake_r], [], [], timeout) - log.debug('Woke up at %s', time.time()) - else: - log.warning('_poll called with a non-zero timeout and' - ' sleep=False -- but there was nothing to do.' - ' This can cause high CPU usage during idle.') - self._clear_wake_fd() - return [] - - # Add a private pipe fd to allow external wakeups - fds = list(sockets.keys()) - fds.append(self._wake_r) - ready, _, _ = select.select(fds, [], [], timeout) - + assert self.in_flight_request_count() > 0 or self._connecting or sleep responses = [] - for sock in ready: - if sock == self._wake_r: + for key, events in self._selector.select(timeout): + if key.fileobj is self._wake_r: + self._clear_wake_fd() + continue + elif not (events & selectors.EVENT_READ): continue - conn = sockets[sock] + conn = key.data while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks + + # Incomplete responses are buffered internally + # while conn.in_flight_requests retains the request if not response: break responses.append(response) - self._clear_wake_fd() return responses def in_flight_request_count(self, node_id=None): |