summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp137
1 files changed, 32 insertions, 105 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index c530e9cd51..e59857462c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -20,7 +20,6 @@
*/
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Cluster.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
@@ -146,10 +145,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
- // Same thing but for the new cluster interface.
- if (broker && !broker->getCluster().enqueue(*this, msg))
- return;
-
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -169,6 +164,7 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
}else {
push(msg);
}
+ mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -202,6 +198,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
+ mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -227,7 +224,6 @@ void Queue::requeue(const QueuedMessage& msg){
}
}
}
- if (broker) broker->getCluster().release(msg);
copy.notify();
}
@@ -240,22 +236,8 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
}
}
-// Inform the cluster of an acquired message on exit from a function
-// that does the acquiring. The calling function should set qmsg
-// to the acquired message.
-struct ClusterAcquireOnExit {
- Broker* broker;
- QueuedMessage qmsg;
- ClusterAcquireOnExit(Broker* b) : broker(b) {}
- ~ClusterAcquireOnExit() {
- if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
- }
-};
-
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
- ClusterAcquireOnExit willAcquire(broker);
-
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -266,18 +248,16 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
if (lastValueQueue) {
clearLVQIndex(*i);
}
- QPID_LOG(debug, "Acquired message at " << i->position << " from " << name);
- willAcquire.qmsg = *i;
+ QPID_LOG(debug,
+ "Acquired message at " << i->position << " from " << name);
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
return false;
}
bool Queue::acquire(const QueuedMessage& msg) {
- ClusterAcquireOnExit acquire(broker);
-
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -285,17 +265,16 @@ bool Queue::acquire(const QueuedMessage& msg) {
Messages::iterator i = findAt(msg.position);
if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
(!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
+ (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
+ ) {
clearLVQIndex(msg);
QPID_LOG(debug,
"Match found, acquire succeeded: " <<
i->position << " == " << msg.position);
- acquire.qmsg = *i;
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
@@ -335,8 +314,6 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
- ClusterAcquireOnExit willAcquire(broker); // Outside the lock
-
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -353,7 +330,6 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- willAcquire.qmsg = msg;
popMsg(msg);
return CONSUMED;
} else {
@@ -475,51 +451,40 @@ QueuedMessage Queue::find(SequenceNumber pos) const {
return QueuedMessage();
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
- size_t consumers;
- {
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
- }
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
}
- consumers = ++consumerCount;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
}
- if (broker) broker->getCluster().consume(*this, consumers);
+ consumerCount++;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- size_t consumers;
- {
- Mutex::ScopedLock locker(consumerLock);
- consumers = --consumerCount;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
- }
- if (broker) broker->getCluster().cancel(*this, consumers);
+ Mutex::ScopedLock locker(consumerLock);
+ consumerCount--;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
}
QueuedMessage Queue::get(){
- ClusterAcquireOnExit acquire(broker); // Outside lock
-
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if(!messages.empty()){
msg = getFront();
- acquire.qmsg = msg;
popMsg(msg);
}
return msg;
@@ -644,12 +609,10 @@ void Queue::popMsg(QueuedMessage& qmsg)
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
- if (!isRecovery) mgntEnqStats(msg);
- QueuedMessage qm;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- qm = QueuedMessage(this, msg, ++sequence);
+ QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -666,14 +629,12 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
if (!old) old = i->second;
i->second->setReplacementMessage(msg,this);
- // FIXME aconway 2010-10-15: it is incorrect to use qm.position below
- // should be using the position of the message being replaced.
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
} else {
- Mutex::ScopedUnlock u(messageLock);
+ Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
}
@@ -831,48 +792,19 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
if (policy.get()) policy->enqueueAborted(msg);
}
-void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) {
- if (broker) broker->getCluster().accept(msg);
- dequeue(ctxt, msg);
-}
-
-struct ScopedClusterReject {
- Broker* broker;
- const QueuedMessage& qmsg;
- ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) {
- if (broker) broker->getCluster().reject(qmsg);
- }
- ~ScopedClusterReject() {
- if (broker) broker->getCluster().rejected(qmsg);
- }
-};
-
-void Queue::reject(const QueuedMessage &msg) {
- ScopedClusterReject scr(broker, msg);
- Exchange::shared_ptr alternate = getAlternateExchange();
- if (alternate) {
- DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
- QPID_LOG(info, "Routed rejected message from " << getName() << " to "
- << alternate->getName());
- } else {
- //just drop it
- QPID_LOG(info, "Dropping rejected message from " << getName());
- }
- dequeue(0, msg);
-}
-
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
+
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) dequeued(msg);
+ if (!ctxt) {
+ dequeued(msg);
+ }
}
- if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -889,7 +821,6 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
- if (broker) broker->getCluster().drop(msg); // Outside lock
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
if (mgmtObject != 0) {
@@ -915,8 +846,6 @@ void Queue::popAndDequeue()
*/
void Queue::dequeued(const QueuedMessage& msg)
{
- // Note: Cluster::drop does only local book-keeping, no multicast
- // So OK to call here with lock held.
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
@@ -932,7 +861,6 @@ void Queue::create(const FieldTable& _settings)
store->create(*this, _settings);
}
configure(_settings);
- if (broker) broker->getCluster().create(*this);
}
void Queue::configure(const FieldTable& _settings, bool recovering)
@@ -1006,7 +934,6 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
- if (broker) broker->getCluster().destroy(*this);
}
void Queue::notifyDeleted()