diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 22:56:11 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-07-16 22:56:11 -0700 |
commit | 43bbdf1434615390800783fc8da56000cf9acd10 (patch) | |
tree | c60951de40069eb69078940fbf742117a5a10b73 /kafka/client_async.py | |
parent | 5ab4d5c274112a4e2024dea415a0ec4b79009a28 (diff) | |
download | kafka-python-43bbdf1434615390800783fc8da56000cf9acd10.tar.gz |
Protect writes to wakeup socket with threading lock (#763 / #709)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e064d51..2700069 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -6,6 +6,7 @@ import heapq import itertools import logging import random +import threading # selectors in stdlib as of py3.4 try: @@ -158,6 +159,7 @@ class KafkaClient(object): self._bootstrap_fails = 0 self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) + self._wake_lock = threading.Lock() self._selector.register(self._wake_r, selectors.EVENT_READ) self._closed = False self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) @@ -758,10 +760,12 @@ class KafkaClient(object): raise Errors.NoBrokersAvailable() def wakeup(self): - if self._wake_w.send(b'x') != 1: - log.warning('Unable to send to wakeup socket!') + with self._wake_lock: + if self._wake_w.send(b'x') != 1: + log.warning('Unable to send to wakeup socket!') def _clear_wake_fd(self): + # reading from wake socket should only happen in a single thread while True: try: self._wake_r.recv(1024) |