summaryrefslogtreecommitdiff
path: root/python/qpid/tests/messaging/endpoints.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-06-25 17:09:05 +0000
committerRafael H. Schloming <rhs@apache.org>2010-06-25 17:09:05 +0000
commite9920e89d298dbcc5cd01d0c79616353eb750c43 (patch)
tree6793a83cffe26ab8f15b43be2f7de77b58020f23 /python/qpid/tests/messaging/endpoints.py
parent6b27ee254c81d3121cba7e20368f5c2d1f0fb2c5 (diff)
downloadqpid-python-e9920e89d298dbcc5cd01d0c79616353eb750c43.tar.gz
added optional timeouts to {connection,session,sender,receiver}.close() as well as connection.detach() and {session,sender}.sync()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@958037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests/messaging/endpoints.py')
-rw-r--r--python/qpid/tests/messaging/endpoints.py103
1 files changed, 100 insertions, 3 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index dce8d9b5ff..b064d62c21 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -20,10 +20,11 @@
# setup, usage, teardown, errors(sync), errors(async), stress, soak,
# boundary-conditions, config
-import errno, os, time
+import errno, os, socket, time
from qpid import compat
from qpid.compat import set
from qpid.messaging import *
+from qpid.messaging.transports import TRANSPORTS
from qpid.tests.messaging import Base
class SetupTests(Base):
@@ -98,8 +99,6 @@ class SetupTests(Base):
def testReconnect(self):
options = self.connection_options()
- import socket
- from qpid.messaging.transports import TRANSPORTS
real = TRANSPORTS["tcp"]
class flaky:
@@ -213,6 +212,104 @@ class ConnectionTests(Base):
self.conn.close()
assert not self.conn.attached()
+class hangable:
+
+ def __init__(self, host, port):
+ self.tcp = TRANSPORTS["tcp"](host, port)
+ self.hung = False
+
+ def hang(self):
+ self.hung = True
+
+ def fileno(self):
+ return self.tcp.fileno()
+
+ def reading(self, reading):
+ if self.hung:
+ return True
+ else:
+ return self.tcp.reading(reading)
+
+ def writing(self, writing):
+ if self.hung:
+ return False
+ else:
+ return self.tcp.writing(writing)
+
+ def send(self, bytes):
+ if self.hung:
+ return 0
+ else:
+ return self.tcp.send(bytes)
+
+ def recv(self, n):
+ if self.hung:
+ return ""
+ else:
+ return self.tcp.recv(n)
+
+ def close(self):
+ self.tcp.close()
+
+TRANSPORTS["hangable"] = hangable
+
+class TimeoutTests(Base):
+
+ def setup_connection(self):
+ options = self.connection_options()
+ options["transport"] = "hangable"
+ return Connection.establish(self.broker, **options)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender("amq.topic")
+
+ def setup_receiver(self):
+ return self.ssn.receiver("amq.topic")
+
+ def teardown_connection(self, conn):
+ try:
+ conn.detach(timeout=0)
+ except Timeout:
+ pass
+
+ def hang(self):
+ self.conn._driver._transport.hang()
+
+ def timeoutTest(self, method):
+ self.hang()
+ try:
+ method(timeout=self.delay())
+ assert False, "did not time out"
+ except Timeout:
+ pass
+
+ def testSenderSync(self):
+ self.snd.send(self.content("testSenderSync"), sync=False)
+ self.timeoutTest(self.snd.sync)
+
+ def testSenderClose(self):
+ self.snd.send(self.content("testSenderClose"), sync=False)
+ self.timeoutTest(self.snd.close)
+
+ def testReceiverClose(self):
+ self.timeoutTest(self.rcv.close)
+
+ def testSessionSync(self):
+ self.snd.send(self.content("testSessionSync"), sync=False)
+ self.timeoutTest(self.ssn.sync)
+
+ def testSessionClose(self):
+ self.timeoutTest(self.ssn.close)
+
+ def testConnectionDetach(self):
+ self.timeoutTest(self.conn.detach)
+
+ def testConnectionClose(self):
+ self.timeoutTest(self.conn.close)
+
ACK_QC = 'test-ack-queue; {create: always}'
ACK_QD = 'test-ack-queue; {delete: always}'