diff options
| author | Alan Conway <aconway@apache.org> | 2010-01-20 17:07:54 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-01-20 17:07:54 +0000 |
| commit | cd3166280e53b8587d4d257b7898577b65edc0b7 (patch) | |
| tree | fabdc0bf29f6c025648d84349faadb317cfa2e68 /cpp/src/qpid/broker/SemanticState.cpp | |
| parent | 8d124f581b0571a9edb5603e6c282a2ecc081b5b (diff) | |
| download | qpid-python-cd3166280e53b8587d4d257b7898577b65edc0b7.tar.gz | |
Cluster-safe assertions.
Assert that replicated data structures are modified in a cluster-safe
context - in cluster delivery thread or during update. Assertions
added to Queue.cpp and SemanticState.cpp.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901282 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 19 |
1 files changed, 18 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index d579f15279..68c62a72ef 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -34,6 +34,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/IsInSequenceSet.h" #include "qpid/log/Statement.h" +#include "qpid/sys/ClusterSafe.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" @@ -47,7 +48,6 @@ #include <assert.h> - namespace qpid { namespace broker { @@ -308,6 +308,7 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession() bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { + assertClusterSafe(); allocateCredit(msg.payload); DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; @@ -331,6 +332,7 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { + assertClusterSafe(); // FIXME aconway 2009-06-08: if we have byte & message credit but // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages @@ -354,6 +356,7 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) { void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { + assertClusterSafe(); uint32_t originalMsgCredit = msgCredit; uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { @@ -387,6 +390,7 @@ SemanticState::ConsumerImpl::~ConsumerImpl() void SemanticState::cancel(ConsumerImpl::shared_ptr c) { + assertClusterSafe(); c->disableNotify(); if (session.isAttached()) session.getConnection().outputTasks.removeOutputTask(c.get()); @@ -468,6 +472,7 @@ void SemanticState::requestDispatch() void SemanticState::ConsumerImpl::requestDispatch() { + assertClusterSafe(); if (blocked) { parent->session.getConnection().outputTasks.addOutputTask(this); parent->session.getConnection().outputTasks.activateOutput(); @@ -565,6 +570,7 @@ void SemanticState::stop(const std::string& destination) void SemanticState::ConsumerImpl::setWindowMode() { + assertClusterSafe(); windowing = true; if (mgmtObject){ mgmtObject->set_creditMode("WINDOW"); @@ -573,6 +579,7 @@ void SemanticState::ConsumerImpl::setWindowMode() void SemanticState::ConsumerImpl::setCreditMode() { + assertClusterSafe(); windowing = false; if (mgmtObject){ mgmtObject->set_creditMode("CREDIT"); @@ -581,6 +588,7 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { + assertClusterSafe(); if (byteCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) byteCredit = value; else byteCredit += value; @@ -589,6 +597,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { + assertClusterSafe(); if (msgCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) msgCredit = value; else msgCredit += value; @@ -614,6 +623,7 @@ void SemanticState::ConsumerImpl::flush() void SemanticState::ConsumerImpl::stop() { + assertClusterSafe(); msgCredit = 0; byteCredit = 0; } @@ -667,12 +677,14 @@ bool SemanticState::ConsumerImpl::doOutput() void SemanticState::ConsumerImpl::enableNotify() { Mutex::ScopedLock l(lock); + assertClusterSafe(); notifyEnabled = true; } void SemanticState::ConsumerImpl::disableNotify() { Mutex::ScopedLock l(lock); + assertClusterSafe(); notifyEnabled = false; } @@ -684,6 +696,7 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const { void SemanticState::ConsumerImpl::notify() { Mutex::ScopedLock l(lock); + assertClusterSafe(); if (notifyEnabled) { parent->session.getConnection().outputTasks.addOutputTask(this); parent->session.getConnection().outputTasks.activateOutput(); @@ -708,6 +721,7 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) { } void SemanticState::accepted(const SequenceSet& commands) { + assertClusterSafe(); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: @@ -740,6 +754,7 @@ void SemanticState::accepted(const SequenceSet& commands) { } void SemanticState::completed(const SequenceSet& commands) { + assertClusterSafe(); DeliveryRecords::iterator removed = remove_if(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, @@ -750,6 +765,7 @@ void SemanticState::completed(const SequenceSet& commands) { void SemanticState::attached() { + assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); session.getConnection().outputTasks.addOutputTask(i->second.get()); @@ -759,6 +775,7 @@ void SemanticState::attached() void SemanticState::detached() { + assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); session.getConnection().outputTasks.removeOutputTask(i->second.get()); |
