diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-07 16:23:24 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-08 09:23:57 -0700 |
commit | 9b71b0da624ebcd6e3e06b4f325e0ddffd1c8f47 (patch) | |
tree | 3448d82d16533ea516cdbd3904e2c96da1a897f3 /kafka/client_async.py | |
parent | ed053660a4fc1341402e6ecd2c5739c252503ef2 (diff) | |
download | kafka-python-9b71b0da624ebcd6e3e06b4f325e0ddffd1c8f47.tar.gz |
Make _wake_r socket non-blocking; drop select from _clear_wake_fd
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e921fa4..ca51987 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -98,6 +98,7 @@ class KafkaClient(object): self._bootstrap_fails = 0 self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._wake_r, self._wake_w = socket.socketpair() + self._wake_r.setblocking(False) def __del__(self): self._wake_r.close() @@ -682,10 +683,10 @@ class KafkaClient(object): def _clear_wake_fd(self): while True: - fds, _, _ = select.select([self._wake_r], [], [], 0) - if not fds: + try: + self._wake_r.recv(1) + except: break - self._wake_r.recv(1) class DelayedTaskQueue(object): |