diff options
Diffstat (limited to 'python/qpid/tests')
-rw-r--r-- | python/qpid/tests/__init__.py | 60 | ||||
-rw-r--r-- | python/qpid/tests/codec.py | 601 | ||||
-rw-r--r-- | python/qpid/tests/codec010.py | 133 | ||||
-rw-r--r-- | python/qpid/tests/connection.py | 227 | ||||
-rw-r--r-- | python/qpid/tests/datatypes.py | 296 | ||||
-rw-r--r-- | python/qpid/tests/framing.py | 289 | ||||
-rw-r--r-- | python/qpid/tests/messaging/__init__.py | 185 | ||||
-rw-r--r-- | python/qpid/tests/messaging/address.py | 321 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 1335 | ||||
-rw-r--r-- | python/qpid/tests/messaging/message.py | 155 | ||||
-rw-r--r-- | python/qpid/tests/mimetype.py | 56 | ||||
-rw-r--r-- | python/qpid/tests/parser.py | 37 | ||||
-rw-r--r-- | python/qpid/tests/queue.py | 71 | ||||
-rw-r--r-- | python/qpid/tests/spec010.py | 74 |
14 files changed, 0 insertions, 3840 deletions
diff --git a/python/qpid/tests/__init__.py b/python/qpid/tests/__init__.py deleted file mode 100644 index 101a0c3759..0000000000 --- a/python/qpid/tests/__init__.py +++ /dev/null @@ -1,60 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -class Test: - - def __init__(self, name): - self.name = name - - def configure(self, config): - self.config = config - -# API Tests -import qpid.tests.framing -import qpid.tests.mimetype -import qpid.tests.messaging - -# Legacy Tests -import qpid.tests.codec -import qpid.tests.queue -import qpid.tests.datatypes -import qpid.tests.connection -import qpid.tests.spec010 -import qpid.tests.codec010 - -class TestTestsXXX(Test): - - def testFoo(self): - print "this test has output" - - def testBar(self): - print "this test "*8 - print "has"*10 - print "a"*75 - print "lot of"*10 - print "output"*10 - - def testQux(self): - import sys - sys.stdout.write("this test has output with no newline") - - def testQuxFail(self): - import sys - sys.stdout.write("this test has output with no newline") - fdsa diff --git a/python/qpid/tests/codec.py b/python/qpid/tests/codec.py deleted file mode 100644 index 8fd0528636..0000000000 --- a/python/qpid/tests/codec.py +++ /dev/null @@ -1,601 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest -from qpid.codec import Codec -from qpid.spec08 import load -from cStringIO import StringIO -from qpid.reference import ReferenceId - -__doc__ = """ - - This is a unit test script for qpid/codec.py - - It can be run standalone or as part of the existing test framework. - - To run standalone: - ------------------- - - Place in the qpid/python/tests/ directory and type... - - python codec.py - - A brief output will be printed on screen. The verbose output will be placed inn a file called - codec_unit_test_output.txt. [TODO: make this filename configurable] - - To run as part of the existing test framework: - ----------------------------------------------- - - python run-tests tests.codec - - Change History: - ----------------- - Jimmy John 05/19/2007 Initial draft - Jimmy John 05/22/2007 Implemented comments by Rafael Schloming - - -""" - -from qpid.specs_config import amqp_spec_0_8 -SPEC = load(amqp_spec_0_8) - -# -------------------------------------- -# -------------------------------------- -class BaseDataTypes(unittest.TestCase): - - - """ - Base class containing common functions - """ - - # --------------- - def setUp(self): - """ - standard setUp for unitetest (refer unittest documentation for details) - """ - self.codec = Codec(StringIO(), SPEC) - - # ------------------ - def tearDown(self): - """ - standard tearDown for unitetest (refer unittest documentation for details) - """ - self.codec.stream.flush() - self.codec.stream.close() - - # ---------------------------------------- - def callFunc(self, functionName, *args): - """ - helper function - given a function name and arguments, calls the function with the args and - returns the contents of the stream - """ - getattr(self.codec, functionName)(args[0]) - return self.codec.stream.getvalue() - - # ---------------------------------------- - def readFunc(self, functionName, *args): - """ - helper function - creates a input stream and then calls the function with arguments as have been - supplied - """ - self.codec.stream = StringIO(args[0]) - return getattr(self.codec, functionName)() - - -# ---------------------------------------- -# ---------------------------------------- -class IntegerTestCase(BaseDataTypes): - - """ - Handles octet, short, long, long long - - """ - - # ------------------------- - def __init__(self, *args): - """ - sets constants for use in tests - """ - - BaseDataTypes.__init__(self, *args) - self.const_integer = 2 - self.const_integer_octet_encoded = '\x02' - self.const_integer_short_encoded = '\x00\x02' - self.const_integer_long_encoded = '\x00\x00\x00\x02' - self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02' - - # -------------------------- # - # Unsigned Octect - 8 bits # - # -------------------------- # - - # -------------------------- - def test_unsigned_octet(self): - """ - ubyte format requires 0<=number<=255 - """ - self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...') - - # ------------------------------------------- - def test_octet_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_octet, 256) - - # ------------------------------------------- - def test_uoctet_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_octet, -1) - - # --------------------------------- - def test_uoctet_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...') - - # ------------------------------------ - def test_unsigned_octet_decode(self): - """ - octet decoding - """ - self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...') - - # ----------------------------------- # - # Unsigned Short Integers - 16 bits # - # ----------------------------------- # - - # ----------------------- - def test_ushort_int(self): - """ - testing unsigned short integer - """ - self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...') - - # ------------------------------------------- - def test_ushort_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_short, 65536) - - # ------------------------------------------- - def test_ushort_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_short, -1) - - # --------------------------------- - def test_ushort_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...') - - # ------------------------------------ - def test_ushort_int_decode(self): - """ - unsigned short decoding - """ - self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...') - - - # ---------------------------------- # - # Unsigned Long Integers - 32 bits # - # ---------------------------------- # - - # ----------------------- - def test_ulong_int(self): - """ - testing unsigned long iteger - """ - self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...') - - # ------------------------------------------- - def test_ulong_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296) - - # ------------------------------------------- - def test_ulong_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_long, -1) - - # --------------------------------- - def test_ulong_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...') - - # ------------------------------- - def test_ulong_int_decode(self): - """ - unsigned long decoding - """ - self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...') - - - # --------------------------------------- # - # Unsigned Long Long Integers - 64 bits # - # --------------------------------------- # - - # ----------------------- - def test_ulong_long_int(self): - """ - testing unsinged long long integer - """ - self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...') - - # ------------------------------------------- - def test_ulong_long_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616) - - # ------------------------------------------- - def test_ulong_long_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_longlong, -1) - - # --------------------------------- - def test_ulong_long_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...') - - # ------------------------------------ - def test_ulong_long_int_decode(self): - """ - unsigned long long decoding - """ - self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...') - -# ----------------------------------- -# ----------------------------------- -class BitTestCase(BaseDataTypes): - - """ - Handles bits - """ - - # ---------------------------------------------- - def callFunc(self, functionName, *args): - """ - helper function - """ - for ele in args: - getattr(self.codec, functionName)(ele) - - self.codec.flush() - return self.codec.stream.getvalue() - - # ------------------- - def test_bit1(self): - """ - sends in 11 - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...') - - # ------------------- - def test_bit2(self): - """ - sends in 10011 - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...') - - # ------------------- - def test_bit3(self): - """ - sends in 1110100111 [10 bits(right to left), should be compressed into two octets] - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...') - - # ------------------------------------ - def test_bit_decode_1(self): - """ - decode bit 1 - """ - self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...') - - # ------------------------------------ - def test_bit_decode_0(self): - """ - decode bit 0 - """ - self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...') - -# ----------------------------------- -# ----------------------------------- -class StringTestCase(BaseDataTypes): - - """ - Handles short strings, long strings - """ - - # ------------------------------------------------------------- # - # Short Strings - 8 bit length followed by zero or more octets # - # ------------------------------------------------------------- # - - # --------------------------------------- - def test_short_string_zero_length(self): - """ - 0 length short string - """ - self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...') - - # ------------------------------------------- - def test_short_string_positive_length(self): - """ - positive length short string - """ - self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...') - - # ------------------------------------------- - def test_short_string_out_of_upper_range(self): - """ - string length > 255 - """ - self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256) - - # ------------------------------------ - def test_short_string_decode(self): - """ - short string decode - """ - self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...') - - - # ------------------------------------------------------------- # - # Long Strings - 32 bit length followed by zero or more octets # - # ------------------------------------------------------------- # - - # --------------------------------------- - def test_long_string_zero_length(self): - """ - 0 length long string - """ - self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...') - - # ------------------------------------------- - def test_long_string_positive_length(self): - """ - positive length long string - """ - self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...') - - # ------------------------------------ - def test_long_string_decode(self): - """ - long string decode - """ - self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...') - - -# -------------------------------------- -# -------------------------------------- -class TimestampTestCase(BaseDataTypes): - - """ - No need of any test cases here as timestamps are implemented as long long which is tested above - """ - pass - -# --------------------------------------- -# --------------------------------------- -class FieldTableTestCase(BaseDataTypes): - - """ - Handles Field Tables - - Only S/I type messages seem to be implemented currently - """ - - # ------------------------- - def __init__(self, *args): - """ - sets constants for use in tests - """ - - BaseDataTypes.__init__(self, *args) - self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'} - self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1' - - # ------------------------------------------- - def test_field_table_name_value_pair(self): - """ - valid name value pair - """ - self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...') - - # --------------------------------------------------- - def test_field_table_multiple_name_value_pair(self): - """ - multiple name value pair - """ - self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...') - - # ------------------------------------ - def test_field_table_decode(self): - """ - field table decode - """ - self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...') - - -# ------------------------------------ -# ------------------------------------ -class ContentTestCase(BaseDataTypes): - - """ - Handles Content data types - """ - - # ----------------------------- - def test_content_inline(self): - """ - inline content - """ - self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...') - - # -------------------------------- - def test_content_reference(self): - """ - reference content - """ - self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...') - - # ------------------------------------ - def test_content_inline_decode(self): - """ - inline content decode - """ - self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...') - - # ------------------------------------ - def test_content_reference_decode(self): - """ - reference content decode - """ - self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...') - -# ------------------------ # -# Pre - existing test code # -# ------------------------ # - -# --------------------- -def test(type, value): - """ - old test function cut/copy/paste from qpid/codec.py - """ - if isinstance(value, (list, tuple)): - values = value - else: - values = [value] - stream = StringIO() - codec = Codec(stream, SPEC) - for v in values: - codec.encode(type, v) - codec.flush() - enc = stream.getvalue() - stream.reset() - dup = [] - for i in xrange(len(values)): - dup.append(codec.decode(type)) - if values != dup: - raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) - -# ----------------------- -def dotest(type, value): - """ - old test function cut/copy/paste from qpid/codec.py - """ - args = (type, value) - test(*args) - -# ------------- -def oldtests(): - """ - old test function cut/copy/paste from qpid/codec.py - """ - for value in ("1", "0", "110", "011", "11001", "10101", "10011"): - for i in range(10): - dotest("bit", map(lambda x: x == "1", value*i)) - - for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): - dotest("table", value) - - for type in ("octet", "short", "long", "longlong"): - for value in range(0, 256): - dotest(type, value) - - for type in ("shortstr", "longstr"): - for value in ("", "a", "asdf"): - dotest(type, value) - -# ----------------------------------------- -class oldTests(unittest.TestCase): - - """ - class to handle pre-existing test cases - """ - - # --------------------------- - def test_oldtestcases(self): - """ - call the old tests - """ - return oldtests() - -# --------------------------- -# --------------------------- -if __name__ == '__main__': - - codec_test_suite = unittest.TestSuite() - - #adding all the test suites... - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase)) - - #loading pre-existing test case from qpid/codec.py - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests)) - - run_output_stream = StringIO() - test_runner = unittest.TextTestRunner(run_output_stream, '', '') - test_result = test_runner.run(codec_test_suite) - - print '\n%d test run...' % (test_result.testsRun) - - if test_result.wasSuccessful(): - print '\nAll tests successful\n' - - if test_result.failures: - print '\n----------' - print '%d FAILURES:' % (len(test_result.failures)) - print '----------\n' - for failure in test_result.failures: - print str(failure[0]) + ' ... FAIL' - - if test_result.errors: - print '\n---------' - print '%d ERRORS:' % (len(test_result.errors)) - print '---------\n' - - for error in test_result.errors: - print str(error[0]) + ' ... ERROR' - - f = open('codec_unit_test_output.txt', 'w') - f.write(str(run_output_stream.getvalue())) - f.close() diff --git a/python/qpid/tests/codec010.py b/python/qpid/tests/codec010.py deleted file mode 100644 index 787ebc146f..0000000000 --- a/python/qpid/tests/codec010.py +++ /dev/null @@ -1,133 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time - -from unittest import TestCase -from qpid.codec010 import StringCodec -from qpid.datatypes import timestamp, uuid4 -from qpid.ops import PRIMITIVE - -class CodecTest(TestCase): - - def check(self, type, value, compare=True): - t = PRIMITIVE[type] - sc = StringCodec() - sc.write_primitive(t, value) - decoded = sc.read_primitive(t) - if compare: - assert decoded == value, "%s, %s" % (decoded, value) - return decoded - - def testMapString(self): - self.check("map", {"string": "this is a test"}) - - def testMapUnicode(self): - self.check("map", {"unicode": u"this is a unicode test"}) - - def testMapBinary(self): - self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"}) - - def testMapBuffer(self): - s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5" - dec = self.check("map", {"buffer": buffer(s)}, False) - assert dec["buffer"] == s - - def testMapInt(self): - self.check("map", {"int": 3}) - - def testMapLong(self): - self.check("map", {"long": 2**32}) - self.check("map", {"long": 1 << 34}) - self.check("map", {"long": -(1 << 34)}) - - def testMapTimestamp(self): - decoded = self.check("map", {"timestamp": timestamp(0)}) - assert isinstance(decoded["timestamp"], timestamp) - - def testMapDatetime(self): - decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False) - assert isinstance(decoded["datetime"], timestamp) - assert decoded["datetime"] == 0.0 - - def testMapNone(self): - self.check("map", {"none": None}) - - def testMapNested(self): - self.check("map", {"map": {"string": "nested test"}}) - - def testMapList(self): - self.check("map", {"list": [1, "two", 3.0, -4]}) - - def testMapUUID(self): - self.check("map", {"uuid": uuid4()}) - - def testMapAll(self): - decoded = self.check("map", {"string": "this is a test", - "unicode": u"this is a unicode test", - "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5", - "int": 3, - "long": 2**32, - "timestamp": timestamp(0), - "none": None, - "map": {"string": "nested map"}, - "list": [1, "two", 3.0, -4], - "uuid": uuid4()}) - assert isinstance(decoded["timestamp"], timestamp) - - def testMapEmpty(self): - self.check("map", {}) - - def testMapNone(self): - self.check("map", None) - - def testList(self): - self.check("list", [1, "two", 3.0, -4]) - - def testListEmpty(self): - self.check("list", []) - - def testListNone(self): - self.check("list", None) - - def testArrayInt(self): - self.check("array", [1, 2, 3, 4]) - - def testArrayString(self): - self.check("array", ["one", "two", "three", "four"]) - - def testArrayEmpty(self): - self.check("array", []) - - def testArrayNone(self): - self.check("array", None) - - def testInt16(self): - self.check("int16", 3) - self.check("int16", -3) - - def testInt64(self): - self.check("int64", 3) - self.check("int64", -3) - self.check("int64", 1<<34) - self.check("int64", -(1<<34)) - - def testDatetime(self): - self.check("datetime", timestamp(0)) - self.check("datetime", timestamp(long(time.time()))) diff --git a/python/qpid/tests/connection.py b/python/qpid/tests/connection.py deleted file mode 100644 index 6847285f69..0000000000 --- a/python/qpid/tests/connection.py +++ /dev/null @@ -1,227 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time -from threading import * -from unittest import TestCase -from qpid.util import connect, listen -from qpid.connection import * -from qpid.datatypes import Message -from qpid.delegates import Server -from qpid.queue import Queue -from qpid.session import Delegate -from qpid.ops import QueueQueryResult - -PORT = 1234 - -class TestServer: - - def __init__(self, queue): - self.queue = queue - - def connection(self, connection): - return Server(connection, delegate=self.session) - - def session(self, session): - session.auto_sync = False - return TestSession(session, self.queue) - -class TestSession(Delegate): - - def __init__(self, session, queue): - self.session = session - self.queue = queue - - def execution_sync(self, es): - pass - - def queue_query(self, qq): - return QueueQueryResult(qq.queue) - - def message_transfer(self, cmd): - if cmd.destination == "echo": - m = Message(cmd.payload) - m.headers = cmd.headers - self.session.message_transfer(cmd.destination, cmd.accept_mode, - cmd.acquire_mode, m) - elif cmd.destination == "abort": - self.session.channel.connection.sock.close() - elif cmd.destination == "heartbeat": - self.session.channel.connection_heartbeat() - else: - self.queue.put(cmd) - -class ConnectionTest(TestCase): - - def setUp(self): - self.queue = Queue() - self.running = True - started = Event() - - def run(): - ts = TestServer(self.queue) - for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): - conn = Connection(s, delegate=ts.connection) - try: - conn.start(5) - except Closed: - pass - - self.server = Thread(target=run) - self.server.setDaemon(True) - self.server.start() - - started.wait(3) - assert started.isSet() - - def tearDown(self): - self.running = False - connect("127.0.0.1", PORT).close() - self.server.join(3) - - def connect(self, **kwargs): - return Connection(connect("127.0.0.1", PORT), **kwargs) - - def test(self): - c = self.connect() - c.start(10) - - ssn1 = c.session("test1", timeout=10) - ssn2 = c.session("test2", timeout=10) - - assert ssn1 == c.sessions["test1"] - assert ssn2 == c.sessions["test2"] - assert ssn1.channel != None - assert ssn2.channel != None - assert ssn1 in c.attached.values() - assert ssn2 in c.attached.values() - - ssn1.close(5) - - assert ssn1.channel == None - assert ssn1 not in c.attached.values() - assert ssn2 in c.sessions.values() - - ssn2.close(5) - - assert ssn2.channel == None - assert ssn2 not in c.attached.values() - assert ssn2 not in c.sessions.values() - - ssn = c.session("session", timeout=10) - - assert ssn.channel != None - assert ssn in c.sessions.values() - - destinations = ("one", "two", "three") - - for d in destinations: - ssn.message_transfer(d) - - for d in destinations: - cmd = self.queue.get(10) - assert cmd.destination == d - assert cmd.headers == None - assert cmd.payload == None - - msg = Message("this is a test") - ssn.message_transfer("four", message=msg) - cmd = self.queue.get(10) - assert cmd.destination == "four" - assert cmd.headers == None - assert cmd.payload == msg.body - - qq = ssn.queue_query("asdf") - assert qq.queue == "asdf" - c.close(5) - - def testCloseGet(self): - c = self.connect() - c.start(10) - ssn = c.session("test", timeout=10) - echos = ssn.incoming("echo") - - for i in range(10): - ssn.message_transfer("echo", message=Message("test%d" % i)) - - ssn.auto_sync=False - ssn.message_transfer("abort") - - for i in range(10): - m = echos.get(timeout=10) - assert m.body == "test%d" % i - - try: - m = echos.get(timeout=10) - assert False - except Closed, e: - pass - - def testCloseListen(self): - c = self.connect() - c.start(10) - ssn = c.session("test", timeout=10) - echos = ssn.incoming("echo") - - messages = [] - exceptions = [] - condition = Condition() - def listener(m): messages.append(m) - def exc_listener(e): - condition.acquire() - exceptions.append(e) - condition.notify() - condition.release() - - echos.listen(listener, exc_listener) - - for i in range(10): - ssn.message_transfer("echo", message=Message("test%d" % i)) - - ssn.auto_sync=False - ssn.message_transfer("abort") - - condition.acquire() - start = time.time() - elapsed = 0 - while not exceptions and elapsed < 10: - condition.wait(10 - elapsed) - elapsed = time.time() - start - condition.release() - - for i in range(10): - m = messages.pop(0) - assert m.body == "test%d" % i - - assert len(exceptions) == 1 - - def testSync(self): - c = self.connect() - c.start(10) - s = c.session("test") - s.auto_sync = False - s.message_transfer("echo", message=Message("test")) - s.sync(10) - - def testHeartbeat(self): - c = self.connect(heartbeat=10) - c.start(10) - s = c.session("test") - s.channel.connection_heartbeat() - s.message_transfer("heartbeat") diff --git a/python/qpid/tests/datatypes.py b/python/qpid/tests/datatypes.py deleted file mode 100644 index 00e649d6cf..0000000000 --- a/python/qpid/tests/datatypes.py +++ /dev/null @@ -1,296 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from unittest import TestCase -from qpid.datatypes import * -from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties - -class SerialTest(TestCase): - - def test(self): - for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)): - assert s + 1 > s - assert s - 1 < s - assert s < s + 1 - assert s > s - 1 - - assert serial(0xFFFFFFFFL) + 1 == serial(0) - - assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL) - assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0) - - def testIncr(self): - s = serial(0) - s += 1 - assert s == serial(1) - - def testIn(self): - l = [serial(1), serial(2), serial(3), serial(4)] - assert serial(1) in l - assert serial(0xFFFFFFFFL + 2) in l - assert 4 in l - - def testNone(self): - assert serial(0) != None - - def testHash(self): - d = {} - d[serial(0)] = "zero" - assert d[0] == "zero" - - def testAdd(self): - assert serial(2) + 2 == serial(4) - assert serial(2) + 2 == 4 - - def testSub(self): - delta = serial(4) - serial(2) - assert isinstance(delta, int) or isinstance(delta, long) - assert delta == 2 - - delta = serial(4) - 2 - assert isinstance(delta, Serial) - assert delta == serial(2) - -class RangedSetTest(TestCase): - - def check(self, ranges): - posts = [] - for range in ranges: - posts.append(range.lower) - posts.append(range.upper) - - sorted = posts[:] - sorted.sort() - - assert posts == sorted - - idx = 1 - while idx + 1 < len(posts): - assert posts[idx] + 1 != posts[idx+1] - idx += 2 - - def test(self): - rs = RangedSet() - - self.check(rs.ranges) - - rs.add(1) - - assert 1 in rs - assert 2 not in rs - assert 0 not in rs - self.check(rs.ranges) - - rs.add(2) - - assert 0 not in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - self.check(rs.ranges) - - rs.add(0) - - assert -1 not in rs - assert 0 in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - self.check(rs.ranges) - - rs.add(37) - - assert -1 not in rs - assert 0 in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - assert 36 not in rs - assert 37 in rs - assert 38 not in rs - self.check(rs.ranges) - - rs.add(-1) - self.check(rs.ranges) - - rs.add(-3) - self.check(rs.ranges) - - rs.add(1, 20) - assert 21 not in rs - assert 20 in rs - self.check(rs.ranges) - - def testAddSelf(self): - a = RangedSet() - a.add(0, 8) - self.check(a.ranges) - a.add(0, 8) - self.check(a.ranges) - assert len(a.ranges) == 1 - range = a.ranges[0] - assert range.lower == 0 - assert range.upper == 8 - - def testEmpty(self): - s = RangedSet() - assert s.empty() - s.add(0, -1) - assert s.empty() - s.add(0, 0) - assert not s.empty() - - def testMinMax(self): - s = RangedSet() - assert s.max() is None - assert s.min() is None - s.add(0, 10) - assert s.max() == 10 - assert s.min() == 0 - s.add(0, 5) - assert s.max() == 10 - assert s.min() == 0 - s.add(0, 11) - assert s.max() == 11 - assert s.min() == 0 - s.add(15, 20) - assert s.max() == 20 - assert s.min() == 0 - s.add(-10, -5) - assert s.max() == 20 - assert s.min() == -10 - -class RangeTest(TestCase): - - def testIntersect1(self): - a = Range(0, 10) - b = Range(9, 20) - i1 = a.intersect(b) - i2 = b.intersect(a) - assert i1.upper == 10 - assert i2.upper == 10 - assert i1.lower == 9 - assert i2.lower == 9 - - def testIntersect2(self): - a = Range(0, 10) - b = Range(11, 20) - assert a.intersect(b) == None - assert b.intersect(a) == None - - def testIntersect3(self): - a = Range(0, 10) - b = Range(3, 5) - i1 = a.intersect(b) - i2 = b.intersect(a) - assert i1.upper == 5 - assert i2.upper == 5 - assert i1.lower == 3 - assert i2.lower == 3 - -class UUIDTest(TestCase): - - def test(self): - # this test is kind of lame, but it does excercise the basic - # functionality of the class - u = uuid4() - for i in xrange(1024): - assert u != uuid4() - -class MessageTest(TestCase): - - def setUp(self): - self.mp = MessageProperties() - self.dp = DeliveryProperties() - self.fp = FragmentProperties() - - def testHas(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.has("message_properties") - assert m.has("delivery_properties") - assert m.has("fragment_properties") - - def testGet(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - assert m.get("fragment_properties") == self.fp - - def testSet(self): - m = Message(self.mp, self.dp, "body") - assert m.get("fragment_properties") is None - m.set(self.fp) - assert m.get("fragment_properties") == self.fp - - def testSetOnEmpty(self): - m = Message("body") - assert m.get("delivery_properties") is None - m.set(self.dp) - assert m.get("delivery_properties") == self.dp - - def testSetReplace(self): - m = Message(self.mp, self.dp, self.fp, "body") - dp = DeliveryProperties() - assert m.get("delivery_properties") == self.dp - assert m.get("delivery_properties") != dp - m.set(dp) - assert m.get("delivery_properties") != self.dp - assert m.get("delivery_properties") == dp - - def testClear(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - assert m.get("fragment_properties") == self.fp - m.clear("fragment_properties") - assert m.get("fragment_properties") is None - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - -class TimestampTest(TestCase): - - def check(self, expected, *values): - for v in values: - assert isinstance(v, timestamp) - assert v == expected - assert v == timestamp(expected) - - def testAdd(self): - self.check(4.0, - timestamp(2.0) + 2.0, - 2.0 + timestamp(2.0)) - - def testSub(self): - self.check(2.0, - timestamp(4.0) - 2.0, - 4.0 - timestamp(2.0)) - - def testNeg(self): - self.check(-4.0, -timestamp(4.0)) - - def testPos(self): - self.check(+4.0, +timestamp(4.0)) - - def testAbs(self): - self.check(4.0, abs(timestamp(-4.0))) - - def testConversion(self): - dt = timestamp(0).datetime() - t = timestamp(dt) - assert t == 0 diff --git a/python/qpid/tests/framing.py b/python/qpid/tests/framing.py deleted file mode 100644 index 0b33df8b9a..0000000000 --- a/python/qpid/tests/framing.py +++ /dev/null @@ -1,289 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# setup, usage, teardown, errors(sync), errors(async), stress, soak, -# boundary-conditions, config - -from qpid.tests import Test -from qpid.framing import * - -class Base(Test): - - def cmp_frames(self, frm1, frm2): - assert frm1.flags == frm2.flags, "expected: %r, got %r" % (frm1, frm2) - assert frm1.type == frm2.type, "expected: %r, got %r" % (frm1, frm2) - assert frm1.track == frm2.track, "expected: %r, got %r" % (frm1, frm2) - assert frm1.channel == frm2.channel, "expected: %r, got %r" % (frm1, frm2) - assert frm1.payload == frm2.payload, "expected: %r, got %r" % (frm1, frm2) - - def cmp_segments(self, seg1, seg2): - assert seg1.first == seg2.first, "expected: %r, got %r" % (seg1, seg2) - assert seg1.last == seg2.last, "expected: %r, got %r" % (seg1, seg2) - assert seg1.type == seg2.type, "expected: %r, got %r" % (seg1, seg2) - assert seg1.track == seg2.track, "expected: %r, got %r" % (seg1, seg2) - assert seg1.channel == seg2.channel, "expected: %r, got %r" % (seg1, seg2) - assert seg1.payload == seg2.payload, "expected: %r, got %r" % (seg1, seg2) - - def cmp_list(self, l1, l2): - if l1 is None: - assert l2 is None - return - - assert len(l1) == len(l2) - for v1, v2 in zip(l1, l2): - if isinstance(v1, Compound): - self.cmp_ops(v1, v2) - else: - assert v1 == v2 - - def cmp_ops(self, op1, op2): - if op1 is None: - assert op2 is None - return - - assert op1.__class__ == op2.__class__ - cls = op1.__class__ - assert op1.NAME == op2.NAME - assert op1.CODE == op2.CODE - assert op1.FIELDS == op2.FIELDS - for f in cls.FIELDS: - v1 = getattr(op1, f.name) - v2 = getattr(op2, f.name) - if COMPOUND.has_key(f.type) or f.type == "struct32": - self.cmp_ops(v1, v2) - elif f.type in ("list", "array"): - self.cmp_list(v1, v2) - else: - assert v1 == v2, "expected: %r, got %r" % (v1, v2) - - if issubclass(cls, Command) or issubclass(cls, Control): - assert op1.channel == op2.channel - - if issubclass(cls, Command): - assert op1.sync == op2.sync, "expected: %r, got %r" % (op1.sync, op2.sync) - assert (op1.headers is None and op2.headers is None) or \ - (op1.headers is not None and op2.headers is not None) - if op1.headers is not None: - assert len(op1.headers) == len(op2.headers) - for h1, h2 in zip(op1.headers, op2.headers): - self.cmp_ops(h1, h2) - -class FrameTest(Base): - - def enc_dec(self, frames, encoded=None): - enc = FrameEncoder() - dec = FrameDecoder() - - enc.write(*frames) - bytes = enc.read() - if encoded is not None: - assert bytes == encoded, "expected %r, got %r" % (encoded, bytes) - dec.write(bytes) - dframes = dec.read() - - assert len(frames) == len(dframes) - for f, df, in zip(frames, dframes): - self.cmp_frames(f, df) - - def testEmpty(self): - self.enc_dec([Frame(0, 0, 0, 0, "")], - "\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x00") - - def testSingle(self): - self.enc_dec([Frame(0, 0, 0, 1, "payload")], - "\x00\x00\x00\x13\x00\x00\x00\x01\x00\x00\x00\x00payload") - - def testMaxChannel(self): - self.enc_dec([Frame(0, 0, 0, 65535, "max-channel")], - "\x00\x00\x00\x17\x00\x00\xff\xff\x00\x00\x00\x00max-channel") - - def testMaxType(self): - self.enc_dec([Frame(0, 255, 0, 0, "max-type")], - "\x00\xff\x00\x14\x00\x00\x00\x00\x00\x00\x00\x00max-type") - - def testMaxTrack(self): - self.enc_dec([Frame(0, 0, 15, 0, "max-track")], - "\x00\x00\x00\x15\x00\x0f\x00\x00\x00\x00\x00\x00max-track") - - def testSequence(self): - self.enc_dec([Frame(0, 0, 0, 0, "zero"), - Frame(0, 0, 0, 1, "one"), - Frame(0, 0, 1, 0, "two"), - Frame(0, 0, 1, 1, "three"), - Frame(0, 1, 0, 0, "four"), - Frame(0, 1, 0, 1, "five"), - Frame(0, 1, 1, 0, "six"), - Frame(0, 1, 1, 1, "seven"), - Frame(1, 0, 0, 0, "eight"), - Frame(1, 0, 0, 1, "nine"), - Frame(1, 0, 1, 0, "ten"), - Frame(1, 0, 1, 1, "eleven"), - Frame(1, 1, 0, 0, "twelve"), - Frame(1, 1, 0, 1, "thirteen"), - Frame(1, 1, 1, 0, "fourteen"), - Frame(1, 1, 1, 1, "fifteen")]) - -class SegmentTest(Base): - - def enc_dec(self, segments, frames=None, interleave=None, max_payload=Frame.MAX_PAYLOAD): - enc = SegmentEncoder(max_payload) - dec = SegmentDecoder() - - enc.write(*segments) - frms = enc.read() - if frames is not None: - assert len(frames) == len(frms), "expected %s, got %s" % (frames, frms) - for f1, f2 in zip(frames, frms): - self.cmp_frames(f1, f2) - if interleave is not None: - ilvd = [] - for f in frms: - ilvd.append(f) - if interleave: - ilvd.append(interleave.pop(0)) - ilvd.extend(interleave) - dec.write(*ilvd) - else: - dec.write(*frms) - segs = dec.read() - assert len(segments) == len(segs) - for s1, s2 in zip(segments, segs): - self.cmp_segments(s1, s2) - - def testEmpty(self): - self.enc_dec([Segment(True, True, 0, 0, 0, "")], - [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0, - "")]) - - def testSingle(self): - self.enc_dec([Segment(True, True, 0, 0, 0, "payload")], - [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG | LAST_SEG, 0, 0, 0, - "payload")]) - - def testMaxChannel(self): - self.enc_dec([Segment(False, False, 0, 0, 65535, "max-channel")], - [Frame(FIRST_FRM | LAST_FRM, 0, 0, 65535, "max-channel")]) - - def testMaxType(self): - self.enc_dec([Segment(False, False, 255, 0, 0, "max-type")], - [Frame(FIRST_FRM | LAST_FRM, 255, 0, 0, "max-type")]) - - def testMaxTrack(self): - self.enc_dec([Segment(False, False, 0, 15, 0, "max-track")], - [Frame(FIRST_FRM | LAST_FRM, 0, 15, 0, "max-track")]) - - def testSequence(self): - self.enc_dec([Segment(True, False, 0, 0, 0, "one"), - Segment(False, False, 0, 0, 0, "two"), - Segment(False, True, 0, 0, 0, "three")], - [Frame(FIRST_FRM | LAST_FRM | FIRST_SEG, 0, 0, 0, "one"), - Frame(FIRST_FRM | LAST_FRM, 0, 0, 0, "two"), - Frame(FIRST_FRM | LAST_FRM | LAST_SEG, 0, 0, 0, "three")]) - - def testInterleaveChannel(self): - frames = [Frame(0, 0, 0, 0, chr(ord("a") + i)) for i in range(7)] - frames[0].flags |= FIRST_FRM - frames[-1].flags |= LAST_FRM - - ilvd = [Frame(0, 0, 0, 1, chr(ord("a") + i)) for i in range(7)] - - self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefg")], frames, ilvd, max_payload=1) - - def testInterleaveTrack(self): - frames = [Frame(0, 0, 0, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1)) - for i in range(0, 8, 2)] - frames[0].flags |= FIRST_FRM - frames[-1].flags |= LAST_FRM - - ilvd = [Frame(0, 0, 1, 0, "%c%c" % (ord("a") + i, ord("a") + i + 1)) - for i in range(0, 8, 2)] - - self.enc_dec([Segment(False, False, 0, 0, 0, "abcdefgh")], frames, ilvd, max_payload=2) - -from qpid.ops import * - -class OpTest(Base): - - def enc_dec(self, ops): - enc = OpEncoder() - dec = OpDecoder() - enc.write(*ops) - segs = enc.read() - dec.write(*segs) - dops = dec.read() - assert len(ops) == len(dops) - for op1, op2 in zip(ops, dops): - self.cmp_ops(op1, op2) - - def testEmtpyMT(self): - self.enc_dec([MessageTransfer()]) - - def testEmptyMTSync(self): - self.enc_dec([MessageTransfer(sync=True)]) - - def testMT(self): - self.enc_dec([MessageTransfer(destination="asdf")]) - - def testSyncMT(self): - self.enc_dec([MessageTransfer(destination="asdf", sync=True)]) - - def testEmptyPayloadMT(self): - self.enc_dec([MessageTransfer(payload="")]) - - def testPayloadMT(self): - self.enc_dec([MessageTransfer(payload="test payload")]) - - def testHeadersEmptyPayloadMT(self): - self.enc_dec([MessageTransfer(headers=[DeliveryProperties()])]) - - def testHeadersPayloadMT(self): - self.enc_dec([MessageTransfer(headers=[DeliveryProperties()], payload="test payload")]) - - def testMultiHeadersEmptyPayloadMT(self): - self.enc_dec([MessageTransfer(headers=[DeliveryProperties(), MessageProperties()])]) - - def testMultiHeadersPayloadMT(self): - self.enc_dec([MessageTransfer(headers=[MessageProperties(), DeliveryProperties()], payload="test payload")]) - - def testContentTypeHeadersPayloadMT(self): - self.enc_dec([MessageTransfer(headers=[MessageProperties(content_type="text/plain")], payload="test payload")]) - - def testMulti(self): - self.enc_dec([MessageTransfer(), - MessageTransfer(sync=True), - MessageTransfer(destination="one"), - MessageTransfer(destination="two", sync=True), - MessageTransfer(destination="three", payload="test payload")]) - - def testControl(self): - self.enc_dec([SessionAttach(name="asdf")]) - - def testMixed(self): - self.enc_dec([SessionAttach(name="fdsa"), MessageTransfer(destination="test")]) - - def testChannel(self): - self.enc_dec([SessionAttach(name="asdf", channel=3), MessageTransfer(destination="test", channel=1)]) - - def testCompound(self): - self.enc_dec([MessageTransfer(headers=[MessageProperties(reply_to=ReplyTo(exchange="exch", routing_key="rk"))])]) - - def testListCompound(self): - self.enc_dec([ExecutionResult(value=RecoverResult(in_doubt=[Xid(global_id="one"), - Xid(global_id="two"), - Xid(global_id="three")]))]) diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py deleted file mode 100644 index 8f6680d5e3..0000000000 --- a/python/qpid/tests/messaging/__init__.py +++ /dev/null @@ -1,185 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time -from math import ceil -from qpid.harness import Skipped -from qpid.messaging import * -from qpid.tests import Test - -class Base(Test): - - def setup_connection(self): - return None - - def setup_session(self): - return None - - def setup_sender(self): - return None - - def setup_receiver(self): - return None - - def setup(self): - self.test_id = uuid4() - self.broker = self.config.broker - try: - self.conn = self.setup_connection() - except ConnectError, e: - raise Skipped(e) - self.ssn = self.setup_session() - self.snd = self.setup_sender() - if self.snd is not None: - self.snd.durable = self.durable() - self.rcv = self.setup_receiver() - - def teardown(self): - if self.conn is not None and self.conn.attached(): - self.teardown_connection(self.conn) - self.conn = None - - def teardown_connection(self, conn): - conn.close(timeout=self.timeout()) - - def content(self, base, count = None): - if count is None: - return "%s[%s]" % (base, self.test_id) - else: - return "%s[%s, %s]" % (base, count, self.test_id) - - def message(self, base, count = None, **kwargs): - return Message(content=self.content(base, count), **kwargs) - - def ping(self, ssn): - PING_Q = 'ping-queue; {create: always, delete: always}' - # send a message - sender = ssn.sender(PING_Q, durable=self.durable()) - content = self.content("ping") - sender.send(content) - receiver = ssn.receiver(PING_Q) - msg = receiver.fetch(0) - ssn.acknowledge() - assert msg.content == content, "expected %r, got %r" % (content, msg.content) - - def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False): - messages = [] - try: - while limit is None or len(messages) < limit: - messages.append(rcv.fetch(timeout=timeout)) - except Empty: - pass - if expected is not None: - self.assertEchos(expected, messages, redelivered) - return messages - - def diff(self, m1, m2, excluded_properties=()): - result = {} - for attr in ("id", "subject", "user_id", "reply_to", - "correlation_id", "durable", "priority", "ttl", - "redelivered", "content_type", "content"): - a1 = getattr(m1, attr) - a2 = getattr(m2, attr) - if a1 != a2: - result[attr] = (a1, a2) - p1 = dict(m1.properties) - p2 = dict(m2.properties) - for ep in excluded_properties: - p1.pop(ep, None) - p2.pop(ep, None) - if p1 != p2: - result["properties"] = (p1, p2) - return result - - def assertEcho(self, msg, echo, redelivered=False): - if not isinstance(msg, Message) or not isinstance(echo, Message): - if isinstance(msg, Message): - msg = msg.content - if isinstance(echo, Message): - echo = echo.content - assert msg == echo, "expected %s, got %s" % (msg, echo) - else: - delta = self.diff(msg, echo, ("x-amqp-0-10.routing-key",)) - mttl, ettl = delta.pop("ttl", (0, 0)) - if redelivered: - assert echo.redelivered, \ - "expected %s to be redelivered: %s" % (msg, echo) - if delta.has_key("redelivered"): - del delta["redelivered"] - assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl) - assert mttl >= ettl, "%s, %s" % (mttl, ettl) - assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta) - - def assertEchos(self, msgs, echoes, redelivered=False): - assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes) - for m, e in zip(msgs, echoes): - self.assertEcho(m, e, redelivered) - - def assertEmpty(self, rcv): - contents = self.drain(rcv) - assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) - - def assertAvailable(self, rcv, expected=None, lower=None, upper=None): - if expected is not None: - if lower is not None or upper is not None: - raise ValueError("cannot specify lower or upper when specifying expected") - lower = expected - upper = expected - else: - if lower is None: - lower = int(ceil(rcv.threshold*rcv.capacity)) - if upper is None: - upper = rcv.capacity - - p = rcv.available() - if upper == lower: - assert p == lower, "expected %s, got %s" % (lower, p) - else: - assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper) - - def sleep(self): - time.sleep(self.delay()) - - def delay(self): - return float(self.config.defines.get("delay", "2")) - - def timeout(self): - return float(self.config.defines.get("timeout", "60")) - - def get_bool(self, name): - return self.config.defines.get(name, "false").lower() in ("true", "yes", "1") - - def durable(self): - return self.get_bool("durable") - - def reconnect(self): - return self.get_bool("reconnect") - - - def transport(self): - if self.broker.scheme == self.broker.AMQPS: - return "ssl" - else: - return "tcp" - - def connection_options(self): - return {"reconnect": self.reconnect(), - "transport": self.transport()} - -import address, endpoints, message diff --git a/python/qpid/tests/messaging/address.py b/python/qpid/tests/messaging/address.py deleted file mode 100644 index aa9562a717..0000000000 --- a/python/qpid/tests/messaging/address.py +++ /dev/null @@ -1,321 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -from qpid.tests import Test -from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, \ - SYM, WSPACE, LEXER -from qpid.lexer import Token -from qpid.harness import Skipped -from qpid.tests.parser import ParserBase - -def indent(st): - return " " + st.replace("\n", "\n ") - -def pprint_address(name, subject, options): - return "NAME: %s\nSUBJECT: %s\nOPTIONS: %s" % \ - (pprint(name), pprint(subject), pprint(options)) - -def pprint(o): - if isinstance(o, dict): - return pprint_map(o) - elif isinstance(o, list): - return pprint_list(o) - elif isinstance(o, basestring): - return pprint_string(o) - else: - return repr(o) - -def pprint_map(m): - items = ["%s: %s" % (pprint(k), pprint(v)) for k, v in m.items()] - items.sort() - return pprint_items("{", items, "}") - -def pprint_list(l): - return pprint_items("[", [pprint(x) for x in l], "]") - -def pprint_items(start, items, end): - if items: - return "%s\n%s\n%s" % (start, ",\n".join([indent(i) for i in items]), end) - else: - return "%s%s" % (start, end) - -def pprint_string(s): - result = "'" - for c in s: - if c == "'": - result += "\\'" - elif c == "\n": - result += "\\n" - elif ord(c) >= 0x80: - result += "\\u%04x" % ord(c) - else: - result += c - result += "'" - return result - -class AddressTests(ParserBase, Test): - - EXCLUDE = (WSPACE, EOF) - - def fields(self, line, n): - result = line.split(":", n - 1) - result.extend([None]*(n - len(result))) - return result - - def call(self, parser, mode, input): - try: - from subprocess import Popen, PIPE, STDOUT - po = Popen([parser, mode], stdin=PIPE, stdout=PIPE, stderr=STDOUT) - except ImportError, e: - raise Skipped("%s" % e) - except OSError, e: - raise Skipped("%s: %s" % (e, parser)) - out, _ = po.communicate(input=input) - return out - - def parser(self): - return self.config.defines.get("address.parser") - - def do_lex(self, st): - parser = self.parser() - if parser: - out = self.call(parser, "lex", st) - lines = out.split("\n") - toks = [] - for line in lines: - if line.strip(): - name, position, value = self.fields(line, 3) - toks.append(Token(LEXER.type(name), value, position, st)) - return toks - else: - return lex(st) - - def do_parse(self, st): - return parse(st) - - def valid(self, addr, name=None, subject=None, options=None): - parser = self.parser() - if parser: - got = self.call(parser, "parse", addr) - expected = "%s\n" % pprint_address(name, subject, options) - assert expected == got, "expected\n<EXP>%s</EXP>\ngot\n<GOT>%s</GOT>" % (expected, got) - else: - ParserBase.valid(self, addr, (name, subject, options)) - - def invalid(self, addr, error=None): - parser = self.parser() - if parser: - got = self.call(parser, "parse", addr) - expected = "ERROR: %s\n" % error - assert expected == got, "expected %r, got %r" % (expected, got) - else: - ParserBase.invalid(self, addr, error) - - def testDashInId1(self): - self.lex("foo-bar", ID) - - def testDashInId2(self): - self.lex("foo-3", ID) - - def testDashAlone1(self): - self.lex("foo - bar", ID, SYM, ID) - - def testDashAlone2(self): - self.lex("foo - 3", ID, SYM, NUMBER) - - def testLeadingDash(self): - self.lex("-foo", SYM, ID) - - def testTrailingDash(self): - self.lex("foo-", ID, SYM) - - def testNegativeNum(self): - self.lex("-3", NUMBER) - - def testIdNum(self): - self.lex("id1", ID) - - def testIdSpaceNum(self): - self.lex("id 1", ID, NUMBER) - - def testHash(self): - self.valid("foo/bar.#", "foo", "bar.#") - - def testStar(self): - self.valid("foo/bar.*", "foo", "bar.*") - - def testColon(self): - self.valid("foo.bar/baz.qux:moo:arf", "foo.bar", "baz.qux:moo:arf") - - def testOptions(self): - self.valid("foo.bar/baz.qux:moo:arf; {key: value}", - "foo.bar", "baz.qux:moo:arf", {"key": "value"}) - - def testOptionsTrailingComma(self): - self.valid("name/subject; {key: value,}", "name", "subject", - {"key": "value"}) - - def testOptionsNone(self): - self.valid("name/subject; {key: None}", "name", "subject", - {"key": None}) - - def testSemiSubject(self): - self.valid("foo.bar/'baz.qux;moo:arf'; {key: value}", - "foo.bar", "baz.qux;moo:arf", {"key": "value"}) - - def testCommaSubject(self): - self.valid("foo.bar/baz.qux.{moo,arf}", "foo.bar", "baz.qux.{moo,arf}") - - def testCommaSubjectOptions(self): - self.valid("foo.bar/baz.qux.{moo,arf}; {key: value}", "foo.bar", - "baz.qux.{moo,arf}", {"key": "value"}) - - def testUnbalanced(self): - self.valid("foo.bar/baz.qux.{moo,arf; {key: value}", "foo.bar", - "baz.qux.{moo,arf", {"key": "value"}) - - def testSlashQuote(self): - self.valid("foo.bar\\/baz.qux.{moo,arf; {key: value}", - "foo.bar/baz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc1(self): - self.valid("foo.bar\\x00baz.qux.{moo,arf; {key: value}", - "foo.bar\x00baz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc2(self): - self.valid("foo.bar\\xffbaz.qux.{moo,arf; {key: value}", - "foo.bar\xffbaz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc3(self): - self.valid("foo.bar\\xFFbaz.qux.{moo,arf; {key: value}", - "foo.bar\xFFbaz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashUnicode1(self): - self.valid("foo.bar\\u1234baz.qux.{moo,arf; {key: value}", - u"foo.bar\u1234baz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode2(self): - self.valid("foo.bar\\u0000baz.qux.{moo,arf; {key: value}", - u"foo.bar\u0000baz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode3(self): - self.valid("foo.bar\\uffffbaz.qux.{moo,arf; {key: value}", - u"foo.bar\uffffbaz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode4(self): - self.valid("foo.bar\\uFFFFbaz.qux.{moo,arf; {key: value}", - u"foo.bar\uFFFFbaz.qux.{moo,arf", None, {"key": "value"}) - - def testNoName(self): - self.invalid("; {key: value}", - "unexpected token SEMI(;) line:1,0:; {key: value}") - - def testEmpty(self): - self.invalid("", "unexpected token EOF line:1,0:") - - def testNoNameSlash(self): - self.invalid("/asdf; {key: value}", - "unexpected token SLASH(/) line:1,0:/asdf; {key: value}") - - def testBadOptions1(self): - self.invalid("name/subject; {", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), " - "got EOF line:1,15:name/subject; {") - - def testBadOptions2(self): - self.invalid("name/subject; { 3", - "expecting COLON, got EOF " - "line:1,17:name/subject; { 3") - - def testBadOptions3(self): - self.invalid("name/subject; { key:", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), got EOF " - "line:1,20:name/subject; { key:") - - def testBadOptions4(self): - self.invalid("name/subject; { key: value", - "expecting (COMMA, RBRACE), got EOF " - "line:1,26:name/subject; { key: value") - - def testBadOptions5(self): - self.invalid("name/subject; { key: value asdf", - "expecting (COMMA, RBRACE), got ID(asdf) " - "line:1,27:name/subject; { key: value asdf") - - def testBadOptions6(self): - self.invalid("name/subject; { key: value,", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), got EOF " - "line:1,27:name/subject; { key: value,") - - def testBadOptions7(self): - self.invalid("name/subject; { key: value } asdf", - "expecting EOF, got ID(asdf) " - "line:1,29:name/subject; { key: value } asdf") - - def testList1(self): - self.valid("name/subject; { key: [] }", "name", "subject", {"key": []}) - - def testList2(self): - self.valid("name/subject; { key: ['one'] }", "name", "subject", {"key": ['one']}) - - def testList3(self): - self.valid("name/subject; { key: [1, 2, 3] }", "name", "subject", - {"key": [1, 2, 3]}) - - def testList4(self): - self.valid("name/subject; { key: [1, [2, 3], 4] }", "name", "subject", - {"key": [1, [2, 3], 4]}) - - def testBadList1(self): - self.invalid("name/subject; { key: [ }", "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), " - "got RBRACE(}) line:1,23:name/subject; { key: [ }") - - def testBadList2(self): - self.invalid("name/subject; { key: [ 1 }", "expecting (COMMA, RBRACK), " - "got RBRACE(}) line:1,25:name/subject; { key: [ 1 }") - - def testBadList3(self): - self.invalid("name/subject; { key: [ 1 2 }", "expecting (COMMA, RBRACK), " - "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 }") - - def testBadList4(self): - self.invalid("name/subject; { key: [ 1 2 ] }", "expecting (COMMA, RBRACK), " - "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 ] }") - - def testMap1(self): - self.valid("name/subject; { 'key': value }", - "name", "subject", {"key": "value"}) - - def testMap2(self): - self.valid("name/subject; { 1: value }", "name", "subject", {1: "value"}) - - def testMap3(self): - self.valid('name/subject; { "foo.bar": value }', - "name", "subject", {"foo.bar": "value"}) - - def testBoolean(self): - self.valid("name/subject; { true1: True, true2: true, " - "false1: False, false2: false }", - "name", "subject", {"true1": True, "true2": True, - "false1": False, "false2": False}) diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py deleted file mode 100644 index db5ec03df2..0000000000 --- a/python/qpid/tests/messaging/endpoints.py +++ /dev/null @@ -1,1335 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# setup, usage, teardown, errors(sync), errors(async), stress, soak, -# boundary-conditions, config - -import errno, os, socket, sys, time -from qpid import compat -from qpid.compat import set -from qpid.messaging import * -from qpid.messaging.transports import TRANSPORTS -from qpid.tests.messaging import Base -from threading import Thread - -class SetupTests(Base): - - def testEstablish(self): - self.conn = Connection.establish(self.broker, **self.connection_options()) - self.ping(self.conn.session()) - - def testOpen(self): - self.conn = Connection(self.broker, **self.connection_options()) - self.conn.open() - self.ping(self.conn.session()) - - def testOpenReconnectURLs(self): - options = self.connection_options() - options["reconnect_urls"] = [self.broker, self.broker] - self.conn = Connection(self.broker, **options) - self.conn.open() - self.ping(self.conn.session()) - - def testTcpNodelay(self): - self.conn = Connection.establish(self.broker, tcp_nodelay=True) - assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) - - def testConnectError(self): - try: - # Specifying port 0 yields a bad address on Windows; port 4 is unassigned - self.conn = Connection.establish("localhost:4") - assert False, "connect succeeded" - except ConnectError, e: - assert "refused" in str(e) - - def testGetError(self): - self.conn = Connection("localhost:0") - try: - self.conn.open() - assert False, "connect succeeded" - except ConnectError, e: - assert self.conn.get_error() == e - - def use_fds(self): - fds = [] - try: - while True: - fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY)) - except OSError, e: - if e.errno != errno.EMFILE: - raise e - else: - return fds - - def testOpenCloseResourceLeaks(self): - fds = self.use_fds() - try: - for i in range(32): - if fds: os.close(fds.pop()) - for i in xrange(64): - conn = Connection.establish(self.broker, **self.connection_options()) - conn.close() - finally: - while fds: - os.close(fds.pop()) - - def testOpenFailResourceLeaks(self): - fds = self.use_fds() - try: - for i in range(32): - if fds: os.close(fds.pop()) - for i in xrange(64): - conn = Connection("localhost:0", **self.connection_options()) - # XXX: we need to force a waiter to be created for this test - # to work - conn._lock.acquire() - conn._wait(lambda: False, timeout=0.001) - conn._lock.release() - try: - conn.open() - except ConnectError, e: - pass - finally: - while fds: - os.close(fds.pop()) - - def testReconnect(self): - options = self.connection_options() - real = TRANSPORTS["tcp"] - - class flaky: - - def __init__(self, conn, host, port): - self.real = real(conn, host, port) - self.sent_count = 0 - self.recv_count = 0 - - def fileno(self): - return self.real.fileno() - - def reading(self, reading): - return self.real.reading(reading) - - def writing(self, writing): - return self.real.writing(writing) - - def send(self, bytes): - if self.sent_count > 2048: - raise socket.error("fake error") - n = self.real.send(bytes) - self.sent_count += n - return n - - def recv(self, n): - if self.recv_count > 2048: - return "" - bytes = self.real.recv(n) - self.recv_count += len(bytes) - return bytes - - def close(self): - self.real.close() - - TRANSPORTS["flaky"] = flaky - - options["reconnect"] = True - options["reconnect_interval"] = 0 - options["reconnect_limit"] = 100 - options["reconnect_log"] = False - options["transport"] = "flaky" - - self.conn = Connection.establish(self.broker, **options) - ssn = self.conn.session() - snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}") - rcv = ssn.receiver(snd.target) - - msgs = [self.message("testReconnect", i) for i in range(20)] - for m in msgs: - snd.send(m) - - content = set() - drained = [] - duplicates = [] - try: - while True: - m = rcv.fetch(timeout=0) - if m.content not in content: - content.add(m.content) - drained.append(m) - else: - duplicates.append(m) - ssn.acknowledge(m) - except Empty: - pass - # XXX: apparently we don't always get duplicates, should figure out why - #assert duplicates, "no duplicates" - assert len(drained) == len(msgs) - for m, d in zip(msgs, drained): - # XXX: we should figure out how to provide proper end to end - # redelivered - self.assertEcho(m, d, d.redelivered) - -class ConnectionTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def testCheckClosed(self): - assert not self.conn.check_closed() - - def testSessionAnon(self): - ssn1 = self.conn.session() - ssn2 = self.conn.session() - self.ping(ssn1) - self.ping(ssn2) - assert ssn1 is not ssn2 - - def testSessionNamed(self): - ssn1 = self.conn.session("one") - ssn2 = self.conn.session("two") - self.ping(ssn1) - self.ping(ssn2) - assert ssn1 is not ssn2 - assert ssn1 is self.conn.session("one") - assert ssn2 is self.conn.session("two") - - def testDetach(self): - ssn = self.conn.session() - self.ping(ssn) - self.conn.detach() - try: - self.ping(ssn) - assert False, "ping succeeded" - except Detached: - # this is the expected failure when pinging on a detached - # connection - pass - self.conn.attach() - self.ping(ssn) - - def testClose(self): - self.conn.close() - assert not self.conn.attached() - - def testSimultaneousClose(self): - ssns = [self.conn.session() for i in range(3)] - for s in ssns: - for i in range(3): - s.receiver("amq.topic") - s.sender("amq.topic") - - def closer(errors): - try: - self.conn.close() - except: - _, e, _ = sys.exc_info() - errors.append(compat.format_exc(e)) - - t1_errors = [] - t2_errors = [] - t1 = Thread(target=lambda: closer(t1_errors)) - t2 = Thread(target=lambda: closer(t2_errors)) - t1.start() - t2.start() - t1.join(self.delay()) - t2.join(self.delay()) - - assert not t1_errors, t1_errors[0] - assert not t2_errors, t2_errors[0] - -class hangable: - - def __init__(self, conn, host, port): - self.tcp = TRANSPORTS["tcp"](conn, host, port) - self.hung = False - - def hang(self): - self.hung = True - - def fileno(self): - return self.tcp.fileno() - - def reading(self, reading): - if self.hung: - return True - else: - return self.tcp.reading(reading) - - def writing(self, writing): - if self.hung: - return False - else: - return self.tcp.writing(writing) - - def send(self, bytes): - if self.hung: - return 0 - else: - return self.tcp.send(bytes) - - def recv(self, n): - if self.hung: - return "" - else: - return self.tcp.recv(n) - - def close(self): - self.tcp.close() - -TRANSPORTS["hangable"] = hangable - -class TimeoutTests(Base): - - def setup_connection(self): - options = self.connection_options() - options["transport"] = "hangable" - return Connection.establish(self.broker, **options) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender("amq.topic") - - def setup_receiver(self): - return self.ssn.receiver("amq.topic; {link: {reliability: unreliable}}") - - def teardown_connection(self, conn): - try: - conn.detach(timeout=0) - except Timeout: - pass - - def hang(self): - self.conn._driver._transport.hang() - - def timeoutTest(self, method): - self.hang() - try: - method(timeout=self.delay()) - assert False, "did not time out" - except Timeout: - pass - - def testSenderSync(self): - self.snd.send(self.content("testSenderSync"), sync=False) - self.timeoutTest(self.snd.sync) - - def testSenderClose(self): - self.snd.send(self.content("testSenderClose"), sync=False) - self.timeoutTest(self.snd.close) - - def testReceiverClose(self): - self.timeoutTest(self.rcv.close) - - def testSessionSync(self): - self.snd.send(self.content("testSessionSync"), sync=False) - self.timeoutTest(self.ssn.sync) - - def testSessionClose(self): - self.timeoutTest(self.ssn.close) - - def testConnectionDetach(self): - self.timeoutTest(self.conn.detach) - - def testConnectionClose(self): - self.timeoutTest(self.conn.close) - -ACK_QC = 'test-ack-queue; {create: always}' -ACK_QD = 'test-ack-queue; {delete: always}' - -class SessionTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def testSender(self): - snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}', - durable=self.durable()) - snd2 = self.ssn.sender(snd.target, durable=self.durable()) - assert snd is not snd2 - snd2.close() - - content = self.content("testSender") - snd.send(content) - rcv = self.ssn.receiver(snd.target) - msg = rcv.fetch(0) - assert msg.content == content - self.ssn.acknowledge(msg) - - def testReceiver(self): - rcv = self.ssn.receiver('test-rcv-queue; {create: always}') - rcv2 = self.ssn.receiver(rcv.source) - assert rcv is not rcv2 - rcv2.close() - - content = self.content("testReceiver") - snd = self.ssn.sender(rcv.source, durable=self.durable()) - snd.send(content) - msg = rcv.fetch(0) - assert msg.content == content - self.ssn.acknowledge(msg) - snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') - - def testDetachedReceiver(self): - self.conn.detach() - rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}") - m = self.content("testDetachedReceiver") - self.conn.attach() - snd = self.ssn.sender("test-dis-rcv-queue") - snd.send(m) - self.drain(rcv, expected=[m]) - - def testNextReceiver(self): - ADDR = 'test-next-rcv-queue; {create: always, delete: always}' - rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - - snd = self.ssn.sender(ADDR) - - msgs = [] - for i in range(10): - content = self.content("testNextReceiver", i) - snd.send(content) - msgs.append(content) - - fetched = [] - try: - while True: - rcv = self.ssn.next_receiver(timeout=self.delay()) - assert rcv in (rcv1, rcv2, rcv3) - assert rcv.available() > 0 - fetched.append(rcv.fetch().content) - except Empty: - pass - assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) - self.ssn.acknowledge() - #we set the capacity to 0 to prevent the deletion of the queue - - #triggered the deletion policy when the first receiver is closed - - #resulting in session exceptions being issued for the remaining - #active subscriptions: - for r in [rcv1, rcv2, rcv3]: - r.capacity = 0 - - # XXX, we need a convenient way to assert that required queues are - # empty on setup, and possibly also to drain queues on teardown - def ackTest(self, acker, ack_capacity=None): - # send a bunch of messages - snd = self.ssn.sender(ACK_QC, durable=self.durable()) - contents = [self.content("ackTest", i) for i in range(15)] - for c in contents: - snd.send(c) - - # drain the queue, verify the messages are there and then close - # without acking - rcv = self.ssn.receiver(ACK_QC) - self.drain(rcv, expected=contents) - self.ssn.close() - - # drain the queue again, verify that they are all the messages - # were requeued, and ack this time before closing - self.ssn = self.conn.session() - if ack_capacity is not None: - self.ssn.ack_capacity = ack_capacity - rcv = self.ssn.receiver(ACK_QC) - self.drain(rcv, expected=contents) - acker(self.ssn) - self.ssn.close() - - # drain the queue a final time and verify that the messages were - # dequeued - self.ssn = self.conn.session() - rcv = self.ssn.receiver(ACK_QD) - self.assertEmpty(rcv) - - def testAcknowledge(self): - self.ackTest(lambda ssn: ssn.acknowledge()) - - def testAcknowledgeAsync(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) - - def testAcknowledgeAsyncAckCap0(self): - try: - try: - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) - assert False, "acknowledge shouldn't succeed with ack_capacity of zero" - except InsufficientCapacity: - pass - finally: - self.ssn.ack_capacity = UNLIMITED - self.drain(self.ssn.receiver(ACK_QD)) - self.ssn.acknowledge() - - def testAcknowledgeAsyncAckCap1(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) - - def testAcknowledgeAsyncAckCap5(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) - - def testAcknowledgeAsyncAckCapUNLIMITED(self): - self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) - - def testRelease(self): - msgs = [self.message("testRelease", i) for i in range(3)] - snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") - for m in msgs: - snd.send(m) - rcv = self.ssn.receiver(snd.target) - echos = self.drain(rcv, expected=msgs) - self.ssn.acknowledge(echos[0]) - self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) - self.ssn.acknowledge(echos[2], Disposition(RELEASED)) - self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) - self.drain(rcv, expected=msgs[2:3]) - self.ssn.acknowledge() - - def testReject(self): - msgs = [self.message("testReject", i) for i in range(3)] - snd = self.ssn.sender(""" - test-reject-queue; { - create: always, - delete: always, - node: { - x-declare: { - alternate-exchange: 'amq.topic' - } - } - } -""") - for m in msgs: - snd.send(m) - rcv = self.ssn.receiver(snd.target) - rej = self.ssn.receiver("amq.topic") - echos = self.drain(rcv, expected=msgs) - self.ssn.acknowledge(echos[0]) - self.ssn.acknowledge(echos[1], Disposition(REJECTED)) - self.ssn.acknowledge(echos[2], - Disposition(REJECTED, code=3, text="test-reject")) - self.drain(rej, expected=msgs[1:]) - self.ssn.acknowledge() - - def send(self, ssn, target, base, count=1): - snd = ssn.sender(target, durable=self.durable()) - messages = [] - for i in range(count): - c = self.message(base, i) - snd.send(c) - messages.append(c) - snd.close() - return messages - - def txTest(self, commit): - TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' - TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' - txssn = self.conn.session(transactional=True) - messages = self.send(self.ssn, TX_Q, "txTest", 3) - txrcv = txssn.receiver(TX_Q) - txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) - rcv = self.ssn.receiver(txrcv.source) - copy_rcv = self.ssn.receiver(txsnd.target) - self.assertEmpty(copy_rcv) - for i in range(3): - m = txrcv.fetch(0) - txsnd.send(m) - self.assertEmpty(copy_rcv) - txssn.acknowledge() - if commit: - txssn.commit() - self.assertEmpty(rcv) - self.drain(copy_rcv, expected=messages) - else: - txssn.rollback() - self.drain(rcv, expected=messages, redelivered=True) - self.assertEmpty(copy_rcv) - self.ssn.acknowledge() - - def testCommit(self): - self.txTest(True) - - def testRollback(self): - self.txTest(False) - - def txTestSend(self, commit): - TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' - txssn = self.conn.session(transactional=True) - messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) - rcv = self.ssn.receiver(TX_SEND_Q) - self.assertEmpty(rcv) - - if commit: - txssn.commit() - self.drain(rcv, expected=messages) - self.ssn.acknowledge() - else: - txssn.rollback() - self.assertEmpty(rcv) - txssn.commit() - self.assertEmpty(rcv) - - def testCommitSend(self): - self.txTestSend(True) - - def testRollbackSend(self): - self.txTestSend(False) - - def txTestAck(self, commit): - TX_ACK_QC = 'test-tx-ack-queue; {create: always}' - TX_ACK_QD = 'test-tx-ack-queue; {delete: always}' - txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_QC) - self.assertEmpty(txrcv) - messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) - self.drain(txrcv, expected=messages) - - if commit: - txssn.acknowledge() - else: - txssn.rollback() - self.drain(txrcv, expected=messages, redelivered=True) - txssn.acknowledge() - txssn.rollback() - self.drain(txrcv, expected=messages, redelivered=True) - txssn.commit() # commit without ack - self.assertEmpty(txrcv) - - txssn.close() - - txssn = self.conn.session(transactional=True) - txrcv = txssn.receiver(TX_ACK_QC) - self.drain(txrcv, expected=messages, redelivered=True) - txssn.acknowledge() - txssn.commit() - rcv = self.ssn.receiver(TX_ACK_QD) - self.assertEmpty(rcv) - txssn.close() - self.assertEmpty(rcv) - - def testCommitAck(self): - self.txTestAck(True) - - def testRollbackAck(self): - self.txTestAck(False) - - def testDoubleCommit(self): - ssn = self.conn.session(transactional=True) - snd = ssn.sender("amq.direct") - rcv = ssn.receiver("amq.direct") - msgs = [self.message("testDoubleCommit", i) for i in range(3)] - for m in msgs: - snd.send(m) - ssn.commit() - self.drain(rcv, expected=msgs) - ssn.acknowledge() - ssn.commit() - - def testClose(self): - self.ssn.close() - try: - self.ping(self.ssn) - assert False, "ping succeeded" - except Detached: - pass - -RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' - -class ReceiverTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(RECEIVER_Q) - - def setup_receiver(self): - return self.ssn.receiver(RECEIVER_Q) - - def send(self, base, count = None, sync=True): - content = self.content(base, count) - self.snd.send(content, sync=sync) - return content - - def testFetch(self): - try: - msg = self.rcv.fetch(0) - assert False, "unexpected message: %s" % msg - except Empty: - pass - try: - start = time.time() - msg = self.rcv.fetch(self.delay()) - assert False, "unexpected message: %s" % msg - except Empty: - elapsed = time.time() - start - assert elapsed >= self.delay() - - one = self.send("testFetch", 1) - two = self.send("testFetch", 2) - three = self.send("testFetch", 3) - msg = self.rcv.fetch(0) - assert msg.content == one - msg = self.rcv.fetch(self.delay()) - assert msg.content == two - msg = self.rcv.fetch() - assert msg.content == three - self.ssn.acknowledge() - - def fetchFromClosedTest(self, entry): - entry.close() - try: - msg = self.rcv.fetch(0) - assert False, "unexpected result: %s" % msg - except Empty, e: - assert False, "unexpected exception: %s" % e - except LinkClosed, e: - pass - - def testFetchFromClosedReceiver(self): - self.fetchFromClosedTest(self.rcv) - - def testFetchFromClosedSession(self): - self.fetchFromClosedTest(self.ssn) - - def testFetchFromClosedConnection(self): - self.fetchFromClosedTest(self.conn) - - def fetchFromConcurrentCloseTest(self, entry): - def closer(): - self.sleep() - entry.close() - t = Thread(target=closer) - t.start() - try: - msg = self.rcv.fetch() - assert False, "unexpected result: %s" % msg - except Empty, e: - assert False, "unexpected exception: %s" % e - except LinkClosed, e: - pass - t.join() - - def testFetchFromConcurrentCloseReceiver(self): - self.fetchFromConcurrentCloseTest(self.rcv) - - def testFetchFromConcurrentCloseSession(self): - self.fetchFromConcurrentCloseTest(self.ssn) - - def testFetchFromConcurrentCloseConnection(self): - self.fetchFromConcurrentCloseTest(self.conn) - - def testCapacityIncrease(self): - content = self.send("testCapacityIncrease") - self.sleep() - assert self.rcv.available() == 0 - self.rcv.capacity = UNLIMITED - self.sleep() - assert self.rcv.available() == 1 - msg = self.rcv.fetch(0) - assert msg.content == content - assert self.rcv.available() == 0 - self.ssn.acknowledge() - - def testCapacityDecrease(self): - self.rcv.capacity = UNLIMITED - one = self.send("testCapacityDecrease", 1) - self.sleep() - assert self.rcv.available() == 1 - msg = self.rcv.fetch(0) - assert msg.content == one - - self.rcv.capacity = 0 - - two = self.send("testCapacityDecrease", 2) - self.sleep() - assert self.rcv.available() == 0 - msg = self.rcv.fetch(0) - assert msg.content == two - - self.ssn.acknowledge() - - def capacityTest(self, capacity, threshold=None): - if threshold is not None: - self.rcv.threshold = threshold - self.rcv.capacity = capacity - self.assertAvailable(self.rcv, 0) - - for i in range(2*capacity): - self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False) - self.snd.sync() - self.sleep() - self.assertAvailable(self.rcv) - - first = capacity/2 - second = capacity - first - self.drain(self.rcv, limit = first) - self.sleep() - self.assertAvailable(self.rcv) - self.drain(self.rcv, limit = second) - self.sleep() - self.assertAvailable(self.rcv) - - drained = self.drain(self.rcv) - assert len(drained) == capacity, "%s, %s" % (len(drained), drained) - self.assertAvailable(self.rcv, 0) - - self.ssn.acknowledge() - - def testCapacity5(self): - self.capacityTest(5) - - def testCapacity5Threshold1(self): - self.capacityTest(5, 1) - - def testCapacity10(self): - self.capacityTest(10) - - def testCapacity10Threshold1(self): - self.capacityTest(10, 1) - - def testCapacity100(self): - self.capacityTest(100) - - def testCapacity100Threshold1(self): - self.capacityTest(100, 1) - - def testCapacityUNLIMITED(self): - self.rcv.capacity = UNLIMITED - self.assertAvailable(self.rcv, 0) - - for i in range(10): - self.send("testCapacityUNLIMITED", i) - self.sleep() - self.assertAvailable(self.rcv, 10) - - self.drain(self.rcv) - self.assertAvailable(self.rcv, 0) - - self.ssn.acknowledge() - - def testAvailable(self): - self.rcv.capacity = UNLIMITED - assert self.rcv.available() == 0 - - for i in range(3): - self.send("testAvailable", i) - self.sleep() - assert self.rcv.available() == 3 - - for i in range(3, 10): - self.send("testAvailable", i) - self.sleep() - assert self.rcv.available() == 10 - - self.drain(self.rcv, limit=3) - assert self.rcv.available() == 7 - - self.drain(self.rcv) - assert self.rcv.available() == 0 - - self.ssn.acknowledge() - - def testDoubleClose(self): - m1 = self.content("testDoubleClose", 1) - m2 = self.content("testDoubleClose", 2) - - snd = self.ssn.sender("""test-double-close; { - create: always, - delete: sender, - node: { - type: topic - } -} -""") - r1 = self.ssn.receiver(snd.target) - r2 = self.ssn.receiver(snd.target) - snd.send(m1) - self.drain(r1, expected=[m1]) - self.drain(r2, expected=[m1]) - r1.close() - snd.send(m2) - self.drain(r2, expected=[m2]) - r2.close() - - # XXX: need testClose - - def testMode(self): - msgs = [self.content("testMode", 1), - self.content("testMode", 2), - self.content("testMode", 3)] - - for m in msgs: - self.snd.send(m) - - rb = self.ssn.receiver('test-receiver-queue; {mode: browse}') - rc = self.ssn.receiver('test-receiver-queue; {mode: consume}') - self.drain(rb, expected=msgs) - self.drain(rc, expected=msgs) - rb2 = self.ssn.receiver(rb.source) - self.assertEmpty(rb2) - self.drain(self.rcv, expected=[]) - - # XXX: need testUnsettled() - - def unreliabilityTest(self, mode="unreliable"): - msgs = [self.message("testUnreliable", i) for i in range(3)] - snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}") - rcv = self.ssn.receiver(snd.target) - for m in msgs: - snd.send(m) - - # close without ack on reliable receiver, messages should be requeued - ssn = self.conn.session() - rrcv = ssn.receiver("test-unreliability-queue") - self.drain(rrcv, expected=msgs) - ssn.close() - - # close without ack on unreliable receiver, messages should not be requeued - ssn = self.conn.session() - urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode) - self.drain(urcv, expected=msgs, redelivered=True) - ssn.close() - - self.assertEmpty(rcv) - - def testUnreliable(self): - self.unreliabilityTest(mode="unreliable") - - def testAtMostOnce(self): - self.unreliabilityTest(mode="at-most-once") - -class AddressTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def badOption(self, options, error): - try: - self.ssn.sender("test-bad-options-snd; %s" % options) - assert False - except InvalidOption, e: - assert "error in options: %s" % error == str(e), e - - try: - self.ssn.receiver("test-bad-options-rcv; %s" % options) - assert False - except InvalidOption, e: - assert "error in options: %s" % error == str(e), e - - def testIllegalKey(self): - self.badOption("{create: always, node: " - "{this-property-does-not-exist: 3}}", - "node: this-property-does-not-exist: " - "illegal key") - - def testWrongValue(self): - self.badOption("{create: asdf}", "create: asdf not in " - "('always', 'sender', 'receiver', 'never')") - - def testWrongType1(self): - self.badOption("{node: asdf}", - "node: asdf is not a map") - - def testWrongType2(self): - self.badOption("{node: {durable: []}}", - "node: durable: [] is not a bool") - - def testCreateQueue(self): - snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " - "node: {type: queue, durable: False, " - "x-declare: {auto_delete: true}}}") - content = self.content("testCreateQueue") - snd.send(content) - rcv = self.ssn.receiver("test-create-queue") - self.drain(rcv, expected=[content]) - - def createExchangeTest(self, props=""): - addr = """test-create-exchange; { - create: always, - delete: always, - node: { - type: topic, - durable: False, - x-declare: {auto_delete: true, %s} - } - }""" % props - snd = self.ssn.sender(addr) - snd.send("ping") - rcv1 = self.ssn.receiver("test-create-exchange/first") - rcv2 = self.ssn.receiver("test-create-exchange/first") - rcv3 = self.ssn.receiver("test-create-exchange/second") - for r in (rcv1, rcv2, rcv3): - try: - r.fetch(0) - assert False - except Empty: - pass - msg1 = Message(self.content("testCreateExchange", 1), subject="first") - msg2 = Message(self.content("testCreateExchange", 2), subject="second") - snd.send(msg1) - snd.send(msg2) - self.drain(rcv1, expected=[msg1.content]) - self.drain(rcv2, expected=[msg1.content]) - self.drain(rcv3, expected=[msg2.content]) - - def testCreateExchange(self): - self.createExchangeTest() - - def testCreateExchangeDirect(self): - self.createExchangeTest("type: direct") - - def testCreateExchangeTopic(self): - self.createExchangeTest("type: topic") - - def testDeleteBySender(self): - snd = self.ssn.sender("test-delete; {create: always}") - snd.send("ping") - snd.close() - snd = self.ssn.sender("test-delete; {delete: always}") - snd.send("ping") - snd.close() - try: - self.ssn.sender("test-delete") - except NotFound, e: - assert "no such queue" in str(e) - - def testDeleteByReceiver(self): - rcv = self.ssn.receiver("test-delete; {create: always, delete: always}") - try: - rcv.fetch(0) - except Empty: - pass - rcv.close() - - try: - self.ssn.receiver("test-delete") - assert False - except NotFound, e: - assert "no such queue" in str(e) - - def testDeleteSpecial(self): - snd = self.ssn.sender("amq.topic; {delete: always}") - snd.send("asdf") - try: - snd.close() - assert False, "successfully deleted amq.topic" - except SessionError, e: - assert "Cannot delete default exchange" in str(e) - # XXX: need to figure out close after error - self.conn._remove_session(self.ssn) - - def testNodeBindingsQueue(self): - snd = self.ssn.sender(""" -test-node-bindings-queue; { - create: always, - delete: always, - node: { - x-bindings: [{exchange: "amq.topic", key: "a.#"}, - {exchange: "amq.direct", key: "b"}, - {exchange: "amq.topic", key: "c.*"}] - } -} -""") - snd.send("one") - snd_a = self.ssn.sender("amq.topic/a.foo") - snd_b = self.ssn.sender("amq.direct/b") - snd_c = self.ssn.sender("amq.topic/c.bar") - snd_a.send("two") - snd_b.send("three") - snd_c.send("four") - rcv = self.ssn.receiver("test-node-bindings-queue") - self.drain(rcv, expected=["one", "two", "three", "four"]) - - def testNodeBindingsTopic(self): - rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}") - rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}") - rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}") - rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}") - snd = self.ssn.sender(""" -test-node-bindings-topic; { - create: always, - delete: always, - node: { - type: topic, - x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"}, - {queue: test-node-bindings-topic-queue-a, key: "a.#"}, - {queue: test-node-bindings-topic-queue-b, key: "b"}, - {queue: test-node-bindings-topic-queue-c, key: "c.*"}] - } -} -""") - m1 = Message("one") - m2 = Message(subject="a.foo", content="two") - m3 = Message(subject="b", content="three") - m4 = Message(subject="c.bar", content="four") - snd.send(m1) - snd.send(m2) - snd.send(m3) - snd.send(m4) - self.drain(rcv, expected=[m1, m2, m3, m4]) - self.drain(rcv_a, expected=[m2]) - self.drain(rcv_b, expected=[m3]) - self.drain(rcv_c, expected=[m4]) - - def testLinkBindings(self): - m_a = self.message("testLinkBindings", 1, subject="a") - m_b = self.message("testLinkBindings", 2, subject="b") - - self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}") - snd = self.ssn.sender("amq.topic") - - snd.send(m_a) - snd.send(m_b) - snd.close() - - rcv = self.ssn.receiver("test-link-bindings-queue") - self.assertEmpty(rcv) - - snd = self.ssn.sender(""" -amq.topic; { - link: { - x-bindings: [{queue: test-link-bindings-queue, key: a}] - } -} -""") - - snd.send(m_a) - snd.send(m_b) - - self.drain(rcv, expected=[m_a]) - rcv.close() - - rcv = self.ssn.receiver(""" -test-link-bindings-queue; { - link: { - x-bindings: [{exchange: "amq.topic", key: b}] - } -} -""") - - snd.send(m_a) - snd.send(m_b) - - self.drain(rcv, expected=[m_a, m_b]) - - def testSubjectOverride(self): - snd = self.ssn.sender("amq.topic/a") - rcv_a = self.ssn.receiver("amq.topic/a") - rcv_b = self.ssn.receiver("amq.topic/b") - m1 = self.content("testSubjectOverride", 1) - m2 = self.content("testSubjectOverride", 2) - snd.send(m1) - snd.send(Message(subject="b", content=m2)) - self.drain(rcv_a, expected=[m1]) - self.drain(rcv_b, expected=[m2]) - - def testSubjectDefault(self): - m1 = self.content("testSubjectDefault", 1) - m2 = self.content("testSubjectDefault", 2) - snd = self.ssn.sender("amq.topic/a") - rcv = self.ssn.receiver("amq.topic") - snd.send(m1) - snd.send(Message(subject="b", content=m2)) - e1 = rcv.fetch(timeout=0) - e2 = rcv.fetch(timeout=0) - assert e1.subject == "a", "subject: %s" % e1.subject - assert e2.subject == "b", "subject: %s" % e2.subject - self.assertEmpty(rcv) - - def doReliabilityTest(self, reliability, messages, expected): - snd = self.ssn.sender("amq.topic") - rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability) - for m in messages: - snd.send(m) - self.conn.detach() - self.conn.attach() - self.drain(rcv, expected=expected) - - def testReliabilityUnreliable(self): - msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)] - self.doReliabilityTest("unreliable", msgs, []) - - def testReliabilityAtLeastOnce(self): - msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)] - self.doReliabilityTest("at-least-once", msgs, msgs) - - def testLinkName(self): - msgs = [self.message("testLinkName", i) for i in range(3)] - snd = self.ssn.sender("amq.topic") - trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}") - qrcv = self.ssn.receiver("test-link-name") - for m in msgs: - snd.send(m) - self.drain(qrcv, expected=msgs) - - def testAssert1(self): - try: - snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}") - assert 0, "assertion failed to trigger" - except AssertionFailed, e: - pass - - def testAssert2(self): - snd = self.ssn.sender("amq.topic; {assert: always}") - -NOSUCH_Q = "this-queue-should-not-exist" -UNPARSEABLE_ADDR = "name/subject; {bad options" -UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" - -class AddressErrorTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def senderErrorTest(self, addr, exc, check=lambda e: True): - try: - self.ssn.sender(addr, durable=self.durable()) - assert False, "sender creation succeeded" - except exc, e: - assert check(e), "unexpected error: %s" % compat.format_exc(e) - - def receiverErrorTest(self, addr, exc, check=lambda e: True): - try: - self.ssn.receiver(addr) - assert False, "receiver creation succeeded" - except exc, e: - assert check(e), "unexpected error: %s" % compat.format_exc(e) - - def testNoneTarget(self): - self.senderErrorTest(None, MalformedAddress) - - def testNoneSource(self): - self.receiverErrorTest(None, MalformedAddress) - - def testNoTarget(self): - self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) - - def testNoSource(self): - self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) - - def testUnparseableTarget(self): - self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress, - lambda e: "expecting COLON" in str(e)) - - def testUnparseableSource(self): - self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress, - lambda e: "expecting COLON" in str(e)) - - def testUnlexableTarget(self): - self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress, - lambda e: "unrecognized characters" in str(e)) - - def testUnlexableSource(self): - self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress, - lambda e: "unrecognized characters" in str(e)) - - def testInvalidMode(self): - self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}', - InvalidOption, - lambda e: "not in ('browse', 'consume')" in str(e)) - -SENDER_Q = 'test-sender-q; {create: always, delete: always}' - -class SenderTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(SENDER_Q) - - def setup_receiver(self): - return self.ssn.receiver(SENDER_Q) - - def checkContent(self, content): - self.snd.send(content) - msg = self.rcv.fetch(0) - assert msg.content == content - - out = Message(content) - self.snd.send(out) - echo = self.rcv.fetch(0) - assert out.content == echo.content - assert echo.content == msg.content - self.ssn.acknowledge() - - def testSendString(self): - self.checkContent(self.content("testSendString")) - - def testSendList(self): - self.checkContent(["testSendList", 1, 3.14, self.test_id]) - - def testSendMap(self): - self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) - - def asyncTest(self, capacity): - self.snd.capacity = capacity - msgs = [self.content("asyncTest", i) for i in range(15)] - for m in msgs: - self.snd.send(m, sync=False) - self.drain(self.rcv, timeout=self.delay(), expected=msgs) - self.ssn.acknowledge() - - def testSendAsyncCapacity0(self): - try: - self.asyncTest(0) - assert False, "send shouldn't succeed with zero capacity" - except InsufficientCapacity: - # this is expected - pass - - def testSendAsyncCapacity1(self): - self.asyncTest(1) - - def testSendAsyncCapacity5(self): - self.asyncTest(5) - - def testSendAsyncCapacityUNLIMITED(self): - self.asyncTest(UNLIMITED) - - def testCapacityTimeout(self): - self.snd.capacity = 1 - msgs = [] - caught = False - while len(msgs) < 100: - m = self.content("testCapacity", len(msgs)) - try: - self.snd.send(m, sync=False, timeout=0) - msgs.append(m) - except InsufficientCapacity: - caught = True - break - self.snd.sync() - self.drain(self.rcv, expected=msgs) - self.ssn.acknowledge() - assert caught, "did not exceed capacity" diff --git a/python/qpid/tests/messaging/message.py b/python/qpid/tests/messaging/message.py deleted file mode 100644 index 297374b82b..0000000000 --- a/python/qpid/tests/messaging/message.py +++ /dev/null @@ -1,155 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.messaging import * -from qpid.tests.messaging import Base - -class MessageTests(Base): - - def testCreateString(self): - m = Message("string") - assert m.content == "string" - assert m.content_type is None - - def testCreateUnicode(self): - m = Message(u"unicode") - assert m.content == u"unicode" - assert m.content_type == "text/plain" - - def testCreateMap(self): - m = Message({}) - assert m.content == {} - assert m.content_type == "amqp/map" - - def testCreateList(self): - m = Message([]) - assert m.content == [] - assert m.content_type == "amqp/list" - - def testContentTypeOverride(self): - m = Message() - m.content_type = "text/html; charset=utf8" - m.content = u"<html/>" - assert m.content_type == "text/html; charset=utf8" - -ECHO_Q = 'test-message-echo-queue; {create: always, delete: always}' - -class MessageEchoTests(Base): - - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - - def setup_sender(self): - return self.ssn.sender(ECHO_Q) - - def setup_receiver(self): - return self.ssn.receiver(ECHO_Q) - - def check(self, msg): - self.snd.send(msg) - echo = self.rcv.fetch(0) - self.assertEcho(msg, echo) - self.ssn.acknowledge(echo) - - def testStringContent(self): - self.check(Message("string")) - - def testUnicodeContent(self): - self.check(Message(u"unicode")) - - - TEST_MAP = {"key1": "string", - "key2": u"unicode", - "key3": 3, - "key4": -3, - "key5": 3.14, - "key6": -3.14, - "key7": ["one", 2, 3.14], - "key8": [], - "key9": {"sub-key0": 3}, - "key10": True, - "key11": False, - "x-amqp-0-10.app-id": "test-app-id", - "x-amqp-0-10.content-encoding": "test-content-encoding"} - - def testMapContent(self): - self.check(Message(MessageEchoTests.TEST_MAP)) - - def testListContent(self): - self.check(Message([])) - self.check(Message([1, 2, 3])) - self.check(Message(["one", 2, 3.14, {"four": 4}])) - - def testProperties(self): - msg = Message() - msg.subject = "subject" - msg.correlation_id = str(self.test_id) - msg.durable = True - msg.priority = 7 - msg.ttl = 60 - msg.properties = MessageEchoTests.TEST_MAP - msg.reply_to = "reply-address" - self.check(msg) - - def testContentTypeUnknown(self): - msg = Message(content_type = "this-content-type-does-not-exist") - self.check(msg) - - def testTextPlain(self): - self.check(Message(content_type="text/plain", content="asdf")) - - def testTextPlainEmpty(self): - self.check(Message(content_type="text/plain")) - - def check_rt(self, addr, expected=None): - if expected is None: - expected = addr - msg = Message(reply_to=addr) - self.snd.send(msg) - echo = self.rcv.fetch(0) - assert echo.reply_to == expected, echo.reply_to - self.ssn.acknowledge(echo) - - def testReplyTo(self): - self.check_rt("name") - - def testReplyToQueue(self): - self.check_rt("name; {node: {type: queue}}", "name") - - def testReplyToQueueSubject(self): - self.check_rt("name/subject; {node: {type: queue}}", "name") - - def testReplyToTopic(self): - self.check_rt("name; {node: {type: topic}}") - - def testReplyToTopicSubject(self): - self.check_rt("name/subject; {node: {type: topic}}") - - def testBooleanEncoding(self): - msg = Message({"true": True, "false": False}) - self.snd.send(msg) - echo = self.rcv.fetch(0) - self.assertEcho(msg, echo) - t = echo.content["true"] - f = echo.content["false"] - assert isinstance(t, bool), t - assert isinstance(f, bool), f diff --git a/python/qpid/tests/mimetype.py b/python/qpid/tests/mimetype.py deleted file mode 100644 index 22760316f0..0000000000 --- a/python/qpid/tests/mimetype.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.tests import Test -from qpid.mimetype import lex, parse, ParseError, EOF, WSPACE -from parser import ParserBase - -class MimeTypeTests(ParserBase, Test): - - EXCLUDE = (WSPACE, EOF) - - def do_lex(self, st): - return lex(st) - - def do_parse(self, st): - return parse(st) - - def valid(self, addr, type=None, subtype=None, parameters=None): - ParserBase.valid(self, addr, (type, subtype, parameters)) - - def testTypeOnly(self): - self.invalid("type", "expecting SLASH, got EOF line:1,4:type") - - def testTypeSubtype(self): - self.valid("type/subtype", "type", "subtype", []) - - def testTypeSubtypeParam(self): - self.valid("type/subtype ; name=value", - "type", "subtype", [("name", "value")]) - - def testTypeSubtypeParamComment(self): - self.valid("type/subtype ; name(This is a comment.)=value", - "type", "subtype", [("name", "value")]) - - def testMultipleParams(self): - self.valid("type/subtype ; name1=value1 ; name2=value2", - "type", "subtype", [("name1", "value1"), ("name2", "value2")]) - - def testCaseInsensitivity(self): - self.valid("Type/Subtype", "type", "subtype", []) diff --git a/python/qpid/tests/parser.py b/python/qpid/tests/parser.py deleted file mode 100644 index a4865cc9fe..0000000000 --- a/python/qpid/tests/parser.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.parser import ParseError - -class ParserBase: - - def lex(self, addr, *types): - toks = [t.type for t in self.do_lex(addr) if t.type not in self.EXCLUDE] - assert list(types) == toks, "expected %s, got %s" % (types, toks) - - def valid(self, addr, expected): - got = self.do_parse(addr) - assert expected == got, "expected %s, got %s" % (expected, got) - - def invalid(self, addr, error=None): - try: - p = self.do_parse(addr) - assert False, "invalid address parsed: %s" % p - except ParseError, e: - assert error == str(e), "expected %r, got %r" % (error, str(e)) diff --git a/python/qpid/tests/queue.py b/python/qpid/tests/queue.py deleted file mode 100644 index e12354eb43..0000000000 --- a/python/qpid/tests/queue.py +++ /dev/null @@ -1,71 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import threading, time -from unittest import TestCase -from qpid.queue import Queue, Empty, Closed - - -class QueueTest (TestCase): - - # The qpid queue class just provides sime simple extensions to - # python's standard queue data structure, so we don't need to test - # all the queue functionality. - - def test_listen(self): - values = [] - heard = threading.Event() - def listener(x): - values.append(x) - heard.set() - - q = Queue(0) - q.listen(listener) - heard.clear() - q.put(1) - heard.wait() - assert values[-1] == 1 - heard.clear() - q.put(2) - heard.wait() - assert values[-1] == 2 - - q.listen(None) - q.put(3) - assert q.get(3) == 3 - q.listen(listener) - - heard.clear() - q.put(4) - heard.wait() - assert values[-1] == 4 - - def test_close(self): - q = Queue(0) - q.put(1); q.put(2); q.put(3); q.close() - assert q.get() == 1 - assert q.get() == 2 - assert q.get() == 3 - for i in range(10): - try: - q.get() - raise AssertionError("expected Closed") - except Closed: - pass diff --git a/python/qpid/tests/spec010.py b/python/qpid/tests/spec010.py deleted file mode 100644 index ac04e1ee02..0000000000 --- a/python/qpid/tests/spec010.py +++ /dev/null @@ -1,74 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, tempfile, shutil, stat -from unittest import TestCase -from qpid.codec010 import Codec, StringCodec -from qpid.ops import * - -class SpecTest(TestCase): - - def testSessionHeader(self): - sc = StringCodec() - sc.write_compound(Header(sync=True)) - assert sc.encoded == "\x01\x01" - - sc = StringCodec() - sc.write_compound(Header(sync=False)) - assert sc.encoded == "\x01\x00" - - def encdec(self, value): - sc = StringCodec() - sc.write_compound(value) - decoded = sc.read_compound(value.__class__) - return decoded - - def testMessageProperties(self): - props = MessageProperties(content_length=3735928559L, - reply_to=ReplyTo(exchange="the exchange name", - routing_key="the routing key")) - dec = self.encdec(props) - assert props.content_length == dec.content_length - assert props.reply_to.exchange == dec.reply_to.exchange - assert props.reply_to.routing_key == dec.reply_to.routing_key - - def testMessageSubscribe(self): - cmd = MessageSubscribe(exclusive=True, destination="this is a test") - dec = self.encdec(cmd) - assert cmd.exclusive == dec.exclusive - assert cmd.destination == dec.destination - - def testXid(self): - sc = StringCodec() - xid = Xid(format=0, global_id="gid", branch_id="bid") - sc.write_compound(xid) - assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid' - dec = sc.read_compound(Xid) - assert xid.__dict__ == dec.__dict__ - -# def testLoadReadOnly(self): -# spec = "amqp.0-10-qpid-errata.xml" -# f = testrunner.get_spec_file(spec) -# dest = tempfile.mkdtemp() -# shutil.copy(f, dest) -# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) -# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) -# fname = os.path.join(dest, spec) -# load(fname) -# assert not os.path.exists("%s.pcl" % fname) |