diff options
| author | Alan Conway <aconway@apache.org> | 2010-01-20 17:07:54 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-01-20 17:07:54 +0000 |
| commit | cd3166280e53b8587d4d257b7898577b65edc0b7 (patch) | |
| tree | fabdc0bf29f6c025648d84349faadb317cfa2e68 /cpp/src/qpid/broker/Queue.cpp | |
| parent | 8d124f581b0571a9edb5603e6c282a2ecc081b5b (diff) | |
| download | qpid-python-cd3166280e53b8587d4d257b7898577b65edc0b7.tar.gz | |
Cluster-safe assertions.
Assert that replicated data structures are modified in a cluster-safe
context - in cluster delivery thread or during update. Assertions
added to Queue.cpp and SemanticState.cpp.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901282 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index dcc5116afa..3eb714186c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -33,6 +33,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" @@ -44,6 +45,7 @@ #include <boost/bind.hpp> #include <boost/intrusive_ptr.hpp> + using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; @@ -144,7 +146,6 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) } void Queue::deliver(boost::intrusive_ptr<Message>& msg){ - if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); @@ -165,7 +166,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); } mgntEnqStats(msg); - QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); + QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -202,6 +203,7 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ + assertClusterSafe(); QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); @@ -222,6 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){ } void Queue::clearLVQIndex(const QueuedMessage& msg){ + assertClusterSafe(); const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0; if (lastValueQueue && ft){ string key = ft->getAsString(qpidVQMatchProperty); @@ -232,6 +235,7 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); + assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); Messages::iterator i = findAt(position); @@ -251,6 +255,8 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess bool Queue::acquire(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); + assertClusterSafe(); + QPID_LOG(debug, "attempting to acquire " << msg.position); Messages::iterator i = findAt(msg.position); if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set @@ -272,6 +278,7 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::notifyListener() { + assertClusterSafe(); QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); @@ -366,6 +373,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) void Queue::removeListener(Consumer::shared_ptr c) { + assertClusterSafe(); QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); @@ -440,6 +448,7 @@ QueuedMessage Queue::find(SequenceNumber pos) const { } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ + assertClusterSafe(); Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw ResourceLockedException( @@ -539,6 +548,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { void Queue::popMsg(QueuedMessage& qmsg) { + assertClusterSafe(); const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); if (lastValueQueue && ft){ string key = ft->getAsString(qpidVQMatchProperty); @@ -549,6 +559,7 @@ void Queue::popMsg(QueuedMessage& qmsg) } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ + assertClusterSafe(); QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); |
