summaryrefslogtreecommitdiff
path: root/python/qpid/tests
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/tests')
-rw-r--r--python/qpid/tests/__init__.py28
-rw-r--r--python/qpid/tests/messaging.py402
2 files changed, 430 insertions, 0 deletions
diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py
new file mode 100644
index 0000000000..465e31ca3a
--- /dev/null
+++ b/python/qpid/tests/__init__.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Test:
+
+ def __init__(self, name):
+ self.name = name
+
+ def configure(self, config):
+ self.config = config
+
+import messaging
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
new file mode 100644
index 0000000000..53216a249a
--- /dev/null
+++ b/python/qpid/tests/messaging.py
@@ -0,0 +1,402 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# setup, usage, teardown, errors(sync), errors(async), stress, soak,
+# boundary-conditions, config
+
+import time
+from qpid.tests import Test
+from qpid.messaging import Connection, Disconnected, Empty, Message, uuid4
+from Queue import Queue, Empty as QueueEmpty
+
+class Base(Test):
+
+ def setup_connection(self):
+ return None
+
+ def setup_session(self):
+ return None
+
+ def setup_sender(self):
+ return None
+
+ def setup_receiver(self):
+ return None
+
+ def setup(self):
+ self.broker = self.config.broker
+ self.conn = self.setup_connection()
+ self.ssn = self.setup_session()
+ self.snd = self.setup_sender()
+ self.rcv = self.setup_receiver()
+
+ def teardown(self):
+ if self.conn is not None and self.conn.connected():
+ self.conn.close()
+
+ def ping(self, ssn):
+ # send a message
+ sender = ssn.sender("ping-queue")
+ content = "ping[%s]" % uuid4()
+ sender.send(content)
+ receiver = ssn.receiver("ping-queue")
+ msg = receiver.fetch(timeout=0)
+ ssn.acknowledge()
+ assert msg.content == content
+
+ def drain(self, rcv):
+ msgs = []
+ try:
+ while True:
+ msgs.append(rcv.fetch(0))
+ except Empty:
+ pass
+ return msgs
+
+class SetupTests(Base):
+
+ def testOpen(self):
+ # XXX: need to flesh out URL support/syntax
+ self.conn = Connection.open(self.broker.host, self.broker.port)
+ self.ping(self.conn.session())
+
+ def testConnect(self):
+ # XXX: need to flesh out URL support/syntax
+ self.conn = Connection(self.broker.host, self.broker.port)
+ self.conn.connect()
+ self.ping(self.conn.session())
+
+class ConnectionTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def testSessionAnon(self):
+ ssn1 = self.conn.session()
+ ssn2 = self.conn.session()
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+
+ def testSessionNamed(self):
+ ssn1 = self.conn.session("one")
+ ssn2 = self.conn.session("two")
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+ assert ssn1 is self.conn.session("one")
+ assert ssn2 is self.conn.session("two")
+
+ def testDisconnect(self):
+ ssn = self.conn.session()
+ self.ping(ssn)
+ self.conn.disconnect()
+ import socket
+ try:
+ self.ping(ssn)
+ assert False, "ping succeeded"
+ except Disconnected:
+ # this is the expected failure when pinging on a disconnected
+ # connection
+ pass
+ self.conn.connect()
+ self.ping(ssn)
+
+ def testStart(self):
+ ssn = self.conn.session()
+ assert not ssn.started
+ self.conn.start()
+ assert ssn.started
+ ssn2 = self.conn.session()
+ assert ssn2.started
+
+ def testStop(self):
+ self.conn.start()
+ ssn = self.conn.session()
+ assert ssn.started
+ self.conn.stop()
+ assert not ssn.started
+ ssn2 = self.conn.session()
+ assert not ssn2.started
+
+ def testClose(self):
+ self.conn.close()
+ assert not self.conn.connected()
+
+class SessionTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def testSender(self):
+ snd = self.ssn.sender("test-snd-queue")
+ snd2 = self.ssn.sender(snd.target)
+ assert snd is not snd2
+ snd2.close()
+
+ content = "testSender[%s]" % uuid4()
+ snd.send(content)
+ rcv = self.ssn.receiver(snd.target)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+
+ def testReceiver(self):
+ rcv = self.ssn.receiver("test-rcv-queue")
+ rcv2 = self.ssn.receiver(rcv.source)
+ assert rcv is not rcv2
+ rcv2.close()
+
+ content = "testReceiver[%s]" % uuid4()
+ snd = self.ssn.sender(rcv.source)
+ snd.send(content)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+
+ def testStart(self):
+ rcv = self.ssn.receiver("test-start-queue")
+ assert not rcv.started
+ self.ssn.start()
+ assert rcv.started
+ rcv = self.ssn.receiver("test-start-queue")
+ assert rcv.started
+
+ def testStop(self):
+ self.ssn.start()
+ rcv = self.ssn.receiver("test-stop-queue")
+ assert rcv.started
+ self.ssn.stop()
+ assert not rcv.started
+ rcv = self.ssn.receiver("test-stop-queue")
+ assert not rcv.started
+
+ # XXX, we need a convenient way to assert that required queues are
+ # empty on setup, and possibly also to drain queues on teardown
+ def testAcknowledge(self):
+ # send a bunch of messages
+ snd = self.ssn.sender("test-ack-queue")
+ tid = "a"
+ contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+ for c in contents:
+ snd.send(c)
+
+ # drain the queue, verify the messages are there and then close
+ # without acking
+ rcv = self.ssn.receiver(snd.target)
+ msgs = self.drain(rcv)
+ assert contents == [m.content for m in msgs]
+ self.ssn.close()
+
+ # drain the queue again, verify that they are all the messages
+ # were requeued, and ack this time before closing
+ self.ssn = self.conn.session()
+ rcv = self.ssn.receiver("test-ack-queue")
+ msgs = self.drain(rcv)
+ assert contents == [m.content for m in msgs]
+ self.ssn.acknowledge()
+ self.ssn.close()
+
+ # drain the queue a final time and verify that the messages were
+ # dequeued
+ self.ssn = self.conn.session()
+ rcv = self.ssn.receiver("test-ack-queue")
+ msgs = self.drain(rcv)
+ assert len(msgs) == 0
+
+ def testClose(self):
+ self.ssn.close()
+ try:
+ self.ping(self.ssn)
+ assert False, "ping succeeded"
+ except Disconnected:
+ pass
+
+class ReceiverTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender("test-receiver-queue")
+
+ def setup_receiver(self):
+ return self.ssn.receiver("test-receiver-queue")
+
+ def testListen(self):
+ msgs = Queue()
+ def listener(m):
+ msgs.put(m)
+ self.ssn.acknowledge(m)
+ self.rcv.listen(listener)
+ content = "testListen[%s]" % uuid4()
+ self.snd.send(content)
+ try:
+ msg = msgs.get(timeout=3)
+ assert False, "did not expect message: %s" % msg
+ except QueueEmpty:
+ pass
+ self.rcv.start()
+ msg = msgs.get(timeout=3)
+ assert msg.content == content
+
+ def testFetch(self):
+ try:
+ msg = self.rcv.fetch(0)
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ pass
+ try:
+ start = time.time()
+ msg = self.rcv.fetch(3)
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ elapsed = time.time() - start
+ assert elapsed >= 3
+
+ content = "testListen[%s]" % uuid4()
+ for i in range(3):
+ self.snd.send(content)
+ msg = self.rcv.fetch(0)
+ assert msg.content == content
+ msg = self.rcv.fetch(3)
+ assert msg.content == content
+ msg = self.rcv.fetch()
+ assert msg.content == content
+ self.ssn.acknowledge()
+
+ # XXX: need testStart, testStop and testClose
+
+class MessageTests(Base):
+
+ def testCreateString(self):
+ m = Message("string")
+ assert m.content == "string"
+ assert m.content_type is None
+
+ def testCreateUnicode(self):
+ m = Message(u"unicode")
+ assert m.content == u"unicode"
+ assert m.content_type == "text/plain; charset=utf8"
+
+ def testCreateMap(self):
+ m = Message({})
+ assert m.content == {}
+ assert m.content_type == "amqp/map"
+
+ def testCreateList(self):
+ m = Message([])
+ assert m.content == []
+ assert m.content_type == "amqp/list"
+
+ def testContentTypeOverride(self):
+ m = Message()
+ m.content_type = "text/html; charset=utf8"
+ m.content = u"<html/>"
+ assert m.content_type == "text/html; charset=utf8"
+
+class MessageEchoTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender("test-message-echo-queue")
+
+ def setup_receiver(self):
+ return self.ssn.receiver("test-message-echo-queue")
+
+ def check(self, msg):
+ self.snd.send(msg)
+ echo = self.rcv.fetch(0)
+
+ assert msg.id == echo.id
+ assert msg.subject == echo.subject
+ assert msg.user_id == echo.user_id
+ assert msg.to == echo.to
+ assert msg.reply_to == echo.reply_to
+ assert msg.correlation_id == echo.correlation_id
+ assert msg.properties == echo.properties
+ assert msg.content_type == echo.content_type
+ assert msg.content == echo.content
+
+ self.ssn.acknowledge(echo)
+
+ def testStringContent(self):
+ self.check(Message("string"))
+
+ def testUnicodeContent(self):
+ self.check(Message(u"unicode"))
+
+
+ TEST_MAP = {"key1": "string",
+ "key2": u"unicode",
+ "key3": 3,
+ "key4": -3,
+ "key5": 3.14,
+ "key6": -3.14,
+ "key7": ["one", 2, 3.14],
+ "key8": []}
+
+ def testMapContent(self):
+ self.check(Message(MessageEchoTests.TEST_MAP))
+
+ def testListContent(self):
+ self.check(Message([]))
+ self.check(Message([1, 2, 3]))
+ self.check(Message(["one", 2, 3.14, {"four": 4}]))
+
+ def testProperties(self):
+ msg = Message()
+ msg.to = "to-address"
+ msg.subject = "subject"
+ msg.correlation_id = str(uuid4())
+ msg.properties = MessageEchoTests.TEST_MAP
+ msg.reply_to = "reply-address"
+ self.check(msg)
+
+class TestTestsXXX(Test):
+
+ def testFoo(self):
+ print "this test has output"
+
+ def testBar(self):
+ print "this test "*8
+ print "has"*10
+ print "a"*75
+ print "lot of"*10
+ print "output"*10
+
+ def testQux(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+
+ def testQuxFail(self):
+ import sys
+ sys.stdout.write("this test has output with no newline")
+ fdsa