summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/QueueTest.cpp3
-rw-r--r--cpp/src/tests/cluster_test.cpp125
2 files changed, 85 insertions, 43 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 079b9b0ba6..5a93533755 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -26,6 +26,7 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
#include <iostream>
@@ -491,7 +492,7 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt
} else {
if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
}
- m->setTimestamp();
+ m->setTimestamp(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 15a96aeba9..14b7659b65 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -37,6 +37,7 @@
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/assign.hpp>
#include <string>
#include <iostream>
@@ -51,22 +52,23 @@ template <class T>
ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); }
}
-
-QPID_AUTO_TEST_SUITE(cluster)
+QPID_AUTO_TEST_SUITE(cluster_test)
using namespace std;
using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
-using qpid::sys::TIME_SEC;
-using qpid::broker::Broker;
+using namespace boost::assign;
+using broker::Broker;
using boost::shared_ptr;
-using qpid::cluster::Cluster;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
ostream& operator<<(ostream& o, const cpg_name* n) {
- return o << qpid::cluster::Cpg::str(*n);
+ return o << cluster::Cpg::str(*n);
}
ostream& operator<<(ostream& o, const cpg_address& a) {
@@ -94,7 +96,7 @@ template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
// Retry up to 10 secs in .1 second intervals.
for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
- ::usleep(1000*100); // 0.1 secs
+ sys::usleep(1000*100); // 0.1 secs
urls = source.getKnownBrokers();
}
}
@@ -127,6 +129,45 @@ int64_t getMsgSequence(const Message& m) {
return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
}
+Message ttlMessage(const std::string& data, const std::string& key, uint64_t ttl) {
+ Message m(data, key);
+ m.getDeliveryProperties().setTtl(ttl);
+ return m;
+}
+
+vector<std::string> browse(Client& c, const std::string& q, int n) {
+ SubscriptionSettings browseSettings(
+ FlowControl::unlimited(),
+ ACCEPT_MODE_NONE,
+ ACQUIRE_MODE_NOT_ACQUIRED,
+ 0 // No auto-ack.
+ );
+ LocalQueue lq;
+ c.subs.subscribe(lq, q, browseSettings);
+ vector<std::string> result;
+ for (int i = 0; i < n; ++i) {
+ result.push_back(lq.get(TIMEOUT).getData());
+ }
+ c.subs.getSubscription(q).cancel();
+ return result;
+}
+
+QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
+ // Note: this doesn't actually test for cluster race conditions around TTL,
+ // it just verifies that basic TTL functionality works.
+ //
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
+ c0.session.messageTransfer(arg::content=Message("b", "q"));
+ BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<std::string>("a")("b"));
+ sys::usleep(300*1000);
+ BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<std::string>("b"));
+ BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<std::string>("b"));
+}
+
QPID_AUTO_TEST_CASE(testSequenceOptions) {
// Make sure the exchange qpid.msg_sequence property is properly replicated.
ClusterFixture cluster(1);
@@ -138,13 +179,13 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) {
c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
- BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC)));
- BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+ BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+ BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
cluster.add();
Client c1(cluster[1]);
c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
- BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
+ BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
}
QPID_AUTO_TEST_CASE(testTxTransaction) {
@@ -160,14 +201,14 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
commitSession.txSelect();
commitSession.messageTransfer(arg::content=Message("a", "q"));
commitSession.messageTransfer(arg::content=Message("b", "q"));
- BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A");
+ BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
// Start a transaction that will roll back.
Session rollbackSession = c0.connection.newSession("rollback");
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
rollbackSession.messageTransfer(arg::content=Message("1", "q"));
- Message rollbackMessage = rollbackSubs.get("q", TIME_SEC);
+ Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
@@ -191,10 +232,10 @@ QPID_AUTO_TEST_CASE(testTxTransaction) {
// Verify queue status: just the comitted messages and dequeues should remain.
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
- BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
- BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
- BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b");
- BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
}
QPID_AUTO_TEST_CASE(testUnacked) {
@@ -210,7 +251,7 @@ QPID_AUTO_TEST_CASE(testUnacked) {
c0.session.messageTransfer(arg::content=Message("11","q1"));
LocalQueue q1;
c0.subs.subscribe(q1, "q1", manualAccept);
- BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted
+ BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
// Create unacked message: not acquired, accepted or completeed.
@@ -220,12 +261,12 @@ QPID_AUTO_TEST_CASE(testUnacked) {
c0.session.messageTransfer(arg::content=Message("22","q2"));
LocalQueue q2;
c0.subs.subscribe(q2, "q2", manualAcquire);
- m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue
+ m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue
BOOST_CHECK_EQUAL(m.getData(), "21");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
c0.subs.getSubscription("q2").acquire(m); // Acquire manually
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
- BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue
+ BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue
BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
// Create empty credit record: acquire and accept but don't complete.
@@ -235,7 +276,7 @@ QPID_AUTO_TEST_CASE(testUnacked) {
c0.session.messageTransfer(arg::content=Message("32", "q3"));
LocalQueue q3;
c0.subs.subscribe(q3, "q3", manualComplete);
- Message m31=q3.get(TIME_SEC);
+ Message m31=q3.get(TIMEOUT);
BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
@@ -251,7 +292,7 @@ QPID_AUTO_TEST_CASE(testUnacked) {
// Complete the empty credit message, should unblock the message behind it.
BOOST_CHECK_THROW(q3.get(0), Exception);
c0.session.markCompleted(SequenceSet(m31.getId()), true);
- BOOST_CHECK_EQUAL(q3.get(TIME_SEC).getData(), "32");
+ BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
@@ -260,9 +301,9 @@ QPID_AUTO_TEST_CASE(testUnacked) {
BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
- BOOST_CHECK_EQUAL(c1.subs.get("q1", TIME_SEC).getData(), "11");
- BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "21");
- BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22");
+ BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21");
+ BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22");
}
QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
@@ -276,7 +317,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
c0.session.messageTransfer(arg::content=Message("1","q"));
c0.session.messageTransfer(arg::content=Message("2","q"));
Message m;
- BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "1");
// New member, TX not comitted, c1 should see nothing.
@@ -287,7 +328,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
// After commit c1 shoudl see results of tx.
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "2");
// Another transaction with both members active.
@@ -295,7 +336,7 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
c0.session.txCommit();
BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "3");
}
@@ -318,7 +359,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
// No reliable way to ensure the partial message has arrived
// before we start the new broker, so we sleep.
- ::usleep(2500);
+ sys::usleep(2500);
cluster.add();
// Send final 2 frames of message.
@@ -328,7 +369,7 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
// Verify message is enqued correctly on second member.
Message m;
Client c1(cluster[1], "c1");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "abcd");
BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
}
@@ -391,20 +432,20 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
// Activate the subscription, ensure message removed on all queues.
c0.subs.setFlowControl("q", FlowControl::unlimited());
Message m;
- BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "aaa");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
// Check second subscription's flow control: gets first message, not second.
- BOOST_CHECK(lp.get(m, TIME_SEC));
+ BOOST_CHECK(lp.get(m, TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "bbb");
BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
- BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "ccc");
// Kill the subscribing member, ensure further messages are not removed.
@@ -412,7 +453,7 @@ QPID_AUTO_TEST_CASE(testUpdateConsumers) {
BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
for (int i = 0; i < 10; ++i) {
c1.session.messageTransfer(arg::content=Message("xxx", "q"));
- BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
}
}
@@ -426,7 +467,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
c0.session.messageTransfer(arg::content=Message("foo","q"));
c0.session.messageTransfer(arg::content=Message("bar","q"));
while (c0.session.queueQuery("q").getMessageCount() != 2)
- ::usleep(1000); // Wait for message to show up on broker 0.
+ sys::usleep(1000); // Wait for message to show up on broker 0.
// Add a new broker, it should catch up.
cluster.add();
@@ -444,18 +485,18 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
Client c1(cluster[1], "c1");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "foo");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "bar");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
// Add another broker, don't wait for join - should be stalled till ready.
cluster.add();
Client c2(cluster[2], "c2");
- BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "pfoo");
- BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
BOOST_CHECK_EQUAL(m.getData(), "pbar");
BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
}
@@ -488,9 +529,9 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
c0.session.close();
Client c1(cluster[1]);
Message msg;
- BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
BOOST_CHECK_EQUAL(string("foo"), msg.getData());
- BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
BOOST_CHECK_EQUAL(string("bar"), msg.getData());
}
@@ -535,9 +576,9 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
// Check they arrived
Message m;
- BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
BOOST_CHECK_EQUAL("foo", m.getData());
- BOOST_CHECK(c0.lq.get(m, sys::TIME_SEC));
+ BOOST_CHECK(c0.lq.get(m, TIMEOUT));
BOOST_CHECK_EQUAL("bar", m.getData());
// Queue should be empty on all cluster members.