summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e921fa4..ca51987 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -98,6 +98,7 @@ class KafkaClient(object):
self._bootstrap_fails = 0
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = socket.socketpair()
+ self._wake_r.setblocking(False)
def __del__(self):
self._wake_r.close()
@@ -682,10 +683,10 @@ class KafkaClient(object):
def _clear_wake_fd(self):
while True:
- fds, _, _ = select.select([self._wake_r], [], [], 0)
- if not fds:
+ try:
+ self._wake_r.recv(1)
+ except:
break
- self._wake_r.recv(1)
class DelayedTaskQueue(object):