summaryrefslogtreecommitdiff
path: root/python/qpid/tests
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/tests')
-rw-r--r--python/qpid/tests/__init__.py60
-rw-r--r--python/qpid/tests/codec.py601
-rw-r--r--python/qpid/tests/codec010.py133
-rw-r--r--python/qpid/tests/connection.py227
-rw-r--r--python/qpid/tests/datatypes.py296
-rw-r--r--python/qpid/tests/framing.py289
-rw-r--r--python/qpid/tests/messaging/__init__.py185
-rw-r--r--python/qpid/tests/messaging/address.py321
-rw-r--r--python/qpid/tests/messaging/endpoints.py1335
-rw-r--r--python/qpid/tests/messaging/message.py155
-rw-r--r--python/qpid/tests/mimetype.py56
-rw-r--r--python/qpid/tests/parser.py37
-rw-r--r--python/qpid/tests/queue.py71
-rw-r--r--python/qpid/tests/spec010.py74
14 files changed, 0 insertions, 3840 deletions
diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py
deleted file mode 100644
index 101a0c3759..0000000000
--- a/python/qpid/tests/__init__.py
+++ /dev/null
@@ -1,60 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-class Test:
-
- def __init__(self, name):
- self.name = name
-
- def configure(self, config):
- self.config = config
-
-# API Tests
-import qpid.tests.framing
-import qpid.tests.mimetype
-import qpid.tests.messaging
-
-# Legacy Tests
-import qpid.tests.codec
-import qpid.tests.queue
-import qpid.tests.datatypes
-import qpid.tests.connection
-import qpid.tests.spec010
-import qpid.tests.codec010
-
-class TestTestsXXX(Test):
-
- def testFoo(self):
- print "this test has output"
-
- def testBar(self):
- print "this test "*8
- print "has"*10
- print "a"*75
- print "lot of"*10
- print "output"*10
-
- def testQux(self):
- import sys
- sys.stdout.write("this test has output with no newline")
-
- def testQuxFail(self):
- import sys
- sys.stdout.write("this test has output with no newline")
- fdsa
diff --git a/python/qpid/tests/codec.py b/python/qpid/tests/codec.py
deleted file mode 100644
index 8fd0528636..0000000000
--- a/python/qpid/tests/codec.py
+++ /dev/null
@@ -1,601 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import unittest
-from qpid.codec import Codec
-from qpid.spec08 import load
-from cStringIO import StringIO
-from qpid.reference import ReferenceId
-
-__doc__ = """
-
- This is a unit test script for qpid/codec.py
-
- It can be run standalone or as part of the existing test framework.
-
- To run standalone:
- -------------------
-
- Place in the qpid/python/tests/ directory and type...
-
- python codec.py
-
- A brief output will be printed on screen. The verbose output will be placed inn a file called
- codec_unit_test_output.txt. [TODO: make this filename configurable]
-
- To run as part of the existing test framework:
- -----------------------------------------------
-
- python run-tests tests.codec
-
- Change History:
- -----------------
- Jimmy John 05/19/2007 Initial draft
- Jimmy John 05/22/2007 Implemented comments by Rafael Schloming
-
-
-"""
-
-from qpid.specs_config import amqp_spec_0_8
-SPEC = load(amqp_spec_0_8)
-
-# --------------------------------------
-# --------------------------------------
-class BaseDataTypes(unittest.TestCase):
-
-
- """
- Base class containing common functions
- """
-
- # ---------------
- def setUp(self):
- """
- standard setUp for unitetest (refer unittest documentation for details)
- """
- self.codec = Codec(StringIO(), SPEC)
-
- # ------------------
- def tearDown(self):
- """
- standard tearDown for unitetest (refer unittest documentation for details)
- """
- self.codec.stream.flush()
- self.codec.stream.close()
-
- # ----------------------------------------
- def callFunc(self, functionName, *args):
- """
- helper function - given a function name and arguments, calls the function with the args and
- returns the contents of the stream
- """
- getattr(self.codec, functionName)(args[0])
- return self.codec.stream.getvalue()
-
- # ----------------------------------------
- def readFunc(self, functionName, *args):
- """
- helper function - creates a input stream and then calls the function with arguments as have been
- supplied
- """
- self.codec.stream = StringIO(args[0])
- return getattr(self.codec, functionName)()
-
-
-# ----------------------------------------
-# ----------------------------------------
-class IntegerTestCase(BaseDataTypes):
-
- """
- Handles octet, short, long, long long
-
- """
-
- # -------------------------
- def __init__(self, *args):
- """
- sets constants for use in tests
- """
-
- BaseDataTypes.__init__(self, *args)
- self.const_integer = 2
- self.const_integer_octet_encoded = '\x02'
- self.const_integer_short_encoded = '\x00\x02'
- self.const_integer_long_encoded = '\x00\x00\x00\x02'
- self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02'
-
- # -------------------------- #
- # Unsigned Octect - 8 bits #
- # -------------------------- #
-
- # --------------------------
- def test_unsigned_octet(self):
- """
- ubyte format requires 0<=number<=255
- """
- self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...')
-
- # -------------------------------------------
- def test_octet_out_of_upper_range(self):
- """
- testing for input above acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_octet, 256)
-
- # -------------------------------------------
- def test_uoctet_out_of_lower_range(self):
- """
- testing for input below acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_octet, -1)
-
- # ---------------------------------
- def test_uoctet_with_fraction(self):
- """
- the fractional part should be ignored...
- """
- self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...')
-
- # ------------------------------------
- def test_unsigned_octet_decode(self):
- """
- octet decoding
- """
- self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...')
-
- # ----------------------------------- #
- # Unsigned Short Integers - 16 bits #
- # ----------------------------------- #
-
- # -----------------------
- def test_ushort_int(self):
- """
- testing unsigned short integer
- """
- self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...')
-
- # -------------------------------------------
- def test_ushort_int_out_of_upper_range(self):
- """
- testing for input above acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_short, 65536)
-
- # -------------------------------------------
- def test_ushort_int_out_of_lower_range(self):
- """
- testing for input below acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_short, -1)
-
- # ---------------------------------
- def test_ushort_int_with_fraction(self):
- """
- the fractional part should be ignored...
- """
- self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...')
-
- # ------------------------------------
- def test_ushort_int_decode(self):
- """
- unsigned short decoding
- """
- self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...')
-
-
- # ---------------------------------- #
- # Unsigned Long Integers - 32 bits #
- # ---------------------------------- #
-
- # -----------------------
- def test_ulong_int(self):
- """
- testing unsigned long iteger
- """
- self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...')
-
- # -------------------------------------------
- def test_ulong_int_out_of_upper_range(self):
- """
- testing for input above acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296)
-
- # -------------------------------------------
- def test_ulong_int_out_of_lower_range(self):
- """
- testing for input below acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_long, -1)
-
- # ---------------------------------
- def test_ulong_int_with_fraction(self):
- """
- the fractional part should be ignored...
- """
- self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...')
-
- # -------------------------------
- def test_ulong_int_decode(self):
- """
- unsigned long decoding
- """
- self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...')
-
-
- # --------------------------------------- #
- # Unsigned Long Long Integers - 64 bits #
- # --------------------------------------- #
-
- # -----------------------
- def test_ulong_long_int(self):
- """
- testing unsinged long long integer
- """
- self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...')
-
- # -------------------------------------------
- def test_ulong_long_int_out_of_upper_range(self):
- """
- testing for input above acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616)
-
- # -------------------------------------------
- def test_ulong_long_int_out_of_lower_range(self):
- """
- testing for input below acceptable range
- """
- self.failUnlessRaises(Exception, self.codec.encode_longlong, -1)
-
- # ---------------------------------
- def test_ulong_long_int_with_fraction(self):
- """
- the fractional part should be ignored...
- """
- self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...')
-
- # ------------------------------------
- def test_ulong_long_int_decode(self):
- """
- unsigned long long decoding
- """
- self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...')
-
-# -----------------------------------
-# -----------------------------------
-class BitTestCase(BaseDataTypes):
-
- """
- Handles bits
- """
-
- # ----------------------------------------------
- def callFunc(self, functionName, *args):
- """
- helper function
- """
- for ele in args:
- getattr(self.codec, functionName)(ele)
-
- self.codec.flush()
- return self.codec.stream.getvalue()
-
- # -------------------
- def test_bit1(self):
- """
- sends in 11
- """
- self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...')
-
- # -------------------
- def test_bit2(self):
- """
- sends in 10011
- """
- self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...')
-
- # -------------------
- def test_bit3(self):
- """
- sends in 1110100111 [10 bits(right to left), should be compressed into two octets]
- """
- self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...')
-
- # ------------------------------------
- def test_bit_decode_1(self):
- """
- decode bit 1
- """
- self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...')
-
- # ------------------------------------
- def test_bit_decode_0(self):
- """
- decode bit 0
- """
- self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...')
-
-# -----------------------------------
-# -----------------------------------
-class StringTestCase(BaseDataTypes):
-
- """
- Handles short strings, long strings
- """
-
- # ------------------------------------------------------------- #
- # Short Strings - 8 bit length followed by zero or more octets #
- # ------------------------------------------------------------- #
-
- # ---------------------------------------
- def test_short_string_zero_length(self):
- """
- 0 length short string
- """
- self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...')
-
- # -------------------------------------------
- def test_short_string_positive_length(self):
- """
- positive length short string
- """
- self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...')
-
- # -------------------------------------------
- def test_short_string_out_of_upper_range(self):
- """
- string length > 255
- """
- self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256)
-
- # ------------------------------------
- def test_short_string_decode(self):
- """
- short string decode
- """
- self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...')
-
-
- # ------------------------------------------------------------- #
- # Long Strings - 32 bit length followed by zero or more octets #
- # ------------------------------------------------------------- #
-
- # ---------------------------------------
- def test_long_string_zero_length(self):
- """
- 0 length long string
- """
- self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...')
-
- # -------------------------------------------
- def test_long_string_positive_length(self):
- """
- positive length long string
- """
- self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...')
-
- # ------------------------------------
- def test_long_string_decode(self):
- """
- long string decode
- """
- self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...')
-
-
-# --------------------------------------
-# --------------------------------------
-class TimestampTestCase(BaseDataTypes):
-
- """
- No need of any test cases here as timestamps are implemented as long long which is tested above
- """
- pass
-
-# ---------------------------------------
-# ---------------------------------------
-class FieldTableTestCase(BaseDataTypes):
-
- """
- Handles Field Tables
-
- Only S/I type messages seem to be implemented currently
- """
-
- # -------------------------
- def __init__(self, *args):
- """
- sets constants for use in tests
- """
-
- BaseDataTypes.__init__(self, *args)
- self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'}
- self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1'
-
- # -------------------------------------------
- def test_field_table_name_value_pair(self):
- """
- valid name value pair
- """
- self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...')
-
- # ---------------------------------------------------
- def test_field_table_multiple_name_value_pair(self):
- """
- multiple name value pair
- """
- self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...')
-
- # ------------------------------------
- def test_field_table_decode(self):
- """
- field table decode
- """
- self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...')
-
-
-# ------------------------------------
-# ------------------------------------
-class ContentTestCase(BaseDataTypes):
-
- """
- Handles Content data types
- """
-
- # -----------------------------
- def test_content_inline(self):
- """
- inline content
- """
- self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...')
-
- # --------------------------------
- def test_content_reference(self):
- """
- reference content
- """
- self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...')
-
- # ------------------------------------
- def test_content_inline_decode(self):
- """
- inline content decode
- """
- self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...')
-
- # ------------------------------------
- def test_content_reference_decode(self):
- """
- reference content decode
- """
- self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...')
-
-# ------------------------ #
-# Pre - existing test code #
-# ------------------------ #
-
-# ---------------------
-def test(type, value):
- """
- old test function cut/copy/paste from qpid/codec.py
- """
- if isinstance(value, (list, tuple)):
- values = value
- else:
- values = [value]
- stream = StringIO()
- codec = Codec(stream, SPEC)
- for v in values:
- codec.encode(type, v)
- codec.flush()
- enc = stream.getvalue()
- stream.reset()
- dup = []
- for i in xrange(len(values)):
- dup.append(codec.decode(type))
- if values != dup:
- raise AssertionError("%r --> %r --> %r" % (values, enc, dup))
-
-# -----------------------
-def dotest(type, value):
- """
- old test function cut/copy/paste from qpid/codec.py
- """
- args = (type, value)
- test(*args)
-
-# -------------
-def oldtests():
- """
- old test function cut/copy/paste from qpid/codec.py
- """
- for value in ("1", "0", "110", "011", "11001", "10101", "10011"):
- for i in range(10):
- dotest("bit", map(lambda x: x == "1", value*i))
-
- for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}):
- dotest("table", value)
-
- for type in ("octet", "short", "long", "longlong"):
- for value in range(0, 256):
- dotest(type, value)
-
- for type in ("shortstr", "longstr"):
- for value in ("", "a", "asdf"):
- dotest(type, value)
-
-# -----------------------------------------
-class oldTests(unittest.TestCase):
-
- """
- class to handle pre-existing test cases
- """
-
- # ---------------------------
- def test_oldtestcases(self):
- """
- call the old tests
- """
- return oldtests()
-
-# ---------------------------
-# ---------------------------
-if __name__ == '__main__':
-
- codec_test_suite = unittest.TestSuite()
-
- #adding all the test suites...
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase))
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase))
-
- #loading pre-existing test case from qpid/codec.py
- codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests))
-
- run_output_stream = StringIO()
- test_runner = unittest.TextTestRunner(run_output_stream, '', '')
- test_result = test_runner.run(codec_test_suite)
-
- print '\n%d test run...' % (test_result.testsRun)
-
- if test_result.wasSuccessful():
- print '\nAll tests successful\n'
-
- if test_result.failures:
- print '\n----------'
- print '%d FAILURES:' % (len(test_result.failures))
- print '----------\n'
- for failure in test_result.failures:
- print str(failure[0]) + ' ... FAIL'
-
- if test_result.errors:
- print '\n---------'
- print '%d ERRORS:' % (len(test_result.errors))
- print '---------\n'
-
- for error in test_result.errors:
- print str(error[0]) + ' ... ERROR'
-
- f = open('codec_unit_test_output.txt', 'w')
- f.write(str(run_output_stream.getvalue()))
- f.close()
diff --git a/python/qpid/tests/codec010.py b/python/qpid/tests/codec010.py
deleted file mode 100644
index 787ebc146f..0000000000
--- a/python/qpid/tests/codec010.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time
-
-from unittest import TestCase
-from qpid.codec010 import StringCodec
-from qpid.datatypes import timestamp, uuid4
-from qpid.ops import PRIMITIVE
-
-class CodecTest(TestCase):
-
- def check(self, type, value, compare=True):
- t = PRIMITIVE[type]
- sc = StringCodec()
- sc.write_primitive(t, value)
- decoded = sc.read_primitive(t)
- if compare:
- assert decoded == value, "%s, %s" % (decoded, value)
- return decoded
-
- def testMapString(self):
- self.check("map", {"string": "this is a test"})
-
- def testMapUnicode(self):
- self.check("map", {"unicode": u"this is a unicode test"})
-
- def testMapBinary(self):
- self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"})
-
- def testMapBuffer(self):
- s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"
- dec = self.check("map", {"buffer": buffer(s)}, False)
- assert dec["buffer"] == s
-
- def testMapInt(self):
- self.check("map", {"int": 3})
-
- def testMapLong(self):
- self.check("map", {"long": 2**32})
- self.check("map", {"long": 1 << 34})
- self.check("map", {"long": -(1 << 34)})
-
- def testMapTimestamp(self):
- decoded = self.check("map", {"timestamp": timestamp(0)})
- assert isinstance(decoded["timestamp"], timestamp)
-
- def testMapDatetime(self):
- decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False)
- assert isinstance(decoded["datetime"], timestamp)
- assert decoded["datetime"] == 0.0
-
- def testMapNone(self):
- self.check("map", {"none": None})
-
- def testMapNested(self):
- self.check("map", {"map": {"string": "nested test"}})
-
- def testMapList(self):
- self.check("map", {"list": [1, "two", 3.0, -4]})
-
- def testMapUUID(self):
- self.check("map", {"uuid": uuid4()})
-
- def testMapAll(self):
- decoded = self.check("map", {"string": "this is a test",
- "unicode": u"this is a unicode test",
- "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5",
- "int": 3,
- "long": 2**32,
- "timestamp": timestamp(0),
- "none": None,
- "map": {"string": "nested map"},
- "list": [1, "two", 3.0, -4],
- "uuid": uuid4()})
- assert isinstance(decoded["timestamp"], timestamp)
-
- def testMapEmpty(self):
- self.check("map", {})
-
- def testMapNone(self):
- self.check("map", None)
-
- def testList(self):
- self.check("list", [1, "two", 3.0, -4])
-
- def testListEmpty(self):
- self.check("list", [])
-
- def testListNone(self):
- self.check("list", None)
-
- def testArrayInt(self):
- self.check("array", [1, 2, 3, 4])
-
- def testArrayString(self):
- self.check("array", ["one", "two", "three", "four"])
-
- def testArrayEmpty(self):
- self.check("array", [])
-
- def testArrayNone(self):
- self.check("array", None)
-
- def testInt16(self):
- self.check("int16", 3)
- self.check("int16", -3)
-
- def testInt64(self):
- self.check("int64", 3)
- self.check("int64", -3)
- self.check("int64", 1<<34)
- self.check("int64", -(1<<34))
-
- def testDatetime(self):
- self.check("datetime", timestamp(0))
- self.check("datetime", timestamp(long(time.time())))
diff --git a/python/qpid/tests/connection.py b/python/qpid/tests/connection.py
deleted file mode 100644
index 6847285f69..0000000000
--- a/python/qpid/tests/connection.py
+++ /dev/null
@@ -1,227 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time
-from threading import *
-from unittest import TestCase
-from qpid.util import connect, listen
-from qpid.connection import *
-from qpid.datatypes import Message
-from qpid.delegates import Server
-from qpid.queue import Queue
-from qpid.session import Delegate
-from qpid.ops import QueueQueryResult
-
-PORT = 1234
-
-class TestServer:
-
- def __init__(self, queue):
- self.queue = queue
-
- def connection(self, connection):
- return Server(connection, delegate=self.session)
-
- def session(self, session):
- session.auto_sync = False
- return TestSession(session, self.queue)
-
-class TestSession(Delegate):
-
- def __init__(self, session, queue):
- self.session = session
- self.queue = queue
-
- def execution_sync(self, es):
- pass
-
- def queue_query(self, qq):
- return QueueQueryResult(qq.queue)
-
- def message_transfer(self, cmd):
- if cmd.destination == "echo":
- m = Message(cmd.payload)
- m.headers = cmd.headers
- self.session.message_transfer(cmd.destination, cmd.accept_mode,
- cmd.acquire_mode, m)
- elif cmd.destination == "abort":
- self.session.channel.connection.sock.close()
- elif cmd.destination == "heartbeat":
- self.session.channel.connection_heartbeat()
- else:
- self.queue.put(cmd)
-
-class ConnectionTest(TestCase):
-
- def setUp(self):
- self.queue = Queue()
- self.running = True
- started = Event()
-
- def run():
- ts = TestServer(self.queue)
- for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()):
- conn = Connection(s, delegate=ts.connection)
- try:
- conn.start(5)
- except Closed:
- pass
-
- self.server = Thread(target=run)
- self.server.setDaemon(True)
- self.server.start()
-
- started.wait(3)
- assert started.isSet()
-
- def tearDown(self):
- self.running = False
- connect("127.0.0.1", PORT).close()
- self.server.join(3)
-
- def connect(self, **kwargs):
- return Connection(connect("127.0.0.1", PORT), **kwargs)
-
- def test(self):
- c = self.connect()
- c.start(10)
-
- ssn1 = c.session("test1", timeout=10)
- ssn2 = c.session("test2", timeout=10)
-
- assert ssn1 == c.sessions["test1"]
- assert ssn2 == c.sessions["test2"]
- assert ssn1.channel != None
- assert ssn2.channel != None
- assert ssn1 in c.attached.values()
- assert ssn2 in c.attached.values()
-
- ssn1.close(5)
-
- assert ssn1.channel == None
- assert ssn1 not in c.attached.values()
- assert ssn2 in c.sessions.values()
-
- ssn2.close(5)
-
- assert ssn2.channel == None
- assert ssn2 not in c.attached.values()
- assert ssn2 not in c.sessions.values()
-
- ssn = c.session("session", timeout=10)
-
- assert ssn.channel != None
- assert ssn in c.sessions.values()
-
- destinations = ("one", "two", "three")
-
- for d in destinations:
- ssn.message_transfer(d)
-
- for d in destinations:
- cmd = self.queue.get(10)
- assert cmd.destination == d
- assert cmd.headers == None
- assert cmd.payload == None
-
- msg = Message("this is a test")
- ssn.message_transfer("four", message=msg)
- cmd = self.queue.get(10)
- assert cmd.destination == "four"
- assert cmd.headers == None
- assert cmd.payload == msg.body
-
- qq = ssn.queue_query("asdf")
- assert qq.queue == "asdf"
- c.close(5)
-
- def testCloseGet(self):
- c = self.connect()
- c.start(10)
- ssn = c.session("test", timeout=10)
- echos = ssn.incoming("echo")
-
- for i in range(10):
- ssn.message_transfer("echo", message=Message("test%d" % i))
-
- ssn.auto_sync=False
- ssn.message_transfer("abort")
-
- for i in range(10):
- m = echos.get(timeout=10)
- assert m.body == "test%d" % i
-
- try:
- m = echos.get(timeout=10)
- assert False
- except Closed, e:
- pass
-
- def testCloseListen(self):
- c = self.connect()
- c.start(10)
- ssn = c.session("test", timeout=10)
- echos = ssn.incoming("echo")
-
- messages = []
- exceptions = []
- condition = Condition()
- def listener(m): messages.append(m)
- def exc_listener(e):
- condition.acquire()
- exceptions.append(e)
- condition.notify()
- condition.release()
-
- echos.listen(listener, exc_listener)
-
- for i in range(10):
- ssn.message_transfer("echo", message=Message("test%d" % i))
-
- ssn.auto_sync=False
- ssn.message_transfer("abort")
-
- condition.acquire()
- start = time.time()
- elapsed = 0
- while not exceptions and elapsed < 10:
- condition.wait(10 - elapsed)
- elapsed = time.time() - start
- condition.release()
-
- for i in range(10):
- m = messages.pop(0)
- assert m.body == "test%d" % i
-
- assert len(exceptions) == 1
-
- def testSync(self):
- c = self.connect()
- c.start(10)
- s = c.session("test")
- s.auto_sync = False
- s.message_transfer("echo", message=Message("test"))
- s.sync(10)
-
- def testHeartbeat(self):
- c = self.connect(heartbeat=10)
- c.start(10)
- s = c.session("test")
- s.channel.connection_heartbeat()
- s.message_transfer("heartbeat")
diff --git a/python/qpid/tests/datatypes.py b/python/qpid/tests/datatypes.py
deleted file mode 100644
index 00e649d6cf..0000000000
--- a/python/qpid/tests/datatypes.py
+++ /dev/null
@@ -1,296 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from unittest import TestCase
-from qpid.datatypes import *
-from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties
-
-class SerialTest(TestCase):
-
- def test(self):
- for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)):
- assert s + 1 > s
- assert s - 1 < s
- assert s < s + 1
- assert s > s - 1
-
- assert serial(0xFFFFFFFFL) + 1 == serial(0)
-
- assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL)
- assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0)
-
- def testIncr(self):
- s = serial(0)
- s += 1
- assert s == serial(1)
-
- def testIn(self):
- l = [serial(1), serial(2), serial(3), serial(4)]
- assert serial(1) in l
- assert serial(0xFFFFFFFFL + 2) in l
- assert 4 in l
-
- def testNone(self):
- assert serial(0) != None
-
- def testHash(self):
- d = {}
- d[serial(0)] = "zero"
- assert d[0] == "zero"
-
- def testAdd(self):
- assert serial(2) + 2 == serial(4)
- assert serial(2) + 2 == 4
-
- def testSub(self):
- delta = serial(4) - serial(2)
- assert isinstance(delta, int) or isinstance(delta, long)
- assert delta == 2
-
- delta = serial(4) - 2
- assert isinstance(delta, Serial)
- assert delta == serial(2)
-
-class RangedSetTest(TestCase):
-
- def check(self, ranges):
- posts = []
- for range in ranges:
- posts.append(range.lower)
- posts.append(range.upper)
-
- sorted = posts[:]
- sorted.sort()
-
- assert posts == sorted
-
- idx = 1
- while idx + 1 < len(posts):
- assert posts[idx] + 1 != posts[idx+1]
- idx += 2
-
- def test(self):
- rs = RangedSet()
-
- self.check(rs.ranges)
-
- rs.add(1)
-
- assert 1 in rs
- assert 2 not in rs
- assert 0 not in rs
- self.check(rs.ranges)
-
- rs.add(2)
-
- assert 0 not in rs
- assert 1 in rs
- assert 2 in rs
- assert 3 not in rs
- self.check(rs.ranges)
-
- rs.add(0)
-
- assert -1 not in rs
- assert 0 in rs
- assert 1 in rs
- assert 2 in rs
- assert 3 not in rs
- self.check(rs.ranges)
-
- rs.add(37)
-
- assert -1 not in rs
- assert 0 in rs
- assert 1 in rs
- assert 2 in rs
- assert 3 not in rs
- assert 36 not in rs
- assert 37 in rs
- assert 38 not in rs
- self.check(rs.ranges)
-
- rs.add(-1)
- self.check(rs.ranges)
-
- rs.add(-3)
- self.check(rs.ranges)
-
- rs.add(1, 20)
- assert 21 not in rs
- assert 20 in rs
- self.check(rs.ranges)
-
- def testAddSelf(self):
- a = RangedSet()
- a.add(0, 8)
- self.check(a.ranges)
- a.add(0, 8)
- self.check(a.ranges)
- assert len(a.ranges) == 1
- range = a.ranges[0]
- assert range.lower == 0
- assert range.upper == 8
-
- def testEmpty(self):
- s = RangedSet()
- assert s.empty()
- s.add(0, -1)
- assert s.empty()
- s.add(0, 0)
- assert not s.empty()
-
- def testMinMax(self):
- s = RangedSet()
- assert s.max() is None
- assert s.min() is None
- s.add(0, 10)
- assert s.max() == 10
- assert s.min() == 0
- s.add(0, 5)
- assert s.max() == 10
- assert s.min() == 0
- s.add(0, 11)
- assert s.max() == 11
- assert s.min() == 0
- s.add(15, 20)
- assert s.max() == 20
- assert s.min() == 0
- s.add(-10, -5)
- assert s.max() == 20
- assert s.min() == -10
-
-class RangeTest(TestCase):
-
- def testIntersect1(self):
- a = Range(0, 10)
- b = Range(9, 20)
- i1 = a.intersect(b)
- i2 = b.intersect(a)
- assert i1.upper == 10
- assert i2.upper == 10
- assert i1.lower == 9
- assert i2.lower == 9
-
- def testIntersect2(self):
- a = Range(0, 10)
- b = Range(11, 20)
- assert a.intersect(b) == None
- assert b.intersect(a) == None
-
- def testIntersect3(self):
- a = Range(0, 10)
- b = Range(3, 5)
- i1 = a.intersect(b)
- i2 = b.intersect(a)
- assert i1.upper == 5
- assert i2.upper == 5
- assert i1.lower == 3
- assert i2.lower == 3
-
-class UUIDTest(TestCase):
-
- def test(self):
- # this test is kind of lame, but it does excercise the basic
- # functionality of the class
- u = uuid4()
- for i in xrange(1024):
- assert u != uuid4()
-
-class MessageTest(TestCase):
-
- def setUp(self):
- self.mp = MessageProperties()
- self.dp = DeliveryProperties()
- self.fp = FragmentProperties()
-
- def testHas(self):
- m = Message(self.mp, self.dp, self.fp, "body")
- assert m.has("message_properties")
- assert m.has("delivery_properties")
- assert m.has("fragment_properties")
-
- def testGet(self):
- m = Message(self.mp, self.dp, self.fp, "body")
- assert m.get("message_properties") == self.mp
- assert m.get("delivery_properties") == self.dp
- assert m.get("fragment_properties") == self.fp
-
- def testSet(self):
- m = Message(self.mp, self.dp, "body")
- assert m.get("fragment_properties") is None
- m.set(self.fp)
- assert m.get("fragment_properties") == self.fp
-
- def testSetOnEmpty(self):
- m = Message("body")
- assert m.get("delivery_properties") is None
- m.set(self.dp)
- assert m.get("delivery_properties") == self.dp
-
- def testSetReplace(self):
- m = Message(self.mp, self.dp, self.fp, "body")
- dp = DeliveryProperties()
- assert m.get("delivery_properties") == self.dp
- assert m.get("delivery_properties") != dp
- m.set(dp)
- assert m.get("delivery_properties") != self.dp
- assert m.get("delivery_properties") == dp
-
- def testClear(self):
- m = Message(self.mp, self.dp, self.fp, "body")
- assert m.get("message_properties") == self.mp
- assert m.get("delivery_properties") == self.dp
- assert m.get("fragment_properties") == self.fp
- m.clear("fragment_properties")
- assert m.get("fragment_properties") is None
- assert m.get("message_properties") == self.mp
- assert m.get("delivery_properties") == self.dp
-
-class TimestampTest(TestCase):
-
- def check(self, expected, *values):
- for v in values:
- assert isinstance(v, timestamp)
- assert v == expected
- assert v == timestamp(expected)
-
- def testAdd(self):
- self.check(4.0,
- timestamp(2.0) + 2.0,
- 2.0 + timestamp(2.0))
-
- def testSub(self):
- self.check(2.0,
- timestamp(4.0) - 2.0,
- 4.0 - timestamp(2.0))
-
- def testNeg(self):
- self.check(-4.0, -timestamp(4.0))
-
- def testPos(self):
- self.check(+4.0, +timestamp(4.0))
-
- def testAbs(self):
- self.check(4.0, abs(timestamp(-4.0)))
-
- def testConversion(self):
- dt = timestamp(0).datetime()
- t = timestamp(dt)
- assert t == 0
diff --git a/python/qpid/tests/framing.py b/python/qpid/tests/framing.py
deleted file mode 100644
index 0b33df8b9a..0000000000
--- a/python/qpid/tests/framing.py
+++ /dev/null
@@ -1,289 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# setup, usage, teardown, errors(sync), errors(async), stress, soak,
-# boundary-conditions, config
-
-from qpid.tests import Test
-from qpid.framing import *
-
-class Base(Test):
-
- def cmp_frames(self, frm1, frm2):
- assert frm1.flags == frm2.flags, "expected: %r, got %r" % (frm1, frm2)
- assert frm1.type == frm2.type, "expected: %r, got %r" % (frm1, frm2)
- assert frm1.track == frm2.track, "expected: %r, got %r" % (frm1, frm2)
- assert frm1.channel == frm2.channel, "expected: %r, got %r" % (frm1, frm2)
- assert frm1.payload == frm2.payload, "expected: %r, got %r" % (frm1, frm2)
-
- def cmp_segments(self, seg1, seg2):
- assert seg1.first == seg2.first, "expected: %r, got %r" % (seg1, seg2)
- assert seg1.last == seg2.last, "expected: %r, got %r" % (seg1, seg2)
- assert seg1.type == seg2.type, "expected: %r, got %r" % (seg1, seg2)
- assert seg1.track == seg2.track, "expected: %r, got %r" % (seg1, seg2)
- assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2)
- assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2)
-
- def cmp_list(self, l1, l2):
- if l1 is None:
- assert l2 is None
- return
-
- assert len(l1) == len(l2)
- for v1, v2 in zip(l1, l2):
- if isinstance(v1, Compound):
- self.cmp_ops(v1, v2)
- else:
- assert v1 == v2
-
- def cmp_ops(self, op1, op2):
- if op1 is None:
- assert op2 is None
- return
-
- assert op1.__class__ == op2.__class__
- cls = op1.__class__
- assert op1.NAME == op2.NAME
- assert op1.CODE == op2.CODE
- assert op1.FIELDS == op2.FIELDS
- for f in cls.FIELDS:
- v1 = getattr(op1, f.name)
- v2 = getattr(op2, f.name)
- if COMPOUND.has_key(f.type) or f.type == "struct32":
- self.cmp_ops(v1, v2)
- elif f.type in ("list", "array"):
- self.cmp_list(v1, v2)
- else:
- assert v1 == v2, "expected: %r, got %r" % (v1, v2)
-
- if issubclass(cls, Command) or issubclass(cls, Control):
- assert op1.channel == op2.channel
-
- if issubclass(cls, Command):
- assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync)
- assert (op1.headers is None and op2.headers is None) or \
- (op1.headers is not None and op2.headers is not None)
- if op1.headers is not None:
- assert len(op1.headers) == len(op2.headers)
- for h1, h2 in zip(op1.headers, op2.headers):
- self.cmp_ops(h1, h2)
-
-class FrameTest(Base):
-
- def enc_dec(self, frames, encoded=None):
- enc = FrameEncoder()
- dec = FrameDecoder()
-
- enc.write(*frames)
- bytes = enc.read()
- if encoded is not None:
- assert bytes == encoded, "expected %r, got %r" % (encoded, bytes)
- dec.write(bytes)
- dframes = dec.read()
-
- assert len(frames) == len(dframes)
- for f, df, in zip(frames, dframes):
- self.cmp_frames(f, df)
-
- def testEmpty(self):
- self.enc_dec([Frame(0, 0, 0, 0, "")],
- "\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x00")
-
- def testSingle(self):
- self.enc_dec([Frame(0, 0, 0, 1, "payload")],
- "\x00\x00\x00\x13\x00\x00\x00\x01\x00\x00\x00\x00payload")
-
- def testMaxChannel(self):
- self.enc_dec([Frame(0, 0, 0, 65535, "max-channel")],
- "\x00\x00\x00\x17\x00\x00\xff\xff\x00\x00\x00\x00max-channel")
-
- def testMaxType(self):
- self.enc_dec([Frame(0, 255, 0, 0, "max-type")],
- "\x00\xff\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00max-type")
-
- def testMaxTrack(self):
- self.enc_dec([Frame(0, 0, 15, 0, "max-track")],
- "\x00\x00\x00\x15\x00\x0f\x00\x00\x00\x00\x00\x00max-track")
-
- def testSequence(self):
- self.enc_dec([Frame(0, 0, 0, 0, "zero"),
- Frame(0, 0, 0, 1, "one"),
- Frame(0, 0, 1, 0, "two"),
- Frame(0, 0, 1, 1, "three"),
- Frame(0, 1, 0, 0, "four"),
- Frame(0, 1, 0, 1, "five"),
- Frame(0, 1, 1, 0, "six"),
- Frame(0, 1, 1, 1, "seven"),
- Frame(1, 0, 0, 0, "eight"),
- Frame(1, 0, 0, 1, "nine"),
- Frame(1, 0, 1, 0, "ten"),
- Frame(1, 0, 1, 1, "eleven"),
- Frame(1, 1, 0, 0, "twelve"),
- Frame(1, 1, 0, 1, "thirteen"),
- Frame(1, 1, 1, 0, "fourteen"),
- Frame(1, 1, 1, 1, "fifteen")])
-
-class SegmentTest(Base):
-
- def enc_dec(self, segments, frames=None, interleave=None, max_payload=Frame.MAX_PAYLOAD):
- enc = SegmentEncoder(max_payload)
- dec = SegmentDecoder()
-
- enc.write(*segments)
- frms = enc.read()
- if frames is not None:
- assert len(frames) == len(frms), "expected %s, got %s" % (frames, frms)
- for f1, f2 in zip(frames, frms):
- self.cmp_frames(f1, f2)
- if interleave is not None:
- ilvd = []
- for f in frms:
- ilvd.append(f)
- if interleave:
- ilvd.append(interleave.pop(0))
- ilvd.extend(interleave)
- dec.write(*ilvd)
- else:
- dec.write(*frms)
- segs = dec.read()
- assert len(segments) == len(segs)
- for s1, s2 in zip(segments, segs):
- self.cmp_segments(s1, s2)
-
- def testEmpty(self):
- self.enc_dec([Segment(True, True, 0, 0, 0, "")],
- [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0,
- "")])
-
- def testSingle(self):
- self.enc_dec([Segment(True, True, 0, 0, 0, "payload")],
- [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0,
- "payload")])
-
- def testMaxChannel(self):
- self.enc_dec([Segment(False, False, 0, 0, 65535, "max-channel")],
- [Frame(FIRST_FRM | LAST_FRM, 0, 0, 65535, "max-channel")])
-
- def testMaxType(self):
- self.enc_dec([Segment(False, False, 255, 0, 0, "max-type")],
- [Frame(FIRST_FRM | LAST_FRM, 255, 0, 0, "max-type")])
-
- def testMaxTrack(self):
- self.enc_dec([Segment(False, False, 0, 15, 0, "max-track")],
- [Frame(FIRST_FRM | LAST_FRM, 0, 15, 0, "max-track")])
-
- def testSequence(self):
- self.enc_dec([Segment(True, False, 0, 0, 0, "one"),
- Segment(False, False, 0, 0, 0, "two"),
- Segment(False, True, 0, 0, 0, "three")],
- [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG, 0, 0, 0, "one"),
- Frame(FIRST_FRM | LAST_FRM, 0, 0, 0, "two"),
- Frame(FIRST_FRM | LAST_FRM | LAST_SEG, 0, 0, 0, "three")])
-
- def testInterleaveChannel(self):
- frames = [Frame(0, 0, 0, 0, chr(ord("a") + i)) for i in range(7)]
- frames[0].flags |= FIRST_FRM
- frames[-1].flags |= LAST_FRM
-
- ilvd = [Frame(0, 0, 0, 1, chr(ord("a") + i)) for i in range(7)]
-
- self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefg")], frames, ilvd, max_payload=1)
-
- def testInterleaveTrack(self):
- frames = [Frame(0, 0, 0, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1))
- for i in range(0, 8, 2)]
- frames[0].flags |= FIRST_FRM
- frames[-1].flags |= LAST_FRM
-
- ilvd = [Frame(0, 0, 1, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1))
- for i in range(0, 8, 2)]
-
- self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2)
-
-from qpid.ops import *
-
-class OpTest(Base):
-
- def enc_dec(self, ops):
- enc = OpEncoder()
- dec = OpDecoder()
- enc.write(*ops)
- segs = enc.read()
- dec.write(*segs)
- dops = dec.read()
- assert len(ops) == len(dops)
- for op1, op2 in zip(ops, dops):
- self.cmp_ops(op1, op2)
-
- def testEmtpyMT(self):
- self.enc_dec([MessageTransfer()])
-
- def testEmptyMTSync(self):
- self.enc_dec([MessageTransfer(sync=True)])
-
- def testMT(self):
- self.enc_dec([MessageTransfer(destination="asdf")])
-
- def testSyncMT(self):
- self.enc_dec([MessageTransfer(destination="asdf", sync=True)])
-
- def testEmptyPayloadMT(self):
- self.enc_dec([MessageTransfer(payload="")])
-
- def testPayloadMT(self):
- self.enc_dec([MessageTransfer(payload="test payload")])
-
- def testHeadersEmptyPayloadMT(self):
- self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])])
-
- def testHeadersPayloadMT(self):
- self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")])
-
- def testMultiHeadersEmptyPayloadMT(self):
- self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])])
-
- def testMultiHeadersPayloadMT(self):
- self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")])
-
- def testContentTypeHeadersPayloadMT(self):
- self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")])
-
- def testMulti(self):
- self.enc_dec([MessageTransfer(),
- MessageTransfer(sync=True),
- MessageTransfer(destination="one"),
- MessageTransfer(destination="two", sync=True),
- MessageTransfer(destination="three", payload="test payload")])
-
- def testControl(self):
- self.enc_dec([SessionAttach(name="asdf")])
-
- def testMixed(self):
- self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")])
-
- def testChannel(self):
- self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)])
-
- def testCompound(self):
- self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])])
-
- def testListCompound(self):
- self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"),
- Xid(global_id="two"),
- Xid(global_id="three")]))])
diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py
deleted file mode 100644
index 8f6680d5e3..0000000000
--- a/python/qpid/tests/messaging/__init__.py
+++ /dev/null
@@ -1,185 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time
-from math import ceil
-from qpid.harness import Skipped
-from qpid.messaging import *
-from qpid.tests import Test
-
-class Base(Test):
-
- def setup_connection(self):
- return None
-
- def setup_session(self):
- return None
-
- def setup_sender(self):
- return None
-
- def setup_receiver(self):
- return None
-
- def setup(self):
- self.test_id = uuid4()
- self.broker = self.config.broker
- try:
- self.conn = self.setup_connection()
- except ConnectError, e:
- raise Skipped(e)
- self.ssn = self.setup_session()
- self.snd = self.setup_sender()
- if self.snd is not None:
- self.snd.durable = self.durable()
- self.rcv = self.setup_receiver()
-
- def teardown(self):
- if self.conn is not None and self.conn.attached():
- self.teardown_connection(self.conn)
- self.conn = None
-
- def teardown_connection(self, conn):
- conn.close(timeout=self.timeout())
-
- def content(self, base, count = None):
- if count is None:
- return "%s[%s]" % (base, self.test_id)
- else:
- return "%s[%s, %s]" % (base, count, self.test_id)
-
- def message(self, base, count = None, **kwargs):
- return Message(content=self.content(base, count), **kwargs)
-
- def ping(self, ssn):
- PING_Q = 'ping-queue; {create: always, delete: always}'
- # send a message
- sender = ssn.sender(PING_Q, durable=self.durable())
- content = self.content("ping")
- sender.send(content)
- receiver = ssn.receiver(PING_Q)
- msg = receiver.fetch(0)
- ssn.acknowledge()
- assert msg.content == content, "expected %r, got %r" % (content, msg.content)
-
- def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False):
- messages = []
- try:
- while limit is None or len(messages) < limit:
- messages.append(rcv.fetch(timeout=timeout))
- except Empty:
- pass
- if expected is not None:
- self.assertEchos(expected, messages, redelivered)
- return messages
-
- def diff(self, m1, m2, excluded_properties=()):
- result = {}
- for attr in ("id", "subject", "user_id", "reply_to",
- "correlation_id", "durable", "priority", "ttl",
- "redelivered", "content_type", "content"):
- a1 = getattr(m1, attr)
- a2 = getattr(m2, attr)
- if a1 != a2:
- result[attr] = (a1, a2)
- p1 = dict(m1.properties)
- p2 = dict(m2.properties)
- for ep in excluded_properties:
- p1.pop(ep, None)
- p2.pop(ep, None)
- if p1 != p2:
- result["properties"] = (p1, p2)
- return result
-
- def assertEcho(self, msg, echo, redelivered=False):
- if not isinstance(msg, Message) or not isinstance(echo, Message):
- if isinstance(msg, Message):
- msg = msg.content
- if isinstance(echo, Message):
- echo = echo.content
- assert msg == echo, "expected %s, got %s" % (msg, echo)
- else:
- delta = self.diff(msg, echo, ("x-amqp-0-10.routing-key",))
- mttl, ettl = delta.pop("ttl", (0, 0))
- if redelivered:
- assert echo.redelivered, \
- "expected %s to be redelivered: %s" % (msg, echo)
- if delta.has_key("redelivered"):
- del delta["redelivered"]
- assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl)
- assert mttl >= ettl, "%s, %s" % (mttl, ettl)
- assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta)
-
- def assertEchos(self, msgs, echoes, redelivered=False):
- assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes)
- for m, e in zip(msgs, echoes):
- self.assertEcho(m, e, redelivered)
-
- def assertEmpty(self, rcv):
- contents = self.drain(rcv)
- assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
-
- def assertAvailable(self, rcv, expected=None, lower=None, upper=None):
- if expected is not None:
- if lower is not None or upper is not None:
- raise ValueError("cannot specify lower or upper when specifying expected")
- lower = expected
- upper = expected
- else:
- if lower is None:
- lower = int(ceil(rcv.threshold*rcv.capacity))
- if upper is None:
- upper = rcv.capacity
-
- p = rcv.available()
- if upper == lower:
- assert p == lower, "expected %s, got %s" % (lower, p)
- else:
- assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper)
-
- def sleep(self):
- time.sleep(self.delay())
-
- def delay(self):
- return float(self.config.defines.get("delay", "2"))
-
- def timeout(self):
- return float(self.config.defines.get("timeout", "60"))
-
- def get_bool(self, name):
- return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
-
- def durable(self):
- return self.get_bool("durable")
-
- def reconnect(self):
- return self.get_bool("reconnect")
-
-
- def transport(self):
- if self.broker.scheme == self.broker.AMQPS:
- return "ssl"
- else:
- return "tcp"
-
- def connection_options(self):
- return {"reconnect": self.reconnect(),
- "transport": self.transport()}
-
-import address, endpoints, message
diff --git a/python/qpid/tests/messaging/address.py b/python/qpid/tests/messaging/address.py
deleted file mode 100644
index aa9562a717..0000000000
--- a/python/qpid/tests/messaging/address.py
+++ /dev/null
@@ -1,321 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-from qpid.tests import Test
-from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, \
- SYM, WSPACE, LEXER
-from qpid.lexer import Token
-from qpid.harness import Skipped
-from qpid.tests.parser import ParserBase
-
-def indent(st):
- return " " + st.replace("\n", "\n ")
-
-def pprint_address(name, subject, options):
- return "NAME: %s\nSUBJECT: %s\nOPTIONS: %s" % \
- (pprint(name), pprint(subject), pprint(options))
-
-def pprint(o):
- if isinstance(o, dict):
- return pprint_map(o)
- elif isinstance(o, list):
- return pprint_list(o)
- elif isinstance(o, basestring):
- return pprint_string(o)
- else:
- return repr(o)
-
-def pprint_map(m):
- items = ["%s: %s" % (pprint(k), pprint(v)) for k, v in m.items()]
- items.sort()
- return pprint_items("{", items, "}")
-
-def pprint_list(l):
- return pprint_items("[", [pprint(x) for x in l], "]")
-
-def pprint_items(start, items, end):
- if items:
- return "%s\n%s\n%s" % (start, ",\n".join([indent(i) for i in items]), end)
- else:
- return "%s%s" % (start, end)
-
-def pprint_string(s):
- result = "'"
- for c in s:
- if c == "'":
- result += "\\'"
- elif c == "\n":
- result += "\\n"
- elif ord(c) >= 0x80:
- result += "\\u%04x" % ord(c)
- else:
- result += c
- result += "'"
- return result
-
-class AddressTests(ParserBase, Test):
-
- EXCLUDE = (WSPACE, EOF)
-
- def fields(self, line, n):
- result = line.split(":", n - 1)
- result.extend([None]*(n - len(result)))
- return result
-
- def call(self, parser, mode, input):
- try:
- from subprocess import Popen, PIPE, STDOUT
- po = Popen([parser, mode], stdin=PIPE, stdout=PIPE, stderr=STDOUT)
- except ImportError, e:
- raise Skipped("%s" % e)
- except OSError, e:
- raise Skipped("%s: %s" % (e, parser))
- out, _ = po.communicate(input=input)
- return out
-
- def parser(self):
- return self.config.defines.get("address.parser")
-
- def do_lex(self, st):
- parser = self.parser()
- if parser:
- out = self.call(parser, "lex", st)
- lines = out.split("\n")
- toks = []
- for line in lines:
- if line.strip():
- name, position, value = self.fields(line, 3)
- toks.append(Token(LEXER.type(name), value, position, st))
- return toks
- else:
- return lex(st)
-
- def do_parse(self, st):
- return parse(st)
-
- def valid(self, addr, name=None, subject=None, options=None):
- parser = self.parser()
- if parser:
- got = self.call(parser, "parse", addr)
- expected = "%s\n" % pprint_address(name, subject, options)
- assert expected == got, "expected\n<EXP>%s</EXP>\ngot\n<GOT>%s</GOT>" % (expected, got)
- else:
- ParserBase.valid(self, addr, (name, subject, options))
-
- def invalid(self, addr, error=None):
- parser = self.parser()
- if parser:
- got = self.call(parser, "parse", addr)
- expected = "ERROR: %s\n" % error
- assert expected == got, "expected %r, got %r" % (expected, got)
- else:
- ParserBase.invalid(self, addr, error)
-
- def testDashInId1(self):
- self.lex("foo-bar", ID)
-
- def testDashInId2(self):
- self.lex("foo-3", ID)
-
- def testDashAlone1(self):
- self.lex("foo - bar", ID, SYM, ID)
-
- def testDashAlone2(self):
- self.lex("foo - 3", ID, SYM, NUMBER)
-
- def testLeadingDash(self):
- self.lex("-foo", SYM, ID)
-
- def testTrailingDash(self):
- self.lex("foo-", ID, SYM)
-
- def testNegativeNum(self):
- self.lex("-3", NUMBER)
-
- def testIdNum(self):
- self.lex("id1", ID)
-
- def testIdSpaceNum(self):
- self.lex("id 1", ID, NUMBER)
-
- def testHash(self):
- self.valid("foo/bar.#", "foo", "bar.#")
-
- def testStar(self):
- self.valid("foo/bar.*", "foo", "bar.*")
-
- def testColon(self):
- self.valid("foo.bar/baz.qux:moo:arf", "foo.bar", "baz.qux:moo:arf")
-
- def testOptions(self):
- self.valid("foo.bar/baz.qux:moo:arf; {key: value}",
- "foo.bar", "baz.qux:moo:arf", {"key": "value"})
-
- def testOptionsTrailingComma(self):
- self.valid("name/subject; {key: value,}", "name", "subject",
- {"key": "value"})
-
- def testOptionsNone(self):
- self.valid("name/subject; {key: None}", "name", "subject",
- {"key": None})
-
- def testSemiSubject(self):
- self.valid("foo.bar/'baz.qux;moo:arf'; {key: value}",
- "foo.bar", "baz.qux;moo:arf", {"key": "value"})
-
- def testCommaSubject(self):
- self.valid("foo.bar/baz.qux.{moo,arf}", "foo.bar", "baz.qux.{moo,arf}")
-
- def testCommaSubjectOptions(self):
- self.valid("foo.bar/baz.qux.{moo,arf}; {key: value}", "foo.bar",
- "baz.qux.{moo,arf}", {"key": "value"})
-
- def testUnbalanced(self):
- self.valid("foo.bar/baz.qux.{moo,arf; {key: value}", "foo.bar",
- "baz.qux.{moo,arf", {"key": "value"})
-
- def testSlashQuote(self):
- self.valid("foo.bar\\/baz.qux.{moo,arf; {key: value}",
- "foo.bar/baz.qux.{moo,arf",
- None, {"key": "value"})
-
- def testSlashHexEsc1(self):
- self.valid("foo.bar\\x00baz.qux.{moo,arf; {key: value}",
- "foo.bar\x00baz.qux.{moo,arf",
- None, {"key": "value"})
-
- def testSlashHexEsc2(self):
- self.valid("foo.bar\\xffbaz.qux.{moo,arf; {key: value}",
- "foo.bar\xffbaz.qux.{moo,arf",
- None, {"key": "value"})
-
- def testSlashHexEsc3(self):
- self.valid("foo.bar\\xFFbaz.qux.{moo,arf; {key: value}",
- "foo.bar\xFFbaz.qux.{moo,arf",
- None, {"key": "value"})
-
- def testSlashUnicode1(self):
- self.valid("foo.bar\\u1234baz.qux.{moo,arf; {key: value}",
- u"foo.bar\u1234baz.qux.{moo,arf", None, {"key": "value"})
-
- def testSlashUnicode2(self):
- self.valid("foo.bar\\u0000baz.qux.{moo,arf; {key: value}",
- u"foo.bar\u0000baz.qux.{moo,arf", None, {"key": "value"})
-
- def testSlashUnicode3(self):
- self.valid("foo.bar\\uffffbaz.qux.{moo,arf; {key: value}",
- u"foo.bar\uffffbaz.qux.{moo,arf", None, {"key": "value"})
-
- def testSlashUnicode4(self):
- self.valid("foo.bar\\uFFFFbaz.qux.{moo,arf; {key: value}",
- u"foo.bar\uFFFFbaz.qux.{moo,arf", None, {"key": "value"})
-
- def testNoName(self):
- self.invalid("; {key: value}",
- "unexpected token SEMI(;) line:1,0:; {key: value}")
-
- def testEmpty(self):
- self.invalid("", "unexpected token EOF line:1,0:")
-
- def testNoNameSlash(self):
- self.invalid("/asdf; {key: value}",
- "unexpected token SLASH(/) line:1,0:/asdf; {key: value}")
-
- def testBadOptions1(self):
- self.invalid("name/subject; {",
- "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), "
- "got EOF line:1,15:name/subject; {")
-
- def testBadOptions2(self):
- self.invalid("name/subject; { 3",
- "expecting COLON, got EOF "
- "line:1,17:name/subject; { 3")
-
- def testBadOptions3(self):
- self.invalid("name/subject; { key:",
- "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), got EOF "
- "line:1,20:name/subject; { key:")
-
- def testBadOptions4(self):
- self.invalid("name/subject; { key: value",
- "expecting (COMMA, RBRACE), got EOF "
- "line:1,26:name/subject; { key: value")
-
- def testBadOptions5(self):
- self.invalid("name/subject; { key: value asdf",
- "expecting (COMMA, RBRACE), got ID(asdf) "
- "line:1,27:name/subject; { key: value asdf")
-
- def testBadOptions6(self):
- self.invalid("name/subject; { key: value,",
- "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), got EOF "
- "line:1,27:name/subject; { key: value,")
-
- def testBadOptions7(self):
- self.invalid("name/subject; { key: value } asdf",
- "expecting EOF, got ID(asdf) "
- "line:1,29:name/subject; { key: value } asdf")
-
- def testList1(self):
- self.valid("name/subject; { key: [] }", "name", "subject", {"key": []})
-
- def testList2(self):
- self.valid("name/subject; { key: ['one'] }", "name", "subject", {"key": ['one']})
-
- def testList3(self):
- self.valid("name/subject; { key: [1, 2, 3] }", "name", "subject",
- {"key": [1, 2, 3]})
-
- def testList4(self):
- self.valid("name/subject; { key: [1, [2, 3], 4] }", "name", "subject",
- {"key": [1, [2, 3], 4]})
-
- def testBadList1(self):
- self.invalid("name/subject; { key: [ }", "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), "
- "got RBRACE(}) line:1,23:name/subject; { key: [ }")
-
- def testBadList2(self):
- self.invalid("name/subject; { key: [ 1 }", "expecting (COMMA, RBRACK), "
- "got RBRACE(}) line:1,25:name/subject; { key: [ 1 }")
-
- def testBadList3(self):
- self.invalid("name/subject; { key: [ 1 2 }", "expecting (COMMA, RBRACK), "
- "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 }")
-
- def testBadList4(self):
- self.invalid("name/subject; { key: [ 1 2 ] }", "expecting (COMMA, RBRACK), "
- "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 ] }")
-
- def testMap1(self):
- self.valid("name/subject; { 'key': value }",
- "name", "subject", {"key": "value"})
-
- def testMap2(self):
- self.valid("name/subject; { 1: value }", "name", "subject", {1: "value"})
-
- def testMap3(self):
- self.valid('name/subject; { "foo.bar": value }',
- "name", "subject", {"foo.bar": "value"})
-
- def testBoolean(self):
- self.valid("name/subject; { true1: True, true2: true, "
- "false1: False, false2: false }",
- "name", "subject", {"true1": True, "true2": True,
- "false1": False, "false2": False})
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
deleted file mode 100644
index db5ec03df2..0000000000
--- a/python/qpid/tests/messaging/endpoints.py
+++ /dev/null
@@ -1,1335 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# setup, usage, teardown, errors(sync), errors(async), stress, soak,
-# boundary-conditions, config
-
-import errno, os, socket, sys, time
-from qpid import compat
-from qpid.compat import set
-from qpid.messaging import *
-from qpid.messaging.transports import TRANSPORTS
-from qpid.tests.messaging import Base
-from threading import Thread
-
-class SetupTests(Base):
-
- def testEstablish(self):
- self.conn = Connection.establish(self.broker, **self.connection_options())
- self.ping(self.conn.session())
-
- def testOpen(self):
- self.conn = Connection(self.broker, **self.connection_options())
- self.conn.open()
- self.ping(self.conn.session())
-
- def testOpenReconnectURLs(self):
- options = self.connection_options()
- options["reconnect_urls"] = [self.broker, self.broker]
- self.conn = Connection(self.broker, **options)
- self.conn.open()
- self.ping(self.conn.session())
-
- def testTcpNodelay(self):
- self.conn = Connection.establish(self.broker, tcp_nodelay=True)
- assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
-
- def testConnectError(self):
- try:
- # Specifying port 0 yields a bad address on Windows; port 4 is unassigned
- self.conn = Connection.establish("localhost:4")
- assert False, "connect succeeded"
- except ConnectError, e:
- assert "refused" in str(e)
-
- def testGetError(self):
- self.conn = Connection("localhost:0")
- try:
- self.conn.open()
- assert False, "connect succeeded"
- except ConnectError, e:
- assert self.conn.get_error() == e
-
- def use_fds(self):
- fds = []
- try:
- while True:
- fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY))
- except OSError, e:
- if e.errno != errno.EMFILE:
- raise e
- else:
- return fds
-
- def testOpenCloseResourceLeaks(self):
- fds = self.use_fds()
- try:
- for i in range(32):
- if fds: os.close(fds.pop())
- for i in xrange(64):
- conn = Connection.establish(self.broker, **self.connection_options())
- conn.close()
- finally:
- while fds:
- os.close(fds.pop())
-
- def testOpenFailResourceLeaks(self):
- fds = self.use_fds()
- try:
- for i in range(32):
- if fds: os.close(fds.pop())
- for i in xrange(64):
- conn = Connection("localhost:0", **self.connection_options())
- # XXX: we need to force a waiter to be created for this test
- # to work
- conn._lock.acquire()
- conn._wait(lambda: False, timeout=0.001)
- conn._lock.release()
- try:
- conn.open()
- except ConnectError, e:
- pass
- finally:
- while fds:
- os.close(fds.pop())
-
- def testReconnect(self):
- options = self.connection_options()
- real = TRANSPORTS["tcp"]
-
- class flaky:
-
- def __init__(self, conn, host, port):
- self.real = real(conn, host, port)
- self.sent_count = 0
- self.recv_count = 0
-
- def fileno(self):
- return self.real.fileno()
-
- def reading(self, reading):
- return self.real.reading(reading)
-
- def writing(self, writing):
- return self.real.writing(writing)
-
- def send(self, bytes):
- if self.sent_count > 2048:
- raise socket.error("fake error")
- n = self.real.send(bytes)
- self.sent_count += n
- return n
-
- def recv(self, n):
- if self.recv_count > 2048:
- return ""
- bytes = self.real.recv(n)
- self.recv_count += len(bytes)
- return bytes
-
- def close(self):
- self.real.close()
-
- TRANSPORTS["flaky"] = flaky
-
- options["reconnect"] = True
- options["reconnect_interval"] = 0
- options["reconnect_limit"] = 100
- options["reconnect_log"] = False
- options["transport"] = "flaky"
-
- self.conn = Connection.establish(self.broker, **options)
- ssn = self.conn.session()
- snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}")
- rcv = ssn.receiver(snd.target)
-
- msgs = [self.message("testReconnect", i) for i in range(20)]
- for m in msgs:
- snd.send(m)
-
- content = set()
- drained = []
- duplicates = []
- try:
- while True:
- m = rcv.fetch(timeout=0)
- if m.content not in content:
- content.add(m.content)
- drained.append(m)
- else:
- duplicates.append(m)
- ssn.acknowledge(m)
- except Empty:
- pass
- # XXX: apparently we don't always get duplicates, should figure out why
- #assert duplicates, "no duplicates"
- assert len(drained) == len(msgs)
- for m, d in zip(msgs, drained):
- # XXX: we should figure out how to provide proper end to end
- # redelivered
- self.assertEcho(m, d, d.redelivered)
-
-class ConnectionTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def testCheckClosed(self):
- assert not self.conn.check_closed()
-
- def testSessionAnon(self):
- ssn1 = self.conn.session()
- ssn2 = self.conn.session()
- self.ping(ssn1)
- self.ping(ssn2)
- assert ssn1 is not ssn2
-
- def testSessionNamed(self):
- ssn1 = self.conn.session("one")
- ssn2 = self.conn.session("two")
- self.ping(ssn1)
- self.ping(ssn2)
- assert ssn1 is not ssn2
- assert ssn1 is self.conn.session("one")
- assert ssn2 is self.conn.session("two")
-
- def testDetach(self):
- ssn = self.conn.session()
- self.ping(ssn)
- self.conn.detach()
- try:
- self.ping(ssn)
- assert False, "ping succeeded"
- except Detached:
- # this is the expected failure when pinging on a detached
- # connection
- pass
- self.conn.attach()
- self.ping(ssn)
-
- def testClose(self):
- self.conn.close()
- assert not self.conn.attached()
-
- def testSimultaneousClose(self):
- ssns = [self.conn.session() for i in range(3)]
- for s in ssns:
- for i in range(3):
- s.receiver("amq.topic")
- s.sender("amq.topic")
-
- def closer(errors):
- try:
- self.conn.close()
- except:
- _, e, _ = sys.exc_info()
- errors.append(compat.format_exc(e))
-
- t1_errors = []
- t2_errors = []
- t1 = Thread(target=lambda: closer(t1_errors))
- t2 = Thread(target=lambda: closer(t2_errors))
- t1.start()
- t2.start()
- t1.join(self.delay())
- t2.join(self.delay())
-
- assert not t1_errors, t1_errors[0]
- assert not t2_errors, t2_errors[0]
-
-class hangable:
-
- def __init__(self, conn, host, port):
- self.tcp = TRANSPORTS["tcp"](conn, host, port)
- self.hung = False
-
- def hang(self):
- self.hung = True
-
- def fileno(self):
- return self.tcp.fileno()
-
- def reading(self, reading):
- if self.hung:
- return True
- else:
- return self.tcp.reading(reading)
-
- def writing(self, writing):
- if self.hung:
- return False
- else:
- return self.tcp.writing(writing)
-
- def send(self, bytes):
- if self.hung:
- return 0
- else:
- return self.tcp.send(bytes)
-
- def recv(self, n):
- if self.hung:
- return ""
- else:
- return self.tcp.recv(n)
-
- def close(self):
- self.tcp.close()
-
-TRANSPORTS["hangable"] = hangable
-
-class TimeoutTests(Base):
-
- def setup_connection(self):
- options = self.connection_options()
- options["transport"] = "hangable"
- return Connection.establish(self.broker, **options)
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender("amq.topic")
-
- def setup_receiver(self):
- return self.ssn.receiver("amq.topic; {link: {reliability: unreliable}}")
-
- def teardown_connection(self, conn):
- try:
- conn.detach(timeout=0)
- except Timeout:
- pass
-
- def hang(self):
- self.conn._driver._transport.hang()
-
- def timeoutTest(self, method):
- self.hang()
- try:
- method(timeout=self.delay())
- assert False, "did not time out"
- except Timeout:
- pass
-
- def testSenderSync(self):
- self.snd.send(self.content("testSenderSync"), sync=False)
- self.timeoutTest(self.snd.sync)
-
- def testSenderClose(self):
- self.snd.send(self.content("testSenderClose"), sync=False)
- self.timeoutTest(self.snd.close)
-
- def testReceiverClose(self):
- self.timeoutTest(self.rcv.close)
-
- def testSessionSync(self):
- self.snd.send(self.content("testSessionSync"), sync=False)
- self.timeoutTest(self.ssn.sync)
-
- def testSessionClose(self):
- self.timeoutTest(self.ssn.close)
-
- def testConnectionDetach(self):
- self.timeoutTest(self.conn.detach)
-
- def testConnectionClose(self):
- self.timeoutTest(self.conn.close)
-
-ACK_QC = 'test-ack-queue; {create: always}'
-ACK_QD = 'test-ack-queue; {delete: always}'
-
-class SessionTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def testSender(self):
- snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}',
- durable=self.durable())
- snd2 = self.ssn.sender(snd.target, durable=self.durable())
- assert snd is not snd2
- snd2.close()
-
- content = self.content("testSender")
- snd.send(content)
- rcv = self.ssn.receiver(snd.target)
- msg = rcv.fetch(0)
- assert msg.content == content
- self.ssn.acknowledge(msg)
-
- def testReceiver(self):
- rcv = self.ssn.receiver('test-rcv-queue; {create: always}')
- rcv2 = self.ssn.receiver(rcv.source)
- assert rcv is not rcv2
- rcv2.close()
-
- content = self.content("testReceiver")
- snd = self.ssn.sender(rcv.source, durable=self.durable())
- snd.send(content)
- msg = rcv.fetch(0)
- assert msg.content == content
- self.ssn.acknowledge(msg)
- snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}')
-
- def testDetachedReceiver(self):
- self.conn.detach()
- rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}")
- m = self.content("testDetachedReceiver")
- self.conn.attach()
- snd = self.ssn.sender("test-dis-rcv-queue")
- snd.send(m)
- self.drain(rcv, expected=[m])
-
- def testNextReceiver(self):
- ADDR = 'test-next-rcv-queue; {create: always, delete: always}'
- rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
- rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
- rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
-
- snd = self.ssn.sender(ADDR)
-
- msgs = []
- for i in range(10):
- content = self.content("testNextReceiver", i)
- snd.send(content)
- msgs.append(content)
-
- fetched = []
- try:
- while True:
- rcv = self.ssn.next_receiver(timeout=self.delay())
- assert rcv in (rcv1, rcv2, rcv3)
- assert rcv.available() > 0
- fetched.append(rcv.fetch().content)
- except Empty:
- pass
- assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
- self.ssn.acknowledge()
- #we set the capacity to 0 to prevent the deletion of the queue -
- #triggered the deletion policy when the first receiver is closed -
- #resulting in session exceptions being issued for the remaining
- #active subscriptions:
- for r in [rcv1, rcv2, rcv3]:
- r.capacity = 0
-
- # XXX, we need a convenient way to assert that required queues are
- # empty on setup, and possibly also to drain queues on teardown
- def ackTest(self, acker, ack_capacity=None):
- # send a bunch of messages
- snd = self.ssn.sender(ACK_QC, durable=self.durable())
- contents = [self.content("ackTest", i) for i in range(15)]
- for c in contents:
- snd.send(c)
-
- # drain the queue, verify the messages are there and then close
- # without acking
- rcv = self.ssn.receiver(ACK_QC)
- self.drain(rcv, expected=contents)
- self.ssn.close()
-
- # drain the queue again, verify that they are all the messages
- # were requeued, and ack this time before closing
- self.ssn = self.conn.session()
- if ack_capacity is not None:
- self.ssn.ack_capacity = ack_capacity
- rcv = self.ssn.receiver(ACK_QC)
- self.drain(rcv, expected=contents)
- acker(self.ssn)
- self.ssn.close()
-
- # drain the queue a final time and verify that the messages were
- # dequeued
- self.ssn = self.conn.session()
- rcv = self.ssn.receiver(ACK_QD)
- self.assertEmpty(rcv)
-
- def testAcknowledge(self):
- self.ackTest(lambda ssn: ssn.acknowledge())
-
- def testAcknowledgeAsync(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
-
- def testAcknowledgeAsyncAckCap0(self):
- try:
- try:
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
- assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
- except InsufficientCapacity:
- pass
- finally:
- self.ssn.ack_capacity = UNLIMITED
- self.drain(self.ssn.receiver(ACK_QD))
- self.ssn.acknowledge()
-
- def testAcknowledgeAsyncAckCap1(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
-
- def testAcknowledgeAsyncAckCap5(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
-
- def testAcknowledgeAsyncAckCapUNLIMITED(self):
- self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
-
- def testRelease(self):
- msgs = [self.message("testRelease", i) for i in range(3)]
- snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
- for m in msgs:
- snd.send(m)
- rcv = self.ssn.receiver(snd.target)
- echos = self.drain(rcv, expected=msgs)
- self.ssn.acknowledge(echos[0])
- self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
- self.ssn.acknowledge(echos[2], Disposition(RELEASED))
- self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
- self.drain(rcv, expected=msgs[2:3])
- self.ssn.acknowledge()
-
- def testReject(self):
- msgs = [self.message("testReject", i) for i in range(3)]
- snd = self.ssn.sender("""
- test-reject-queue; {
- create: always,
- delete: always,
- node: {
- x-declare: {
- alternate-exchange: 'amq.topic'
- }
- }
- }
-""")
- for m in msgs:
- snd.send(m)
- rcv = self.ssn.receiver(snd.target)
- rej = self.ssn.receiver("amq.topic")
- echos = self.drain(rcv, expected=msgs)
- self.ssn.acknowledge(echos[0])
- self.ssn.acknowledge(echos[1], Disposition(REJECTED))
- self.ssn.acknowledge(echos[2],
- Disposition(REJECTED, code=3, text="test-reject"))
- self.drain(rej, expected=msgs[1:])
- self.ssn.acknowledge()
-
- def send(self, ssn, target, base, count=1):
- snd = ssn.sender(target, durable=self.durable())
- messages = []
- for i in range(count):
- c = self.message(base, i)
- snd.send(c)
- messages.append(c)
- snd.close()
- return messages
-
- def txTest(self, commit):
- TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
- TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
- txssn = self.conn.session(transactional=True)
- messages = self.send(self.ssn, TX_Q, "txTest", 3)
- txrcv = txssn.receiver(TX_Q)
- txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
- rcv = self.ssn.receiver(txrcv.source)
- copy_rcv = self.ssn.receiver(txsnd.target)
- self.assertEmpty(copy_rcv)
- for i in range(3):
- m = txrcv.fetch(0)
- txsnd.send(m)
- self.assertEmpty(copy_rcv)
- txssn.acknowledge()
- if commit:
- txssn.commit()
- self.assertEmpty(rcv)
- self.drain(copy_rcv, expected=messages)
- else:
- txssn.rollback()
- self.drain(rcv, expected=messages, redelivered=True)
- self.assertEmpty(copy_rcv)
- self.ssn.acknowledge()
-
- def testCommit(self):
- self.txTest(True)
-
- def testRollback(self):
- self.txTest(False)
-
- def txTestSend(self, commit):
- TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
- txssn = self.conn.session(transactional=True)
- messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
- rcv = self.ssn.receiver(TX_SEND_Q)
- self.assertEmpty(rcv)
-
- if commit:
- txssn.commit()
- self.drain(rcv, expected=messages)
- self.ssn.acknowledge()
- else:
- txssn.rollback()
- self.assertEmpty(rcv)
- txssn.commit()
- self.assertEmpty(rcv)
-
- def testCommitSend(self):
- self.txTestSend(True)
-
- def testRollbackSend(self):
- self.txTestSend(False)
-
- def txTestAck(self, commit):
- TX_ACK_QC = 'test-tx-ack-queue; {create: always}'
- TX_ACK_QD = 'test-tx-ack-queue; {delete: always}'
- txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_QC)
- self.assertEmpty(txrcv)
- messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
- self.drain(txrcv, expected=messages)
-
- if commit:
- txssn.acknowledge()
- else:
- txssn.rollback()
- self.drain(txrcv, expected=messages, redelivered=True)
- txssn.acknowledge()
- txssn.rollback()
- self.drain(txrcv, expected=messages, redelivered=True)
- txssn.commit() # commit without ack
- self.assertEmpty(txrcv)
-
- txssn.close()
-
- txssn = self.conn.session(transactional=True)
- txrcv = txssn.receiver(TX_ACK_QC)
- self.drain(txrcv, expected=messages, redelivered=True)
- txssn.acknowledge()
- txssn.commit()
- rcv = self.ssn.receiver(TX_ACK_QD)
- self.assertEmpty(rcv)
- txssn.close()
- self.assertEmpty(rcv)
-
- def testCommitAck(self):
- self.txTestAck(True)
-
- def testRollbackAck(self):
- self.txTestAck(False)
-
- def testDoubleCommit(self):
- ssn = self.conn.session(transactional=True)
- snd = ssn.sender("amq.direct")
- rcv = ssn.receiver("amq.direct")
- msgs = [self.message("testDoubleCommit", i) for i in range(3)]
- for m in msgs:
- snd.send(m)
- ssn.commit()
- self.drain(rcv, expected=msgs)
- ssn.acknowledge()
- ssn.commit()
-
- def testClose(self):
- self.ssn.close()
- try:
- self.ping(self.ssn)
- assert False, "ping succeeded"
- except Detached:
- pass
-
-RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
-
-class ReceiverTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(RECEIVER_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(RECEIVER_Q)
-
- def send(self, base, count = None, sync=True):
- content = self.content(base, count)
- self.snd.send(content, sync=sync)
- return content
-
- def testFetch(self):
- try:
- msg = self.rcv.fetch(0)
- assert False, "unexpected message: %s" % msg
- except Empty:
- pass
- try:
- start = time.time()
- msg = self.rcv.fetch(self.delay())
- assert False, "unexpected message: %s" % msg
- except Empty:
- elapsed = time.time() - start
- assert elapsed >= self.delay()
-
- one = self.send("testFetch", 1)
- two = self.send("testFetch", 2)
- three = self.send("testFetch", 3)
- msg = self.rcv.fetch(0)
- assert msg.content == one
- msg = self.rcv.fetch(self.delay())
- assert msg.content == two
- msg = self.rcv.fetch()
- assert msg.content == three
- self.ssn.acknowledge()
-
- def fetchFromClosedTest(self, entry):
- entry.close()
- try:
- msg = self.rcv.fetch(0)
- assert False, "unexpected result: %s" % msg
- except Empty, e:
- assert False, "unexpected exception: %s" % e
- except LinkClosed, e:
- pass
-
- def testFetchFromClosedReceiver(self):
- self.fetchFromClosedTest(self.rcv)
-
- def testFetchFromClosedSession(self):
- self.fetchFromClosedTest(self.ssn)
-
- def testFetchFromClosedConnection(self):
- self.fetchFromClosedTest(self.conn)
-
- def fetchFromConcurrentCloseTest(self, entry):
- def closer():
- self.sleep()
- entry.close()
- t = Thread(target=closer)
- t.start()
- try:
- msg = self.rcv.fetch()
- assert False, "unexpected result: %s" % msg
- except Empty, e:
- assert False, "unexpected exception: %s" % e
- except LinkClosed, e:
- pass
- t.join()
-
- def testFetchFromConcurrentCloseReceiver(self):
- self.fetchFromConcurrentCloseTest(self.rcv)
-
- def testFetchFromConcurrentCloseSession(self):
- self.fetchFromConcurrentCloseTest(self.ssn)
-
- def testFetchFromConcurrentCloseConnection(self):
- self.fetchFromConcurrentCloseTest(self.conn)
-
- def testCapacityIncrease(self):
- content = self.send("testCapacityIncrease")
- self.sleep()
- assert self.rcv.available() == 0
- self.rcv.capacity = UNLIMITED
- self.sleep()
- assert self.rcv.available() == 1
- msg = self.rcv.fetch(0)
- assert msg.content == content
- assert self.rcv.available() == 0
- self.ssn.acknowledge()
-
- def testCapacityDecrease(self):
- self.rcv.capacity = UNLIMITED
- one = self.send("testCapacityDecrease", 1)
- self.sleep()
- assert self.rcv.available() == 1
- msg = self.rcv.fetch(0)
- assert msg.content == one
-
- self.rcv.capacity = 0
-
- two = self.send("testCapacityDecrease", 2)
- self.sleep()
- assert self.rcv.available() == 0
- msg = self.rcv.fetch(0)
- assert msg.content == two
-
- self.ssn.acknowledge()
-
- def capacityTest(self, capacity, threshold=None):
- if threshold is not None:
- self.rcv.threshold = threshold
- self.rcv.capacity = capacity
- self.assertAvailable(self.rcv, 0)
-
- for i in range(2*capacity):
- self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False)
- self.snd.sync()
- self.sleep()
- self.assertAvailable(self.rcv)
-
- first = capacity/2
- second = capacity - first
- self.drain(self.rcv, limit = first)
- self.sleep()
- self.assertAvailable(self.rcv)
- self.drain(self.rcv, limit = second)
- self.sleep()
- self.assertAvailable(self.rcv)
-
- drained = self.drain(self.rcv)
- assert len(drained) == capacity, "%s, %s" % (len(drained), drained)
- self.assertAvailable(self.rcv, 0)
-
- self.ssn.acknowledge()
-
- def testCapacity5(self):
- self.capacityTest(5)
-
- def testCapacity5Threshold1(self):
- self.capacityTest(5, 1)
-
- def testCapacity10(self):
- self.capacityTest(10)
-
- def testCapacity10Threshold1(self):
- self.capacityTest(10, 1)
-
- def testCapacity100(self):
- self.capacityTest(100)
-
- def testCapacity100Threshold1(self):
- self.capacityTest(100, 1)
-
- def testCapacityUNLIMITED(self):
- self.rcv.capacity = UNLIMITED
- self.assertAvailable(self.rcv, 0)
-
- for i in range(10):
- self.send("testCapacityUNLIMITED", i)
- self.sleep()
- self.assertAvailable(self.rcv, 10)
-
- self.drain(self.rcv)
- self.assertAvailable(self.rcv, 0)
-
- self.ssn.acknowledge()
-
- def testAvailable(self):
- self.rcv.capacity = UNLIMITED
- assert self.rcv.available() == 0
-
- for i in range(3):
- self.send("testAvailable", i)
- self.sleep()
- assert self.rcv.available() == 3
-
- for i in range(3, 10):
- self.send("testAvailable", i)
- self.sleep()
- assert self.rcv.available() == 10
-
- self.drain(self.rcv, limit=3)
- assert self.rcv.available() == 7
-
- self.drain(self.rcv)
- assert self.rcv.available() == 0
-
- self.ssn.acknowledge()
-
- def testDoubleClose(self):
- m1 = self.content("testDoubleClose", 1)
- m2 = self.content("testDoubleClose", 2)
-
- snd = self.ssn.sender("""test-double-close; {
- create: always,
- delete: sender,
- node: {
- type: topic
- }
-}
-""")
- r1 = self.ssn.receiver(snd.target)
- r2 = self.ssn.receiver(snd.target)
- snd.send(m1)
- self.drain(r1, expected=[m1])
- self.drain(r2, expected=[m1])
- r1.close()
- snd.send(m2)
- self.drain(r2, expected=[m2])
- r2.close()
-
- # XXX: need testClose
-
- def testMode(self):
- msgs = [self.content("testMode", 1),
- self.content("testMode", 2),
- self.content("testMode", 3)]
-
- for m in msgs:
- self.snd.send(m)
-
- rb = self.ssn.receiver('test-receiver-queue; {mode: browse}')
- rc = self.ssn.receiver('test-receiver-queue; {mode: consume}')
- self.drain(rb, expected=msgs)
- self.drain(rc, expected=msgs)
- rb2 = self.ssn.receiver(rb.source)
- self.assertEmpty(rb2)
- self.drain(self.rcv, expected=[])
-
- # XXX: need testUnsettled()
-
- def unreliabilityTest(self, mode="unreliable"):
- msgs = [self.message("testUnreliable", i) for i in range(3)]
- snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}")
- rcv = self.ssn.receiver(snd.target)
- for m in msgs:
- snd.send(m)
-
- # close without ack on reliable receiver, messages should be requeued
- ssn = self.conn.session()
- rrcv = ssn.receiver("test-unreliability-queue")
- self.drain(rrcv, expected=msgs)
- ssn.close()
-
- # close without ack on unreliable receiver, messages should not be requeued
- ssn = self.conn.session()
- urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode)
- self.drain(urcv, expected=msgs, redelivered=True)
- ssn.close()
-
- self.assertEmpty(rcv)
-
- def testUnreliable(self):
- self.unreliabilityTest(mode="unreliable")
-
- def testAtMostOnce(self):
- self.unreliabilityTest(mode="at-most-once")
-
-class AddressTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def badOption(self, options, error):
- try:
- self.ssn.sender("test-bad-options-snd; %s" % options)
- assert False
- except InvalidOption, e:
- assert "error in options: %s" % error == str(e), e
-
- try:
- self.ssn.receiver("test-bad-options-rcv; %s" % options)
- assert False
- except InvalidOption, e:
- assert "error in options: %s" % error == str(e), e
-
- def testIllegalKey(self):
- self.badOption("{create: always, node: "
- "{this-property-does-not-exist: 3}}",
- "node: this-property-does-not-exist: "
- "illegal key")
-
- def testWrongValue(self):
- self.badOption("{create: asdf}", "create: asdf not in "
- "('always', 'sender', 'receiver', 'never')")
-
- def testWrongType1(self):
- self.badOption("{node: asdf}",
- "node: asdf is not a map")
-
- def testWrongType2(self):
- self.badOption("{node: {durable: []}}",
- "node: durable: [] is not a bool")
-
- def testCreateQueue(self):
- snd = self.ssn.sender("test-create-queue; {create: always, delete: always, "
- "node: {type: queue, durable: False, "
- "x-declare: {auto_delete: true}}}")
- content = self.content("testCreateQueue")
- snd.send(content)
- rcv = self.ssn.receiver("test-create-queue")
- self.drain(rcv, expected=[content])
-
- def createExchangeTest(self, props=""):
- addr = """test-create-exchange; {
- create: always,
- delete: always,
- node: {
- type: topic,
- durable: False,
- x-declare: {auto_delete: true, %s}
- }
- }""" % props
- snd = self.ssn.sender(addr)
- snd.send("ping")
- rcv1 = self.ssn.receiver("test-create-exchange/first")
- rcv2 = self.ssn.receiver("test-create-exchange/first")
- rcv3 = self.ssn.receiver("test-create-exchange/second")
- for r in (rcv1, rcv2, rcv3):
- try:
- r.fetch(0)
- assert False
- except Empty:
- pass
- msg1 = Message(self.content("testCreateExchange", 1), subject="first")
- msg2 = Message(self.content("testCreateExchange", 2), subject="second")
- snd.send(msg1)
- snd.send(msg2)
- self.drain(rcv1, expected=[msg1.content])
- self.drain(rcv2, expected=[msg1.content])
- self.drain(rcv3, expected=[msg2.content])
-
- def testCreateExchange(self):
- self.createExchangeTest()
-
- def testCreateExchangeDirect(self):
- self.createExchangeTest("type: direct")
-
- def testCreateExchangeTopic(self):
- self.createExchangeTest("type: topic")
-
- def testDeleteBySender(self):
- snd = self.ssn.sender("test-delete; {create: always}")
- snd.send("ping")
- snd.close()
- snd = self.ssn.sender("test-delete; {delete: always}")
- snd.send("ping")
- snd.close()
- try:
- self.ssn.sender("test-delete")
- except NotFound, e:
- assert "no such queue" in str(e)
-
- def testDeleteByReceiver(self):
- rcv = self.ssn.receiver("test-delete; {create: always, delete: always}")
- try:
- rcv.fetch(0)
- except Empty:
- pass
- rcv.close()
-
- try:
- self.ssn.receiver("test-delete")
- assert False
- except NotFound, e:
- assert "no such queue" in str(e)
-
- def testDeleteSpecial(self):
- snd = self.ssn.sender("amq.topic; {delete: always}")
- snd.send("asdf")
- try:
- snd.close()
- assert False, "successfully deleted amq.topic"
- except SessionError, e:
- assert "Cannot delete default exchange" in str(e)
- # XXX: need to figure out close after error
- self.conn._remove_session(self.ssn)
-
- def testNodeBindingsQueue(self):
- snd = self.ssn.sender("""
-test-node-bindings-queue; {
- create: always,
- delete: always,
- node: {
- x-bindings: [{exchange: "amq.topic", key: "a.#"},
- {exchange: "amq.direct", key: "b"},
- {exchange: "amq.topic", key: "c.*"}]
- }
-}
-""")
- snd.send("one")
- snd_a = self.ssn.sender("amq.topic/a.foo")
- snd_b = self.ssn.sender("amq.direct/b")
- snd_c = self.ssn.sender("amq.topic/c.bar")
- snd_a.send("two")
- snd_b.send("three")
- snd_c.send("four")
- rcv = self.ssn.receiver("test-node-bindings-queue")
- self.drain(rcv, expected=["one", "two", "three", "four"])
-
- def testNodeBindingsTopic(self):
- rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}")
- rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}")
- rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}")
- rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}")
- snd = self.ssn.sender("""
-test-node-bindings-topic; {
- create: always,
- delete: always,
- node: {
- type: topic,
- x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"},
- {queue: test-node-bindings-topic-queue-a, key: "a.#"},
- {queue: test-node-bindings-topic-queue-b, key: "b"},
- {queue: test-node-bindings-topic-queue-c, key: "c.*"}]
- }
-}
-""")
- m1 = Message("one")
- m2 = Message(subject="a.foo", content="two")
- m3 = Message(subject="b", content="three")
- m4 = Message(subject="c.bar", content="four")
- snd.send(m1)
- snd.send(m2)
- snd.send(m3)
- snd.send(m4)
- self.drain(rcv, expected=[m1, m2, m3, m4])
- self.drain(rcv_a, expected=[m2])
- self.drain(rcv_b, expected=[m3])
- self.drain(rcv_c, expected=[m4])
-
- def testLinkBindings(self):
- m_a = self.message("testLinkBindings", 1, subject="a")
- m_b = self.message("testLinkBindings", 2, subject="b")
-
- self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}")
- snd = self.ssn.sender("amq.topic")
-
- snd.send(m_a)
- snd.send(m_b)
- snd.close()
-
- rcv = self.ssn.receiver("test-link-bindings-queue")
- self.assertEmpty(rcv)
-
- snd = self.ssn.sender("""
-amq.topic; {
- link: {
- x-bindings: [{queue: test-link-bindings-queue, key: a}]
- }
-}
-""")
-
- snd.send(m_a)
- snd.send(m_b)
-
- self.drain(rcv, expected=[m_a])
- rcv.close()
-
- rcv = self.ssn.receiver("""
-test-link-bindings-queue; {
- link: {
- x-bindings: [{exchange: "amq.topic", key: b}]
- }
-}
-""")
-
- snd.send(m_a)
- snd.send(m_b)
-
- self.drain(rcv, expected=[m_a, m_b])
-
- def testSubjectOverride(self):
- snd = self.ssn.sender("amq.topic/a")
- rcv_a = self.ssn.receiver("amq.topic/a")
- rcv_b = self.ssn.receiver("amq.topic/b")
- m1 = self.content("testSubjectOverride", 1)
- m2 = self.content("testSubjectOverride", 2)
- snd.send(m1)
- snd.send(Message(subject="b", content=m2))
- self.drain(rcv_a, expected=[m1])
- self.drain(rcv_b, expected=[m2])
-
- def testSubjectDefault(self):
- m1 = self.content("testSubjectDefault", 1)
- m2 = self.content("testSubjectDefault", 2)
- snd = self.ssn.sender("amq.topic/a")
- rcv = self.ssn.receiver("amq.topic")
- snd.send(m1)
- snd.send(Message(subject="b", content=m2))
- e1 = rcv.fetch(timeout=0)
- e2 = rcv.fetch(timeout=0)
- assert e1.subject == "a", "subject: %s" % e1.subject
- assert e2.subject == "b", "subject: %s" % e2.subject
- self.assertEmpty(rcv)
-
- def doReliabilityTest(self, reliability, messages, expected):
- snd = self.ssn.sender("amq.topic")
- rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability)
- for m in messages:
- snd.send(m)
- self.conn.detach()
- self.conn.attach()
- self.drain(rcv, expected=expected)
-
- def testReliabilityUnreliable(self):
- msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)]
- self.doReliabilityTest("unreliable", msgs, [])
-
- def testReliabilityAtLeastOnce(self):
- msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)]
- self.doReliabilityTest("at-least-once", msgs, msgs)
-
- def testLinkName(self):
- msgs = [self.message("testLinkName", i) for i in range(3)]
- snd = self.ssn.sender("amq.topic")
- trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}")
- qrcv = self.ssn.receiver("test-link-name")
- for m in msgs:
- snd.send(m)
- self.drain(qrcv, expected=msgs)
-
- def testAssert1(self):
- try:
- snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}")
- assert 0, "assertion failed to trigger"
- except AssertionFailed, e:
- pass
-
- def testAssert2(self):
- snd = self.ssn.sender("amq.topic; {assert: always}")
-
-NOSUCH_Q = "this-queue-should-not-exist"
-UNPARSEABLE_ADDR = "name/subject; {bad options"
-UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3"
-
-class AddressErrorTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def senderErrorTest(self, addr, exc, check=lambda e: True):
- try:
- self.ssn.sender(addr, durable=self.durable())
- assert False, "sender creation succeeded"
- except exc, e:
- assert check(e), "unexpected error: %s" % compat.format_exc(e)
-
- def receiverErrorTest(self, addr, exc, check=lambda e: True):
- try:
- self.ssn.receiver(addr)
- assert False, "receiver creation succeeded"
- except exc, e:
- assert check(e), "unexpected error: %s" % compat.format_exc(e)
-
- def testNoneTarget(self):
- self.senderErrorTest(None, MalformedAddress)
-
- def testNoneSource(self):
- self.receiverErrorTest(None, MalformedAddress)
-
- def testNoTarget(self):
- self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
-
- def testNoSource(self):
- self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e))
-
- def testUnparseableTarget(self):
- self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
- lambda e: "expecting COLON" in str(e))
-
- def testUnparseableSource(self):
- self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress,
- lambda e: "expecting COLON" in str(e))
-
- def testUnlexableTarget(self):
- self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress,
- lambda e: "unrecognized characters" in str(e))
-
- def testUnlexableSource(self):
- self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress,
- lambda e: "unrecognized characters" in str(e))
-
- def testInvalidMode(self):
- self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}',
- InvalidOption,
- lambda e: "not in ('browse', 'consume')" in str(e))
-
-SENDER_Q = 'test-sender-q; {create: always, delete: always}'
-
-class SenderTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(SENDER_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(SENDER_Q)
-
- def checkContent(self, content):
- self.snd.send(content)
- msg = self.rcv.fetch(0)
- assert msg.content == content
-
- out = Message(content)
- self.snd.send(out)
- echo = self.rcv.fetch(0)
- assert out.content == echo.content
- assert echo.content == msg.content
- self.ssn.acknowledge()
-
- def testSendString(self):
- self.checkContent(self.content("testSendString"))
-
- def testSendList(self):
- self.checkContent(["testSendList", 1, 3.14, self.test_id])
-
- def testSendMap(self):
- self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
-
- def asyncTest(self, capacity):
- self.snd.capacity = capacity
- msgs = [self.content("asyncTest", i) for i in range(15)]
- for m in msgs:
- self.snd.send(m, sync=False)
- self.drain(self.rcv, timeout=self.delay(), expected=msgs)
- self.ssn.acknowledge()
-
- def testSendAsyncCapacity0(self):
- try:
- self.asyncTest(0)
- assert False, "send shouldn't succeed with zero capacity"
- except InsufficientCapacity:
- # this is expected
- pass
-
- def testSendAsyncCapacity1(self):
- self.asyncTest(1)
-
- def testSendAsyncCapacity5(self):
- self.asyncTest(5)
-
- def testSendAsyncCapacityUNLIMITED(self):
- self.asyncTest(UNLIMITED)
-
- def testCapacityTimeout(self):
- self.snd.capacity = 1
- msgs = []
- caught = False
- while len(msgs) < 100:
- m = self.content("testCapacity", len(msgs))
- try:
- self.snd.send(m, sync=False, timeout=0)
- msgs.append(m)
- except InsufficientCapacity:
- caught = True
- break
- self.snd.sync()
- self.drain(self.rcv, expected=msgs)
- self.ssn.acknowledge()
- assert caught, "did not exceed capacity"
diff --git a/python/qpid/tests/messaging/message.py b/python/qpid/tests/messaging/message.py
deleted file mode 100644
index 297374b82b..0000000000
--- a/python/qpid/tests/messaging/message.py
+++ /dev/null
@@ -1,155 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.messaging import *
-from qpid.tests.messaging import Base
-
-class MessageTests(Base):
-
- def testCreateString(self):
- m = Message("string")
- assert m.content == "string"
- assert m.content_type is None
-
- def testCreateUnicode(self):
- m = Message(u"unicode")
- assert m.content == u"unicode"
- assert m.content_type == "text/plain"
-
- def testCreateMap(self):
- m = Message({})
- assert m.content == {}
- assert m.content_type == "amqp/map"
-
- def testCreateList(self):
- m = Message([])
- assert m.content == []
- assert m.content_type == "amqp/list"
-
- def testContentTypeOverride(self):
- m = Message()
- m.content_type = "text/html; charset=utf8"
- m.content = u"<html/>"
- assert m.content_type == "text/html; charset=utf8"
-
-ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}'
-
-class MessageEchoTests(Base):
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def setup_sender(self):
- return self.ssn.sender(ECHO_Q)
-
- def setup_receiver(self):
- return self.ssn.receiver(ECHO_Q)
-
- def check(self, msg):
- self.snd.send(msg)
- echo = self.rcv.fetch(0)
- self.assertEcho(msg, echo)
- self.ssn.acknowledge(echo)
-
- def testStringContent(self):
- self.check(Message("string"))
-
- def testUnicodeContent(self):
- self.check(Message(u"unicode"))
-
-
- TEST_MAP = {"key1": "string",
- "key2": u"unicode",
- "key3": 3,
- "key4": -3,
- "key5": 3.14,
- "key6": -3.14,
- "key7": ["one", 2, 3.14],
- "key8": [],
- "key9": {"sub-key0": 3},
- "key10": True,
- "key11": False,
- "x-amqp-0-10.app-id": "test-app-id",
- "x-amqp-0-10.content-encoding": "test-content-encoding"}
-
- def testMapContent(self):
- self.check(Message(MessageEchoTests.TEST_MAP))
-
- def testListContent(self):
- self.check(Message([]))
- self.check(Message([1, 2, 3]))
- self.check(Message(["one", 2, 3.14, {"four": 4}]))
-
- def testProperties(self):
- msg = Message()
- msg.subject = "subject"
- msg.correlation_id = str(self.test_id)
- msg.durable = True
- msg.priority = 7
- msg.ttl = 60
- msg.properties = MessageEchoTests.TEST_MAP
- msg.reply_to = "reply-address"
- self.check(msg)
-
- def testContentTypeUnknown(self):
- msg = Message(content_type = "this-content-type-does-not-exist")
- self.check(msg)
-
- def testTextPlain(self):
- self.check(Message(content_type="text/plain", content="asdf"))
-
- def testTextPlainEmpty(self):
- self.check(Message(content_type="text/plain"))
-
- def check_rt(self, addr, expected=None):
- if expected is None:
- expected = addr
- msg = Message(reply_to=addr)
- self.snd.send(msg)
- echo = self.rcv.fetch(0)
- assert echo.reply_to == expected, echo.reply_to
- self.ssn.acknowledge(echo)
-
- def testReplyTo(self):
- self.check_rt("name")
-
- def testReplyToQueue(self):
- self.check_rt("name; {node: {type: queue}}", "name")
-
- def testReplyToQueueSubject(self):
- self.check_rt("name/subject; {node: {type: queue}}", "name")
-
- def testReplyToTopic(self):
- self.check_rt("name; {node: {type: topic}}")
-
- def testReplyToTopicSubject(self):
- self.check_rt("name/subject; {node: {type: topic}}")
-
- def testBooleanEncoding(self):
- msg = Message({"true": True, "false": False})
- self.snd.send(msg)
- echo = self.rcv.fetch(0)
- self.assertEcho(msg, echo)
- t = echo.content["true"]
- f = echo.content["false"]
- assert isinstance(t, bool), t
- assert isinstance(f, bool), f
diff --git a/python/qpid/tests/mimetype.py b/python/qpid/tests/mimetype.py
deleted file mode 100644
index 22760316f0..0000000000
--- a/python/qpid/tests/mimetype.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.tests import Test
-from qpid.mimetype import lex, parse, ParseError, EOF, WSPACE
-from parser import ParserBase
-
-class MimeTypeTests(ParserBase, Test):
-
- EXCLUDE = (WSPACE, EOF)
-
- def do_lex(self, st):
- return lex(st)
-
- def do_parse(self, st):
- return parse(st)
-
- def valid(self, addr, type=None, subtype=None, parameters=None):
- ParserBase.valid(self, addr, (type, subtype, parameters))
-
- def testTypeOnly(self):
- self.invalid("type", "expecting SLASH, got EOF line:1,4:type")
-
- def testTypeSubtype(self):
- self.valid("type/subtype", "type", "subtype", [])
-
- def testTypeSubtypeParam(self):
- self.valid("type/subtype ; name=value",
- "type", "subtype", [("name", "value")])
-
- def testTypeSubtypeParamComment(self):
- self.valid("type/subtype ; name(This is a comment.)=value",
- "type", "subtype", [("name", "value")])
-
- def testMultipleParams(self):
- self.valid("type/subtype ; name1=value1 ; name2=value2",
- "type", "subtype", [("name1", "value1"), ("name2", "value2")])
-
- def testCaseInsensitivity(self):
- self.valid("Type/Subtype", "type", "subtype", [])
diff --git a/python/qpid/tests/parser.py b/python/qpid/tests/parser.py
deleted file mode 100644
index a4865cc9fe..0000000000
--- a/python/qpid/tests/parser.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.parser import ParseError
-
-class ParserBase:
-
- def lex(self, addr, *types):
- toks = [t.type for t in self.do_lex(addr) if t.type not in self.EXCLUDE]
- assert list(types) == toks, "expected %s, got %s" % (types, toks)
-
- def valid(self, addr, expected):
- got = self.do_parse(addr)
- assert expected == got, "expected %s, got %s" % (expected, got)
-
- def invalid(self, addr, error=None):
- try:
- p = self.do_parse(addr)
- assert False, "invalid address parsed: %s" % p
- except ParseError, e:
- assert error == str(e), "expected %r, got %r" % (error, str(e))
diff --git a/python/qpid/tests/queue.py b/python/qpid/tests/queue.py
deleted file mode 100644
index e12354eb43..0000000000
--- a/python/qpid/tests/queue.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import threading, time
-from unittest import TestCase
-from qpid.queue import Queue, Empty, Closed
-
-
-class QueueTest (TestCase):
-
- # The qpid queue class just provides sime simple extensions to
- # python's standard queue data structure, so we don't need to test
- # all the queue functionality.
-
- def test_listen(self):
- values = []
- heard = threading.Event()
- def listener(x):
- values.append(x)
- heard.set()
-
- q = Queue(0)
- q.listen(listener)
- heard.clear()
- q.put(1)
- heard.wait()
- assert values[-1] == 1
- heard.clear()
- q.put(2)
- heard.wait()
- assert values[-1] == 2
-
- q.listen(None)
- q.put(3)
- assert q.get(3) == 3
- q.listen(listener)
-
- heard.clear()
- q.put(4)
- heard.wait()
- assert values[-1] == 4
-
- def test_close(self):
- q = Queue(0)
- q.put(1); q.put(2); q.put(3); q.close()
- assert q.get() == 1
- assert q.get() == 2
- assert q.get() == 3
- for i in range(10):
- try:
- q.get()
- raise AssertionError("expected Closed")
- except Closed:
- pass
diff --git a/python/qpid/tests/spec010.py b/python/qpid/tests/spec010.py
deleted file mode 100644
index ac04e1ee02..0000000000
--- a/python/qpid/tests/spec010.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, tempfile, shutil, stat
-from unittest import TestCase
-from qpid.codec010 import Codec, StringCodec
-from qpid.ops import *
-
-class SpecTest(TestCase):
-
- def testSessionHeader(self):
- sc = StringCodec()
- sc.write_compound(Header(sync=True))
- assert sc.encoded == "\x01\x01"
-
- sc = StringCodec()
- sc.write_compound(Header(sync=False))
- assert sc.encoded == "\x01\x00"
-
- def encdec(self, value):
- sc = StringCodec()
- sc.write_compound(value)
- decoded = sc.read_compound(value.__class__)
- return decoded
-
- def testMessageProperties(self):
- props = MessageProperties(content_length=3735928559L,
- reply_to=ReplyTo(exchange="the exchange name",
- routing_key="the routing key"))
- dec = self.encdec(props)
- assert props.content_length == dec.content_length
- assert props.reply_to.exchange == dec.reply_to.exchange
- assert props.reply_to.routing_key == dec.reply_to.routing_key
-
- def testMessageSubscribe(self):
- cmd = MessageSubscribe(exclusive=True, destination="this is a test")
- dec = self.encdec(cmd)
- assert cmd.exclusive == dec.exclusive
- assert cmd.destination == dec.destination
-
- def testXid(self):
- sc = StringCodec()
- xid = Xid(format=0, global_id="gid", branch_id="bid")
- sc.write_compound(xid)
- assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
- dec = sc.read_compound(Xid)
- assert xid.__dict__ == dec.__dict__
-
-# def testLoadReadOnly(self):
-# spec = "amqp.0-10-qpid-errata.xml"
-# f = testrunner.get_spec_file(spec)
-# dest = tempfile.mkdtemp()
-# shutil.copy(f, dest)
-# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
-# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
-# fname = os.path.join(dest, spec)
-# load(fname)
-# assert not os.path.exists("%s.pcl" % fname)