summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-23 15:02:29 -0800
committerDana Powers <dana.powers@rd.io>2016-01-23 15:02:29 -0800
commitb8c209714c3a2251c056ebeed0357055cc8e3b72 (patch)
tree92899cff63057940220a7e0b0bacbebbe25ee658 /kafka/client_async.py
parent4c2ad1278013a9e04e718b411f938d7c7ff050ad (diff)
downloadkafka-python-b8c209714c3a2251c056ebeed0357055cc8e3b72.tar.gz
Optionally sleep in KafkaClient.poll(), add KafkaClient.wakeup()
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py44
1 files changed, 37 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index c99057c..f4566c0 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -1,7 +1,10 @@
+from __future__ import absolute_import
+
import copy
import heapq
import itertools
import logging
+import os
import random
import select
import time
@@ -92,6 +95,7 @@ class KafkaClient(object):
self._last_bootstrap = 0
self._bootstrap_fails = 0
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
+ self._wake_r, self._wake_w = os.pipe()
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
@@ -293,7 +297,7 @@ class KafkaClient(object):
return self._conns[node_id].send(request, expect_response=expect_response)
- def poll(self, timeout_ms=None, future=None):
+ def poll(self, timeout_ms=None, future=None, sleep=False):
"""Try to read and write to sockets.
This method will also attempt to complete node connections, refresh
@@ -305,6 +309,9 @@ class KafkaClient(object):
timeout will be the minimum of timeout, request timeout and
metadata timeout. Default: request_timeout_ms
future (Future, optional): if provided, blocks until future.is_done
+ sleep (bool): if True and there is nothing to do (no connections
+ or requests in flight), will sleep for duration timeout before
+ returning empty results. Default: False.
Returns:
list: responses received (can be empty)
@@ -345,7 +352,7 @@ class KafkaClient(object):
self.config['request_timeout_ms'])
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
- responses.extend(self._poll(timeout))
+ responses.extend(self._poll(timeout, sleep=sleep))
# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
@@ -354,7 +361,7 @@ class KafkaClient(object):
return responses
- def _poll(self, timeout):
+ def _poll(self, timeout, sleep=False):
# select on reads across all connected sockets, blocking up to timeout
sockets = dict([(conn._sock, conn)
for conn in six.itervalues(self._conns)
@@ -364,22 +371,35 @@ class KafkaClient(object):
# if sockets are connecting, we can wake when they are writeable
if self._connecting:
sockets = [self._conns[node]._sock for node in self._connecting]
- select.select([], sockets, [], timeout)
+ select.select([self._wake_r], sockets, [], timeout)
elif timeout:
- log.warning('_poll called with a timeout, but nothing to do'
- ' -- this can cause high CPU usage during idle')
+ if sleep:
+ log.debug('Sleeping at %s for %s', time.time(), timeout)
+ select.select([self._wake_r], [], [], timeout)
+ log.debug('Woke up at %s', time.time())
+ else:
+ log.warning('_poll called with a non-zero timeout and'
+ ' sleep=False -- but there was nothing to do.'
+ ' This can cause high CPU usage during idle.')
+ self._clear_wake_fd()
return []
- ready, _, _ = select.select(list(sockets.keys()), [], [], timeout)
+ # Add a private pipe fd to allow external wakeups
+ fds = list(sockets.keys())
+ fds.append(self._wake_r)
+ ready, _, _ = select.select(fds, [], [], timeout)
responses = []
for sock in ready:
+ if sock == self._wake_r:
+ continue
conn = sockets[sock]
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
if not response:
break
responses.append(response)
+ self._clear_wake_fd()
return responses
def in_flight_request_count(self, node_id=None):
@@ -580,6 +600,16 @@ class KafkaClient(object):
version, request.__class__.__name__)
continue
+ def wakeup(self):
+ os.write(self._wake_w, 'x')
+
+ def _clear_wake_fd(self):
+ while True:
+ fds, _, _ = select.select([self._wake_r], [], [], 0)
+ if not fds:
+ break
+ os.read(self._wake_r, 1)
+
class DelayedTaskQueue(object):
# see https://docs.python.org/2/library/heapq.html