summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/qpid/testlib.py45
-rw-r--r--python/tests/exchange.py44
-rw-r--r--python/tests/queue.py8
3 files changed, 67 insertions, 30 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index ff9ecbee8a..0bec6a8708 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -14,11 +14,13 @@
# limitations under the License.
#
+#
# Support library for qpid python tests.
#
import sys, re, unittest, os, random, logging
import qpid.client, qpid.spec
+import Queue
from getopt import getopt, GetoptError
@@ -188,26 +190,41 @@ class TestBase(unittest.TestCase):
self.exchanges.append((channel,exchange))
return reply
- def assertPublishConsume(self, queue="", exchange="", routing_key=""):
- """
- Publish a message and consume it, assert it comes back intact.
+ def uniqueString(self):
+ """Generate a unique string, unique for this TestBase instance"""
+ # TODO aconway 2006-09-20: Not thread safe.
+ if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
+ return "Test Message " + str(self.uniqueCounter)
+
+ def consume(self, queueName):
+ """Consume from named queue returns the Queue object."""
+ reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+ return self.client.queue(reply.consumer_tag)
+
+ def assertEmpty(self, queue):
+ """Assert that the queue is empty"""
+ try:
+ queue.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Queue.Empty: None # Ignore
- queue can be a single queue name or a list of queue names.
- For a list assert the message appears on all queues.
- Crude attempt to make unique messages so we can't consume
- a message not really meant for us.
+ def assertPublishGet(self, queue, exchange="", routing_key=""):
"""
- body = "TestMessage("+str(random.randint(999999, 1000000))+")"
+ Publish to exchange and assert queue.get() returns the same message.
+ """
+ body = self.uniqueString()
self.channel.basic_publish(exchange=exchange,
content=qpid.content.Content(body),
routing_key=routing_key)
- if not isinstance(queue, list): queue = [queue]
- for q in queue:
- reply = self.channel.basic_consume(queue=q, no_ack=True)
- msg = self.client.queue(reply.consumer_tag).get(timeout=2)
- self.assertEqual(body, msg.content.body)
-
+ self.assertEqual(body, queue.get(timeout=2).content.body)
+ def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+ """
+ Publish a message and consume it, assert it comes back intact.
+ Return the Queue object used to consume.
+ """
+ self.assertPublishGet(self.consume(queue), exchange, routing_key)
+
def assertChannelException(self, expectedCode, message):
self.assertEqual(message.method.klass.name, "channel")
self.assertEqual(message.method.name, "close")
diff --git a/python/tests/exchange.py b/python/tests/exchange.py
index b9b16bad78..4eb64520e6 100644
--- a/python/tests/exchange.py
+++ b/python/tests/exchange.py
@@ -57,9 +57,25 @@ class StandardExchangeVerifier:
self.channel.queue_bind(queue="q", exchange=ex)
self.queue_declare(queue="p")
self.channel.queue_bind(queue="p", exchange=ex)
- self.assertPublishConsume(exchange=ex, queue=["q","p"])
+ for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+ def verifyTopicExchange(self, ex):
+ """Verify that ex behaves like a topic exchange"""
+ self.queue_declare(queue="a")
+ self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+ q = self.consume("a")
+ self.assertPublishGet(q, ex, "a.b.x")
+ self.assertPublishGet(q, ex, "a.x.b.x")
+ self.assertPublishGet(q, ex, "a.x.x.b.x")
+
+ # Shouldn't match
+ self.channel.basic_publish(exchange=ex, routing_key="a.b")
+ self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")
+ self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x")
+ self.channel.basic_publish(exchange=ex, routing_key="a.b")
+ self.assert_(q.empty())
+
-
class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""
The server SHOULD implement these standard exchange types: topic, headers.
@@ -76,6 +92,11 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""Declare and test a fanout exchange"""
self.exchange_declare(0, exchange="f", type="fanout")
self.verifyFanOutExchange("f")
+
+ def testTopic(self):
+ """Declare and test a topic exchange"""
+ self.exchange_declare(0, exchange="t", type="topic")
+ self.verifyTopicExchange("t")
class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
@@ -88,24 +109,17 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if
those types are defined).
"""
- # TODO aconway 2006-09-01: Add tests for 3.1.3.1:
- # - Test auto binding by q name
- # - Test the nameless "default publish" exchange.
- # - Auto created amq.fanout exchange
-
def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
- def testAmqTopic(self):
- self.exchange_declare(0, exchange="amq.topic", passive="true")
- # TODO aconway 2006-09-14: verify topic behavior
+ def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
def testAmqHeaders(self):
self.exchange_declare(0, exchange="amq.headers", passive="true")
# TODO aconway 2006-09-14: verify headers behavior
-class DefaultExchangeRuleTests(TestBase):
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
"""
The server MUST predeclare a direct exchange to act as the default exchange
for content Publish methods and for default queue bindings.
@@ -115,6 +129,12 @@ class DefaultExchangeRuleTests(TestBase):
routing key but without specifying the exchange name, then ensuring that
the message arrives in the queue correctly.
"""
+ def testDefaultExchange(self):
+ # Test automatic binding by queue name.
+ self.queue_declare(queue="d")
+ self.assertPublishConsume(queue="d", routing_key="d")
+ # Test explicit bind to default queue
+ self.verifyDirectExchange("")
class DefaultAccessRuleTests(TestBase):
@@ -123,7 +143,7 @@ class DefaultAccessRuleTests(TestBase):
by specifying an empty exchange name in the Queue.Bind and content Publish
methods.
"""
-
+ # TODO aconway 2006-09-18: fill this in.
class ExtensionsRuleTests(TestBase):
"""
diff --git a/python/tests/queue.py b/python/tests/queue.py
index 92260a7d64..65baaaa00b 100644
--- a/python/tests/queue.py
+++ b/python/tests/queue.py
@@ -156,9 +156,9 @@ class QueueTests(TestBase):
#straight-forward case:
channel.queue_declare(queue="delete-me")
- channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("a"))
- channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("b"))
- channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("c"))
+ channel.basic_publish(routing_key="delete-me", content=Content("a"))
+ channel.basic_publish(routing_key="delete-me", content=Content("b"))
+ channel.basic_publish(routing_key="delete-me", content=Content("c"))
reply = channel.queue_delete(queue="delete-me")
self.assertEqual(3, reply.message_count)
#check that it has gone be declaring passively
@@ -189,7 +189,7 @@ class QueueTests(TestBase):
#create a queue and add a message to it (use default binding):
channel.queue_declare(queue="delete-me-2")
channel.queue_declare(queue="delete-me-2", passive="True")
- channel.basic_publish(exchange="amq.direct", routing_key="delete-me-2", content=Content("message"))
+ channel.basic_publish(routing_key="delete-me-2", content=Content("message"))
#try to delete, but only if empty:
try: