diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-04-08 14:28:43 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-04-08 14:28:43 +0000 |
| commit | c296c9bd6d6090ad9321615d6fffa5372b5e0eb4 (patch) | |
| tree | b1f1d1257e6b87ed04e4b6f94211573d46389566 /qpid/cpp/src/tests/federation.py | |
| parent | fa799ec00a6b04444093710209143b9f6f58e303 (diff) | |
| download | qpid-python-c296c9bd6d6090ad9321615d6fffa5372b5e0eb4.tar.gz | |
QPID-3170: correct deletion of federation routes when keys match.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1090266 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/federation.py')
| -rwxr-xr-x | qpid/cpp/src/tests/federation.py | 300 |
1 files changed, 299 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 973a1d366c..201b06a4a2 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -23,7 +23,7 @@ from qpid.testlib import TestBase010 from qpid.datatypes import Message from qpid.queue import Empty from qpid.util import URL -from time import sleep +from time import sleep, time class _FedBroker(object): @@ -1791,3 +1791,301 @@ class FederationTests(TestBase010): if headers: return headers[name] return None + + def test_dynamic_topic_bounce(self): + """ Bounce the connection between federated Topic Exchanges. + """ + class Params: + def exchange_type(self): return "topic" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, + binding_key="spud.*") + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud.*") + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud.boy") + + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_direct_bounce(self): + """ Bounce the connection between federated Direct Exchanges. + """ + class Params: + def exchange_type(self): return "direct" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, binding_key="spud") + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud") + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud") + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_fanout_bounce(self): + """ Bounce the connection between federated Fanout Exchanges. + """ + class Params: + def exchange_type(self): return "fanout" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename) + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename) + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud") + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_headers_bounce(self): + """ Bounce the connection between federated Headers Exchanges. + """ + class Params: + def exchange_type(self): return "headers" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, + binding_key="spud", arguments={'x-match':'any', 'class':'first'}) + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud") + def delivery_properties(self, ssn): + return ssn.message_properties(application_headers={'class':'first'}) + ## @todo KAG - re-enable once federation bugs with headers exchanges + ## are fixed. + #self.generic_dynamic_bounce_test(Params()) + return + + + def generic_dynamic_bounce_test(self, params): + """ Verify that a federated broker can maintain a binding to a local + queue using the same key as a remote binding. Destroy and reconnect + the federation link, and verify routes are restored correctly. + See QPID-3170. + Topology: + + Queue1 <---"Key"---B0<==[Federated Exchange]==>B1---"Key"--->Queue2 + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers[0:2]: + _b.client_session.exchange_declare(exchange="fedX", type=params.exchange_type()) + self.assertEqual(_b.client_session.exchange_query(name="fedX").type, + params.exchange_type(), "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + timeout = time() + 10 + while my_exchange is None and time() <= timeout: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX": + my_exchange = ooo + break + if my_exchange is None: + self.fail("QMF failed to find new exchange!") + exchanges.append(my_exchange) + + # + # on each broker, create a local queue bound to the exchange with the + # same key value. + # + + self._brokers[0].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + params.bind_queue(self._brokers[0].client_session, "fedX1", "fedX") + self.subscribe(self._brokers[0].client_session, queue="fedX1", destination="f1") + queue_0 = self._brokers[0].client_session.incoming("f1") + + self._brokers[1].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + params.bind_queue(self._brokers[1].client_session, "fedX1", "fedX") + self.subscribe(self._brokers[1].client_session, queue="fedX1", destination="f1") + queue_1 = self._brokers[1].client_session.incoming("f1") + + # now federate the two brokers + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B0 + result = self._brokers[0].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX", # src + "fedX", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # wait for all the inter-broker links to become operational + operational = False + timeout = time() + 10 + while not operational and time() <= timeout: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + self.failUnless(operational, "inter-broker links failed to become operational.") + + # @todo - There is no way to determine when the bridge objects become + # active. + + # wait until the binding key has propagated to each broker - each + # broker should see 2 bindings (1 local, 1 remote) + + binding_counts = [2, 2] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount < binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # send 10 msgs to B0 + for i in range(1, 11): + # dp = self._brokers[0].client_session.delivery_properties(routing_key=params.routing_key()) + dp = params.delivery_properties(self._brokers[0].client_session) + self._brokers[0].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i)) + + # get exactly 10 msgs on B0's local queue and B1's queue + for i in range(1, 11): + try: + msg = queue_0.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + msg = queue_1.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + except Empty: + self.fail("Only got %d msgs - expected 10" % i) + try: + extra = queue_0.get(timeout=1) + self.fail("Got unexpected message in queue_0: " + extra.body) + except Empty: None + + try: + extra = queue_1.get(timeout=1) + self.fail("Got unexpected message in queue_1: " + extra.body) + except Empty: None + + # + # Tear down the bridges between the two exchanges, then wait + # for the bindings to be cleaned up + # + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + binding_counts = [1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # + # restore the bridges between the two exchanges, and wait for the + # bindings to propagate. + # + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX", # src + "fedX", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + binding_counts = [2, 2] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # + # verify traffic flows correctly + # + + for i in range(1, 11): + #dp = self._brokers[1].client_session.delivery_properties(routing_key=params.routing_key()) + dp = params.delivery_properties(self._brokers[1].client_session) + self._brokers[1].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i)) + + # get exactly 10 msgs on B0's queue and B1's queue + for i in range(1, 11): + try: + msg = queue_0.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + msg = queue_1.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + except Empty: + self.fail("Only got %d msgs - expected 10" % i) + try: + extra = queue_0.get(timeout=1) + self.fail("Got unexpected message in queue_0: " + extra.body) + except Empty: None + + try: + extra = queue_1.get(timeout=1) + self.fail("Got unexpected message in queue_1: " + extra.body) + except Empty: None + + + # + # cleanup + # + params.unbind_queue(self._brokers[0].client_session, "fedX1", "fedX") + self._brokers[0].client_session.message_cancel(destination="f1") + self._brokers[0].client_session.queue_delete(queue="fedX1") + + params.unbind_queue(self._brokers[1].client_session, "fedX1", "fedX") + self._brokers[1].client_session.message_cancel(destination="f1") + self._brokers[1].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers[0:2]: + _b.client_session.exchange_delete(exchange="fedX") + + self._teardown_brokers() + + self.verify_cleanup() + + |
