diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/tests/federation.py | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/federation.py')
-rwxr-xr-x | cpp/src/tests/federation.py | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index dcd074eda9..6477c6effd 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -2604,3 +2604,109 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_dynamic_bounce_unbinds_named_queue(self): + """ Verify that a propagated binding is removed when the connection is + bounced + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers[0:2]: + _b.client_session.exchange_declare(exchange="fedX", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + timeout = time() + 10 + while my_exchange is None and time() <= timeout: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX": + my_exchange = ooo + break + if my_exchange is None: + self.fail("QMF failed to find new exchange!") + exchanges.append(my_exchange) + + # on the destination broker, create a binding for propagation + self._brokers[0].client_session.queue_declare(queue="fedDstQ") + self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud") + + # on the source broker, create a bridge queue + self._brokers[1].client_session.queue_declare(queue="fedSrcQ") + + # connect B1 --> B0 + result = self._brokers[0].qmf_object.create( "link", + "Link-dynamic", + {"host":self._brokers[1].host, + "port":self._brokers[1].port}, False) + self.assertEqual(result.status, 0) + + # bridge the "fedX" exchange: + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-dynamic", + {"link":"Link-dynamic", + "src":"fedX", + "dest":"fedX", + "dynamic":True, + "queue":"fedSrcQ"}, False) + self.assertEqual(result.status, 0) + + # wait for the inter-broker links to become operational + operational = False + timeout = time() + 10 + while not operational and time() <= timeout: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + self.failUnless(operational, "inter-broker links failed to become operational.") + + # wait until the binding key has propagated to the src broker + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount < 1 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 1) + + # + # Tear down the bridges between the two exchanges, then wait + # for the bindings to be cleaned up + # + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount != 0 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 0) + + self._brokers[1].client_session.queue_delete(queue="fedSrcQ") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers[0:2]: + _b.client_session.exchange_delete(exchange="fedX") + + self._teardown_brokers() + + self.verify_cleanup() |