From 3c445cd30f65530c4fe9198a66b92f8a3962739d Mon Sep 17 00:00:00 2001 From: "Carl C. Trieloff" Date: Tue, 7 Jul 2009 15:05:11 +0000 Subject: More tests and completion of fix for 791672 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@791858 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Queue.cpp | 16 +++++++++++----- cpp/src/qpid/broker/Queue.h | 1 + cpp/src/tests/QueueTest.cpp | 28 ++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 5 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 30b33d877b..759a38d919 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -99,7 +99,8 @@ Queue::Queue(const string& _name, bool _autodelete, eventMode(0), eventMgr(0), insertSeqNo(0), - broker(b) + broker(b), + lastForcedPosition(0) { if (parent != 0 && broker != 0) { @@ -659,6 +660,7 @@ bool Queue::canAutoDelete() const void Queue::clearLastNodeFailure() { inLastNodeFailure = false; + lastForcedPosition = sequence; } void Queue::setLastNodeFailure() @@ -666,10 +668,14 @@ void Queue::setLastNodeFailure() if (persistLastNode){ Mutex::ScopedLock locker(messageLock); for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { - if (lastValueQueue) checkLvqReplace(*i); - i->payload->forcePersistent(); - if (i->payload->isForcedPersistent() ){ - enqueue(0, i->payload); + // don't force a message twice to disk. + if(i->position > lastForcedPosition) { + if (lastValueQueue) checkLvqReplace(*i); + i->payload->forcePersistent(); + if (i->payload->isForcedPersistent() ){ + enqueue(0, i->payload); + } + lastForcedPosition = i->position; } } inLastNodeFailure = true; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 339fcc8ce1..56e349b06b 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -106,6 +106,7 @@ namespace qpid { bool insertSeqNo; std::string seqNoKey; Broker* broker; + framing::SequenceNumber lastForcedPosition; void push(boost::intrusive_ptr& msg, bool isRecovery=false); void setPolicy(std::auto_ptr policy); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 7ba3598ea7..d987e07a02 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -548,6 +548,34 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); + // check they don't get stored twice + queue1->setLastNodeFailure(); + queue2->setLastNodeFailure(); + BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); + + intrusive_ptr msg2 = create_message("e", "B"); + queue1->deliver(msg2); + queue2->deliver(msg2); + + queue1->clearLastNodeFailure(); + queue2->clearLastNodeFailure(); + // check only new messages get forced + queue1->setLastNodeFailure(); + queue2->setLastNodeFailure(); + BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); + + // check no failure messages are stored + queue1->clearLastNodeFailure(); + queue2->clearLastNodeFailure(); + + intrusive_ptr msg3 = create_message("e", "B"); + queue1->deliver(msg3); + queue2->deliver(msg3); + BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); + queue1->setLastNodeFailure(); + queue2->setLastNodeFailure(); + BOOST_CHECK_EQUAL(testStore.enqCnt, 6u); + } -- cgit v1.2.1