diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-09-22 17:36:01 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-09-22 17:36:01 +0000 |
commit | f13c131456c4fb0a027513242f2d8253aad75ca0 (patch) | |
tree | 5ccd77aeb648695b41c0444e44ffa039e012d124 /cpp/src/qpid/broker/Queue.cpp | |
parent | 3d3fb015b49b088a6e1f641437cd6b7acb0ed6ec (diff) | |
download | qpid-python-f13c131456c4fb0a027513242f2d8253aad75ca0.tar.gz |
Joint checkin from gsim, kpvdr, cctrieloff. See QPID-2102: Exceeding reject queue policy under a transaction causes broker crash
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@817742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 61 |
1 files changed, 34 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index b2a8e223c5..1cc48a949e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -181,6 +181,8 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ + if (policy.get()) policy->recoverEnqueued(msg); + push(msg, true); if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure @@ -563,15 +565,12 @@ void Queue::popMsg(QueuedMessage& qmsg) } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ - Messages dequeues; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (policy.get()) { - policy->tryEnqueue(qm); - //depending on policy, may have some dequeues - if (!isRecovery) pendingDequeues.swap(dequeues); + policy->enqueued(qm); } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); @@ -608,10 +607,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ } } copy.notify(); - if (!dequeues.empty()) { - //depending on policy, may have some dequeues - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); - } } QueuedMessage Queue::getFront() @@ -697,8 +692,12 @@ void Queue::setLastNodeFailure() } // return true if store exists, -bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) +bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { + if (policy.get() && !suppressPolicyCheck) { + policy->tryEnqueue(msg); + } + if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } @@ -713,9 +712,21 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) store->enqueue(ctxt, pmsg, *this); return true; } + if (!store) { + //Messages enqueued on a transient queue should be prevented + //from having their content released as it may not be + //recoverable by these queue for delivery + msg->blockContentRelease(); + } return false; } +void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) +{ + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->enqueueAborted(msg); +} + // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { @@ -781,7 +792,15 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings, bool recovering) { - setPolicy(QueuePolicy::createQueuePolicy(_settings)); + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && + (!store || NullMessageStore::isNullStore(store))) { + QPID_LOG(warning, "Flow to disk not valid for non-persisted queue"); + FieldTable copy(_settings); + copy.erase(QueuePolicy::typeKey); + setPolicy(QueuePolicy::createQueuePolicy(getName(), copy)); + } else { + setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); + } //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue with no-local=" << noLocal); @@ -975,19 +994,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } -bool Queue::releaseMessageContent(const QueuedMessage& m) -{ - if (store && !NullMessageStore::isNullStore(store)) { - QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); - m.payload->releaseContent(store); - return true; - } else { - QPID_LOG(warning, "Message " << m.position << " on " << name - << " cannot be released from memory as the queue is not durable"); - return false; - } -} - ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; @@ -1044,11 +1050,12 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { if (m.payload) { - if (policy.get()) policy->tryEnqueue(m); - mgntEnqStats(m.payload); - if (m.payload->isPersistent()) { - enqueue ( 0, m.payload ); + if (policy.get()) { + policy->recoverEnqueued(m.payload); + policy->enqueued(m); } + mgntEnqStats(m.payload); + enqueue ( 0, m.payload, true ); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } |