summaryrefslogtreecommitdiff
path: root/RC9/qpid/python/tests/spec010.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-01-13 18:11:43 +0000
committerRafael H. Schloming <rhs@apache.org>2009-01-13 18:11:43 +0000
commit7e34266b9a23f4536415bfbc3f161b84615b6550 (patch)
tree484008cf2d413f58b5e4ab80b373303c66200888 /RC9/qpid/python/tests/spec010.py
parent4612263ea692f00a4bd810438bdaf9bc88022091 (diff)
downloadqpid-python-M4.tar.gz
Tag M4 RC9M4
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@734202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC9/qpid/python/tests/spec010.py')
-rw-r--r--RC9/qpid/python/tests/spec010.py84
1 files changed, 84 insertions, 0 deletions
diff --git a/RC9/qpid/python/tests/spec010.py b/RC9/qpid/python/tests/spec010.py
new file mode 100644
index 0000000000..df9cb9590a
--- /dev/null
+++ b/RC9/qpid/python/tests/spec010.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, tempfile, shutil, stat
+from unittest import TestCase
+from qpid.spec010 import load
+from qpid.codec010 import Codec, StringCodec
+from qpid.testlib import testrunner
+from qpid.datatypes import Struct
+
+class SpecTest(TestCase):
+
+ def setUp(self):
+ self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml"))
+
+ def testSessionHeader(self):
+ hdr = self.spec["session.header"]
+ sc = StringCodec(self.spec)
+ hdr.encode(sc, Struct(hdr, sync=True))
+ assert sc.encoded == "\x01\x01"
+
+ sc = StringCodec(self.spec)
+ hdr.encode(sc, Struct(hdr, sync=False))
+ assert sc.encoded == "\x01\x00"
+
+ def encdec(self, type, value):
+ sc = StringCodec(self.spec)
+ type.encode(sc, value)
+ decoded = type.decode(sc)
+ return decoded
+
+ def testMessageProperties(self):
+ mp = self.spec["message.message_properties"]
+ rt = self.spec["message.reply_to"]
+
+ props = Struct(mp, content_length=3735928559L,
+ reply_to=Struct(rt, exchange="the exchange name",
+ routing_key="the routing key"))
+ dec = self.encdec(mp, props)
+ assert props.content_length == dec.content_length
+ assert props.reply_to.exchange == dec.reply_to.exchange
+ assert props.reply_to.routing_key == dec.reply_to.routing_key
+
+ def testMessageSubscribe(self):
+ ms = self.spec["message.subscribe"]
+ cmd = Struct(ms, exclusive=True, destination="this is a test")
+ dec = self.encdec(self.spec["message.subscribe"], cmd)
+ assert cmd.exclusive == dec.exclusive
+ assert cmd.destination == dec.destination
+
+ def testXid(self):
+ xid = self.spec["dtx.xid"]
+ sc = StringCodec(self.spec)
+ st = Struct(xid, format=0, global_id="gid", branch_id="bid")
+ xid.encode(sc, st)
+ assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
+ assert xid.decode(sc).__dict__ == st.__dict__
+
+ def testLoadReadOnly(self):
+ spec = "amqp.0-10-qpid-errata.xml"
+ f = testrunner.get_spec_file(spec)
+ dest = tempfile.mkdtemp()
+ shutil.copy(f, dest)
+ shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
+ os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
+ fname = os.path.join(dest, spec)
+ load(fname)
+ assert not os.path.exists("%s.pcl" % fname)