diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index dbe23f5..15f75d6 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -211,6 +211,7 @@ class KafkaClient(object): self._bootstrap_fails = 0 self._wake_r = None self._wake_w = None + self._need_wakeup = False self._wake_lock = threading.Lock() self._init_wakeup_socketpair() @@ -891,7 +892,9 @@ class KafkaClient(object): def wakeup(self): with self._wake_lock: try: - self._wake_w.send(b'x') + if not self._need_wakeup: + self._wake_w.send(b'x') + self._need_wakeup = True except socket.error as e: log.warning('Unable to send to wakeup socket! (%s)', e) @@ -902,6 +905,7 @@ class KafkaClient(object): self._wake_r.recv(4096) except socket.error: break + self._need_wakeup = False def _maybe_close_oldest_connection(self): expired_connection = self._idle_expiry_manager.poll_expired_connection() |