summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-20 17:07:54 +0000
committerAlan Conway <aconway@apache.org>2010-01-20 17:07:54 +0000
commitcd3166280e53b8587d4d257b7898577b65edc0b7 (patch)
treefabdc0bf29f6c025648d84349faadb317cfa2e68 /cpp/src/qpid/broker/SemanticState.cpp
parent8d124f581b0571a9edb5603e6c282a2ecc081b5b (diff)
downloadqpid-python-cd3166280e53b8587d4d257b7898577b65edc0b7.tar.gz
Cluster-safe assertions.
Assert that replicated data structures are modified in a cluster-safe context - in cluster delivery thread or during update. Assertions added to Queue.cpp and SemanticState.cpp. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901282 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp19
1 files changed, 18 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index d579f15279..68c62a72ef 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -34,6 +34,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/ClusterSafe.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
@@ -47,7 +48,6 @@
#include <assert.h>
-
namespace qpid {
namespace broker {
@@ -308,6 +308,7 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession()
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
+ assertClusterSafe();
allocateCredit(msg.payload);
DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
@@ -331,6 +332,7 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
+ assertClusterSafe();
// FIXME aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
@@ -354,6 +356,7 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) {
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
+ assertClusterSafe();
uint32_t originalMsgCredit = msgCredit;
uint32_t originalByteCredit = byteCredit;
if (msgCredit != 0xFFFFFFFF) {
@@ -387,6 +390,7 @@ SemanticState::ConsumerImpl::~ConsumerImpl()
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
+ assertClusterSafe();
c->disableNotify();
if (session.isAttached())
session.getConnection().outputTasks.removeOutputTask(c.get());
@@ -468,6 +472,7 @@ void SemanticState::requestDispatch()
void SemanticState::ConsumerImpl::requestDispatch()
{
+ assertClusterSafe();
if (blocked) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -565,6 +570,7 @@ void SemanticState::stop(const std::string& destination)
void SemanticState::ConsumerImpl::setWindowMode()
{
+ assertClusterSafe();
windowing = true;
if (mgmtObject){
mgmtObject->set_creditMode("WINDOW");
@@ -573,6 +579,7 @@ void SemanticState::ConsumerImpl::setWindowMode()
void SemanticState::ConsumerImpl::setCreditMode()
{
+ assertClusterSafe();
windowing = false;
if (mgmtObject){
mgmtObject->set_creditMode("CREDIT");
@@ -581,6 +588,7 @@ void SemanticState::ConsumerImpl::setCreditMode()
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
+ assertClusterSafe();
if (byteCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) byteCredit = value;
else byteCredit += value;
@@ -589,6 +597,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
+ assertClusterSafe();
if (msgCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) msgCredit = value;
else msgCredit += value;
@@ -614,6 +623,7 @@ void SemanticState::ConsumerImpl::flush()
void SemanticState::ConsumerImpl::stop()
{
+ assertClusterSafe();
msgCredit = 0;
byteCredit = 0;
}
@@ -667,12 +677,14 @@ bool SemanticState::ConsumerImpl::doOutput()
void SemanticState::ConsumerImpl::enableNotify()
{
Mutex::ScopedLock l(lock);
+ assertClusterSafe();
notifyEnabled = true;
}
void SemanticState::ConsumerImpl::disableNotify()
{
Mutex::ScopedLock l(lock);
+ assertClusterSafe();
notifyEnabled = false;
}
@@ -684,6 +696,7 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
void SemanticState::ConsumerImpl::notify()
{
Mutex::ScopedLock l(lock);
+ assertClusterSafe();
if (notifyEnabled) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -708,6 +721,7 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
}
void SemanticState::accepted(const SequenceSet& commands) {
+ assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
@@ -740,6 +754,7 @@ void SemanticState::accepted(const SequenceSet& commands) {
}
void SemanticState::completed(const SequenceSet& commands) {
+ assertClusterSafe();
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(),
isInSequenceSetAnd(commands,
@@ -750,6 +765,7 @@ void SemanticState::completed(const SequenceSet& commands) {
void SemanticState::attached()
{
+ assertClusterSafe();
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
session.getConnection().outputTasks.addOutputTask(i->second.get());
@@ -759,6 +775,7 @@ void SemanticState::attached()
void SemanticState::detached()
{
+ assertClusterSafe();
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
session.getConnection().outputTasks.removeOutputTask(i->second.get());