diff options
-rw-r--r-- | kafka/client_async.py | 44 |
1 files changed, 37 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index c99057c..f4566c0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -1,7 +1,10 @@ +from __future__ import absolute_import + import copy import heapq import itertools import logging +import os import random import select import time @@ -92,6 +95,7 @@ class KafkaClient(object): self._last_bootstrap = 0 self._bootstrap_fails = 0 self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) + self._wake_r, self._wake_w = os.pipe() def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails @@ -293,7 +297,7 @@ class KafkaClient(object): return self._conns[node_id].send(request, expect_response=expect_response) - def poll(self, timeout_ms=None, future=None): + def poll(self, timeout_ms=None, future=None, sleep=False): """Try to read and write to sockets. This method will also attempt to complete node connections, refresh @@ -305,6 +309,9 @@ class KafkaClient(object): timeout will be the minimum of timeout, request timeout and metadata timeout. Default: request_timeout_ms future (Future, optional): if provided, blocks until future.is_done + sleep (bool): if True and there is nothing to do (no connections + or requests in flight), will sleep for duration timeout before + returning empty results. Default: False. Returns: list: responses received (can be empty) @@ -345,7 +352,7 @@ class KafkaClient(object): self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts - responses.extend(self._poll(timeout)) + responses.extend(self._poll(timeout, sleep=sleep)) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done @@ -354,7 +361,7 @@ class KafkaClient(object): return responses - def _poll(self, timeout): + def _poll(self, timeout, sleep=False): # select on reads across all connected sockets, blocking up to timeout sockets = dict([(conn._sock, conn) for conn in six.itervalues(self._conns) @@ -364,22 +371,35 @@ class KafkaClient(object): # if sockets are connecting, we can wake when they are writeable if self._connecting: sockets = [self._conns[node]._sock for node in self._connecting] - select.select([], sockets, [], timeout) + select.select([self._wake_r], sockets, [], timeout) elif timeout: - log.warning('_poll called with a timeout, but nothing to do' - ' -- this can cause high CPU usage during idle') + if sleep: + log.debug('Sleeping at %s for %s', time.time(), timeout) + select.select([self._wake_r], [], [], timeout) + log.debug('Woke up at %s', time.time()) + else: + log.warning('_poll called with a non-zero timeout and' + ' sleep=False -- but there was nothing to do.' + ' This can cause high CPU usage during idle.') + self._clear_wake_fd() return [] - ready, _, _ = select.select(list(sockets.keys()), [], [], timeout) + # Add a private pipe fd to allow external wakeups + fds = list(sockets.keys()) + fds.append(self._wake_r) + ready, _, _ = select.select(fds, [], [], timeout) responses = [] for sock in ready: + if sock == self._wake_r: + continue conn = sockets[sock] while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks if not response: break responses.append(response) + self._clear_wake_fd() return responses def in_flight_request_count(self, node_id=None): @@ -580,6 +600,16 @@ class KafkaClient(object): version, request.__class__.__name__) continue + def wakeup(self): + os.write(self._wake_w, 'x') + + def _clear_wake_fd(self): + while True: + fds, _, _ = select.select([self._wake_r], [], [], 0) + if not fds: + break + os.read(self._wake_r, 1) + class DelayedTaskQueue(object): # see https://docs.python.org/2/library/heapq.html |