summaryrefslogtreecommitdiff
path: root/python/tests_0-10
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-17 12:17:55 +0000
committerGordon Sim <gsim@apache.org>2008-03-17 12:17:55 +0000
commit6574ab48665039ae9b8b1d2c5dd26ea94a3d23fa (patch)
tree01091458f1db56d09953cd129305586057df1384 /python/tests_0-10
parent1c1efeddef24ef18d75af65e4249b541b1382ea8 (diff)
downloadqpid-python-6574ab48665039ae9b8b1d2c5dd26ea94a3d23fa.tar.gz
Scope exclusive queues to sessions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@637854 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
-rw-r--r--python/tests_0-10/alternate_exchange.py7
-rw-r--r--python/tests_0-10/queue.py25
-rw-r--r--python/tests_0-10/tx.py46
3 files changed, 47 insertions, 31 deletions
diff --git a/python/tests_0-10/alternate_exchange.py b/python/tests_0-10/alternate_exchange.py
index 418a8c90ee..c177c3deb7 100644
--- a/python/tests_0-10/alternate_exchange.py
+++ b/python/tests_0-10/alternate_exchange.py
@@ -111,14 +111,13 @@ class AlternateExchangeTests(TestBase010):
session = self.session
session.exchange_declare(exchange="alternate", type="fanout")
- session = self.conn.session("alternate", 2)
- session.queue_declare(queue="q", exclusive=True, auto_delete=True, alternate_exchange="alternate")
+ session2 = self.conn.session("alternate", 2)
+ session2.queue_declare(queue="q", exclusive=True, auto_delete=True, alternate_exchange="alternate")
try:
- session.exchange_delete(exchange="alternate")
+ session2.exchange_delete(exchange="alternate")
self.fail("Expected deletion of in-use alternate-exchange to fail")
except SessionException, e:
session = self.session
- session.queue_delete(queue="q")
session.exchange_delete(exchange="alternate")
self.assertEquals(530, e.args[0].error_code)
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index 39305d4554..758794dd52 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -86,19 +86,17 @@ class QueueTests(TestBase010):
Test that the exclusive field is honoured in queue.declare
"""
# TestBase.setUp has already opened session(1)
- c1 = self.session
+ s1 = self.session
# Here we open a second separate connection:
- other = self.connect()
- c2 = other.session(1)
- c2.session_open()
+ s2 = self.conn.session("other", 2)
#declare an exclusive queue:
- c1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
try:
#other connection should not be allowed to declare this:
- c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
self.fail("Expected second exclusive queue_declare to raise a channel exception")
- except Closed, e:
+ except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
@@ -322,24 +320,23 @@ class QueueTests(TestBase010):
Test auto-deletion (of non-exclusive queues)
"""
session = self.session
- other = self.connect()
- session2 = other.session(1)
- session2.session_open()
+ session2 =self.conn.session("other", 1)
session.queue_declare(queue="auto-delete-me", auto_delete=True)
#consume from both sessions
- reply = session.basic_consume(queue="auto-delete-me")
- session2.basic_consume(queue="auto-delete-me")
+ tag = "my-tag"
+ session.message_subscribe(queue="auto-delete-me", destination=tag)
+ session2.message_subscribe(queue="auto-delete-me", destination=tag)
#implicit cancel
- session2.session_close()
+ session2.close()
#check it is still there
session.queue_declare(queue="auto-delete-me", passive=True)
#explicit cancel => queue is now unused again:
- session.basic_cancel(consumer_tag=reply.consumer_tag)
+ session.message_cancel(destination=tag)
#NOTE: this assumes there is no timeout in use
diff --git a/python/tests_0-10/tx.py b/python/tests_0-10/tx.py
index 59298bad1b..5aef2b00e8 100644
--- a/python/tests_0-10/tx.py
+++ b/python/tests_0-10/tx.py
@@ -30,23 +30,30 @@ class TxTests(TestBase010):
"""
Test that commited publishes are delivered and commited acks are not re-delivered
"""
+ session = self.session
+
+ #declare queues and create subscribers in the checking session
+ #to ensure that the queues are not auto-deleted too early:
+ self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"])
+ session.message_subscribe(queue="tx-commit-a", destination="qa")
+ session.message_subscribe(queue="tx-commit-b", destination="qb")
+ session.message_subscribe(queue="tx-commit-c", destination="qc")
+
+ #use a separate session for actual work
session2 = self.conn.session("worker", 2)
self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
session2.tx_commit()
session2.close()
- #use a different session with new subscriptions to ensure
- #there is no redelivery of acked messages:
- session = self.session
session.tx_select()
- self.subscribe(session, queue="tx-commit-a", destination="qa")
+ self.enable_flow("qa")
queue_a = session.incoming("qa")
- self.subscribe(session, queue="tx-commit-b", destination="qb")
+ self.enable_flow("qb")
queue_b = session.incoming("qb")
- self.subscribe(session, queue="tx-commit-c", destination="qc")
+ self.enable_flow("qc")
queue_c = session.incoming("qc")
#check results
@@ -76,6 +83,12 @@ class TxTests(TestBase010):
"""
Test that a session closed with an open transaction is effectively rolled back
"""
+ session = self.session
+ self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"])
+ session.message_subscribe(queue="tx-autorollback-a", destination="qa")
+ session.message_subscribe(queue="tx-autorollback-b", destination="qb")
+ session.message_subscribe(queue="tx-autorollback-c", destination="qc")
+
session2 = self.conn.session("worker", 2)
queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
@@ -87,16 +100,15 @@ class TxTests(TestBase010):
session2.close()
- session = self.session
session.tx_select()
- self.subscribe(session, queue="tx-autorollback-a", destination="qa")
+ self.enable_flow("qa")
queue_a = session.incoming("qa")
- self.subscribe(session, queue="tx-autorollback-b", destination="qb")
+ self.enable_flow("qb")
queue_b = session.incoming("qb")
- self.subscribe(session, queue="tx-autorollback-c", destination="qc")
+ self.enable_flow("qc")
queue_c = session.incoming("qc")
#check results
@@ -169,9 +181,7 @@ class TxTests(TestBase010):
commit and rollback
"""
#setup:
- session.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
- session.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
- session.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+ self.declare_queues([name_a, name_b, name_c])
key = "my_key_" + name_b
topic = "my_topic_" + name_c
@@ -232,6 +242,11 @@ class TxTests(TestBase010):
session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
return queue_a, queue_b, queue_c, acked
+ def declare_queues(self, names, session=None):
+ session = session or self.session
+ for n in names:
+ session.queue_declare(queue=n, auto_delete=True)
+
def subscribe(self, session=None, **keys):
session = session or self.session
consumer_tag = keys["destination"]
@@ -239,6 +254,11 @@ class TxTests(TestBase010):
session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF)
session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF)
+ def enable_flow(self, tag, session=None):
+ session = session or self.session
+ session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF)
+
def complete(self, session, msg):
session.receiver._completed.add(msg.id)#TODO: this may be done automatically
session.channel.session_completed(session.receiver._completed)