summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortalwrii <talwrii@gmail.com>2016-01-12 23:55:52 +0000
committerSergey Shepelev <temotor@gmail.com>2017-01-17 00:27:47 +0500
commitc40759b8875f1b87c1d88c5bb8b2514cf2226813 (patch)
tree6581531eb838e1bff243292297b18f44ada0f7b1
parentf81b135ae9e74d6a70d04ce5ceb2fdb7cb939eac (diff)
downloadeventlet-zmq-timeout-282.tar.gz
green.zmq: support RCVTIMEO (receive timeout)zmq-timeout-282
https://github.com/eventlet/eventlet/issues/282 https://github.com/eventlet/eventlet/pull/283
-rw-r--r--AUTHORS1
-rw-r--r--eventlet/green/zmq.py23
-rw-r--r--tests/zmq_test.py13
3 files changed, 35 insertions, 2 deletions
diff --git a/AUTHORS b/AUTHORS
index 4b105ff..70f27da 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -143,3 +143,4 @@ Thanks To
* Matthew D. Pagel
* Matt Yule-Bennett
* Artur Stawiarski
+* Tal Wrii
diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py
index 97cf788..b3e70e2 100644
--- a/eventlet/green/zmq.py
+++ b/eventlet/green/zmq.py
@@ -95,11 +95,14 @@ class _BlockedThread(object):
__bool__ = __nonzero__
- def block(self):
+ def block(self, deadline=None):
if self._blocked_thread is not None:
raise Exception("Cannot block more than one thread on one BlockedThread")
self._blocked_thread = greenlet.getcurrent()
+ if deadline is not None:
+ self._hub.schedule_call_local(deadline - self._hub.clock(), self.wake)
+
try:
self._hub.switch()
finally:
@@ -245,6 +248,7 @@ class Socket(_Socket):
event,
lambda _: None,
lambda: None)
+ self.__dict__['_eventlet_clock'] = hub.clock
@_wraps(_Socket.close)
def close(self, linger=None):
@@ -376,6 +380,16 @@ class Socket(_Socket):
self._eventlet_recv_event.wake()
return msg
+ deadline = None
+ if hasattr(__zmq__, 'RCVTIMEO'):
+ sock_timeout = self.getsockopt(__zmq__.RCVTIMEO)
+ if sock_timeout == -1:
+ pass
+ elif sock_timeout > 0:
+ deadline = self._eventlet_clock() + sock_timeout / 1000.0
+ else:
+ raise ValueError(sock_timeout)
+
flags |= NOBLOCK
with self._eventlet_recv_lock:
while True:
@@ -383,7 +397,12 @@ class Socket(_Socket):
return _Socket_recv(self, flags, copy, track)
except ZMQError as e:
if e.errno == EAGAIN:
- self._eventlet_recv_event.block()
+ # zmq in its wisdom decided to reuse EAGAIN for timeouts
+ if deadline is not None and self._eventlet_clock() > deadline:
+ e.is_timeout = True
+ raise
+
+ self._eventlet_recv_event.block(deadline=deadline)
else:
raise
finally:
diff --git a/tests/zmq_test.py b/tests/zmq_test.py
index 5c1a25f..601878f 100644
--- a/tests/zmq_test.py
+++ b/tests/zmq_test.py
@@ -597,3 +597,16 @@ def test_recv_json_no_args():
with clean_pair(zmq.REQ, zmq.REP) as (s1, s2, _):
eventlet.spawn(s1.send_json, {})
s2.recv_json()
+
+
+@tests.skip_unless(zmq_supported)
+def test_recv_timeout():
+ # https://github.com/eventlet/eventlet/issues/282
+ with clean_pair(zmq.PUB, zmq.SUB) as (_, sub, _):
+ sub.setsockopt(zmq.RCVTIMEO, 100)
+ try:
+ with eventlet.Timeout(1, False):
+ sub.recv()
+ assert False
+ except zmq.ZMQError as e:
+ assert eventlet.is_timeout(e)