summaryrefslogtreecommitdiff
path: root/python/qpid/tests/messaging
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-18 20:22:23 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-18 20:22:23 +0000
commit6577298076dbacbcc2d3280057b9129282ebf162 (patch)
treebf5d370f4876e14a9e257cc56d04fb5511614a10 /python/qpid/tests/messaging
parent98ed72a36159ab9084ecf49045456dd21d5a64f8 (diff)
downloadqpid-python-6577298076dbacbcc2d3280057b9129282ebf162.tar.gz
split messaging.py into multiple files and made qpid.messaging a package
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@911550 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests/messaging')
-rw-r--r--python/qpid/tests/messaging/__init__.py106
-rw-r--r--python/qpid/tests/messaging/address.py309
-rw-r--r--python/qpid/tests/messaging/endpoints.py878
-rw-r--r--python/qpid/tests/messaging/message.py116
4 files changed, 1409 insertions, 0 deletions
diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py
new file mode 100644
index 0000000000..6785614f41
--- /dev/null
+++ b/python/qpid/tests/messaging/__init__.py
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from qpid.messaging import *
+from qpid.tests import Test
+
+class Base(Test):
+
+ def setup_connection(self):
+ return None
+
+ def setup_session(self):
+ return None
+
+ def setup_sender(self):
+ return None
+
+ def setup_receiver(self):
+ return None
+
+ def setup(self):
+ self.test_id = uuid4()
+ self.broker = self.config.broker
+ try:
+ self.conn = self.setup_connection()
+ except ConnectError, e:
+ raise Skipped(e)
+ self.ssn = self.setup_session()
+ self.snd = self.setup_sender()
+ if self.snd is not None:
+ self.snd.durable = self.durable()
+ self.rcv = self.setup_receiver()
+
+ def teardown(self):
+ if self.conn is not None and self.conn.connected():
+ self.conn.close()
+
+ def content(self, base, count = None):
+ if count is None:
+ return "%s[%s]" % (base, self.test_id)
+ else:
+ return "%s[%s, %s]" % (base, count, self.test_id)
+
+ def ping(self, ssn):
+ PING_Q = 'ping-queue; {create: always, delete: always}'
+ # send a message
+ sender = ssn.sender(PING_Q, durable=self.durable())
+ content = self.content("ping")
+ sender.send(content)
+ receiver = ssn.receiver(PING_Q)
+ msg = receiver.fetch(0)
+ ssn.acknowledge()
+ assert msg.content == content, "expected %r, got %r" % (content, msg.content)
+
+ def drain(self, rcv, limit=None, timeout=0, expected=None):
+ contents = []
+ try:
+ while limit is None or len(contents) < limit:
+ contents.append(rcv.fetch(timeout=timeout).content)
+ except Empty:
+ pass
+ if expected is not None:
+ assert expected == contents, "expected %s, got %s" % (expected, contents)
+ return contents
+
+ def assertEmpty(self, rcv):
+ contents = self.drain(rcv)
+ assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
+
+ def assertPending(self, rcv, expected):
+ p = rcv.pending()
+ assert p == expected, "expected %s, got %s" % (expected, p)
+
+ def sleep(self):
+ time.sleep(self.delay())
+
+ def delay(self):
+ return float(self.config.defines.get("delay", "2"))
+
+ def get_bool(self, name):
+ return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
+
+ def durable(self):
+ return self.get_bool("durable")
+
+ def reconnect(self):
+ return self.get_bool("reconnect")
+
+import address, endpoints, message
diff --git a/python/qpid/tests/messaging/address.py b/python/qpid/tests/messaging/address.py
new file mode 100644
index 0000000000..7adbc0c6f7
--- /dev/null
+++ b/python/qpid/tests/messaging/address.py
@@ -0,0 +1,309 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from qpid.tests import Test
+from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, SYM, WSPACE, \
+ LEXER
+from qpid.lexer import Token
+from qpid.harness import Skipped
+from qpid.tests.parser import ParserBase
+
+def indent(st):
+ return " " + st.replace("\n", "\n ")
+
+def pprint_address(name, subject, options):
+ return "NAME: %s\nSUBJECT: %s\nOPTIONS: %s" % \
+ (pprint(name), pprint(subject), pprint(options))
+
+def pprint(o):
+ if isinstance(o, dict):
+ return pprint_map(o)
+ elif isinstance(o, list):
+ return pprint_list(o)
+ elif isinstance(o, basestring):
+ return pprint_string(o)
+ else:
+ return repr(o)
+
+def pprint_map(m):
+ items = ["%s: %s" % (pprint(k), pprint(v)) for k, v in m.items()]
+ items.sort()
+ return pprint_items("{", items, "}")
+
+def pprint_list(l):
+ return pprint_items("[", [pprint(x) for x in l], "]")
+
+def pprint_items(start, items, end):
+ if items:
+ return "%s\n%s\n%s" % (start, ",\n".join([indent(i) for i in items]), end)
+ else:
+ return "%s%s" % (start, end)
+
+def pprint_string(s):
+ result = "'"
+ for c in s:
+ if c == "'":
+ result += "\\'"
+ elif c == "\n":
+ result += "\\n"
+ elif ord(c) >= 0x80:
+ result += "\\u%04x" % ord(c)
+ else:
+ result += c
+ result += "'"
+ return result
+
+class AddressTests(ParserBase, Test):
+
+ EXCLUDE = (WSPACE, EOF)
+
+ def fields(self, line, n):
+ result = line.split(":", n - 1)
+ result.extend([None]*(n - len(result)))
+ return result
+
+ def call(self, parser, mode, input):
+ try:
+ from subprocess import Popen, PIPE, STDOUT
+ po = Popen([parser, mode], stdin=PIPE, stdout=PIPE, stderr=STDOUT)
+ except ImportError, e:
+ raise Skipped("%s" % e)
+ except OSError, e:
+ raise Skipped("%s: %s" % (e, parser))
+ out, _ = po.communicate(input=input)
+ return out
+
+ def parser(self):
+ return self.config.defines.get("address.parser")
+
+ def do_lex(self, st):
+ parser = self.parser()
+ if parser:
+ out = self.call(parser, "lex", st)
+ lines = out.split("\n")
+ toks = []
+ for line in lines:
+ if line.strip():
+ name, position, value = self.fields(line, 3)
+ toks.append(Token(LEXER.type(name), value, position, st))
+ return toks
+ else:
+ return lex(st)
+
+ def do_parse(self, st):
+ return parse(st)
+
+ def valid(self, addr, name=None, subject=None, options=None):
+ parser = self.parser()
+ if parser:
+ got = self.call(parser, "parse", addr)
+ expected = "%s\n" % pprint_address(name, subject, options)
+ assert expected == got, "expected\n<EXP>%s</EXP>\ngot\n<GOT>%s</GOT>" % (expected, got)
+ else:
+ ParserBase.valid(self, addr, (name, subject, options))
+
+ def invalid(self, addr, error=None):
+ parser = self.parser()
+ if parser:
+ got = self.call(parser, "parse", addr)
+ expected = "ERROR: %s\n" % error
+ assert expected == got, "expected %r, got %r" % (expected, got)
+ else:
+ ParserBase.invalid(self, addr, error)
+
+ def testDashInId1(self):
+ self.lex("foo-bar", ID)
+
+ def testDashInId2(self):
+ self.lex("foo-3", ID)
+
+ def testDashAlone1(self):
+ self.lex("foo - bar", ID, SYM, ID)
+
+ def testDashAlone2(self):
+ self.lex("foo - 3", ID, SYM, NUMBER)
+
+ def testLeadingDash(self):
+ self.lex("-foo", SYM, ID)
+
+ def testTrailingDash(self):
+ self.lex("foo-", ID, SYM)
+
+ def testNegativeNum(self):
+ self.lex("-3", NUMBER)
+
+ def testHash(self):
+ self.valid("foo/bar.#", "foo", "bar.#")
+
+ def testStar(self):
+ self.valid("foo/bar.*", "foo", "bar.*")
+
+ def testColon(self):
+ self.valid("foo.bar/baz.qux:moo:arf", "foo.bar", "baz.qux:moo:arf")
+
+ def testOptions(self):
+ self.valid("foo.bar/baz.qux:moo:arf; {key: value}",
+ "foo.bar", "baz.qux:moo:arf", {"key": "value"})
+
+ def testOptionsTrailingComma(self):
+ self.valid("name/subject; {key: value,}", "name", "subject",
+ {"key": "value"})
+
+ def testSemiSubject(self):
+ self.valid("foo.bar/'baz.qux;moo:arf'; {key: value}",
+ "foo.bar", "baz.qux;moo:arf", {"key": "value"})
+
+ def testCommaSubject(self):
+ self.valid("foo.bar/baz.qux.{moo,arf}", "foo.bar", "baz.qux.{moo,arf}")
+
+ def testCommaSubjectOptions(self):
+ self.valid("foo.bar/baz.qux.{moo,arf}; {key: value}", "foo.bar",
+ "baz.qux.{moo,arf}", {"key": "value"})
+
+ def testUnbalanced(self):
+ self.valid("foo.bar/baz.qux.{moo,arf; {key: value}", "foo.bar",
+ "baz.qux.{moo,arf", {"key": "value"})
+
+ def testSlashQuote(self):
+ self.valid("foo.bar\\/baz.qux.{moo,arf; {key: value}",
+ "foo.bar/baz.qux.{moo,arf",
+ None, {"key": "value"})
+
+ def testSlashHexEsc1(self):
+ self.valid("foo.bar\\x00baz.qux.{moo,arf; {key: value}",
+ "foo.bar\x00baz.qux.{moo,arf",
+ None, {"key": "value"})
+
+ def testSlashHexEsc2(self):
+ self.valid("foo.bar\\xffbaz.qux.{moo,arf; {key: value}",
+ "foo.bar\xffbaz.qux.{moo,arf",
+ None, {"key": "value"})
+
+ def testSlashHexEsc3(self):
+ self.valid("foo.bar\\xFFbaz.qux.{moo,arf; {key: value}",
+ "foo.bar\xFFbaz.qux.{moo,arf",
+ None, {"key": "value"})
+
+ def testSlashUnicode1(self):
+ self.valid("foo.bar\\u1234baz.qux.{moo,arf; {key: value}",
+ u"foo.bar\u1234baz.qux.{moo,arf", None, {"key": "value"})
+
+ def testSlashUnicode2(self):
+ self.valid("foo.bar\\u0000baz.qux.{moo,arf; {key: value}",
+ u"foo.bar\u0000baz.qux.{moo,arf", None, {"key": "value"})
+
+ def testSlashUnicode3(self):
+ self.valid("foo.bar\\uffffbaz.qux.{moo,arf; {key: value}",
+ u"foo.bar\uffffbaz.qux.{moo,arf", None, {"key": "value"})
+
+ def testSlashUnicode4(self):
+ self.valid("foo.bar\\uFFFFbaz.qux.{moo,arf; {key: value}",
+ u"foo.bar\uFFFFbaz.qux.{moo,arf", None, {"key": "value"})
+
+ def testNoName(self):
+ self.invalid("; {key: value}",
+ "unexpected token SEMI(;) line:1,0:; {key: value}")
+
+ def testEmpty(self):
+ self.invalid("", "unexpected token EOF line:1,0:")
+
+ def testNoNameSlash(self):
+ self.invalid("/asdf; {key: value}",
+ "unexpected token SLASH(/) line:1,0:/asdf; {key: value}")
+
+ def testBadOptions1(self):
+ self.invalid("name/subject; {",
+ "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), "
+ "got EOF line:1,15:name/subject; {")
+
+ def testBadOptions2(self):
+ self.invalid("name/subject; { 3",
+ "expecting COLON, got EOF "
+ "line:1,17:name/subject; { 3")
+
+ def testBadOptions3(self):
+ self.invalid("name/subject; { key:",
+ "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), got EOF "
+ "line:1,20:name/subject; { key:")
+
+ def testBadOptions4(self):
+ self.invalid("name/subject; { key: value",
+ "expecting (COMMA, RBRACE), got EOF "
+ "line:1,26:name/subject; { key: value")
+
+ def testBadOptions5(self):
+ self.invalid("name/subject; { key: value asdf",
+ "expecting (COMMA, RBRACE), got ID(asdf) "
+ "line:1,27:name/subject; { key: value asdf")
+
+ def testBadOptions6(self):
+ self.invalid("name/subject; { key: value,",
+ "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), got EOF "
+ "line:1,27:name/subject; { key: value,")
+
+ def testBadOptions7(self):
+ self.invalid("name/subject; { key: value } asdf",
+ "expecting EOF, got ID(asdf) "
+ "line:1,29:name/subject; { key: value } asdf")
+
+ def testList1(self):
+ self.valid("name/subject; { key: [] }", "name", "subject", {"key": []})
+
+ def testList2(self):
+ self.valid("name/subject; { key: ['one'] }", "name", "subject", {"key": ['one']})
+
+ def testList3(self):
+ self.valid("name/subject; { key: [1, 2, 3] }", "name", "subject",
+ {"key": [1, 2, 3]})
+
+ def testList4(self):
+ self.valid("name/subject; { key: [1, [2, 3], 4] }", "name", "subject",
+ {"key": [1, [2, 3], 4]})
+
+ def testBadList1(self):
+ self.invalid("name/subject; { key: [ }", "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), "
+ "got RBRACE(}) line:1,23:name/subject; { key: [ }")
+
+ def testBadList2(self):
+ self.invalid("name/subject; { key: [ 1 }", "expecting (COMMA, RBRACK), "
+ "got RBRACE(}) line:1,25:name/subject; { key: [ 1 }")
+
+ def testBadList3(self):
+ self.invalid("name/subject; { key: [ 1 2 }", "expecting (COMMA, RBRACK), "
+ "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 }")
+
+ def testBadList4(self):
+ self.invalid("name/subject; { key: [ 1 2 ] }", "expecting (COMMA, RBRACK), "
+ "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 ] }")
+
+ def testMap1(self):
+ self.valid("name/subject; { 'key': value }",
+ "name", "subject", {"key": "value"})
+
+ def testMap2(self):
+ self.valid("name/subject; { 1: value }", "name", "subject", {1: "value"})
+
+ def testMap3(self):
+ self.valid('name/subject; { "foo.bar": value }',
+ "name", "subject", {"foo.bar": "value"})
+
+ def testBoolean(self):
+ self.valid("name/subject; { true: True, false: False }",
+ "name", "subject", {"true": True, "false": False})
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
new file mode 100644
index 0000000000..2e70f13f3a
--- /dev/null
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -0,0 +1,878 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# setup, usage, teardown, errors(sync), errors(async), stress, soak,
+# boundary-conditions, config
+
+import time
+from qpid import compat
+from qpid.harness import Skipped
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+
+class SetupTests(Base):
+
+ def testOpen(self):
+ # XXX: need to flesh out URL support/syntax
+ self.conn = Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+ self.ping(self.conn.session())
+
+ def testConnect(self):
+ # XXX: need to flesh out URL support/syntax
+ self.conn = Connection(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+ self.conn.connect()
+ self.ping(self.conn.session())
+
+ def testConnectError(self):
+ try:
+ self.conn = Connection.open("localhost", 0)
+ assert False, "connect succeeded"
+ except ConnectError, e:
+ # XXX: should verify that e includes appropriate diagnostic info
+ pass
+
+class ConnectionTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def testSessionAnon(self):
+ ssn1 = self.conn.session()
+ ssn2 = self.conn.session()
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+
+ def testSessionNamed(self):
+ ssn1 = self.conn.session("one")
+ ssn2 = self.conn.session("two")
+ self.ping(ssn1)
+ self.ping(ssn2)
+ assert ssn1 is not ssn2
+ assert ssn1 is self.conn.session("one")
+ assert ssn2 is self.conn.session("two")
+
+ def testDisconnect(self):
+ ssn = self.conn.session()
+ self.ping(ssn)
+ self.conn.disconnect()
+ try:
+ self.ping(ssn)
+ assert False, "ping succeeded"
+ except Disconnected:
+ # this is the expected failure when pinging on a disconnected
+ # connection
+ pass
+ self.conn.connect()
+ self.ping(ssn)
+
+ def testClose(self):
+ self.conn.close()
+ assert not self.conn.connected()
+
+ACK_QC = 'test-ack-queue; {create: always}'
+ACK_QD = 'test-ack-queue; {delete: always}'
+
+class SessionTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def testSender(self):
+ snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
+ durable=self.durable())
+ snd2 = self.ssn.sender(snd.target, durable=self.durable())
+ assert snd is not snd2
+ snd2.close()
+
+ content = self.content("testSender")
+ snd.send(content)
+ rcv = self.ssn.receiver(snd.target)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+
+ def testReceiver(self):
+ rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
+ rcv2 = self.ssn.receiver(rcv.source)
+ assert rcv is not rcv2
+ rcv2.close()
+
+ content = self.content("testReceiver")
+ snd = self.ssn.sender(rcv.source, durable=self.durable())
+ snd.send(content)
+ msg = rcv.fetch(0)
+ assert msg.content == content
+ self.ssn.acknowledge(msg)
+ snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
+
+ def testDisconnectedReceiver(self):
+ self.conn.disconnect()
+ rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
+ m = self.content("testDisconnectedReceiver")
+ self.conn.connect()
+ snd = self.ssn.sender("test-dis-rcv-queue")
+ snd.send(m)
+ self.drain(rcv, expected=[m])
+
+ def testNextReceiver(self):
+ ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
+ rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+ rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+ rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+
+ snd = self.ssn.sender(ADDR)
+
+ msgs = []
+ for i in range(10):
+ content = self.content("testNextReceiver", i)
+ snd.send(content)
+ msgs.append(content)
+
+ fetched = []
+ try:
+ while True:
+ rcv = self.ssn.next_receiver(timeout=self.delay())
+ assert rcv in (rcv1, rcv2, rcv3)
+ assert rcv.pending() > 0
+ fetched.append(rcv.fetch().content)
+ except Empty:
+ pass
+ assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
+ self.ssn.acknowledge()
+ #we set the capacity to 0 to prevent the deletion of the queue -
+ #triggered the deletion policy when the first receiver is closed -
+ #resulting in session exceptions being issued for the remaining
+ #active subscriptions:
+ for r in [rcv1, rcv2, rcv3]:
+ r.capacity = 0
+
+ # XXX, we need a convenient way to assert that required queues are
+ # empty on setup, and possibly also to drain queues on teardown
+ def ackTest(self, acker, ack_capacity=None):
+ # send a bunch of messages
+ snd = self.ssn.sender(ACK_QC, durable=self.durable())
+ contents = [self.content("ackTest", i) for i in range(15)]
+ for c in contents:
+ snd.send(c)
+
+ # drain the queue, verify the messages are there and then close
+ # without acking
+ rcv = self.ssn.receiver(ACK_QC)
+ self.drain(rcv, expected=contents)
+ self.ssn.close()
+
+ # drain the queue again, verify that they are all the messages
+ # were requeued, and ack this time before closing
+ self.ssn = self.conn.session()
+ if ack_capacity is not None:
+ self.ssn.ack_capacity = ack_capacity
+ rcv = self.ssn.receiver(ACK_QC)
+ self.drain(rcv, expected=contents)
+ acker(self.ssn)
+ self.ssn.close()
+
+ # drain the queue a final time and verify that the messages were
+ # dequeued
+ self.ssn = self.conn.session()
+ rcv = self.ssn.receiver(ACK_QD)
+ self.assertEmpty(rcv)
+
+ def testAcknowledge(self):
+ self.ackTest(lambda ssn: ssn.acknowledge())
+
+ def testAcknowledgeAsync(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+ def testAcknowledgeAsyncAckCap0(self):
+ try:
+ try:
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+ assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+ except InsufficientCapacity:
+ pass
+ finally:
+ self.ssn.ack_capacity = UNLIMITED
+ self.drain(self.ssn.receiver(ACK_QD))
+ self.ssn.acknowledge()
+
+ def testAcknowledgeAsyncAckCap1(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+ def testAcknowledgeAsyncAckCap5(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+ def testAcknowledgeAsyncAckCapUNLIMITED(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
+ def send(self, ssn, queue, base, count=1):
+ snd = ssn.sender(queue, durable=self.durable())
+ contents = []
+ for i in range(count):
+ c = self.content(base, i)
+ snd.send(c)
+ contents.append(c)
+ snd.close()
+ return contents
+
+ def txTest(self, commit):
+ TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
+ TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
+ txssn = self.conn.session(transactional=True)
+ contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ txrcv = txssn.receiver(TX_Q)
+ txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
+ rcv = self.ssn.receiver(txrcv.source)
+ copy_rcv = self.ssn.receiver(txsnd.target)
+ self.assertEmpty(copy_rcv)
+ for i in range(3):
+ m = txrcv.fetch(0)
+ txsnd.send(m)
+ self.assertEmpty(copy_rcv)
+ txssn.acknowledge()
+ if commit:
+ txssn.commit()
+ self.assertEmpty(rcv)
+ assert contents == self.drain(copy_rcv)
+ else:
+ txssn.rollback()
+ assert contents == self.drain(rcv)
+ self.assertEmpty(copy_rcv)
+ self.ssn.acknowledge()
+
+ def testCommit(self):
+ self.txTest(True)
+
+ def testRollback(self):
+ self.txTest(False)
+
+ def txTestSend(self, commit):
+ TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
+ txssn = self.conn.session(transactional=True)
+ contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ rcv = self.ssn.receiver(TX_SEND_Q)
+ self.assertEmpty(rcv)
+
+ if commit:
+ txssn.commit()
+ assert contents == self.drain(rcv)
+ self.ssn.acknowledge()
+ else:
+ txssn.rollback()
+ self.assertEmpty(rcv)
+ txssn.commit()
+ self.assertEmpty(rcv)
+
+ def testCommitSend(self):
+ self.txTestSend(True)
+
+ def testRollbackSend(self):
+ self.txTestSend(False)
+
+ def txTestAck(self, commit):
+ TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
+ TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
+ txssn = self.conn.session(transactional=True)
+ txrcv = txssn.receiver(TX_ACK_QC)
+ self.assertEmpty(txrcv)
+ contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+ assert contents == self.drain(txrcv)
+
+ if commit:
+ txssn.acknowledge()
+ else:
+ txssn.rollback()
+ drained = self.drain(txrcv)
+ assert contents == drained, "expected %s, got %s" % (contents, drained)
+ txssn.acknowledge()
+ txssn.rollback()
+ assert contents == self.drain(txrcv)
+ txssn.commit() # commit without ack
+ self.assertEmpty(txrcv)
+
+ txssn.close()
+
+ txssn = self.conn.session(transactional=True)
+ txrcv = txssn.receiver(TX_ACK_QC)
+ assert contents == self.drain(txrcv)
+ txssn.acknowledge()
+ txssn.commit()
+ rcv = self.ssn.receiver(TX_ACK_QD)
+ self.assertEmpty(rcv)
+ txssn.close()
+ self.assertEmpty(rcv)
+
+ def testCommitAck(self):
+ self.txTestAck(True)
+
+ def testRollbackAck(self):
+ self.txTestAck(False)
+
+ def testClose(self):
+ self.ssn.close()
+ try:
+ self.ping(self.ssn)
+ assert False, "ping succeeded"
+ except Disconnected:
+ pass
+
+RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
+
+class ReceiverTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(RECEIVER_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(RECEIVER_Q)
+
+ def send(self, base, count = None):
+ content = self.content(base, count)
+ self.snd.send(content)
+ return content
+
+ def testFetch(self):
+ try:
+ msg = self.rcv.fetch(0)
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ pass
+ try:
+ start = time.time()
+ msg = self.rcv.fetch(self.delay())
+ assert False, "unexpected message: %s" % msg
+ except Empty:
+ elapsed = time.time() - start
+ assert elapsed >= self.delay()
+
+ one = self.send("testFetch", 1)
+ two = self.send("testFetch", 2)
+ three = self.send("testFetch", 3)
+ msg = self.rcv.fetch(0)
+ assert msg.content == one
+ msg = self.rcv.fetch(self.delay())
+ assert msg.content == two
+ msg = self.rcv.fetch()
+ assert msg.content == three
+ self.ssn.acknowledge()
+
+ def testCapacityIncrease(self):
+ content = self.send("testCapacityIncrease")
+ self.sleep()
+ assert self.rcv.pending() == 0
+ self.rcv.capacity = UNLIMITED
+ self.sleep()
+ assert self.rcv.pending() == 1
+ msg = self.rcv.fetch(0)
+ assert msg.content == content
+ assert self.rcv.pending() == 0
+ self.ssn.acknowledge()
+
+ def testCapacityDecrease(self):
+ self.rcv.capacity = UNLIMITED
+ one = self.send("testCapacityDecrease", 1)
+ self.sleep()
+ assert self.rcv.pending() == 1
+ msg = self.rcv.fetch(0)
+ assert msg.content == one
+
+ self.rcv.capacity = 0
+
+ two = self.send("testCapacityDecrease", 2)
+ self.sleep()
+ assert self.rcv.pending() == 0
+ msg = self.rcv.fetch(0)
+ assert msg.content == two
+
+ self.ssn.acknowledge()
+
+ def testCapacity(self):
+ self.rcv.capacity = 5
+ self.assertPending(self.rcv, 0)
+
+ for i in range(15):
+ self.send("testCapacity", i)
+ self.sleep()
+ self.assertPending(self.rcv, 5)
+
+ self.drain(self.rcv, limit = 5)
+ self.sleep()
+ self.assertPending(self.rcv, 5)
+
+ drained = self.drain(self.rcv)
+ assert len(drained) == 10, "%s, %s" % (len(drained), drained)
+ self.assertPending(self.rcv, 0)
+
+ self.ssn.acknowledge()
+
+ def testCapacityUNLIMITED(self):
+ self.rcv.capacity = UNLIMITED
+ self.assertPending(self.rcv, 0)
+
+ for i in range(10):
+ self.send("testCapacityUNLIMITED", i)
+ self.sleep()
+ self.assertPending(self.rcv, 10)
+
+ self.drain(self.rcv)
+ self.assertPending(self.rcv, 0)
+
+ self.ssn.acknowledge()
+
+ def testPending(self):
+ self.rcv.capacity = UNLIMITED
+ assert self.rcv.pending() == 0
+
+ for i in range(3):
+ self.send("testPending", i)
+ self.sleep()
+ assert self.rcv.pending() == 3
+
+ for i in range(3, 10):
+ self.send("testPending", i)
+ self.sleep()
+ assert self.rcv.pending() == 10
+
+ self.drain(self.rcv, limit=3)
+ assert self.rcv.pending() == 7
+
+ self.drain(self.rcv)
+ assert self.rcv.pending() == 0
+
+ self.ssn.acknowledge()
+
+ def testDoubleClose(self):
+ m1 = self.content("testDoubleClose", 1)
+ m2 = self.content("testDoubleClose", 2)
+
+ snd = self.ssn.sender("""test-double-close; {
+ create: always,
+ delete: sender,
+ node-properties: {
+ type: topic
+ }
+}
+""")
+ r1 = self.ssn.receiver(snd.target)
+ r2 = self.ssn.receiver(snd.target)
+ snd.send(m1)
+ self.drain(r1, expected=[m1])
+ self.drain(r2, expected=[m1])
+ r1.close()
+ snd.send(m2)
+ self.drain(r2, expected=[m2])
+ r2.close()
+
+ # XXX: need testClose
+
+ def testMode(self):
+ msgs = [self.content("testMode", 1),
+ self.content("testMode", 2),
+ self.content("testMode", 3)]
+
+ for m in msgs:
+ self.snd.send(m)
+
+ rb = self.ssn.receiver('test-receiver-queue; {mode: browse}')
+ rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
+ self.drain(rb, expected=msgs)
+ self.drain(rc, expected=msgs)
+ rb2 = self.ssn.receiver(rb.source)
+ self.assertEmpty(rb2)
+ self.drain(self.rcv, expected=[])
+
+class AddressTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def badOption(self, options, error):
+ try:
+ self.ssn.sender("test-bad-options-snd; %s" % options)
+ assert False
+ except SendError, e:
+ assert "error in options: %s" % error == str(e), e
+
+ try:
+ self.ssn.receiver("test-bad-options-rcv; %s" % options)
+ assert False
+ except ReceiveError, e:
+ assert "error in options: %s" % error == str(e), e
+
+ def testIllegalKey(self):
+ self.badOption("{create: always, node-properties: "
+ "{this-property-does-not-exist: 3}}",
+ "node-properties: this-property-does-not-exist: "
+ "illegal key")
+
+ def testWrongValue(self):
+ self.badOption("{create: asdf}", "create: asdf not in "
+ "('always', 'sender', 'receiver', 'never')")
+
+ def testWrongType1(self):
+ self.badOption("{node-properties: asdf}",
+ "node-properties: asdf is not a map")
+
+ def testWrongType2(self):
+ self.badOption("{node-properties: {durable: []}}",
+ "node-properties: durable: [] is not a bool")
+
+ def testNonQueueBindings(self):
+ self.badOption("{node-properties: {type: topic, x-properties: "
+ "{bindings: []}}}",
+ "node-properties: x-properties: bindings: "
+ "bindings are only permitted on nodes of type queue")
+
+ def testCreateQueue(self):
+ snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
+ "node-properties: {type: queue, durable: False, "
+ "x-properties: {auto_delete: true}}}")
+ content = self.content("testCreateQueue")
+ snd.send(content)
+ rcv = self.ssn.receiver("test-create-queue")
+ self.drain(rcv, expected=[content])
+
+ def createExchangeTest(self, props=""):
+ addr = """test-create-exchange; {
+ create: always,
+ delete: always,
+ node-properties: {
+ type: topic,
+ durable: False,
+ x-properties: {auto_delete: true, %s}
+ }
+ }""" % props
+ snd = self.ssn.sender(addr)
+ snd.send("ping")
+ rcv1 = self.ssn.receiver("test-create-exchange/first")
+ rcv2 = self.ssn.receiver("test-create-exchange/first")
+ rcv3 = self.ssn.receiver("test-create-exchange/second")
+ for r in (rcv1, rcv2, rcv3):
+ try:
+ r.fetch(0)
+ assert False
+ except Empty:
+ pass
+ msg1 = Message(self.content("testCreateExchange", 1), subject="first")
+ msg2 = Message(self.content("testCreateExchange", 2), subject="second")
+ snd.send(msg1)
+ snd.send(msg2)
+ self.drain(rcv1, expected=[msg1.content])
+ self.drain(rcv2, expected=[msg1.content])
+ self.drain(rcv3, expected=[msg2.content])
+
+ def testCreateExchange(self):
+ self.createExchangeTest()
+
+ def testCreateExchangeDirect(self):
+ self.createExchangeTest("type: direct")
+
+ def testCreateExchangeTopic(self):
+ self.createExchangeTest("type: topic")
+
+ def testDeleteBySender(self):
+ snd = self.ssn.sender("test-delete; {create: always}")
+ snd.send("ping")
+ snd.close()
+ snd = self.ssn.sender("test-delete; {delete: always}")
+ snd.send("ping")
+ snd.close()
+ try:
+ self.ssn.sender("test-delete")
+ except SendError, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteByReceiver(self):
+ rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
+ try:
+ rcv.fetch(0)
+ except Empty:
+ pass
+ rcv.close()
+
+ try:
+ self.ssn.receiver("test-delete")
+ assert False
+ except ReceiveError, e:
+ assert "no such queue" in str(e)
+
+ def testDeleteSpecial(self):
+ snd = self.ssn.sender("amq.topic; {delete: always}")
+ snd.send("asdf")
+ try:
+ snd.close()
+ except SessionError, e:
+ assert "Cannot delete default exchange" in str(e)
+ # XXX: need to figure out close after error
+ self.conn._remove_session(self.ssn)
+
+ def testBindings(self):
+ snd = self.ssn.sender("""
+test-bindings-queue; {
+ create: always,
+ delete: always,
+ node-properties: {
+ x-properties: {
+ bindings: ["amq.topic/a.#", "amq.direct/b", "amq.topic/c.*"]
+ }
+ }
+}
+""")
+ snd.send("one")
+ snd_a = self.ssn.sender("amq.topic/a.foo")
+ snd_b = self.ssn.sender("amq.direct/b")
+ snd_c = self.ssn.sender("amq.topic/c.bar")
+ snd_a.send("two")
+ snd_b.send("three")
+ snd_c.send("four")
+ rcv = self.ssn.receiver("test-bindings-queue")
+ self.drain(rcv, expected=["one", "two", "three", "four"])
+
+ def testBindingsAdditive(self):
+ m1 = self.content("testBindingsAdditive", 1)
+ m2 = self.content("testBindingsAdditive", 2)
+ m3 = self.content("testBindingsAdditive", 3)
+ m4 = self.content("testBindingsAdditive", 4)
+
+ snd = self.ssn.sender("""
+test-bindings-additive-queue; {
+ create: always,
+ delete: always,
+ node-properties: {
+ x-properties: {
+ bindings: ["amq.topic/a"]
+ }
+ }
+}
+""")
+
+ snd_a = self.ssn.sender("amq.topic/a")
+ snd_b = self.ssn.sender("amq.topic/b")
+
+ snd_a.send(m1)
+ snd_b.send(m2)
+
+ rcv = self.ssn.receiver("test-bindings-additive-queue")
+ self.drain(rcv, expected=[m1])
+
+ new_snd = self.ssn.sender("""
+test-bindings-additive-queue; {
+ node-properties: {
+ x-properties: {
+ bindings: ["amq.topic/b"]
+ }
+ }
+}
+""")
+
+ new_snd.send(m3)
+ snd_b.send(m4)
+ self.drain(rcv, expected=[m3, m4])
+
+ def testSubjectOverride(self):
+ snd = self.ssn.sender("amq.topic/a")
+ rcv_a = self.ssn.receiver("amq.topic/a")
+ rcv_b = self.ssn.receiver("amq.topic/b")
+ m1 = self.content("testSubjectOverride", 1)
+ m2 = self.content("testSubjectOverride", 2)
+ snd.send(m1)
+ snd.send(Message(subject="b", content=m2))
+ self.drain(rcv_a, expected=[m1])
+ self.drain(rcv_b, expected=[m2])
+
+ def testSubjectDefault(self):
+ m1 = self.content("testSubjectDefault", 1)
+ m2 = self.content("testSubjectDefault", 2)
+ snd = self.ssn.sender("amq.topic/a")
+ rcv = self.ssn.receiver("amq.topic")
+ snd.send(m1)
+ snd.send(Message(subject="b", content=m2))
+ e1 = rcv.fetch(timeout=0)
+ e2 = rcv.fetch(timeout=0)
+ assert e1.subject == "a", "subject: %s" % e1.subject
+ assert e2.subject == "b", "subject: %s" % e2.subject
+ self.assertEmpty(rcv)
+
+NOSUCH_Q = "this-queue-should-not-exist"
+UNPARSEABLE_ADDR = "name/subject; {bad options"
+UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
+
+class AddressErrorTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def senderErrorTest(self, addr, exc, check=lambda e: True):
+ try:
+ self.ssn.sender(addr, durable=self.durable())
+ assert False, "sender creation succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % compat.format_exc(e)
+
+ def receiverErrorTest(self, addr, exc, check=lambda e: True):
+ try:
+ self.ssn.receiver(addr)
+ assert False, "receiver creation succeeded"
+ except exc, e:
+ assert check(e), "unexpected error: %s" % compat.format_exc(e)
+
+ def testNoneTarget(self):
+ # XXX: should have specific exception for this
+ self.senderErrorTest(None, SendError)
+
+ def testNoneSource(self):
+ # XXX: should have specific exception for this
+ self.receiverErrorTest(None, ReceiveError)
+
+ def testNoTarget(self):
+ # XXX: should have specific exception for this
+ self.senderErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
+
+ def testNoSource(self):
+ # XXX: should have specific exception for this
+ self.receiverErrorTest(NOSUCH_Q, ReceiveError, lambda e: NOSUCH_Q in str(e))
+
+ def testUnparseableTarget(self):
+ # XXX: should have specific exception for this
+ self.senderErrorTest(UNPARSEABLE_ADDR, SendError,
+ lambda e: "expecting COLON" in str(e))
+
+ def testUnparseableSource(self):
+ # XXX: should have specific exception for this
+ self.receiverErrorTest(UNPARSEABLE_ADDR, ReceiveError,
+ lambda e: "expecting COLON" in str(e))
+
+ def testUnlexableTarget(self):
+ # XXX: should have specific exception for this
+ self.senderErrorTest(UNLEXABLE_ADDR, SendError,
+ lambda e: "unrecognized characters" in str(e))
+
+ def testUnlexableSource(self):
+ # XXX: should have specific exception for this
+ self.receiverErrorTest(UNLEXABLE_ADDR, ReceiveError,
+ lambda e: "unrecognized characters" in str(e))
+
+ def testInvalidMode(self):
+ # XXX: should have specific exception for this
+ self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
+ ReceiveError,
+ lambda e: "not in ('browse', 'consume')" in str(e))
+
+SENDER_Q = 'test-sender-q; {create: always, delete: always}'
+
+class SenderTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(SENDER_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(SENDER_Q)
+
+ def checkContent(self, content):
+ self.snd.send(content)
+ msg = self.rcv.fetch(0)
+ assert msg.content == content
+
+ out = Message(content)
+ self.snd.send(out)
+ echo = self.rcv.fetch(0)
+ assert out.content == echo.content
+ assert echo.content == msg.content
+ self.ssn.acknowledge()
+
+ def testSendString(self):
+ self.checkContent(self.content("testSendString"))
+
+ def testSendList(self):
+ self.checkContent(["testSendList", 1, 3.14, self.test_id])
+
+ def testSendMap(self):
+ self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
+
+ def asyncTest(self, capacity):
+ self.snd.capacity = capacity
+ msgs = [self.content("asyncTest", i) for i in range(15)]
+ for m in msgs:
+ self.snd.send(m, sync=False)
+ drained = self.drain(self.rcv, timeout=self.delay())
+ assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+ self.ssn.acknowledge()
+
+ def testSendAsyncCapacity0(self):
+ try:
+ self.asyncTest(0)
+ assert False, "send shouldn't succeed with zero capacity"
+ except InsufficientCapacity:
+ # this is expected
+ pass
+
+ def testSendAsyncCapacity1(self):
+ self.asyncTest(1)
+
+ def testSendAsyncCapacity5(self):
+ self.asyncTest(5)
+
+ def testSendAsyncCapacityUNLIMITED(self):
+ self.asyncTest(UNLIMITED)
+
+ def testCapacityTimeout(self):
+ self.snd.capacity = 1
+ msgs = []
+ caught = False
+ while len(msgs) < 100:
+ m = self.content("testCapacity", len(msgs))
+ try:
+ self.snd.send(m, sync=False, timeout=0)
+ msgs.append(m)
+ except InsufficientCapacity:
+ caught = True
+ break
+ self.snd.sync()
+ self.drain(self.rcv, expected=msgs)
+ self.ssn.acknowledge()
+ assert caught, "did not exceed capacity"
diff --git a/python/qpid/tests/messaging/message.py b/python/qpid/tests/messaging/message.py
new file mode 100644
index 0000000000..ef2ec1aac4
--- /dev/null
+++ b/python/qpid/tests/messaging/message.py
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+
+class MessageTests(Base):
+
+ def testCreateString(self):
+ m = Message("string")
+ assert m.content == "string"
+ assert m.content_type is None
+
+ def testCreateUnicode(self):
+ m = Message(u"unicode")
+ assert m.content == u"unicode"
+ assert m.content_type == "text/plain"
+
+ def testCreateMap(self):
+ m = Message({})
+ assert m.content == {}
+ assert m.content_type == "amqp/map"
+
+ def testCreateList(self):
+ m = Message([])
+ assert m.content == []
+ assert m.content_type == "amqp/list"
+
+ def testContentTypeOverride(self):
+ m = Message()
+ m.content_type = "text/html; charset=utf8"
+ m.content = u"<html/>"
+ assert m.content_type == "text/html; charset=utf8"
+
+ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
+
+class MessageEchoTests(Base):
+
+ def setup_connection(self):
+ return Connection.open(self.broker.host, self.broker.port,
+ reconnect=self.reconnect())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self):
+ return self.ssn.sender(ECHO_Q)
+
+ def setup_receiver(self):
+ return self.ssn.receiver(ECHO_Q)
+
+ def check(self, msg):
+ self.snd.send(msg)
+ echo = self.rcv.fetch(0)
+
+ assert msg.id == echo.id
+ assert msg.subject == echo.subject
+ assert msg.user_id == echo.user_id
+ assert msg.to == echo.to
+ assert msg.reply_to == echo.reply_to
+ assert msg.correlation_id == echo.correlation_id
+ assert msg.properties == echo.properties
+ assert msg.content_type == echo.content_type
+ assert msg.content == echo.content, "%s, %s" % (msg, echo)
+
+ self.ssn.acknowledge(echo)
+
+ def testStringContent(self):
+ self.check(Message("string"))
+
+ def testUnicodeContent(self):
+ self.check(Message(u"unicode"))
+
+
+ TEST_MAP = {"key1": "string",
+ "key2": u"unicode",
+ "key3": 3,
+ "key4": -3,
+ "key5": 3.14,
+ "key6": -3.14,
+ "key7": ["one", 2, 3.14],
+ "key8": [],
+ "key9": {"sub-key0": 3}}
+
+ def testMapContent(self):
+ self.check(Message(MessageEchoTests.TEST_MAP))
+
+ def testListContent(self):
+ self.check(Message([]))
+ self.check(Message([1, 2, 3]))
+ self.check(Message(["one", 2, 3.14, {"four": 4}]))
+
+ def testProperties(self):
+ msg = Message()
+ msg.to = "to-address"
+ msg.subject = "subject"
+ msg.correlation_id = str(self.test_id)
+ msg.properties = MessageEchoTests.TEST_MAP
+ msg.reply_to = "reply-address"
+ self.check(msg)