diff options
| author | Alan Conway <aconway@apache.org> | 2014-04-07 21:22:55 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-04-07 21:22:55 +0000 |
| commit | a6f044f40d70b73c320cc909169e3518909365e2 (patch) | |
| tree | a23df30ce4b52f1d23e55a0afa0e90c1fa20ffd6 /qpid/cpp/src/tests/legacystore | |
| parent | 8ea0f79d78edd0a0825547ecc618e3fa63a2b93f (diff) | |
| download | qpid-python-a6f044f40d70b73c320cc909169e3518909365e2.tar.gz | |
QPID-5560: HA tests do not use AMQP 1.0
The HA tests were using only AMQP 0-10.
Modified the tests to use AMQP 1.0 if available (still use 0-10 if 1.0 is not available)
Fixed bugs uncovered both in the tests and in the AMQP 1.0 implementation.
Summary of changes:
- brokertest.py: configurable support for of swig vs. native and amqp0-10 vs. 1.0
- default to swig+amqp1.0 if swig is available, native+amqp0-10 otherwise
- qpidtoollibs/broker.py: enable use of swig client with BrokerAgent
- Swig python client:
- support for passing client_properties/properties.
- expose AddressHelper pn_data read/write as PnData helper class
- set sender/receiver capacity on creation
- limited disposition support - rejected messages.
- support for additional timeout parameters
- expose messaging::Logger, allow log configuration to be set from python.
- ha_tests.py:
- bind, delete policies not supported by AMQP 1.0, switched to using BrokerAgent QMF.
- pass protocol:amqp1.0 connection-option to c++ test clients (qpid-send, qpid-receive)
- TX tests forsce use of 0-10 protocol (but still with Swig client if enabled.)
- Broker fixes:
- Queue::Settings::isTemporary was set in the 0-10 SessionAdapter, moved to Broker::createQueue.
- broker::amqp::Session was always setting an exclusive owner in createQueue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1585588 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/legacystore')
3 files changed, 89 insertions, 80 deletions
diff --git a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py index 3c62740a62..37c12601be 100644 --- a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py +++ b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py @@ -17,15 +17,20 @@ # under the License. # +import os + from brokertest import EXPECT_EXIT_OK from store_test import StoreTest, Qmf, store_args from qpid.messaging import * +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # FIXME aconway 2014-04-04: Tests fail with SWIG client. + class ExchangeQueueTests(StoreTest): """ Simple tests of the broker exchange and queue types """ - + def test_direct_exchange(self): """Test Direct exchange.""" broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK) @@ -34,11 +39,11 @@ class ExchangeQueueTests(StoreTest): broker.send_message("a", msg1) broker.send_message("b", msg2) broker.terminate() - + broker = self.broker(store_args(), name="test_direct_exchange") self.check_message(broker, "a", msg1, True) self.check_message(broker, "b", msg2, True) - + def test_topic_exchange(self): """Test Topic exchange.""" broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK) @@ -56,17 +61,17 @@ class ExchangeQueueTests(StoreTest): msg2 = Message("Message2", durable=True, correlation_id="Msg0004") snd2.send(msg2) broker.terminate() - + broker = self.broker(store_args(), name="test_topic_exchange") self.check_message(broker, "a", msg1, True) self.check_message(broker, "b", msg1, True) self.check_messages(broker, "c", [msg1, msg2], True) self.check_message(broker, "d", msg2, True) self.check_message(broker, "e", msg2, True) - - + + def test_legacy_lvq(self): - """Test legacy LVQ.""" + """Test legacy LVQ.""" broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"}) ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"}) @@ -77,7 +82,7 @@ class ExchangeQueueTests(StoreTest): broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1], xprops="arguments:{\"qpid.last_value_queue\":True}") broker.terminate() - + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False) # Add more messages while subscriber is active (no replacement): @@ -89,11 +94,11 @@ class ExchangeQueueTests(StoreTest): broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn) ssn.acknowledge() broker.terminate() - + broker = self.broker(store_args(), name="test_lvq") self.check_messages(broker, "lvq-test", [ma4, mc4], True) - - + + def test_fanout_exchange(self): """Test Fanout Exchange""" broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK) @@ -107,7 +112,7 @@ class ExchangeQueueTests(StoreTest): msg2 = Message("Msg2", durable=True, correlation_id="Msg0002") snd.send(msg2) broker.terminate() - + broker = self.broker(store_args(), name="test_fanout_exchange") self.check_messages(broker, "q1", [msg1, msg2], True) self.check_messages(broker, "q2", [msg1, msg2], True) @@ -124,18 +129,18 @@ class ExchangeQueueTests(StoreTest): m2 = rcv.fetch() ssn.acknowledge(message=m2, disposition=Disposition(REJECTED)) broker.terminate() - + broker = self.broker(store_args(), name="test_message_reject") qmf = Qmf(broker) assert qmf.queue_message_count("tmr") == 0 - + def test_route(self): """ Test the recovery of a route (link and bridge objects.""" broker = self.broker(store_args(), name="test_route", expect=EXPECT_EXIT_OK) qmf = Qmf(broker) qmf_broker_obj = qmf.get_objects("broker")[0] - + # create a "link" link_args = {"host":"a.fake.host.com", "port":9999, "durable":True, "authMechanism":"PLAIN", "username":"guest", "password":"guest", @@ -143,16 +148,16 @@ class ExchangeQueueTests(StoreTest): result = qmf_broker_obj.create("link", "test-link", link_args, False) self.assertEqual(result.status, 0, result) link = qmf.get_objects("link")[0] - + # create bridge bridge_args = {"link":"test-link", "src":"amq.direct", "dest":"amq.fanout", "key":"my-key", "durable":True} result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False); self.assertEqual(result.status, 0, result) bridge = qmf.get_objects("bridge")[0] - + broker.terminate() - + # recover the link and bridge broker = self.broker(store_args(), name="test_route") qmf = Qmf(broker) @@ -189,7 +194,7 @@ class AlternateExchangePropertyTests(StoreTest): self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"), "Alternate exchange property not found or is incorrect on exchange \"testExch\".") qmf.close() - + def test_queue(self): """Queue alternate exchange property persistexchangeNamece test""" broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK) @@ -226,7 +231,7 @@ class RedeliveredTests(StoreTest): msg = Message(msg_content, durable=True) broker.send_message("testQueue", msg) broker.terminate() - + broker = self.broker(store_args(), name="test_broker_recovery") rcv_msg = broker.get_message("testQueue") self.assertEqual(msg_content, rcv_msg.content) diff --git a/qpid/cpp/src/tests/legacystore/python_tests/resize.py b/qpid/cpp/src/tests/legacystore/python_tests/resize.py index 469e0f6730..e719b755da 100644 --- a/qpid/cpp/src/tests/legacystore/python_tests/resize.py +++ b/qpid/cpp/src/tests/legacystore/python_tests/resize.py @@ -26,8 +26,11 @@ from qpid.datatypes import uuid4 from store_test import StoreTest, store_args from qpid.messaging import Message +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + class ResizeTest(StoreTest): - + resize_tool = os.getenv("QPID_STORE_RESIZE_TOOL", "qpid-store-resize") print resize_tool def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail): @@ -47,21 +50,21 @@ class ResizeTest(StoreTest): finally: p.stdout.close() return res - + def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8, init_file_size = 24, exp_fail = False, wait_time = None): # Using a sender will force the creation of an empty persistent queue which is needed for some tests broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time) ssn = broker.connect().session() snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name) - + msgs = [] for index in range(0, num_msgs): msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index) msgs.append(msg) snd.send(msg) broker.terminate() - + res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files, resize_file_size, exp_fail) if res != 0: @@ -70,95 +73,95 @@ class ResizeTest(StoreTest): self.fail("ERROR: Resize operation failed with return code %d" % res) elif exp_fail: self.fail("ERROR: Resize operation succeeded, but a failure was expected") - + broker = self.broker(store_args(), name="broker") self.check_messages(broker, queue_name, msgs, True) - - # TODO: Check the physical files to check number and size are as expected. + + # TODO: Check the physical files to check number and size are as expected. class SimpleTest(ResizeTest): """ Simple tests of the resize utility for resizing a journal to larger and smaller sizes. """ - + def test_empty_store_same(self): self._resize_test(queue_name = "empty_store_same", num_msgs = 0, msg_size = 0, init_num_files = 8, init_file_size = 24, resize_num_files = 8, resize_file_size = 24) - + def test_empty_store_up(self): self._resize_test(queue_name = "empty_store_up", num_msgs = 0, msg_size = 0, init_num_files = 8, init_file_size = 24, resize_num_files = 16, resize_file_size = 48) - + def test_empty_store_down(self): self._resize_test(queue_name = "empty_store_down", num_msgs = 0, msg_size = 0, init_num_files = 8, init_file_size = 24, resize_num_files = 6, resize_file_size = 12) - -# TODO: Put into long tests, make sure there is > 128GB free disk space + +# TODO: Put into long tests, make sure there is > 128GB free disk space # def test_empty_store_max(self): # self._resize_test(queue_name = "empty_store_max", # num_msgs = 0, msg_size = 0, # init_num_files = 8, init_file_size = 24, # resize_num_files = 64, resize_file_size = 32768, # wait_time = 120) - + def test_empty_store_min(self): self._resize_test(queue_name = "empty_store_min", num_msgs = 0, msg_size = 0, init_num_files = 8, init_file_size = 24, resize_num_files = 4, resize_file_size = 1) - + def test_basic_up(self): self._resize_test(queue_name = "basic_up", num_msgs = 100, msg_size = 10000, init_num_files = 8, init_file_size = 24, resize_num_files = 16, resize_file_size = 48) - + def test_basic_down(self): self._resize_test(queue_name = "basic_down", num_msgs = 100, msg_size = 10000, init_num_files = 8, init_file_size = 24, resize_num_files = 4, resize_file_size = 15) - + def test_basic_low(self): self._resize_test(queue_name = "basic_low", num_msgs = 100, msg_size = 10000, init_num_files = 8, init_file_size = 24, resize_num_files = 4, resize_file_size = 4, exp_fail = True) - + def test_basic_under(self): self._resize_test(queue_name = "basic_under", num_msgs = 100, msg_size = 10000, init_num_files = 8, init_file_size = 24, resize_num_files = 4, resize_file_size = 3, exp_fail = True) - + def test_very_large_msg_up(self): self._resize_test(queue_name = "very_large_msg_up", num_msgs = 4, msg_size = 2000000, init_num_files = 8, init_file_size = 24, resize_num_files = 16, resize_file_size = 48) - + def test_very_large_msg_down(self): self._resize_test(queue_name = "very_large_msg_down", num_msgs = 4, msg_size = 2000000, init_num_files = 16, init_file_size = 64, resize_num_files = 16, resize_file_size = 48) - + def test_very_large_msg_low(self): self._resize_test(queue_name = "very_large_msg_low", num_msgs = 4, msg_size = 2000000, init_num_files = 8, init_file_size = 24, resize_num_files = 7, resize_file_size = 20, exp_fail = True) - + def test_very_large_msg_under(self): self._resize_test(queue_name = "very_large_msg_under", num_msgs = 4, msg_size = 2000000, diff --git a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py index 2fcab4e38e..cc846aefd4 100644 --- a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py +++ b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py @@ -22,10 +22,13 @@ from brokertest import BrokerTest from qpid.messaging import Empty from qmf.console import Session - +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + + def store_args(store_dir = None): """Return the broker args necessary to load the async store""" - assert BrokerTest.store_lib + assert BrokerTest.store_lib if store_dir == None: return [] return ["--store-dir", store_dir] @@ -51,7 +54,7 @@ class Qmf: else: amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable, arguments=arguments) - + def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None): """Add a new queue""" amqp_session = self.__broker.getAmqpSession() @@ -62,7 +65,7 @@ class Qmf: durable=durable, arguments=arguments) else: amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments) - + def delete_queue(self, queue_name): """Delete an existing queue""" amqp_session = self.__broker.getAmqpSession() @@ -84,24 +87,24 @@ class Qmf: return found except Exception: return False - + def query_exchange(self, exchange_name, alt_exchange_name=None): """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known value.""" return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name) - + def query_queue(self, queue_name, alt_exchange_name=None): """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known value.""" return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name) - + def queue_message_count(self, queue_name): """Query the number of messages on a queue""" queue_list = self.__session.getObjects(_class="queue", _name=queue_name) if len(queue_list): return queue_list[0].msgDepth - + def queue_empty(self, queue_name): """Check if a queue is empty (has no messages waiting)""" return self.queue_message_count(queue_name) == 0 @@ -109,7 +112,7 @@ class Qmf: def get_objects(self, target_class, target_package="org.apache.qpid.broker"): return self.__session.getObjects(_class=target_class, _package=target_package) - + def close(self): self.__session.delBroker(self.__broker) self.__session = None @@ -119,7 +122,7 @@ class StoreTest(BrokerTest): """ This subclass of BrokerTest adds some convenience test/check functions """ - + def _chk_empty(self, queue, receiver): """Check if a queue is empty (has no more messages)""" try: @@ -141,9 +144,9 @@ class StoreTest(BrokerTest): else: buff += chr(ord('a') + (index % 26)) return buff + msg - + # Functions for formatting address strings - + @staticmethod def _fmt_csv(string_list, list_braces = None): """Format a list using comma-separation. Braces are optionally added.""" @@ -163,16 +166,16 @@ class StoreTest(BrokerTest): if list_braces != None: str_ += list_braces[1] return str_ - + def _fmt_map(self, string_list): """Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map element('key:val').""" - return self._fmt_csv(string_list, list_braces="{}") - + return self._fmt_csv(string_list, list_braces="{}") + def _fmt_list(self, string_list): """Format a list [l1, l2, l3, ...] from a string list.""" - return self._fmt_csv(string_list, list_braces="[]") - + return self._fmt_csv(string_list, list_braces="[]") + def addr_fmt(self, node_name, **kwargs): """Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address string.""" @@ -190,12 +193,12 @@ class StoreTest(BrokerTest): x_declare_list = kwargs.get("x_declare_list", []) x_bindings_list = kwargs.get("x_bindings_list", []) x_subscribe_list = kwargs.get("x_subscribe_list", []) - + node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0) link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or len(x_bindings_list) > 0 or len(x_subscribe_list) > 0) assert not (node_flag and link_flag) - + opt_str_list = [] if create_policy != None: opt_str_list.append("create: %s" % create_policy) @@ -231,7 +234,7 @@ class StoreTest(BrokerTest): if len(opt_str_list) > 0: addr_str += "; %s" % self._fmt_map(opt_str_list) return addr_str - + def snd_addr(self, node_name, **kwargs): """ Create a send (node) address""" # Get keyword args @@ -245,7 +248,7 @@ class StoreTest(BrokerTest): ftd_size = kwargs.get("ftd_size") policy = kwargs.get("policy", "flow-to-disk") exchage_type = kwargs.get("exchage_type") - + create_policy = None if auto_create: create_policy = "always" @@ -265,10 +268,10 @@ class StoreTest(BrokerTest): x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy)) if exchage_type != None: x_declare_list.append("type: %s" % exchage_type) - + return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy, node_type=node_type, durable=durable, x_declare_list=x_declare_list) - + def rcv_addr(self, node_name, **kwargs): """ Create a receive (link) address""" # Get keyword args @@ -282,7 +285,7 @@ class StoreTest(BrokerTest): ftd_count = kwargs.get("ftd_count") ftd_size = kwargs.get("ftd_size") policy = kwargs.get("policy", "flow-to-disk") - + create_policy = None if auto_create: create_policy = "always" @@ -291,7 +294,7 @@ class StoreTest(BrokerTest): delete_policy = "always" mode = None if browse: - mode = "browse" + mode = "browse" x_declare_list = ["\"exclusive\": %s" % exclusive] if ftd_count != None or ftd_size != None: queue_policy = ["\'qpid.policy_type\': %s" % policy] @@ -308,11 +311,11 @@ class StoreTest(BrokerTest): return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True, link_name=link_name, durable=durable, x_declare_list=x_declare_list, x_bindings_list=x_bindings_list, link_reliability=reliability) - + def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False): """Check that a message is on a queue by dequeuing it and comparing it to the expected message""" return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse) - + def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False, emtpy_flag=False): """Check that messages is on a queue by dequeuing them and comparing them to the expected messages""" @@ -341,8 +344,8 @@ class StoreTest(BrokerTest): if transactional: ssn.commit() return ssn - - + + # Functions for finding strings in the broker log file (or other files) @staticmethod @@ -353,7 +356,7 @@ class StoreTest(BrokerTest): return file_handle.read() finally: file_handle.close() - + def _get_hits(self, broker, search): """Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple queues)""" @@ -361,9 +364,9 @@ class StoreTest(BrokerTest): hits = [] for hit in search.findall(self._read_file(broker.log)): if hit not in hits: - hits.append(hit) + hits.append(hit) return hits - + def _reconsile_hits(self, broker, ftd_msgs, release_hits): """Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining release_hits.""" @@ -382,35 +385,33 @@ class StoreTest(BrokerTest): for hit in release_hits: err += " %s\n" % hit self.assert_(False, err) - + def check_msg_release(self, broker, ftd_msgs): """ Check for 'Content released' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content released$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - + def check_msg_release_on_commit(self, broker, ftd_msgs): """ Check for 'Content released on commit' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content released on commit$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - + def check_msg_release_on_recover(self, broker, ftd_msgs): """ Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content released after recovery$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - + def check_msg_block(self, broker, ftd_msgs): """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content release blocked$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - + def check_msg_block_on_commit(self, broker, ftd_msgs): """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content release blocked on commit$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - - |
