diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/testlib.py | 45 | ||||
| -rw-r--r-- | python/tests/exchange.py | 44 | ||||
| -rw-r--r-- | python/tests/queue.py | 8 |
3 files changed, 67 insertions, 30 deletions
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index ff9ecbee8a..0bec6a8708 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -14,11 +14,13 @@ # limitations under the License. # +# # Support library for qpid python tests. # import sys, re, unittest, os, random, logging import qpid.client, qpid.spec +import Queue from getopt import getopt, GetoptError @@ -188,26 +190,41 @@ class TestBase(unittest.TestCase): self.exchanges.append((channel,exchange)) return reply - def assertPublishConsume(self, queue="", exchange="", routing_key=""): - """ - Publish a message and consume it, assert it comes back intact. + def uniqueString(self): + """Generate a unique string, unique for this TestBase instance""" + # TODO aconway 2006-09-20: Not thread safe. + if not "uniqueCounter" in dir(self): self.uniqueCounter = 1; + return "Test Message " + str(self.uniqueCounter) + + def consume(self, queueName): + """Consume from named queue returns the Queue object.""" + reply = self.channel.basic_consume(queue=queueName, no_ack=True) + return self.client.queue(reply.consumer_tag) + + def assertEmpty(self, queue): + """Assert that the queue is empty""" + try: + queue.get(timeout=1) + self.fail("Queue is not empty.") + except Queue.Empty: None # Ignore - queue can be a single queue name or a list of queue names. - For a list assert the message appears on all queues. - Crude attempt to make unique messages so we can't consume - a message not really meant for us. + def assertPublishGet(self, queue, exchange="", routing_key=""): """ - body = "TestMessage("+str(random.randint(999999, 1000000))+")" + Publish to exchange and assert queue.get() returns the same message. + """ + body = self.uniqueString() self.channel.basic_publish(exchange=exchange, content=qpid.content.Content(body), routing_key=routing_key) - if not isinstance(queue, list): queue = [queue] - for q in queue: - reply = self.channel.basic_consume(queue=q, no_ack=True) - msg = self.client.queue(reply.consumer_tag).get(timeout=2) - self.assertEqual(body, msg.content.body) - + self.assertEqual(body, queue.get(timeout=2).content.body) + def assertPublishConsume(self, queue="", exchange="", routing_key=""): + """ + Publish a message and consume it, assert it comes back intact. + Return the Queue object used to consume. + """ + self.assertPublishGet(self.consume(queue), exchange, routing_key) + def assertChannelException(self, expectedCode, message): self.assertEqual(message.method.klass.name, "channel") self.assertEqual(message.method.name, "close") diff --git a/python/tests/exchange.py b/python/tests/exchange.py index b9b16bad78..4eb64520e6 100644 --- a/python/tests/exchange.py +++ b/python/tests/exchange.py @@ -57,9 +57,25 @@ class StandardExchangeVerifier: self.channel.queue_bind(queue="q", exchange=ex) self.queue_declare(queue="p") self.channel.queue_bind(queue="p", exchange=ex) - self.assertPublishConsume(exchange=ex, queue=["q","p"]) + for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) + + def verifyTopicExchange(self, ex): + """Verify that ex behaves like a topic exchange""" + self.queue_declare(queue="a") + self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") + q = self.consume("a") + self.assertPublishGet(q, ex, "a.b.x") + self.assertPublishGet(q, ex, "a.x.b.x") + self.assertPublishGet(q, ex, "a.x.x.b.x") + + # Shouldn't match + self.channel.basic_publish(exchange=ex, routing_key="a.b") + self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y") + self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x") + self.channel.basic_publish(exchange=ex, routing_key="a.b") + self.assert_(q.empty()) + - class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): """ The server SHOULD implement these standard exchange types: topic, headers. @@ -76,6 +92,11 @@ class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier): """Declare and test a fanout exchange""" self.exchange_declare(0, exchange="f", type="fanout") self.verifyFanOutExchange("f") + + def testTopic(self): + """Declare and test a topic exchange""" + self.exchange_declare(0, exchange="t", type="topic") + self.verifyTopicExchange("t") class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): @@ -88,24 +109,17 @@ class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier): exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if those types are defined). """ - # TODO aconway 2006-09-01: Add tests for 3.1.3.1: - # - Test auto binding by q name - # - Test the nameless "default publish" exchange. - # - Auto created amq.fanout exchange - def testAmqDirect(self): self.verifyDirectExchange("amq.direct") def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout") - def testAmqTopic(self): - self.exchange_declare(0, exchange="amq.topic", passive="true") - # TODO aconway 2006-09-14: verify topic behavior + def testAmqTopic(self): self.verifyTopicExchange("amq.topic") def testAmqHeaders(self): self.exchange_declare(0, exchange="amq.headers", passive="true") # TODO aconway 2006-09-14: verify headers behavior -class DefaultExchangeRuleTests(TestBase): +class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier): """ The server MUST predeclare a direct exchange to act as the default exchange for content Publish methods and for default queue bindings. @@ -115,6 +129,12 @@ class DefaultExchangeRuleTests(TestBase): routing key but without specifying the exchange name, then ensuring that the message arrives in the queue correctly. """ + def testDefaultExchange(self): + # Test automatic binding by queue name. + self.queue_declare(queue="d") + self.assertPublishConsume(queue="d", routing_key="d") + # Test explicit bind to default queue + self.verifyDirectExchange("") class DefaultAccessRuleTests(TestBase): @@ -123,7 +143,7 @@ class DefaultAccessRuleTests(TestBase): by specifying an empty exchange name in the Queue.Bind and content Publish methods. """ - + # TODO aconway 2006-09-18: fill this in. class ExtensionsRuleTests(TestBase): """ diff --git a/python/tests/queue.py b/python/tests/queue.py index 92260a7d64..65baaaa00b 100644 --- a/python/tests/queue.py +++ b/python/tests/queue.py @@ -156,9 +156,9 @@ class QueueTests(TestBase): #straight-forward case: channel.queue_declare(queue="delete-me") - channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("a")) - channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("b")) - channel.basic_publish(exchange="amq.direct", routing_key="delete-me", content=Content("c")) + channel.basic_publish(routing_key="delete-me", content=Content("a")) + channel.basic_publish(routing_key="delete-me", content=Content("b")) + channel.basic_publish(routing_key="delete-me", content=Content("c")) reply = channel.queue_delete(queue="delete-me") self.assertEqual(3, reply.message_count) #check that it has gone be declaring passively @@ -189,7 +189,7 @@ class QueueTests(TestBase): #create a queue and add a message to it (use default binding): channel.queue_declare(queue="delete-me-2") channel.queue_declare(queue="delete-me-2", passive="True") - channel.basic_publish(exchange="amq.direct", routing_key="delete-me-2", content=Content("message")) + channel.basic_publish(routing_key="delete-me-2", content=Content("message")) #try to delete, but only if empty: try: |
