diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/tests/federation.py | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/federation.py')
-rwxr-xr-x | cpp/src/tests/federation.py | 398 |
1 files changed, 380 insertions, 18 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 973a1d366c..7d613b98ce 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -23,7 +23,7 @@ from qpid.testlib import TestBase010 from qpid.datatypes import Message from qpid.queue import Empty from qpid.util import URL -from time import sleep +from time import sleep, time class _FedBroker(object): @@ -111,18 +111,18 @@ class FederationTests(TestBase010): broker = qmf.getObjects(_class="broker")[0] result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0) - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] result = bridge.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) result = link.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) self.verify_cleanup() @@ -133,11 +133,11 @@ class FederationTests(TestBase010): qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] @@ -165,9 +165,9 @@ class FederationTests(TestBase010): except Empty: None result = bridge.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) result = link.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) self.verify_cleanup() @@ -178,11 +178,11 @@ class FederationTests(TestBase010): qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0) - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] @@ -209,9 +209,9 @@ class FederationTests(TestBase010): except Empty: None result = bridge.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) result = link.close() - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) self.verify_cleanup() @@ -236,11 +236,11 @@ class FederationTests(TestBase010): qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) - self.assertEqual(result.status, 0) + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] sleep(3) @@ -262,6 +262,63 @@ class FederationTests(TestBase010): except Empty: None result = bridge.close() + self.assertEqual(result.status, 0, result) + result = link.close() + self.assertEqual(result.status, 0, result) + + self.verify_cleanup() + + def test_pull_from_queue_recovery(self): + session = self.session + + #setup queue on remote broker and add some messages + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_pull_from_queue_recovery") + r_session.queue_declare(queue="my-bridge-queue", auto_delete=True) + for i in range(1, 6): + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) + + #setup queue to receive messages from local broker + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0, result) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) + self.assertEqual(result.status, 0, result) + + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + #recreate the remote bridge queue to invalidate the bridge session + r_session.queue_delete (queue="my-bridge-queue", if_empty=False, if_unused=False) + r_session.queue_declare(queue="my-bridge-queue", auto_delete=True) + + #add some more messages (i.e. after bridge was created) + for i in range(6, 11): + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + try: + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + except Empty: + self.fail("Failed to find expected message containing 'Message %d'" % i) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() self.assertEqual(result.status, 0) result = link.close() self.assertEqual(result.status, 0) @@ -649,10 +706,17 @@ class FederationTests(TestBase010): self.verify_cleanup() - def test_dynamic_headers(self): + def test_dynamic_headers_any(self): + self.do_test_dynamic_headers('any') + + def test_dynamic_headers_all(self): + self.do_test_dynamic_headers('all') + + + def do_test_dynamic_headers(self, match_mode): session = self.session r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) - r_session = r_conn.session("test_dynamic_headers") + r_session = r_conn.session("test_dynamic_headers_%s" % match_mode) session.exchange_declare(exchange="fed.headers", type="headers") r_session.exchange_declare(exchange="fed.headers", type="headers") @@ -671,7 +735,7 @@ class FederationTests(TestBase010): sleep(5) session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) - session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':match_mode, 'class':'first'}) self.subscribe(queue="fed1", destination="f1") queue = session.incoming("f1") @@ -1791,3 +1855,301 @@ class FederationTests(TestBase010): if headers: return headers[name] return None + + def test_dynamic_topic_bounce(self): + """ Bounce the connection between federated Topic Exchanges. + """ + class Params: + def exchange_type(self): return "topic" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, + binding_key="spud.*") + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud.*") + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud.boy") + + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_direct_bounce(self): + """ Bounce the connection between federated Direct Exchanges. + """ + class Params: + def exchange_type(self): return "direct" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, binding_key="spud") + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud") + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud") + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_fanout_bounce(self): + """ Bounce the connection between federated Fanout Exchanges. + """ + class Params: + def exchange_type(self): return "fanout" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename) + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename) + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud") + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_headers_bounce(self): + """ Bounce the connection between federated Headers Exchanges. + """ + class Params: + def exchange_type(self): return "headers" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, + binding_key="spud", arguments={'x-match':'any', 'class':'first'}) + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud") + def delivery_properties(self, ssn): + return ssn.message_properties(application_headers={'class':'first'}) + ## @todo KAG - re-enable once federation bugs with headers exchanges + ## are fixed. + #self.generic_dynamic_bounce_test(Params()) + return + + + def generic_dynamic_bounce_test(self, params): + """ Verify that a federated broker can maintain a binding to a local + queue using the same key as a remote binding. Destroy and reconnect + the federation link, and verify routes are restored correctly. + See QPID-3170. + Topology: + + Queue1 <---"Key"---B0<==[Federated Exchange]==>B1---"Key"--->Queue2 + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers[0:2]: + _b.client_session.exchange_declare(exchange="fedX", type=params.exchange_type()) + self.assertEqual(_b.client_session.exchange_query(name="fedX").type, + params.exchange_type(), "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + timeout = time() + 10 + while my_exchange is None and time() <= timeout: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX": + my_exchange = ooo + break + if my_exchange is None: + self.fail("QMF failed to find new exchange!") + exchanges.append(my_exchange) + + # + # on each broker, create a local queue bound to the exchange with the + # same key value. + # + + self._brokers[0].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + params.bind_queue(self._brokers[0].client_session, "fedX1", "fedX") + self.subscribe(self._brokers[0].client_session, queue="fedX1", destination="f1") + queue_0 = self._brokers[0].client_session.incoming("f1") + + self._brokers[1].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + params.bind_queue(self._brokers[1].client_session, "fedX1", "fedX") + self.subscribe(self._brokers[1].client_session, queue="fedX1", destination="f1") + queue_1 = self._brokers[1].client_session.incoming("f1") + + # now federate the two brokers + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B0 + result = self._brokers[0].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX", # src + "fedX", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # wait for all the inter-broker links to become operational + operational = False + timeout = time() + 10 + while not operational and time() <= timeout: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + self.failUnless(operational, "inter-broker links failed to become operational.") + + # @todo - There is no way to determine when the bridge objects become + # active. + + # wait until the binding key has propagated to each broker - each + # broker should see 2 bindings (1 local, 1 remote) + + binding_counts = [2, 2] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount < binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # send 10 msgs to B0 + for i in range(1, 11): + # dp = self._brokers[0].client_session.delivery_properties(routing_key=params.routing_key()) + dp = params.delivery_properties(self._brokers[0].client_session) + self._brokers[0].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i)) + + # get exactly 10 msgs on B0's local queue and B1's queue + for i in range(1, 11): + try: + msg = queue_0.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + msg = queue_1.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + except Empty: + self.fail("Only got %d msgs - expected 10" % i) + try: + extra = queue_0.get(timeout=1) + self.fail("Got unexpected message in queue_0: " + extra.body) + except Empty: None + + try: + extra = queue_1.get(timeout=1) + self.fail("Got unexpected message in queue_1: " + extra.body) + except Empty: None + + # + # Tear down the bridges between the two exchanges, then wait + # for the bindings to be cleaned up + # + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + binding_counts = [1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # + # restore the bridges between the two exchanges, and wait for the + # bindings to propagate. + # + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX", # src + "fedX", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + binding_counts = [2, 2] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # + # verify traffic flows correctly + # + + for i in range(1, 11): + #dp = self._brokers[1].client_session.delivery_properties(routing_key=params.routing_key()) + dp = params.delivery_properties(self._brokers[1].client_session) + self._brokers[1].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i)) + + # get exactly 10 msgs on B0's queue and B1's queue + for i in range(1, 11): + try: + msg = queue_0.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + msg = queue_1.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + except Empty: + self.fail("Only got %d msgs - expected 10" % i) + try: + extra = queue_0.get(timeout=1) + self.fail("Got unexpected message in queue_0: " + extra.body) + except Empty: None + + try: + extra = queue_1.get(timeout=1) + self.fail("Got unexpected message in queue_1: " + extra.body) + except Empty: None + + + # + # cleanup + # + params.unbind_queue(self._brokers[0].client_session, "fedX1", "fedX") + self._brokers[0].client_session.message_cancel(destination="f1") + self._brokers[0].client_session.queue_delete(queue="fedX1") + + params.unbind_queue(self._brokers[1].client_session, "fedX1", "fedX") + self._brokers[1].client_session.message_cancel(destination="f1") + self._brokers[1].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers[0:2]: + _b.client_session.exchange_delete(exchange="fedX") + + self._teardown_brokers() + + self.verify_cleanup() + + |