summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-20 17:07:54 +0000
committerAlan Conway <aconway@apache.org>2010-01-20 17:07:54 +0000
commitcd3166280e53b8587d4d257b7898577b65edc0b7 (patch)
treefabdc0bf29f6c025648d84349faadb317cfa2e68 /cpp/src/qpid/broker/Queue.cpp
parent8d124f581b0571a9edb5603e6c282a2ecc081b5b (diff)
downloadqpid-python-cd3166280e53b8587d4d257b7898577b65edc0b7.tar.gz
Cluster-safe assertions.
Assert that replicated data structures are modified in a cluster-safe context - in cluster delivery thread or during update. Assertions added to Queue.cpp and SemanticState.cpp. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901282 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp15
1 files changed, 13 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index dcc5116afa..3eb714186c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -33,6 +33,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
@@ -44,6 +45,7 @@
#include <boost/bind.hpp>
#include <boost/intrusive_ptr.hpp>
+
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -144,7 +146,6 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
-
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -165,7 +166,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
push(msg);
}
mgntEnqStats(msg);
- QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -202,6 +203,7 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
+ assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -222,6 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){
}
void Queue::clearLVQIndex(const QueuedMessage& msg){
+ assertClusterSafe();
const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
if (lastValueQueue && ft){
string key = ft->getAsString(qpidVQMatchProperty);
@@ -232,6 +235,7 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
+ assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
Messages::iterator i = findAt(position);
@@ -251,6 +255,8 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
+ assertClusterSafe();
+
QPID_LOG(debug, "attempting to acquire " << msg.position);
Messages::iterator i = findAt(msg.position);
if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
@@ -272,6 +278,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
void Queue::notifyListener()
{
+ assertClusterSafe();
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
@@ -366,6 +373,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
void Queue::removeListener(Consumer::shared_ptr c)
{
+ assertClusterSafe();
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
@@ -440,6 +448,7 @@ QueuedMessage Queue::find(SequenceNumber pos) const {
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+ assertClusterSafe();
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw ResourceLockedException(
@@ -539,6 +548,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
void Queue::popMsg(QueuedMessage& qmsg)
{
+ assertClusterSafe();
const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
if (lastValueQueue && ft){
string key = ft->getAsString(qpidVQMatchProperty);
@@ -549,6 +559,7 @@ void Queue::popMsg(QueuedMessage& qmsg)
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
+ assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);