diff options
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 21 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/federation.py | 106 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 17 |
3 files changed, 144 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index edb50fce9c..55cff046e2 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testLinkBindingCleanup) +{ + MessagingFixture fix; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + + Connection connection = fix.newConnection(); + connection.open(); + + Session session(connection.createSession()); + Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}"); + Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}"); + connection.close(); + + sender.send(Message("test-message"), true); + + // The session-scoped binding should be removed when receiver1's network connection is lost + Message in; + BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index dcd074eda9..6477c6effd 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -2604,3 +2604,109 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_dynamic_bounce_unbinds_named_queue(self): + """ Verify that a propagated binding is removed when the connection is + bounced + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers[0:2]: + _b.client_session.exchange_declare(exchange="fedX", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + timeout = time() + 10 + while my_exchange is None and time() <= timeout: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX": + my_exchange = ooo + break + if my_exchange is None: + self.fail("QMF failed to find new exchange!") + exchanges.append(my_exchange) + + # on the destination broker, create a binding for propagation + self._brokers[0].client_session.queue_declare(queue="fedDstQ") + self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud") + + # on the source broker, create a bridge queue + self._brokers[1].client_session.queue_declare(queue="fedSrcQ") + + # connect B1 --> B0 + result = self._brokers[0].qmf_object.create( "link", + "Link-dynamic", + {"host":self._brokers[1].host, + "port":self._brokers[1].port}, False) + self.assertEqual(result.status, 0) + + # bridge the "fedX" exchange: + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-dynamic", + {"link":"Link-dynamic", + "src":"fedX", + "dest":"fedX", + "dynamic":True, + "queue":"fedSrcQ"}, False) + self.assertEqual(result.status, 0) + + # wait for the inter-broker links to become operational + operational = False + timeout = time() + 10 + while not operational and time() <= timeout: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + self.failUnless(operational, "inter-broker links failed to become operational.") + + # wait until the binding key has propagated to the src broker + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount < 1 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 1) + + # + # Tear down the bridges between the two exchanges, then wait + # for the bindings to be cleaned up + # + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount != 0 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 0) + + self._brokers[1].client_session.queue_delete(queue="fedSrcQ") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers[0:2]: + _b.client_session.exchange_delete(exchange="fedX") + + self._teardown_brokers() + + self.verify_cleanup() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 968ffa8b4a..ccb75d9cfd 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -478,6 +478,23 @@ class ReplicationTests(HaBrokerTest): self.fail("Excpected no-such-queue exception") except NotFound: pass + def test_replicate_binding(self): + """Verify that binding replication can be disabled""" + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + ps = primary.connect().session() + ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") + ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}") + backup.wait_backup("q") + + primary.kill() + assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die + backup.promote() + bs = backup.connect_admin().session() + bs.sender("ex").send(Message("msg")) + self.assert_browse_retry(bs, "q", []) + def test_invalid_replication(self): """Verify that we reject an attempt to declare a queue with invalid replication value.""" cluster = HaCluster(self, 1, ha_replicate="all") |
