diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 3 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 125 |
2 files changed, 85 insertions, 43 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 079b9b0ba6..5a93533755 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include <iostream> @@ -491,7 +492,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt } else { if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl); } - m->setTimestamp(); + m->setTimestamp(new broker::ExpiryPolicy); queue.deliver(m); } } diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 15a96aeba9..14b7659b65 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -37,6 +37,7 @@ #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> +#include <boost/assign.hpp> #include <string> #include <iostream> @@ -51,22 +52,23 @@ template <class T> ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); } } - -QPID_AUTO_TEST_SUITE(cluster) +QPID_AUTO_TEST_SUITE(cluster_test) using namespace std; using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::client; -using qpid::sys::TIME_SEC; -using qpid::broker::Broker; +using namespace boost::assign; +using broker::Broker; using boost::shared_ptr; -using qpid::cluster::Cluster; + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=sys::TIME_SEC/4; ostream& operator<<(ostream& o, const cpg_name* n) { - return o << qpid::cluster::Cpg::str(*n); + return o << cluster::Cpg::str(*n); } ostream& operator<<(ostream& o, const cpg_address& a) { @@ -94,7 +96,7 @@ template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) { BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); // Retry up to 10 secs in .1 second intervals. for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { - ::usleep(1000*100); // 0.1 secs + sys::usleep(1000*100); // 0.1 secs urls = source.getKnownBrokers(); } } @@ -127,6 +129,45 @@ int64_t getMsgSequence(const Message& m) { return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); } +Message ttlMessage(const std::string& data, const std::string& key, uint64_t ttl) { + Message m(data, key); + m.getDeliveryProperties().setTtl(ttl); + return m; +} + +vector<std::string> browse(Client& c, const std::string& q, int n) { + SubscriptionSettings browseSettings( + FlowControl::unlimited(), + ACCEPT_MODE_NONE, + ACQUIRE_MODE_NOT_ACQUIRED, + 0 // No auto-ack. + ); + LocalQueue lq; + c.subs.subscribe(lq, q, browseSettings); + vector<std::string> result; + for (int i = 0; i < n; ++i) { + result.push_back(lq.get(TIMEOUT).getData()); + } + c.subs.getSubscription(q).cancel(); + return result; +} + +QPID_AUTO_TEST_CASE(testMessageTimeToLive) { + // Note: this doesn't actually test for cluster race conditions around TTL, + // it just verifies that basic TTL functionality works. + // + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); + c0.session.messageTransfer(arg::content=Message("b", "q")); + BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<std::string>("a")("b")); + sys::usleep(300*1000); + BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<std::string>("b")); + BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<std::string>("b")); +} + QPID_AUTO_TEST_CASE(testSequenceOptions) { // Make sure the exchange qpid.msg_sequence property is properly replicated. ClusterFixture cluster(1); @@ -138,13 +179,13 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex"); c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex"); - BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC))); - BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC))); + BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); + BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); cluster.add(); Client c1(cluster[1]); c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex"); - BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC))); + BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); } QPID_AUTO_TEST_CASE(testTxTransaction) { @@ -160,14 +201,14 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { commitSession.txSelect(); commitSession.messageTransfer(arg::content=Message("a", "q")); commitSession.messageTransfer(arg::content=Message("b", "q")); - BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A"); + BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); // Start a transaction that will roll back. Session rollbackSession = c0.connection.newSession("rollback"); SubscriptionManager rollbackSubs(rollbackSession); rollbackSession.txSelect(); rollbackSession.messageTransfer(arg::content=Message("1", "q")); - Message rollbackMessage = rollbackSubs.get("q", TIME_SEC); + Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); @@ -191,10 +232,10 @@ QPID_AUTO_TEST_CASE(testTxTransaction) { // Verify queue status: just the comitted messages and dequeues should remain. BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b"); + BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c"); } QPID_AUTO_TEST_CASE(testUnacked) { @@ -210,7 +251,7 @@ QPID_AUTO_TEST_CASE(testUnacked) { c0.session.messageTransfer(arg::content=Message("11","q1")); LocalQueue q1; c0.subs.subscribe(q1, "q1", manualAccept); - BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted + BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue // Create unacked message: not acquired, accepted or completeed. @@ -220,12 +261,12 @@ QPID_AUTO_TEST_CASE(testUnacked) { c0.session.messageTransfer(arg::content=Message("22","q2")); LocalQueue q2; c0.subs.subscribe(q2, "q2", manualAcquire); - m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue + m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue BOOST_CHECK_EQUAL(m.getData(), "21"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed c0.subs.getSubscription("q2").acquire(m); // Acquire manually BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed - BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. // Create empty credit record: acquire and accept but don't complete. @@ -235,7 +276,7 @@ QPID_AUTO_TEST_CASE(testUnacked) { c0.session.messageTransfer(arg::content=Message("32", "q3")); LocalQueue q3; c0.subs.subscribe(q3, "q3", manualComplete); - Message m31=q3.get(TIME_SEC); + Message m31=q3.get(TIMEOUT); BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed. BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u); @@ -251,7 +292,7 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Complete the empty credit message, should unblock the message behind it. BOOST_CHECK_THROW(q3.get(0), Exception); c0.session.markCompleted(SequenceSet(m31.getId()), true); - BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32"); + BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u); @@ -260,9 +301,9 @@ QPID_AUTO_TEST_CASE(testUnacked) { BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); - BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22"); + BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21"); + BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22"); } QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { @@ -276,7 +317,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { c0.session.messageTransfer(arg::content=Message("1","q")); c0.session.messageTransfer(arg::content=Message("2","q")); Message m; - BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "1"); // New member, TX not comitted, c1 should see nothing. @@ -287,7 +328,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { // After commit c1 shoudl see results of tx. c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "2"); // Another transaction with both members active. @@ -295,7 +336,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); c0.session.txCommit(); BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "3"); } @@ -318,7 +359,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // No reliable way to ensure the partial message has arrived // before we start the new broker, so we sleep. - ::usleep(2500); + sys::usleep(2500); cluster.add(); // Send final 2 frames of message. @@ -328,7 +369,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { // Verify message is enqued correctly on second member. Message m; Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "abcd"); BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size()); } @@ -391,20 +432,20 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::unlimited()); Message m; - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "aaa"); BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); // Check second subscription's flow control: gets first message, not second. - BOOST_CHECK(lp.get(m, TIME_SEC)); + BOOST_CHECK(lp.get(m, TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bbb"); BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); - BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "ccc"); // Kill the subscribing member, ensure further messages are not removed. @@ -412,7 +453,7 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) { BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); for (int i = 0; i < 10; ++i) { c1.session.messageTransfer(arg::content=Message("xxx", "q")); - BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC)); + BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } } @@ -426,7 +467,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { c0.session.messageTransfer(arg::content=Message("foo","q")); c0.session.messageTransfer(arg::content=Message("bar","q")); while (c0.session.queueQuery("q").getMessageCount() != 2) - ::usleep(1000); // Wait for message to show up on broker 0. + sys::usleep(1000); // Wait for message to show up on broker 0. // Add a new broker, it should catch up. cluster.add(); @@ -444,18 +485,18 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "foo"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); // Add another broker, don't wait for join - should be stalled till ready. cluster.add(); Client c2(cluster[2], "c2"); - BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "pfoo"); - BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); BOOST_CHECK_EQUAL(m.getData(), "pbar"); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); } @@ -488,9 +529,9 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { c0.session.close(); Client c1(cluster[1]); Message msg; - BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); BOOST_CHECK_EQUAL(string("foo"), msg.getData()); - BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } @@ -535,9 +576,9 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { // Check they arrived Message m; - BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL("foo", m.getData()); - BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIMEOUT)); BOOST_CHECK_EQUAL("bar", m.getData()); // Queue should be empty on all cluster members. |