diff options
Diffstat (limited to 'cpp/src/tests/BrokerClusterCalls.cpp')
| -rw-r--r-- | cpp/src/tests/BrokerClusterCalls.cpp | 48 |
1 files changed, 25 insertions, 23 deletions
diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp index 6cdd6fc9bf..f659702387 100644 --- a/cpp/src/tests/BrokerClusterCalls.cpp +++ b/cpp/src/tests/BrokerClusterCalls.cpp @@ -42,6 +42,7 @@ using namespace boost; using namespace boost::assign; using namespace qpid::messaging; using boost::format; +using boost::intrusive_ptr; namespace qpid { namespace tests { @@ -59,6 +60,9 @@ class DummyCluster : public broker::Cluster history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() % qm.position % qm.payload->getFrames().getContent()).str(); } + void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) { + history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); + } void recordStr(const string& op, const string& name) { history += (format("%s(%s)") % op % name).str(); } @@ -70,7 +74,10 @@ class DummyCluster : public broker::Cluster history += (format("routing(%s)") % m->getFrames().getContent()).str(); } - virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); } + virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) { + recordMsg("enqueue", q, msg); + return true; + } virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { history += (format("routed(%s)") % m->getFrames().getContent()).str(); @@ -91,9 +98,8 @@ class DummyCluster : public broker::Cluster virtual void release(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("release", qm); } - virtual void dequeue(const broker::QueuedMessage& qm) { - // Never ignore dequeue, used to avoid resource leaks. - recordQm("dequeue", qm); + virtual void drop(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("dequeue", qm); } // Consumers @@ -156,7 +162,7 @@ QPID_AUTO_TEST_CASE(testSimplePubSub) { sender.send(Message("a")); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); // Don't check size here as it is uncertain whether acquire has happened yet. @@ -221,7 +227,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { f.s.reject(m); BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); @@ -239,7 +245,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { bool received = receiver.fetch(m, Duration::IMMEDIATE); BOOST_CHECK(!received); // Timed out BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -252,7 +258,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -261,11 +267,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { sender.send(m); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - // FIXME: bug in Queue.cpp gives the incorrect position when - // dequeueing a replaced LVQ message. - // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1 - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -345,20 +347,19 @@ QPID_AUTO_TEST_CASE(testRingQueue) { BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); Receiver receiver = f.s.createReceiver("ring"); @@ -399,15 +400,16 @@ QPID_AUTO_TEST_CASE(testTransactions) { BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit ts.commit(); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)"); - BOOST_CHECK_EQUAL(h.size(), i); - // FIXME aconway 2010-10-18: As things stand the cluster is not // compatible with transactions - // - enqueues occur after routing is complete. + // - enqueues occur after routing is complete + // - no call to Cluster::enqueue, should be in Queue::process? // - no transaction context associated with messages in the Cluster interface. // - no call to Cluster::accept in Queue::dequeueCommitted + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + Receiver receiver = ts.createReceiver("q"); BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); |
