From a91fe82ffdb32f0db050c8c071379281295e5ca8 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 9 May 2008 18:40:13 +0000 Subject: QPID-1045: always notify incoming message queues of session closure and provide API for notifying listeners of closure; also preserve connection close code and report in errors git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654907 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/tests/connection.py | 68 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 2 deletions(-) (limited to 'qpid/python/tests/connection.py') diff --git a/qpid/python/tests/connection.py b/qpid/python/tests/connection.py index 6925480ed3..88620bc1c6 100644 --- a/qpid/python/tests/connection.py +++ b/qpid/python/tests/connection.py @@ -51,8 +51,16 @@ class TestSession(Delegate): def queue_query(self, qq): return qq._type.result.type.new((qq.queue,), {}) - def message_transfer(self, cmd, header, body): - self.queue.put((cmd, header, body)) + def message_transfer(self, cmd, headers, body): + if cmd.destination == "echo": + m = Message(body) + m.headers = headers + self.session.message_transfer(cmd.destination, cmd.accept_mode, + cmd.acquire_mode, m) + elif cmd.destination == "abort": + self.session.channel.connection.sock.close() + else: + self.queue.put((cmd, headers, body)) class ConnectionTest(TestCase): @@ -134,3 +142,59 @@ class ConnectionTest(TestCase): qq = ssn.queue_query("asdf") assert qq.queue == "asdf" c.close(5) + + def testCloseGet(self): + c = Connection(connect("0.0.0.0", PORT), self.spec) + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + for i in range(10): + m = echos.get(timeout=10) + assert m.body == "test%d" % i + + try: + m = echos.get(timeout=10) + assert False + except Closed, e: + pass + + def testCloseListen(self): + c = Connection(connect("0.0.0.0", PORT), self.spec) + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + messages = [] + exceptions = [] + condition = Condition() + def listener(m): messages.append(m) + def exc_listener(e): + exceptions.append(e) + condition.acquire() + condition.notify() + condition.release() + + echos.listen(listener, exc_listener) + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + condition.acquire() + condition.wait(10) + condition.release() + + for i in range(10): + m = messages.pop(0) + assert m.body == "test%d" % i + + assert len(exceptions) == 1 -- cgit v1.2.1