summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp16
1 files changed, 11 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 30be733f89..19589e1d84 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -99,8 +99,7 @@ Queue::Queue(const string& _name, bool _autodelete,
eventMode(0),
eventMgr(0),
insertSeqNo(0),
- broker(b),
- lastForcedPosition(0)
+ broker(b)
{
if (parent != 0 && broker != 0)
{
@@ -211,6 +210,14 @@ void Queue::requeue(const QueuedMessage& msg){
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
+
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
+ msg.payload->forcePersistent();
+ if (msg.payload->isForcedPersistent() ){
+ enqueue(0, msg.payload);
+ }
+ }
}
copy.notify();
}
@@ -660,7 +667,6 @@ bool Queue::canAutoDelete() const
void Queue::clearLastNodeFailure()
{
inLastNodeFailure = false;
- lastForcedPosition = sequence;
}
void Queue::setLastNodeFailure()
@@ -669,19 +675,19 @@ void Queue::setLastNodeFailure()
Mutex::ScopedLock locker(messageLock);
for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
// don't force a message twice to disk.
- if(i->position > lastForcedPosition) {
+ if(!i->payload->isStoredOnQueue(shared_from_this())) {
if (lastValueQueue) checkLvqReplace(*i);
i->payload->forcePersistent();
if (i->payload->isForcedPersistent() ){
enqueue(0, i->payload);
}
- lastForcedPosition = i->position;
}
}
inLastNodeFailure = true;
}
}
+
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{