summaryrefslogtreecommitdiff
path: root/python/qpid/tests
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-10-10 17:15:31 +0000
committerRafael H. Schloming <rhs@apache.org>2009-10-10 17:15:31 +0000
commit13e6bd9704643993d95c81f22106dae5b59b3084 (patch)
tree2dbfa0faacecf665170120c61ec6239e5bff5a9c /python/qpid/tests
parentc68d17bf36649f3ba68334c3147e2d0da7246e67 (diff)
downloadqpid-python-13e6bd9704643993d95c81f22106dae5b59b3084.tar.gz
made addresses not auto-create by default; added error handling and tests for nonexist/invalid addresses; added logging for aborted connections; fixed spurious reattach
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@823890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests')
-rw-r--r--python/qpid/tests/messaging.py131
1 files changed, 102 insertions, 29 deletions
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index fd7aca6ec7..f2bbc79c26 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -24,7 +24,7 @@ import time
from qpid.tests import Test
from qpid.harness import Skipped
from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
- InsufficientCapacity, Message, UNLIMITED, uuid4
+ InsufficientCapacity, Message, ReceiveError, SendError, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -63,11 +63,12 @@ class Base(Test):
return "%s[%s, %s]" % (base, count, self.test_id)
def ping(self, ssn):
+ PING_Q = 'ping-queue {create: always}'
# send a message
- sender = ssn.sender("ping-queue")
+ sender = ssn.sender(PING_Q)
content = self.content("ping")
sender.send(content)
- receiver = ssn.receiver("ping-queue")
+ receiver = ssn.receiver(PING_Q)
msg = receiver.fetch(0)
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
@@ -174,6 +175,8 @@ class ConnectionTests(Base):
self.conn.close()
assert not self.conn.connected()
+ACK_Q = 'test-ack-queue {create: always}'
+
class SessionTests(Base):
def setup_connection(self):
@@ -183,7 +186,7 @@ class SessionTests(Base):
return self.conn.session()
def testSender(self):
- snd = self.ssn.sender("test-snd-queue")
+ snd = self.ssn.sender('test-snd-queue {create: always}')
snd2 = self.ssn.sender(snd.target)
assert snd is not snd2
snd2.close()
@@ -196,7 +199,7 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
def testReceiver(self):
- rcv = self.ssn.receiver("test-rcv-queue")
+ rcv = self.ssn.receiver('test-rcv-queue {create: always}')
rcv2 = self.ssn.receiver(rcv.source)
assert rcv is not rcv2
rcv2.close()
@@ -209,34 +212,36 @@ class SessionTests(Base):
self.ssn.acknowledge(msg)
def testStart(self):
- rcv = self.ssn.receiver("test-start-queue")
+ START_Q = 'test-start-queue {create: always}'
+ rcv = self.ssn.receiver(START_Q)
assert not rcv.started
self.ssn.start()
assert rcv.started
- rcv = self.ssn.receiver("test-start-queue")
+ rcv = self.ssn.receiver(START_Q)
assert rcv.started
def testStop(self):
+ STOP_Q = 'test-stop-queue {create: always}'
self.ssn.start()
- rcv = self.ssn.receiver("test-stop-queue")
+ rcv = self.ssn.receiver(STOP_Q)
assert rcv.started
self.ssn.stop()
assert not rcv.started
- rcv = self.ssn.receiver("test-stop-queue")
+ rcv = self.ssn.receiver(STOP_Q)
assert not rcv.started
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
- snd = self.ssn.sender("test-ack-queue")
+ snd = self.ssn.sender(ACK_Q)
contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
- rcv = self.ssn.receiver(snd.target)
+ rcv = self.ssn.receiver(ACK_Q)
self.drain(rcv, expected=contents)
self.ssn.close()
@@ -245,7 +250,7 @@ class SessionTests(Base):
self.ssn = self.conn.session()
if ack_capacity is not None:
self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver("test-ack-queue")
+ rcv = self.ssn.receiver(ACK_Q)
self.drain(rcv, expected=contents)
acker(self.ssn)
self.ssn.close()
@@ -253,7 +258,7 @@ class SessionTests(Base):
# drain the queue a final time and verify that the messages were
# dequeued
self.ssn = self.conn.session()
- rcv = self.ssn.receiver("test-ack-queue")
+ rcv = self.ssn.receiver(ACK_Q)
self.assertEmpty(rcv)
def testAcknowledge(self):
@@ -271,7 +276,7 @@ class SessionTests(Base):
pass
finally:
self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver("test-ack-queue"))
+ self.drain(self.ssn.receiver(ACK_Q))
self.ssn.acknowledge()
def testAcknowledgeAsyncAckCap1(self):
@@ -294,10 +299,12 @@ class SessionTests(Base):
return contents
def txTest(self, commit):
+ TX_Q = 'test-tx-queue {create: always}'
+ TX_Q_COPY = 'test-tx-queue-copy {create: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, "test-tx-queue", "txTest", 3)
- txrcv = txssn.receiver("test-tx-queue")
- txsnd = txssn.sender("test-tx-queue-copy")
+ contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ txrcv = txssn.receiver(TX_Q)
+ txsnd = txssn.sender(TX_Q_COPY)
rcv = self.ssn.receiver(txrcv.source)
copy_rcv = self.ssn.receiver(txsnd.target)
self.assertEmpty(copy_rcv)
@@ -323,9 +330,10 @@ class SessionTests(Base):
self.txTest(False)
def txTestSend(self, commit):
+ TX_SEND_Q = 'test-tx-send-queue {create: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, "test-tx-send-queue", "txTestSend", 3)
- rcv = self.ssn.receiver("test-tx-send-queue")
+ contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
@@ -345,10 +353,11 @@ class SessionTests(Base):
self.txTestSend(False)
def txTestAck(self, commit):
+ TX_ACK_Q = 'test-tx-ack-queue {create: always}'
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver("test-tx-ack-queue")
+ txrcv = txssn.receiver(TX_ACK_Q)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, "test-tx-ack-queue", "txTestAck", 3)
+ contents = self.send(self.ssn, TX_ACK_Q, "txTestAck", 3)
assert contents == self.drain(txrcv)
if commit:
@@ -366,11 +375,11 @@ class SessionTests(Base):
txssn.close()
txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver("test-tx-ack-queue")
+ txrcv = txssn.receiver(TX_ACK_Q)
assert contents == self.drain(txrcv)
txssn.acknowledge()
txssn.commit()
- rcv = self.ssn.receiver("test-tx-ack-queue")
+ rcv = self.ssn.receiver(TX_ACK_Q)
self.assertEmpty(rcv)
txssn.close()
self.assertEmpty(rcv)
@@ -389,6 +398,8 @@ class SessionTests(Base):
except Disconnected:
pass
+RECEIVER_Q = 'test-receiver-queue {create: always}'
+
class ReceiverTests(Base):
def setup_connection(self):
@@ -398,10 +409,10 @@ class ReceiverTests(Base):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-receiver-queue")
+ return self.ssn.sender(RECEIVER_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-receiver-queue")
+ return self.ssn.receiver(RECEIVER_Q)
def send(self, base, count = None):
content = self.content(base, count)
@@ -538,6 +549,66 @@ class ReceiverTests(Base):
# XXX: need testClose
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "{bad address}"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port)
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def sendErrorTest(self, addr, exc, check=lambda e: True):
+ snd = self.ssn.sender(addr)
+ try:
+ snd.send("hello")
+ assert False, "send succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % e
+ snd.close()
+
+ def fetchErrorTest(self, addr, exc, check=lambda e: True):
+ rcv = self.ssn.receiver(addr)
+ try:
+ rcv.fetch(timeout=0)
+ assert False, "fetch succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % e
+ rcv.close()
+
+ def testNoTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+
+ def testNoSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+
+ def testUnparseableTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(UNPARSEABLE_ADDR, SendError,
+ lambda e: "expecting ID" in str(e))
+
+ def testUnparseableSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+ lambda e: "expecting ID" in str(e))
+
+ def testUnlexableTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(UNLEXABLE_ADDR, SendError,
+ lambda e: "unrecognized character" in str(e))
+
+ def testUnlexableSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(UNLEXABLE_ADDR, ReceiveError,
+ lambda e: "unrecognized character" in str(e))
+
+SENDER_Q = 'test-sender-q {create: always}'
+
class SenderTests(Base):
def setup_connection(self):
@@ -547,10 +618,10 @@ class SenderTests(Base):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-sender-queue")
+ return self.ssn.sender(SENDER_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-sender-queue")
+ return self.ssn.receiver(SENDER_Q)
def checkContent(self, content):
self.snd.send(content)
@@ -644,6 +715,8 @@ class MessageTests(Base):
m.content = u"<html/>"
assert m.content_type == "text/html; charset=utf8"
+ECHO_Q = 'test-message-echo-queue {create: always}'
+
class MessageEchoTests(Base):
def setup_connection(self):
@@ -653,10 +726,10 @@ class MessageEchoTests(Base):
return self.conn.session()
def setup_sender(self):
- return self.ssn.sender("test-message-echo-queue")
+ return self.ssn.sender(ECHO_Q)
def setup_receiver(self):
- return self.ssn.receiver("test-message-echo-queue")
+ return self.ssn.receiver(ECHO_Q)
def check(self, msg):
self.snd.send(msg)