diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/cpp/src/tests/federation.py | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/federation.py')
-rwxr-xr-x | qpid/cpp/src/tests/federation.py | 2091 |
1 files changed, 2091 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py new file mode 100755 index 0000000000..201b06a4a2 --- /dev/null +++ b/qpid/cpp/src/tests/federation.py @@ -0,0 +1,2091 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from qpid.testlib import TestBase010 +from qpid.datatypes import Message +from qpid.queue import Empty +from qpid.util import URL +from time import sleep, time + + +class _FedBroker(object): + """ + A proxy object for a remote broker. Contains connection and management + state. + """ + def __init__(self, host, port, + conn=None, session=None, qmf_broker=None): + self.host = host + self.port = port + self.url = "%s:%d" % (host, port) + self.client_conn = None + self.client_session = None + self.qmf_broker = None + self.qmf_object = None + if conn is not None: + self.client_conn = conn + if session is not None: + self.client_session = session + if qmf_broker is not None: + self.qmf_broker = qmf_broker + + +class FederationTests(TestBase010): + + def remote_host(self): + return self.defines.get("remote-host", "localhost") + + def remote_port(self): + return int(self.defines["remote-port"]) + + def verify_cleanup(self): + attempts = 0 + total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link")) + while total > 0: + attempts += 1 + if attempts >= 10: + self.fail("Bridges and links didn't clean up") + return + sleep(1) + total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link")) + + def _setup_brokers(self): + ports = [self.remote_port()] + extra = self.defines.get("extra-brokers") + if extra: + for p in extra.split(): + ports.append(int(p)) + + # broker[0] has already been set up. + self._brokers = [_FedBroker(self.broker.host, + self.broker.port, + self.conn, + self.session, + self.qmf_broker)] + self._brokers[0].qmf_object = self.qmf.getObjects(_class="broker")[0] + + # setup remaining brokers + for _p in ports: + _b = _FedBroker(self.remote_host(), _p) + _b.client_conn = self.connect(host=self.remote_host(), port=_p) + _b.client_session = _b.client_conn.session("Fed_client_session_" + str(_p)) + _b.qmf_broker = self.qmf.addBroker(_b.url) + for _bo in self.qmf.getObjects(_class="broker"): + if _bo.getBroker().getUrl() == _b.qmf_broker.getUrl(): + _b.qmf_object = _bo + break + self._brokers.append(_b) + + def _teardown_brokers(self): + """ Un-does _setup_brokers() + """ + # broker[0] is configured at test setup, so it must remain configured + for _b in self._brokers[1:]: + self.qmf.delBroker(_b.qmf_broker) + if not _b.client_session.error(): + _b.client_session.close(timeout=10) + _b.client_conn.close(timeout=10) + + + def test_bridge_create_and_close(self): + self.startQmf(); + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + result = bridge.close() + self.assertEqual(result.status, 0) + + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_pull_from_exchange(self): + session = self.session + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + + #setup queue to receive messages from local broker + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + sleep(6) + + #send messages to remote broker and confirm it is routed to local broker + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_pull_from_exchange") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="my-key") + r_session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_push_to_exchange(self): + session = self.session + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + + #setup queue to receive messages from remote broker + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_push_to_exchange") + r_session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + r_session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(session=r_session, queue="fed1", destination="f1") + queue = r_session.incoming("f1") + sleep(6) + + #send messages to local broker and confirm it is routed to remote broker + for i in range(1, 11): + dp = session.delivery_properties(routing_key="my-key") + session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_pull_from_queue(self): + session = self.session + + #setup queue on remote broker and add some messages + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_pull_from_queue") + r_session.queue_declare(queue="my-bridge-queue", auto_delete=True) + for i in range(1, 6): + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) + + #setup queue to receive messages from local broker + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + sleep(3) + + #add some more messages (i.e. after bridge was created) + for i in range(6, 11): + dp = r_session.delivery_properties(routing_key="my-bridge-queue") + r_session.message_transfer(message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + try: + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + except Empty: + self.fail("Failed to find expected message containing 'Message %d'" % i) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_tracing_automatic(self): + remoteUrl = "%s:%d" % (self.remote_host(), self.remote_port()) + self.startQmf() + l_broker = self.qmf_broker + r_broker = self.qmf.addBroker(remoteUrl) + + l_brokerObj = self.qmf.getObjects(_class="broker", _broker=l_broker)[0] + r_brokerObj = self.qmf.getObjects(_class="broker", _broker=r_broker)[0] + + l_res = l_brokerObj.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + r_res = r_brokerObj.connect(self.broker.host, self.broker.port, False, "PLAIN", "guest", "guest", "tcp") + + self.assertEqual(l_res.status, 0) + self.assertEqual(r_res.status, 0) + + l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0] + r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0] + + l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0) + r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0) + + self.assertEqual(l_res.status, 0) + self.assertEqual(r_res.status, 0) + + count = 0 + while l_link.state != "Operational" or r_link.state != "Operational": + count += 1 + if count > 10: + self.fail("Fed links didn't become operational after 10 seconds") + sleep(1) + l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0] + r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0] + sleep(3) + + #setup queue to receive messages from local broker + session = self.session + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.direct", binding_key="key") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + #setup queue on remote broker and add some messages + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_trace") + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="key") + r_session.message_transfer(destination="amq.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + try: + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + except Empty: + self.fail("Failed to find expected message containing 'Message %d'" % i) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + def test_tracing(self): + session = self.session + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id", + "exclude-me,also-exclude-me", False, False, False, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + + #setup queue to receive messages from local broker + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="amq.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + sleep(6) + + #send messages to remote broker and confirm it is routed to local broker + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_tracing") + + trace = [None, "exclude-me", "a,exclude-me,b", "also-exclude-me,c", "dont-exclude-me"] + body = ["yes", "first-bad", "second-bad", "third-bad", "yes"] + for b, t in zip(body, trace): + headers = {} + if (t): headers["x-qpid.trace"]=t + dp = r_session.delivery_properties(routing_key="my-key", ttl=1000*60*5) + mp = r_session.message_properties(application_headers=headers) + r_session.message_transfer(destination="amq.direct", message=Message(dp, mp, b)) + + for e in ["my-bridge-id", "dont-exclude-me,my-bridge-id"]: + msg = queue.get(timeout=5) + self.assertEqual("yes", msg.body) + self.assertEqual(e, self.getAppHeader(msg, "x-qpid.trace")) + assert(msg.get("delivery_properties").ttl > 0) + assert(msg.get("delivery_properties").ttl < 1000*60*50) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_fanout(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_fanout") + + session.exchange_declare(exchange="fed.fanout", type="fanout") + r_session.exchange_declare(exchange="fed.fanout", type="fanout") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.fanout") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties() + r_session.message_transfer(destination="fed.fanout", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + + def test_dynamic_direct(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_direct") + + session.exchange_declare(exchange="fed.direct", type="direct") + r_session.exchange_declare(exchange="fed.direct", type="direct") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.direct", binding_key="fd-key") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="fd-key") + r_session.message_transfer(destination="fed.direct", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_topic(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_topic") + + session.exchange_declare(exchange="fed.topic", type="topic") + r_session.exchange_declare(exchange="fed.topic", type="topic") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="ft-key.#") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="ft-key.one.two") + r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_topic_reorigin(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_topic_reorigin") + + session.exchange_declare(exchange="fed.topic_reorigin", type="topic") + r_session.exchange_declare(exchange="fed.topic_reorigin", type="topic") + + session.exchange_declare(exchange="fed.topic_reorigin_2", type="topic") + r_session.exchange_declare(exchange="fed.topic_reorigin_2", type="topic") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.topic_reorigin_2", binding_key="ft-key.one.#") + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.topic_reorigin", "fed.topic_reorigin", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.topic_reorigin_2", "fed.topic_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.topic_reorigin", binding_key="ft-key.#") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="ft-key.one.two") + r_session.message_transfer(destination="fed.topic_reorigin", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = bridge2.close() + self.assertEqual(result.status, 0) + + # extra check: verify we don't leak bridge objects - keep the link + # around and verify the bridge count has gone to zero + + attempts = 0 + bridgeCount = len(qmf.getObjects(_class="bridge")) + while bridgeCount > 0: + attempts += 1 + if attempts >= 5: + self.fail("Bridges didn't clean up") + return + sleep(1) + bridgeCount = len(qmf.getObjects(_class="bridge")) + + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_direct_reorigin(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_direct_reorigin") + + session.exchange_declare(exchange="fed.direct_reorigin", type="direct") + r_session.exchange_declare(exchange="fed.direct_reorigin", type="direct") + + session.exchange_declare(exchange="fed.direct_reorigin_2", type="direct") + r_session.exchange_declare(exchange="fed.direct_reorigin_2", type="direct") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.direct_reorigin_2", binding_key="ft-key.two") + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.direct_reorigin", "fed.direct_reorigin", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.direct_reorigin_2", "fed.direct_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.direct_reorigin", binding_key="ft-key.one") + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="ft-key.one") + r_session.message_transfer(destination="fed.direct_reorigin", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + + # Extra test: don't explicitly close() bridge2. When the link is closed, + # it should clean up bridge2 automagically. verify_cleanup() will detect + # if bridge2 isn't cleaned up and will fail the test. + # + #result = bridge2.close() + #self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers") + + session.exchange_declare(exchange="fed.headers", type="headers") + r_session.exchange_declare(exchange="fed.headers", type="headers") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.headers", "fed.headers", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.message_properties(application_headers={'class':'first'}) + for i in range(1, 11): + r_session.message_transfer(destination="fed.headers", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + content = msg.body + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_reorigin(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_reorigin") + + session.exchange_declare(exchange="fed.headers_reorigin", type="headers") + r_session.exchange_declare(exchange="fed.headers_reorigin", type="headers") + + session.exchange_declare(exchange="fed.headers_reorigin_2", type="headers") + r_session.exchange_declare(exchange="fed.headers_reorigin_2", type="headers") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.headers_reorigin_2", binding_key="key2", arguments={'x-match':'any', 'class':'second'}) + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.headers_reorigin", "fed.headers_reorigin", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.headers_reorigin_2", "fed.headers_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.headers_reorigin", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.message_properties(application_headers={'class':'first'}) + for i in range(1, 11): + r_session.message_transfer(destination="fed.headers_reorigin", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + + # Extra test: don't explicitly close() bridge2. When the link is closed, + # it should clean up bridge2 automagically. verify_cleanup() will detect + # if bridge2 isn't cleaned up and will fail the test. + # + #result = bridge2.close() + #self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_unbind(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_unbind") + + session.exchange_declare(exchange="fed.headers_unbind", type="headers") + r_session.exchange_declare(exchange="fed.headers_unbind", type="headers") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + queue = qmf.getObjects(_class="queue", name="fed1")[0] + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + session.exchange_bind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) + queue.update() + self.assertEqual(queue.bindingCount, 2, + "bindings not accounted for (expected 2, got %d)" % queue.bindingCount) + + session.exchange_unbind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1") + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + + def test_dynamic_headers_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_xml") + + session.exchange_declare(exchange="fed.xml", type="xml") + r_session.exchange_declare(exchange="fed.xml", type="xml") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.xml", binding_key="key1", arguments={'xquery':'true()'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.delivery_properties(routing_key="key1") + for i in range(1, 11): + r_session.message_transfer(destination="fed.xml", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + content = msg.body + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_reorigin_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_headers_reorigin_xml") + + session.exchange_declare(exchange="fed.xml_reorigin", type="xml") + r_session.exchange_declare(exchange="fed.xml_reorigin", type="xml") + + session.exchange_declare(exchange="fed.xml_reorigin_2", type="xml") + r_session.exchange_declare(exchange="fed.xml_reorigin_2", type="xml") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + session.queue_declare(queue="fed2", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed2", exchange="fed.xml_reorigin_2", binding_key="key2", arguments={'xquery':'true()'}) + self.subscribe(queue="fed2", destination="f2") + queue2 = session.incoming("f2") + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + + bridge = qmf.getObjects(_class="bridge")[0] + bridge2 = qmf.getObjects(_class="bridge")[1] + sleep(5) + + foo=qmf.getObjects(_class="link") + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.xml_reorigin", binding_key="key1", arguments={'xquery':'true()'}) + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + props = r_session.delivery_properties(routing_key="key1") + for i in range(1, 11): + r_session.message_transfer(destination="fed.xml_reorigin", message=Message(props, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + + # Extra test: don't explicitly close() bridge2. When the link is closed, + # it should clean up bridge2 automagically. verify_cleanup() will detect + # if bridge2 isn't cleaned up and will fail the test. + # + #result = bridge2.close() + #self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + def test_dynamic_headers_unbind_xml(self): + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_xml_unbind") + + session.exchange_declare(exchange="fed.xml_unbind", type="xml") + r_session.exchange_declare(exchange="fed.xml_unbind", type="xml") + + self.startQmf() + qmf = self.qmf + + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0) + + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + queue = qmf.getObjects(_class="queue", name="fed1")[0] + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + session.exchange_bind(queue="fed1", exchange="fed.xml_unbind", binding_key="key1", arguments={'xquery':'true()'}) + queue.update() + self.assertEqual(queue.bindingCount, 2, + "bindings not accounted for (expected 2, got %d)" % queue.bindingCount) + + session.exchange_unbind(queue="fed1", exchange="fed.xml_unbind", binding_key="key1") + queue.update() + self.assertEqual(queue.bindingCount, 1, + "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + + def test_dynamic_topic_nodup(self): + """Verify that a message whose routing key matches more than one + binding does not get duplicated to the same queue. + """ + session = self.session + r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) + r_session = r_conn.session("test_dynamic_topic_nodup") + + session.exchange_declare(exchange="fed.topic", type="topic") + r_session.exchange_declare(exchange="fed.topic", type="topic") + + self.startQmf() + qmf = self.qmf + broker = qmf.getObjects(_class="broker")[0] + result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + link = qmf.getObjects(_class="link")[0] + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0) + self.assertEqual(result.status, 0) + bridge = qmf.getObjects(_class="bridge")[0] + sleep(5) + + session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) + session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="red.*") + session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="*.herring") + + self.subscribe(queue="fed1", destination="f1") + queue = session.incoming("f1") + + for i in range(1, 11): + dp = r_session.delivery_properties(routing_key="red.herring") + r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i)) + + for i in range(1, 11): + msg = queue.get(timeout=5) + self.assertEqual("Message %d" % i, msg.body) + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + result = bridge.close() + self.assertEqual(result.status, 0) + result = link.close() + self.assertEqual(result.status, 0) + + self.verify_cleanup() + + + def test_dynamic_direct_route_prop(self): + """ Set up a tree of uni-directional routes across the direct exchange. + Bind the same key to the same queues on the leaf nodes. Verify a + message sent with the routing key transverses the tree an arrives at + each leaf. Remove one leaf's queue, and verify that messages still + reach the other leaf. + + Route Topology: + + +---> B2 queue:"test-queue", binding key:"spudboy" + B0 --> B1 --+ + +---> B3 queue:"test-queue", binding key:"spudboy" + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create direct exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + while my_exchange is None: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.direct": + my_exchange = ooo + break + if my_exchange is None: + retries += 1 + self.failIfEqual(retries, 10, + "QMF failed to find new exchange!") + sleep(1) + exchanges.append(my_exchange) + + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B2 + result = self._brokers[2].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B3 + result = self._brokers[3].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX.direct" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX.direct", # src + "fedX.direct", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # wait for the inter-broker links to become operational + retries = 0 + operational = False + while not operational: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + if not operational: + retries += 1 + self.failIfEqual(retries, 10, + "inter-broker links failed to become operational.") + sleep(1) + + # @todo - There is no way to determine when the bridge objects become + # active. Hopefully, this is long enough! + sleep(6) + + # create a queue on B2, bound to "spudboy" + self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") + + # create a queue on B3, bound to "spudboy" + self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1") + queue_2 = self._brokers[2].client_session.incoming("f1") + + # subscribe to messages arriving on B3's queue + self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1") + queue_3 = self._brokers[3].client_session.incoming("f1") + + # wait until the binding key has propagated to each broker (twice at + # broker B1). Work backwards from binding brokers. + + binding_counts = [1, 2, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(3,-1,-1): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(3) + exchanges[i].update() + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy") + self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i)) + + # wait for 10 messages to be forwarded from B0->B1, + # 10 messages from B1->B2, + # and 10 messages from B1->B3 + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or + exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + + # get exactly 10 msgs on B2 and B3 + for i in range(1, 11): + msg = queue_2.get(timeout=5) + self.assertEqual("Message_drp %d" % i, msg.body) + msg = queue_3.get(timeout=5) + self.assertEqual("Message_drp %d" % i, msg.body) + + try: + extra = queue_2.get(timeout=1) + self.fail("Got unexpected message in queue_2: " + extra.body) + except Empty: None + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + + # tear down the queue on B2 + self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") + self._brokers[2].client_session.message_cancel(destination="f1") + self._brokers[2].client_session.queue_delete(queue="fedX1") + + # @todo - restore code when QPID-2499 fixed!! + sleep(6) + # wait for the binding count on B1 to drop from 2 to 1 + retries = 0 + exchanges[1].update() + while exchanges[1].bindingCount != 1: + retries += 1 + self.failIfEqual(retries, 10, + "unbinding failed to propagate to broker B1: %d" + % exchanges[1].bindingCount) + sleep(1) + exchanges[1].update() + + # send 10 msgs from B0 + for i in range(11, 21): + dp = self._brokers[0].client_session.delivery_properties(routing_key="spudboy") + self._brokers[0].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i)) + + # verify messages are forwarded to B3 only + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + + # get exactly 10 msgs on B3 only + for i in range(11, 21): + msg = queue_3.get(timeout=5) + self.assertEqual("Message_drp %d" % i, msg.body) + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # cleanup + + self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.direct", binding_key="spudboy") + self._brokers[3].client_session.message_cancel(destination="f1") + self._brokers[3].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.direct") + + self._teardown_brokers() + + self.verify_cleanup() + + def test_dynamic_topic_route_prop(self): + """ Set up a tree of uni-directional routes across a topic exchange. + Bind the same key to the same queues on the leaf nodes. Verify a + message sent with the routing key transverses the tree an arrives at + each leaf. Remove one leaf's queue, and verify that messages still + reach the other leaf. + + Route Topology: + + +---> B2 queue:"test-queue", binding key:"spud.*" + B0 --> B1 --+ + +---> B3 queue:"test-queue", binding key:"spud.*" + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.topic", type="topic") + self.assertEqual(_b.client_session.exchange_query(name="fedX.topic").type, + "topic", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + while my_exchange is None: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.topic": + my_exchange = ooo + break + if my_exchange is None: + retries += 1 + self.failIfEqual(retries, 10, + "QMF failed to find new exchange!") + sleep(1) + exchanges.append(my_exchange) + + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B2 + result = self._brokers[2].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B3 + result = self._brokers[3].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX.topic" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX.topic", # src + "fedX.topic", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # wait for the inter-broker links to become operational + retries = 0 + operational = False + while not operational: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + if not operational: + retries += 1 + self.failIfEqual(retries, 10, + "inter-broker links failed to become operational.") + sleep(1) + + # @todo - There is no way to determine when the bridge objects become + # active. + sleep(6) + + # create a queue on B2, bound to "spudboy" + self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*") + + # create a queue on B3, bound to "spudboy" + self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1") + queue_2 = self._brokers[2].client_session.incoming("f1") + + # subscribe to messages arriving on B3's queue + self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1") + queue_3 = self._brokers[3].client_session.incoming("f1") + + # wait until the binding key has propagated to each broker (twice at + # broker B1). Work backwards from binding brokers. + + binding_counts = [1, 2, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(3,-1,-1): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(3) + exchanges[i].update() + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy") + self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message_trp %d" % i)) + + # wait for 10 messages to be forwarded from B0->B1, + # 10 messages from B1->B2, + # and 10 messages from B1->B3 + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or + exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + + # get exactly 10 msgs on B2 and B3 + for i in range(1, 11): + msg = queue_2.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + msg = queue_3.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + + try: + extra = queue_2.get(timeout=1) + self.fail("Got unexpected message in queue_2: " + extra.body) + except Empty: None + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # tear down the queue on B2 + self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*") + self._brokers[2].client_session.message_cancel(destination="f1") + self._brokers[2].client_session.queue_delete(queue="fedX1") + + # wait for the binding count on B1 to drop from 2 to 1 + retries = 0 + exchanges[1].update() + while exchanges[1].bindingCount != 1: + retries += 1 + self.failIfEqual(retries, 10, + "unbinding failed to propagate to broker B1: %d" + % exchanges[1].bindingCount) + sleep(1) + exchanges[1].update() + + # send 10 msgs from B0 + for i in range(11, 21): + dp = self._brokers[0].client_session.delivery_properties(routing_key="spud.boy") + self._brokers[0].client_session.message_transfer(destination="fedX.topic", message=Message(dp, "Message_trp %d" % i)) + + # verify messages are forwarded to B3 only + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + + # get exactly 10 msgs on B3 only + for i in range(11, 21): + msg = queue_3.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # cleanup + + self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.topic", binding_key="spud.*") + self._brokers[3].client_session.message_cancel(destination="f1") + self._brokers[3].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.topic") + + self._teardown_brokers() + + self.verify_cleanup() + + + def test_dynamic_fanout_route_prop(self): + """ Set up a tree of uni-directional routes across a fanout exchange. + Bind the same key to the same queues on the leaf nodes. Verify a + message sent with the routing key transverses the tree an arrives at + each leaf. Remove one leaf's queue, and verify that messages still + reach the other leaf. + + Route Topology: + + +---> B2 queue:"test-queue", binding key:"spud.*" + B0 --> B1 --+ + +---> B3 queue:"test-queue", binding key:"spud.*" + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create fanout exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.fanout", type="fanout") + self.assertEqual(_b.client_session.exchange_query(name="fedX.fanout").type, + "fanout", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + while my_exchange is None: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.fanout": + my_exchange = ooo + break + if my_exchange is None: + retries += 1 + self.failIfEqual(retries, 10, + "QMF failed to find new exchange!") + sleep(1) + exchanges.append(my_exchange) + + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B2 + result = self._brokers[2].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B3 + result = self._brokers[3].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX.fanout" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX.fanout", # src + "fedX.fanout", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # wait for the inter-broker links to become operational + retries = 0 + operational = False + while not operational: + operational = True + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + if not operational: + retries += 1 + self.failIfEqual(retries, 10, + "inter-broker links failed to become operational.") + sleep(1) + + # @todo - There is no way to determine when the bridge objects become + # active. + sleep(6) + + # create a queue on B2, bound to the exchange + self._brokers[2].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[2].client_session.exchange_bind(queue="fedX1", exchange="fedX.fanout") + + # create a queue on B3, bound to the exchange + self._brokers[3].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + self._brokers[3].client_session.exchange_bind(queue="fedX1", exchange="fedX.fanout") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[2].client_session, queue="fedX1", destination="f1") + queue_2 = self._brokers[2].client_session.incoming("f1") + + # subscribe to messages arriving on B3's queue + self.subscribe(self._brokers[3].client_session, queue="fedX1", destination="f1") + queue_3 = self._brokers[3].client_session.incoming("f1") + + # wait until the binding key has propagated to each broker (twice at + # broker B1). Work backwards from binding brokers. + + binding_counts = [1, 2, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(3,-1,-1): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(3) + exchanges[i].update() + + # send 10 msgs from B0 + for i in range(1, 11): + dp = self._brokers[0].client_session.delivery_properties() + self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message_frp %d" % i)) + + # wait for 10 messages to be forwarded from B0->B1, + # 10 messages from B1->B2, + # and 10 messages from B1->B3 + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 10 or exchanges[0].msgRoutes != 10 or + exchanges[1].msgReceives != 10 or exchanges[1].msgRoutes != 20 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 10 or exchanges[3].msgRoutes != 10): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + + # get exactly 10 msgs on B2 and B3 + for i in range(1, 11): + msg = queue_2.get(timeout=5) + self.assertEqual("Message_frp %d" % i, msg.body) + msg = queue_3.get(timeout=5) + self.assertEqual("Message_frp %d" % i, msg.body) + + try: + extra = queue_2.get(timeout=1) + self.fail("Got unexpected message in queue_2: " + extra.body) + except Empty: None + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # tear down the queue on B2 + self._brokers[2].client_session.exchange_unbind(queue="fedX1", exchange="fedX.fanout") + self._brokers[2].client_session.message_cancel(destination="f1") + self._brokers[2].client_session.queue_delete(queue="fedX1") + + # wait for the binding count on B1 to drop from 2 to 1 + retries = 0 + exchanges[1].update() + while exchanges[1].bindingCount != 1: + retries += 1 + self.failIfEqual(retries, 10, + "unbinding failed to propagate to broker B1: %d" + % exchanges[1].bindingCount) + sleep(1) + exchanges[1].update() + + # send 10 msgs from B0 + for i in range(11, 21): + dp = self._brokers[0].client_session.delivery_properties() + self._brokers[0].client_session.message_transfer(destination="fedX.fanout", message=Message(dp, "Message_frp %d" % i)) + + # verify messages are forwarded to B3 only + retries = 0 + for ex in exchanges: + ex.update() + while (exchanges[0].msgReceives != 20 or exchanges[0].msgRoutes != 20 or + exchanges[1].msgReceives != 20 or exchanges[1].msgRoutes != 30 or + exchanges[2].msgReceives != 10 or exchanges[2].msgRoutes != 10 or + exchanges[3].msgReceives != 20 or exchanges[3].msgRoutes != 20): + retries += 1 + self.failIfEqual(retries, 10, + "federation failed to route more msgs %d:%d %d:%d %d:%d %d:%d" + % (exchanges[0].msgReceives, + exchanges[0].msgRoutes, + exchanges[1].msgReceives, + exchanges[1].msgRoutes, + exchanges[2].msgReceives, + exchanges[2].msgRoutes, + exchanges[3].msgReceives, + exchanges[3].msgRoutes)) + sleep(1) + for ex in exchanges: + ex.update() + + # get exactly 10 msgs on B3 only + for i in range(11, 21): + msg = queue_3.get(timeout=5) + self.assertEqual("Message_frp %d" % i, msg.body) + + try: + extra = queue_3.get(timeout=1) + self.fail("Got unexpected message in queue_3: " + extra.body) + except Empty: None + + # cleanup + + self._brokers[3].client_session.exchange_unbind(queue="fedX1", exchange="fedX.fanout") + self._brokers[3].client_session.message_cancel(destination="f1") + self._brokers[3].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.fanout") + + self._teardown_brokers() + + self.verify_cleanup() + + + def getProperty(self, msg, name): + for h in msg.headers: + if hasattr(h, name): return getattr(h, name) + return None + + def getAppHeader(self, msg, name): + headers = self.getProperty(msg, "application_headers") + if headers: + return headers[name] + return None + + def test_dynamic_topic_bounce(self): + """ Bounce the connection between federated Topic Exchanges. + """ + class Params: + def exchange_type(self): return "topic" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, + binding_key="spud.*") + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud.*") + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud.boy") + + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_direct_bounce(self): + """ Bounce the connection between federated Direct Exchanges. + """ + class Params: + def exchange_type(self): return "direct" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, binding_key="spud") + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud") + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud") + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_fanout_bounce(self): + """ Bounce the connection between federated Fanout Exchanges. + """ + class Params: + def exchange_type(self): return "fanout" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename) + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename) + def delivery_properties(self, ssn): + return ssn.delivery_properties(routing_key="spud") + self.generic_dynamic_bounce_test(Params()) + + def test_dynamic_headers_bounce(self): + """ Bounce the connection between federated Headers Exchanges. + """ + class Params: + def exchange_type(self): return "headers" + def bind_queue(self, ssn, qname, ename): + ssn.exchange_bind(queue=qname, exchange=ename, + binding_key="spud", arguments={'x-match':'any', 'class':'first'}) + def unbind_queue(self, ssn, qname, ename): + ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud") + def delivery_properties(self, ssn): + return ssn.message_properties(application_headers={'class':'first'}) + ## @todo KAG - re-enable once federation bugs with headers exchanges + ## are fixed. + #self.generic_dynamic_bounce_test(Params()) + return + + + def generic_dynamic_bounce_test(self, params): + """ Verify that a federated broker can maintain a binding to a local + queue using the same key as a remote binding. Destroy and reconnect + the federation link, and verify routes are restored correctly. + See QPID-3170. + Topology: + + Queue1 <---"Key"---B0<==[Federated Exchange]==>B1---"Key"--->Queue2 + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers[0:2]: + _b.client_session.exchange_declare(exchange="fedX", type=params.exchange_type()) + self.assertEqual(_b.client_session.exchange_query(name="fedX").type, + params.exchange_type(), "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + timeout = time() + 10 + while my_exchange is None and time() <= timeout: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX": + my_exchange = ooo + break + if my_exchange is None: + self.fail("QMF failed to find new exchange!") + exchanges.append(my_exchange) + + # + # on each broker, create a local queue bound to the exchange with the + # same key value. + # + + self._brokers[0].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + params.bind_queue(self._brokers[0].client_session, "fedX1", "fedX") + self.subscribe(self._brokers[0].client_session, queue="fedX1", destination="f1") + queue_0 = self._brokers[0].client_session.incoming("f1") + + self._brokers[1].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True) + params.bind_queue(self._brokers[1].client_session, "fedX1", "fedX") + self.subscribe(self._brokers[1].client_session, queue="fedX1", destination="f1") + queue_1 = self._brokers[1].client_session.incoming("f1") + + # now federate the two brokers + + # connect B0 --> B1 + result = self._brokers[1].qmf_object.connect(self._brokers[0].host, + self._brokers[0].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # connect B1 --> B0 + result = self._brokers[0].qmf_object.connect(self._brokers[1].host, + self._brokers[1].port, + False, "PLAIN", "guest", "guest", "tcp") + self.assertEqual(result.status, 0) + + # for each link, bridge the "fedX" exchanges: + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX", # src + "fedX", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + # wait for all the inter-broker links to become operational + operational = False + timeout = time() + 10 + while not operational and time() <= timeout: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + self.failUnless(operational, "inter-broker links failed to become operational.") + + # @todo - There is no way to determine when the bridge objects become + # active. + + # wait until the binding key has propagated to each broker - each + # broker should see 2 bindings (1 local, 1 remote) + + binding_counts = [2, 2] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount < binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # send 10 msgs to B0 + for i in range(1, 11): + # dp = self._brokers[0].client_session.delivery_properties(routing_key=params.routing_key()) + dp = params.delivery_properties(self._brokers[0].client_session) + self._brokers[0].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i)) + + # get exactly 10 msgs on B0's local queue and B1's queue + for i in range(1, 11): + try: + msg = queue_0.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + msg = queue_1.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + except Empty: + self.fail("Only got %d msgs - expected 10" % i) + try: + extra = queue_0.get(timeout=1) + self.fail("Got unexpected message in queue_0: " + extra.body) + except Empty: None + + try: + extra = queue_1.get(timeout=1) + self.fail("Got unexpected message in queue_1: " + extra.body) + except Empty: None + + # + # Tear down the bridges between the two exchanges, then wait + # for the bindings to be cleaned up + # + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + binding_counts = [1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # + # restore the bridges between the two exchanges, and wait for the + # bindings to propagate. + # + + for _l in qmf.getObjects(_class="link"): + # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker()))) + result = _l.bridge(False, # durable + "fedX", # src + "fedX", # dst + "", # key + "", # tag + "", # excludes + False, # srcIsQueue + False, # srcIsLocal + True, # dynamic + 0) # sync + self.assertEqual(result.status, 0) + + binding_counts = [2, 2] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(2): + exchanges[i].update() + timeout = time() + 10 + while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout: + exchanges[i].update() + self.failUnless(exchanges[i].bindingCount == binding_counts[i]) + + # + # verify traffic flows correctly + # + + for i in range(1, 11): + #dp = self._brokers[1].client_session.delivery_properties(routing_key=params.routing_key()) + dp = params.delivery_properties(self._brokers[1].client_session) + self._brokers[1].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i)) + + # get exactly 10 msgs on B0's queue and B1's queue + for i in range(1, 11): + try: + msg = queue_0.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + msg = queue_1.get(timeout=5) + self.assertEqual("Message_trp %d" % i, msg.body) + except Empty: + self.fail("Only got %d msgs - expected 10" % i) + try: + extra = queue_0.get(timeout=1) + self.fail("Got unexpected message in queue_0: " + extra.body) + except Empty: None + + try: + extra = queue_1.get(timeout=1) + self.fail("Got unexpected message in queue_1: " + extra.body) + except Empty: None + + + # + # cleanup + # + params.unbind_queue(self._brokers[0].client_session, "fedX1", "fedX") + self._brokers[0].client_session.message_cancel(destination="f1") + self._brokers[0].client_session.queue_delete(queue="fedX1") + + params.unbind_queue(self._brokers[1].client_session, "fedX1", "fedX") + self._brokers[1].client_session.message_cancel(destination="f1") + self._brokers[1].client_session.queue_delete(queue="fedX1") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers[0:2]: + _b.client_session.exchange_delete(exchange="fedX") + + self._teardown_brokers() + + self.verify_cleanup() + + |