summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-10-25 18:00:34 +0000
committerAlan Conway <aconway@apache.org>2010-10-25 18:00:34 +0000
commitf27a733f9a4cca3ad2a42acb35ab4620a47e320d (patch)
treea4d7d7a34a6cf42e1241e998f7da012ee37b109f /cpp/src/qpid/broker
parent2c422462dc717e667c13aa74bbc552c8507e3f83 (diff)
downloadqpid-python-f27a733f9a4cca3ad2a42acb35ab4620a47e320d.tar.gz
New cluster: core framework and initial implementation of enqueue logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1027210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Cluster.h19
-rw-r--r--cpp/src/qpid/broker/NullCluster.h4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp17
3 files changed, 24 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/Cluster.h b/cpp/src/qpid/broker/Cluster.h
index 91b52e8af1..4dabd98eab 100644
--- a/cpp/src/qpid/broker/Cluster.h
+++ b/cpp/src/qpid/broker/Cluster.h
@@ -54,8 +54,14 @@ class Cluster
/** In Exchange::route, before the message is enqueued. */
virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
- /** A message is delivered to a queue. */
- virtual void enqueue(QueuedMessage&) = 0;
+
+ /** A message is delivered to a queue.
+ * Called before actually pushing the message to the queue.
+ *@return If true the message should be pushed to the queue now.
+ * otherwise the cluster code will push the message when it is replicated.
+ */
+ virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
+
/** In Exchange::route, after all enqueues for the message. */
virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
@@ -71,11 +77,12 @@ class Cluster
/** A locally-acquired message is released by the consumer and re-queued. */
virtual void release(const QueuedMessage&) = 0;
- /** A message is dropped from the queue, e.g. expired or replaced on an LVQ.
- * This function does only local book-keeping, it does not multicast.
- * It is reasonable to call with a queue lock held.
+
+ /** A message is removed from the queue. It could have been
+ * accepted, rejected or dropped for other reasons e.g. expired or
+ * replaced on an LVQ.
*/
- virtual void dequeue(const QueuedMessage&) = 0;
+ virtual void drop(const QueuedMessage&) = 0;
// Consumers
diff --git a/cpp/src/qpid/broker/NullCluster.h b/cpp/src/qpid/broker/NullCluster.h
index 4f3485eb40..0e11ceef27 100644
--- a/cpp/src/qpid/broker/NullCluster.h
+++ b/cpp/src/qpid/broker/NullCluster.h
@@ -38,14 +38,14 @@ class NullCluster : public Cluster
// Messages
virtual void routing(const boost::intrusive_ptr<Message>&) {}
- virtual void enqueue(QueuedMessage&) {}
+ virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; }
virtual void routed(const boost::intrusive_ptr<Message>&) {}
virtual void acquire(const QueuedMessage&) {}
virtual void accept(const QueuedMessage&) {}
virtual void reject(const QueuedMessage&) {}
virtual void rejected(const QueuedMessage&) {}
virtual void release(const QueuedMessage&) {}
- virtual void dequeue(const QueuedMessage&) {}
+ virtual void drop(const QueuedMessage&) {}
// Consumers
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index b05172f984..c530e9cd51 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -146,6 +146,10 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
+ // Same thing but for the new cluster interface.
+ if (broker && !broker->getCluster().enqueue(*this, msg))
+ return;
+
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -165,7 +169,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
}else {
push(msg);
}
- mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -199,7 +202,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -642,6 +644,7 @@ void Queue::popMsg(QueuedMessage& qmsg)
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
+ if (!isRecovery) mgntEnqStats(msg);
QueuedMessage qm;
QueueListeners::NotificationSet copy;
{
@@ -687,7 +690,6 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
copy.notify();
- if (broker) broker->getCluster().enqueue(qm);
}
QueuedMessage Queue::getFront()
@@ -868,10 +870,9 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) {
- dequeued(msg);
- }
+ if (!ctxt) dequeued(msg);
}
+ if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -888,6 +889,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
+ if (broker) broker->getCluster().drop(msg); // Outside lock
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
if (mgmtObject != 0) {
@@ -913,9 +915,8 @@ void Queue::popAndDequeue()
*/
void Queue::dequeued(const QueuedMessage& msg)
{
- // Note: Cluster::dequeued does only local book-keeping, no multicast
+ // Note: Cluster::drop does only local book-keeping, no multicast
// So OK to call here with lock held.
- if (broker) broker->getCluster().dequeue(msg);
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {