diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 30be733f89..19589e1d84 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -99,8 +99,7 @@ Queue::Queue(const string& _name, bool _autodelete, eventMode(0), eventMgr(0), insertSeqNo(0), - broker(b), - lastForcedPosition(0) + broker(b) { if (parent != 0 && broker != 0) { @@ -211,6 +210,14 @@ void Queue::requeue(const QueuedMessage& msg){ msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); + + // for persistLastNode - don't force a message twice to disk, but force it if no force before + if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { + msg.payload->forcePersistent(); + if (msg.payload->isForcedPersistent() ){ + enqueue(0, msg.payload); + } + } } copy.notify(); } @@ -660,7 +667,6 @@ bool Queue::canAutoDelete() const void Queue::clearLastNodeFailure() { inLastNodeFailure = false; - lastForcedPosition = sequence; } void Queue::setLastNodeFailure() @@ -669,19 +675,19 @@ void Queue::setLastNodeFailure() Mutex::ScopedLock locker(messageLock); for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { // don't force a message twice to disk. - if(i->position > lastForcedPosition) { + if(!i->payload->isStoredOnQueue(shared_from_this())) { if (lastValueQueue) checkLvqReplace(*i); i->payload->forcePersistent(); if (i->payload->isForcedPersistent() ){ enqueue(0, i->payload); } - lastForcedPosition = i->position; } } inLastNodeFailure = true; } } + // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { |
