summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py5
1 files changed, 5 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 0cb575c..c3fcc79 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -154,6 +154,7 @@ class KafkaClient(object):
'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'request_timeout_ms': 30000,
+ 'wakeup_timeout_ms': 3000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
@@ -203,6 +204,7 @@ class KafkaClient(object):
self._bootstrap_fails = 0
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
+ self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
self._wake_lock = threading.Lock()
self._lock = threading.RLock()
@@ -871,6 +873,9 @@ class KafkaClient(object):
with self._wake_lock:
try:
self._wake_w.sendall(b'x')
+ except socket.timeout:
+ log.warning('Timeout to send to wakeup socket!')
+ raise Errors.KafkaTimeoutError()
except socket.error:
log.warning('Unable to send to wakeup socket!')