diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-28 22:41:53 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-28 22:41:53 -0700 |
commit | 2a4f646d67b256f41391ad4f974a37020e4cbdf1 (patch) | |
tree | 038cfaf182db58e968527b650e5af3b344894555 /kafka/client_async.py | |
parent | 91eacbbac918eca81ac9aa6d38e94b04d1a1a574 (diff) | |
download | kafka-python-nonblocking_wake_sockets.tar.gz |
Use _need_wakeup flag to avoid multiple writes to wake socketnonblocking_wake_sockets
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index dbe23f5..15f75d6 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -211,6 +211,7 @@ class KafkaClient(object): self._bootstrap_fails = 0 self._wake_r = None self._wake_w = None + self._need_wakeup = False self._wake_lock = threading.Lock() self._init_wakeup_socketpair() @@ -891,7 +892,9 @@ class KafkaClient(object): def wakeup(self): with self._wake_lock: try: - self._wake_w.send(b'x') + if not self._need_wakeup: + self._wake_w.send(b'x') + self._need_wakeup = True except socket.error as e: log.warning('Unable to send to wakeup socket! (%s)', e) @@ -902,6 +905,7 @@ class KafkaClient(object): self._wake_r.recv(4096) except socket.error: break + self._need_wakeup = False def _maybe_close_oldest_connection(self): expired_connection = self._idle_expiry_manager.poll_expired_connection() |