diff options
Diffstat (limited to 'python/qpid/tests/codec.py')
-rw-r--r-- | python/qpid/tests/codec.py | 601 |
1 files changed, 0 insertions, 601 deletions
diff --git a/python/qpid/tests/codec.py b/python/qpid/tests/codec.py deleted file mode 100644 index 8fd0528636..0000000000 --- a/python/qpid/tests/codec.py +++ /dev/null @@ -1,601 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest -from qpid.codec import Codec -from qpid.spec08 import load -from cStringIO import StringIO -from qpid.reference import ReferenceId - -__doc__ = """ - - This is a unit test script for qpid/codec.py - - It can be run standalone or as part of the existing test framework. - - To run standalone: - ------------------- - - Place in the qpid/python/tests/ directory and type... - - python codec.py - - A brief output will be printed on screen. The verbose output will be placed inn a file called - codec_unit_test_output.txt. [TODO: make this filename configurable] - - To run as part of the existing test framework: - ----------------------------------------------- - - python run-tests tests.codec - - Change History: - ----------------- - Jimmy John 05/19/2007 Initial draft - Jimmy John 05/22/2007 Implemented comments by Rafael Schloming - - -""" - -from qpid.specs_config import amqp_spec_0_8 -SPEC = load(amqp_spec_0_8) - -# -------------------------------------- -# -------------------------------------- -class BaseDataTypes(unittest.TestCase): - - - """ - Base class containing common functions - """ - - # --------------- - def setUp(self): - """ - standard setUp for unitetest (refer unittest documentation for details) - """ - self.codec = Codec(StringIO(), SPEC) - - # ------------------ - def tearDown(self): - """ - standard tearDown for unitetest (refer unittest documentation for details) - """ - self.codec.stream.flush() - self.codec.stream.close() - - # ---------------------------------------- - def callFunc(self, functionName, *args): - """ - helper function - given a function name and arguments, calls the function with the args and - returns the contents of the stream - """ - getattr(self.codec, functionName)(args[0]) - return self.codec.stream.getvalue() - - # ---------------------------------------- - def readFunc(self, functionName, *args): - """ - helper function - creates a input stream and then calls the function with arguments as have been - supplied - """ - self.codec.stream = StringIO(args[0]) - return getattr(self.codec, functionName)() - - -# ---------------------------------------- -# ---------------------------------------- -class IntegerTestCase(BaseDataTypes): - - """ - Handles octet, short, long, long long - - """ - - # ------------------------- - def __init__(self, *args): - """ - sets constants for use in tests - """ - - BaseDataTypes.__init__(self, *args) - self.const_integer = 2 - self.const_integer_octet_encoded = '\x02' - self.const_integer_short_encoded = '\x00\x02' - self.const_integer_long_encoded = '\x00\x00\x00\x02' - self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02' - - # -------------------------- # - # Unsigned Octect - 8 bits # - # -------------------------- # - - # -------------------------- - def test_unsigned_octet(self): - """ - ubyte format requires 0<=number<=255 - """ - self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...') - - # ------------------------------------------- - def test_octet_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_octet, 256) - - # ------------------------------------------- - def test_uoctet_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_octet, -1) - - # --------------------------------- - def test_uoctet_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...') - - # ------------------------------------ - def test_unsigned_octet_decode(self): - """ - octet decoding - """ - self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...') - - # ----------------------------------- # - # Unsigned Short Integers - 16 bits # - # ----------------------------------- # - - # ----------------------- - def test_ushort_int(self): - """ - testing unsigned short integer - """ - self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...') - - # ------------------------------------------- - def test_ushort_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_short, 65536) - - # ------------------------------------------- - def test_ushort_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_short, -1) - - # --------------------------------- - def test_ushort_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...') - - # ------------------------------------ - def test_ushort_int_decode(self): - """ - unsigned short decoding - """ - self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...') - - - # ---------------------------------- # - # Unsigned Long Integers - 32 bits # - # ---------------------------------- # - - # ----------------------- - def test_ulong_int(self): - """ - testing unsigned long iteger - """ - self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...') - - # ------------------------------------------- - def test_ulong_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296) - - # ------------------------------------------- - def test_ulong_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_long, -1) - - # --------------------------------- - def test_ulong_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...') - - # ------------------------------- - def test_ulong_int_decode(self): - """ - unsigned long decoding - """ - self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...') - - - # --------------------------------------- # - # Unsigned Long Long Integers - 64 bits # - # --------------------------------------- # - - # ----------------------- - def test_ulong_long_int(self): - """ - testing unsinged long long integer - """ - self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...') - - # ------------------------------------------- - def test_ulong_long_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616) - - # ------------------------------------------- - def test_ulong_long_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_longlong, -1) - - # --------------------------------- - def test_ulong_long_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...') - - # ------------------------------------ - def test_ulong_long_int_decode(self): - """ - unsigned long long decoding - """ - self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...') - -# ----------------------------------- -# ----------------------------------- -class BitTestCase(BaseDataTypes): - - """ - Handles bits - """ - - # ---------------------------------------------- - def callFunc(self, functionName, *args): - """ - helper function - """ - for ele in args: - getattr(self.codec, functionName)(ele) - - self.codec.flush() - return self.codec.stream.getvalue() - - # ------------------- - def test_bit1(self): - """ - sends in 11 - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...') - - # ------------------- - def test_bit2(self): - """ - sends in 10011 - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...') - - # ------------------- - def test_bit3(self): - """ - sends in 1110100111 [10 bits(right to left), should be compressed into two octets] - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...') - - # ------------------------------------ - def test_bit_decode_1(self): - """ - decode bit 1 - """ - self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...') - - # ------------------------------------ - def test_bit_decode_0(self): - """ - decode bit 0 - """ - self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...') - -# ----------------------------------- -# ----------------------------------- -class StringTestCase(BaseDataTypes): - - """ - Handles short strings, long strings - """ - - # ------------------------------------------------------------- # - # Short Strings - 8 bit length followed by zero or more octets # - # ------------------------------------------------------------- # - - # --------------------------------------- - def test_short_string_zero_length(self): - """ - 0 length short string - """ - self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...') - - # ------------------------------------------- - def test_short_string_positive_length(self): - """ - positive length short string - """ - self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...') - - # ------------------------------------------- - def test_short_string_out_of_upper_range(self): - """ - string length > 255 - """ - self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256) - - # ------------------------------------ - def test_short_string_decode(self): - """ - short string decode - """ - self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...') - - - # ------------------------------------------------------------- # - # Long Strings - 32 bit length followed by zero or more octets # - # ------------------------------------------------------------- # - - # --------------------------------------- - def test_long_string_zero_length(self): - """ - 0 length long string - """ - self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...') - - # ------------------------------------------- - def test_long_string_positive_length(self): - """ - positive length long string - """ - self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...') - - # ------------------------------------ - def test_long_string_decode(self): - """ - long string decode - """ - self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...') - - -# -------------------------------------- -# -------------------------------------- -class TimestampTestCase(BaseDataTypes): - - """ - No need of any test cases here as timestamps are implemented as long long which is tested above - """ - pass - -# --------------------------------------- -# --------------------------------------- -class FieldTableTestCase(BaseDataTypes): - - """ - Handles Field Tables - - Only S/I type messages seem to be implemented currently - """ - - # ------------------------- - def __init__(self, *args): - """ - sets constants for use in tests - """ - - BaseDataTypes.__init__(self, *args) - self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'} - self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1' - - # ------------------------------------------- - def test_field_table_name_value_pair(self): - """ - valid name value pair - """ - self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...') - - # --------------------------------------------------- - def test_field_table_multiple_name_value_pair(self): - """ - multiple name value pair - """ - self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...') - - # ------------------------------------ - def test_field_table_decode(self): - """ - field table decode - """ - self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...') - - -# ------------------------------------ -# ------------------------------------ -class ContentTestCase(BaseDataTypes): - - """ - Handles Content data types - """ - - # ----------------------------- - def test_content_inline(self): - """ - inline content - """ - self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...') - - # -------------------------------- - def test_content_reference(self): - """ - reference content - """ - self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...') - - # ------------------------------------ - def test_content_inline_decode(self): - """ - inline content decode - """ - self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...') - - # ------------------------------------ - def test_content_reference_decode(self): - """ - reference content decode - """ - self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...') - -# ------------------------ # -# Pre - existing test code # -# ------------------------ # - -# --------------------- -def test(type, value): - """ - old test function cut/copy/paste from qpid/codec.py - """ - if isinstance(value, (list, tuple)): - values = value - else: - values = [value] - stream = StringIO() - codec = Codec(stream, SPEC) - for v in values: - codec.encode(type, v) - codec.flush() - enc = stream.getvalue() - stream.reset() - dup = [] - for i in xrange(len(values)): - dup.append(codec.decode(type)) - if values != dup: - raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) - -# ----------------------- -def dotest(type, value): - """ - old test function cut/copy/paste from qpid/codec.py - """ - args = (type, value) - test(*args) - -# ------------- -def oldtests(): - """ - old test function cut/copy/paste from qpid/codec.py - """ - for value in ("1", "0", "110", "011", "11001", "10101", "10011"): - for i in range(10): - dotest("bit", map(lambda x: x == "1", value*i)) - - for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): - dotest("table", value) - - for type in ("octet", "short", "long", "longlong"): - for value in range(0, 256): - dotest(type, value) - - for type in ("shortstr", "longstr"): - for value in ("", "a", "asdf"): - dotest(type, value) - -# ----------------------------------------- -class oldTests(unittest.TestCase): - - """ - class to handle pre-existing test cases - """ - - # --------------------------- - def test_oldtestcases(self): - """ - call the old tests - """ - return oldtests() - -# --------------------------- -# --------------------------- -if __name__ == '__main__': - - codec_test_suite = unittest.TestSuite() - - #adding all the test suites... - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase)) - - #loading pre-existing test case from qpid/codec.py - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests)) - - run_output_stream = StringIO() - test_runner = unittest.TextTestRunner(run_output_stream, '', '') - test_result = test_runner.run(codec_test_suite) - - print '\n%d test run...' % (test_result.testsRun) - - if test_result.wasSuccessful(): - print '\nAll tests successful\n' - - if test_result.failures: - print '\n----------' - print '%d FAILURES:' % (len(test_result.failures)) - print '----------\n' - for failure in test_result.failures: - print str(failure[0]) + ' ... FAIL' - - if test_result.errors: - print '\n---------' - print '%d ERRORS:' % (len(test_result.errors)) - print '---------\n' - - for error in test_result.errors: - print str(error[0]) + ' ... ERROR' - - f = open('codec_unit_test_output.txt', 'w') - f.write(str(run_output_stream.getvalue())) - f.close() |