diff options
Diffstat (limited to 'qpid/cpp')
| -rwxr-xr-x | qpid/cpp/src/tests/federation.py | 550 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/run_federation_tests | 13 |
2 files changed, 560 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index 5f269c8363..c5e379b829 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -22,8 +22,32 @@ import sys from qpid.testlib import TestBase010 from qpid.datatypes import Message from qpid.queue import Empty +from qpid.util import URL from time import sleep + +class _FedBroker(object): + """ + A proxy object for a remote broker. Contains connection and management + state. + """ + def __init__(self, host, port, + conn=None, session=None, qmf_broker=None): + self.host = host + self.port = port + self.url = "%s:%d" % (host, port) + self.client_conn = None + self.client_session = None + self.qmf_broker = None + self.qmf_object = None + if conn is not None: + self.client_conn = conn + if session is not None: + self.client_session = session + if qmf_broker is not None: + self.qmf_broker = qmf_broker + + class FederationTests(TestBase010): def remote_host(self): @@ -43,6 +67,44 @@ class FederationTests(TestBase010): sleep(1) total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link")) + def _setup_brokers(self): + ports = [self.remote_port()] + extra = self.defines.get("extra-brokers") + if extra: + for p in extra.split(): + ports.append(int(p)) + + # broker[0] has already been set up. + self._brokers = [_FedBroker(self.broker.host, + self.broker.port, + self.conn, + self.session, + self.qmf_broker)] + self._brokers[0].qmf_object = self.qmf.getObjects(_class="broker")[0] + + # setup remaining brokers + for _p in ports: + _b = _FedBroker(self.remote_host(), _p) + _b.client_conn = self.connect(host=self.remote_host(), port=_p) + _b.client_session = _b.client_conn.session("Fed_client_session_" + str(_p)) + _b.qmf_broker = self.qmf.addBroker(_b.url) + for _bo in self.qmf.getObjects(_class="broker"): + if _bo.getBroker().getUrl() == _b.qmf_broker.getUrl(): + _b.qmf_object = _bo + break + self._brokers.append(_b) + + def _teardown_brokers(self): + """ Un-does _setup_brokers() + """ + # broker[0] is configured at test setup, so it must remain configured + for _b in self._brokers[1:]: + self.qmf.delBroker(_b.qmf_broker) + if not _b.client_session.error(): + _b.client_session.close(timeout=10) + _b.client_conn.close(timeout=10) + + def test_bridge_create_and_close(self): self.startQmf(); qmf = self.qmf @@ -788,6 +850,494 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_dynamic_direct_route_prop(self): + """ Set up a tree of uni-directional routes across the direct exchange. + Bind the same key to the same queues on the leaf nodes. Verify a + message sent with the routing key transverses the tree an arrives at + each leaf. Remove one leaf's queue, and verify that messages still + reach the other leaf. + + Route Topology: + + +---> B2 queue:"test-queue", binding key:"spudboy" + B0 --> B1 --+ + +---> B3 queue:"test-queue", binding key:"spudboy" + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create direct exchange on each broker + + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B2 + result = self._brokers[2].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B3 + result = self._brokers[3].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX.direct" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX.direct", # src + "fedX.direct", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # create a queue on B2, bound to "spudboy" + self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") + + # create a queue on B3, bound to "spudboy" + self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1") + queue_2 = self._brokers[2].client_session.incoming("f1") + + # subscribe to messages arriving on B3's queue + self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1") + queue_3 = self._brokers[3].client_session.incoming("f1") + + # wait until the binding key has propagated to each broker (twice at + # broker 2) + + retries = 0 + count = 0 + for xxx in qmf.getObjects(_class="binding"): + if xxx.bindingKey == "spudboy": + count += 1 + while count < 5: + retries += 1 + if retries >= 10: + self.fail("binding did not propagate to all brokers!") + return + sleep(1) + count = 0 + for xxx in qmf.getObjects(_class="binding"): + if xxx.bindingKey == "spudboy": + count += 1 + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy") + self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i)) + + # get exactly 10 msgs on B2 and B3 + for i in range(1, 11): + msg = queue_2.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + msg = queue_3.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + + try: + extra = queue_2.get(timeout=1) + self.fail("Got unexpected message in queue_2: " + extra.body) + except Empty: None + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # tear down the queue on B2 + self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") + self._brokers[2].client_session.message_cancel(destination="f1") + self._brokers[2].client_session.queue_delete(queue="fedX1") + + # @todo - find a proper way to check the propagation here! + sleep(6) + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy") + self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message %d" % i)) + + # get exactly 10 msgs on B3 only + for i in range(1, 11): + msg = queue_3.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # cleanup + + self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") + self._brokers[3].client_session.message_cancel(destination="f1") + self._brokers[3].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.direct") + + self._teardown_brokers() + + self.verify_cleanup() + + def test_dynamic_topic_route_prop(self): + """ Set up a tree of uni-directional routes across a topic exchange. + Bind the same key to the same queues on the leaf nodes. Verify a + message sent with the routing key transverses the tree an arrives at + each leaf. Remove one leaf's queue, and verify that messages still + reach the other leaf. + + Route Topology: + + +---> B2 queue:"test-queue", binding key:"spud.*" + B0 --> B1 --+ + +---> B3 queue:"test-queue", binding key:"spud.*" + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create topic exchange on each broker + + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.topic", type="topic") + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B2 + result = self._brokers[2].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B3 + result = self._brokers[3].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX.topic" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX.topic", # src + "fedX.topic", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # create a queue on B2, bound to "spudboy" + self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*") + + # create a queue on B3, bound to "spudboy" + self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1") + queue_2 = self._brokers[2].client_session.incoming("f1") + + # subscribe to messages arriving on B3's queue + self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1") + queue_3 = self._brokers[3].client_session.incoming("f1") + + # wait until the binding key has propagated to each broker (twice at + # broker 2) + + retries = 0 + count = 0 + for xxx in qmf.getObjects(_class="binding"): + if xxx.bindingKey == "spud.*": + count += 1 + while count < 5: + retries += 1 + if retries >= 10: + self.fail("binding did not propagate to all brokers!") + return + sleep(1) + count = 0 + for xxx in qmf.getObjects(_class="binding"): + if xxx.bindingKey == "spud.*": + count += 1 + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy") + self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i)) + + # get exactly 10 msgs on B2 and B3 + for i in range(1, 11): + msg = queue_2.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + msg = queue_3.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + + try: + extra = queue_2.get(timeout=1) + self.fail("Got unexpected message in queue_2: " + extra.body) + except Empty: None + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # tear down the queue on B2 + self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*") + self._brokers[2].client_session.message_cancel(destination="f1") + self._brokers[2].client_session.queue_delete(queue="fedX1") + + # @todo - find a proper way to check the propagation here! + sleep(6) + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy") + self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message %d" % i)) + + # get exactly 10 msgs on B3 only + for i in range(1, 11): + msg = queue_3.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # cleanup + + self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*") + self._brokers[3].client_session.message_cancel(destination="f1") + self._brokers[3].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.topic") + + self._teardown_brokers() + + self.verify_cleanup() + + + def test_dynamic_fanout_route_prop(self): + """ Set up a tree of uni-directional routes across a fanout exchange. + Bind the same key to the same queues on the leaf nodes. Verify a + message sent with the routing key transverses the tree an arrives at + each leaf. Remove one leaf's queue, and verify that messages still + reach the other leaf. + + Route Topology: + + +---> B2 queue:"test-queue", binding key:"spud.*" + B0 --> B1 --+ + +---> B3 queue:"test-queue", binding key:"spud.*" + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create fanout exchange on each broker + + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.fanout", type="fanout") + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B2 + result = self._brokers[2].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B3 + result = self._brokers[3].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX.fanout" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX.fanout", # src + "fedX.fanout", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # create a queue on B2, bound to the exchange + self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.fanout") + + # create a queue on B3, bound to the exchange + self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.fanout") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1") + queue_2 = self._brokers[2].client_session.incoming("f1") + + # subscribe to messages arriving on B3's queue + self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1") + queue_3 = self._brokers[3].client_session.incoming("f1") + + # wait until the binding has propagated to each broker by + # counting the number of bindings that have origins. + # Should have 3: 1 for B0, 2 for B1. + + retries = 0 + count = 0 + for xxx in qmf.getObjects(_class="binding"): + if xxx.origin: + count += 1 + while count < 3: + retries += 1 + if retries >= 10: + self.fail("binding did not propagate to all brokers!") + return + sleep(1) + count = 0 + for xxx in qmf.getObjects(_class="binding"): + if xxx.origin: + count += 1 + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties() + self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i)) + + # get exactly 10 msgs on B2 and B3 + for i in range(1, 11): + msg = queue_2.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + msg = queue_3.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + + try: + extra = queue_2.get(timeout=1) + self.fail("Got unexpected message in queue_2: " + extra.body) + except Empty: None + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # tear down the queue on B2 + self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.fanout") + self._brokers[2].client_session.message_cancel(destination="f1") + self._brokers[2].client_session.queue_delete(queue="fedX1") + + # @todo - find a proper way to check the propagation here! + sleep(6) + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties() + self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message %d" % i)) + + # get exactly 10 msgs on B3 only + for i in range(1, 11): + msg = queue_3.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # cleanup + + self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.fanout") + self._brokers[3].client_session.message_cancel(destination="f1") + self._brokers[3].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.fanout") + + self._teardown_brokers() + + self.verify_cleanup() + def getProperty(self, msg, name): for h in msg.headers: diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests index 8d7954a533..f5bb123d0a 100755 --- a/qpid/cpp/src/tests/run_federation_tests +++ b/qpid/cpp/src/tests/run_federation_tests @@ -30,20 +30,27 @@ start_brokers() { LOCAL_PORT=`cat qpidd.port` ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port REMOTE_PORT=`cat qpidd.port` + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port + REMOTE_B1=`cat qpidd.port` + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port + REMOTE_B2=`cat qpidd.port` } stop_brokers() { $QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORT $QPIDD_EXEC --no-module-dir -q --port $REMOTE_PORT + $QPIDD_EXEC --no-module-dir -q --port $REMOTE_B1 + $QPIDD_EXEC --no-module-dir -q --port $REMOTE_B2 } if test -d ${PYTHON_DIR} ; then start_brokers - echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT" - $QPID_PYTHON_TEST -m federation -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT $@ + echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT $REMOTE_B1 $REMOTE_B2" + $QPID_PYTHON_TEST -m federation -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@ RETCODE=$? stop_brokers - if test x$RETCODE != x0; then + if test x$RETCODE != x0; then echo "FAIL federation tests"; exit 1; fi fi |
