summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-10-18 19:36:13 +0000
committerAlan Conway <aconway@apache.org>2010-10-18 19:36:13 +0000
commita08d54e27d4e91b52c5979cc566ab3e933878983 (patch)
tree7f57ad88051e4a02f52d4bdf395968549e24f57a /cpp/src/qpid
parent8e53bc375ef2bfb4b05cc32b4a8c0042d95b9ec2 (diff)
downloadqpid-python-a08d54e27d4e91b52c5979cc566ab3e933878983.tar.gz
Introduce broker::Cluster interface.
See cpp/src/qpid/cluster/new-cluster-design.txt and new-cluster-plan.txt. qpid/cpp/src/tests/BrokerClusterCalls.cpp is a unit test that verifies the broker makes the expected calls on broker::Cluster in various situations. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1023966 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Broker.h5
-rw-r--r--cpp/src/qpid/broker/Cluster.h103
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp19
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp16
-rw-r--r--cpp/src/qpid/broker/NullCluster.h66
-rw-r--r--cpp/src/qpid/broker/Queue.cpp126
-rw-r--r--cpp/src/qpid/broker/Queue.h7
-rw-r--r--cpp/src/qpid/broker/QueuedMessage.h3
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp7
-rw-r--r--cpp/src/qpid/cluster/new-cluster-design.txt3
-rw-r--r--cpp/src/qpid/cluster/new-cluster-plan.txt439
12 files changed, 745 insertions, 51 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 33364e48df..a288da00c7 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/MessageStoreModule.h"
+#include "qpid/broker/NullCluster.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
@@ -146,6 +147,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.qmf2Support)
: 0),
store(new NullMessageStore),
+ cluster(new NullCluster),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 6636b5d912..dcb20c4fe3 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -70,6 +70,7 @@ namespace broker {
class ExpiryPolicy;
class Message;
+class Cluster;
static const uint16_t DEFAULT_PORT=5672;
@@ -153,6 +154,7 @@ public:
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
+ std::auto_ptr<Cluster> cluster;
AclModule* acl;
DataDir dataDir;
@@ -273,6 +275,9 @@ public:
void setClusterUpdatee(bool set) { clusterUpdatee = set; }
bool isClusterUpdatee() const { return clusterUpdatee; }
+ QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c) { cluster = c; }
+ QPID_BROKER_EXTERN Cluster& getCluster() { return *cluster; }
+
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
diff --git a/cpp/src/qpid/broker/Cluster.h b/cpp/src/qpid/broker/Cluster.h
new file mode 100644
index 0000000000..91b52e8af1
--- /dev/null
+++ b/cpp/src/qpid/broker/Cluster.h
@@ -0,0 +1,103 @@
+#ifndef QPID_BROKER_CLUSTER_H
+#define QPID_BROKER_CLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+
+class Message;
+struct QueuedMessage;
+class Queue;
+class Exchange;
+
+/**
+ * NOTE: this is part of an experimental cluster implementation that is not
+ * yet fully functional. The original cluster implementation remains in place.
+ * See ../cluster/new-cluster-design.txt
+ *
+ * Interface for cluster implementations. Functions on this interface are
+ * called at relevant points in the Broker's processing.
+ */
+class Cluster
+{
+ public:
+ virtual ~Cluster() {}
+
+ // Messages
+
+ /** In Exchange::route, before the message is enqueued. */
+ virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
+ /** A message is delivered to a queue. */
+ virtual void enqueue(QueuedMessage&) = 0;
+ /** In Exchange::route, after all enqueues for the message. */
+ virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
+
+ /** A message is acquired by a local consumer, it is unavailable to replicas. */
+ virtual void acquire(const QueuedMessage&) = 0;
+ /** A locally-acquired message is accepted, it is removed from all replicas. */
+ virtual void accept(const QueuedMessage&) = 0;
+
+ /** A locally-acquired message is rejected, and may be re-routed. */
+ virtual void reject(const QueuedMessage&) = 0;
+ /** Re-routing (if any) is complete for a rejected message. */
+ virtual void rejected(const QueuedMessage&) = 0;
+
+ /** A locally-acquired message is released by the consumer and re-queued. */
+ virtual void release(const QueuedMessage&) = 0;
+ /** A message is dropped from the queue, e.g. expired or replaced on an LVQ.
+ * This function does only local book-keeping, it does not multicast.
+ * It is reasonable to call with a queue lock held.
+ */
+ virtual void dequeue(const QueuedMessage&) = 0;
+
+ // Consumers
+
+ /** A new consumer subscribes to a queue. */
+ virtual void consume(const Queue&, size_t consumerCount) = 0;
+ /** A consumer cancels its subscription to a queue */
+ virtual void cancel(const Queue&, size_t consumerCount) = 0;
+
+ // Wiring
+
+ /** A queue is created */
+ virtual void create(const Queue&) = 0;
+ /** A queue is destroyed */
+ virtual void destroy(const Queue&) = 0;
+ /** An exchange is created */
+ virtual void create(const Exchange&) = 0;
+ /** An exchange is destroyed */
+ virtual void destroy(const Exchange&) = 0;
+ /** A binding is created */
+ virtual void bind(const Queue&, const Exchange&, const std::string& key, const framing::FieldTable& args) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CLUSTER_H*/
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 9443eb6ea5..315b1af2a8 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -112,7 +112,7 @@ void DeliveryRecord::complete() {
bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
- queue->dequeue(ctxt, msg);
+ queue->accept(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
@@ -130,19 +130,8 @@ void DeliveryRecord::committed() const{
}
void DeliveryRecord::reject()
-{
- Exchange::shared_ptr alternate = queue->getAlternateExchange();
- if (alternate) {
- DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
- QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
- << alternate->getName());
- } else {
- //just drop it
- QPID_LOG(info, "Dropping rejected message from " << queue->getName());
- }
-
- dequeue();
+{
+ queue->reject(msg);
}
uint32_t DeliveryRecord::getCredit() const
@@ -156,7 +145,7 @@ void DeliveryRecord::acquire(DeliveryIds& results) {
results.push_back(id);
if (!acceptExpected) {
if (ended) { QPID_LOG(error, "Can't dequeue ended message"); }
- else { queue->dequeue(0, msg); setEnded(); }
+ else { queue->accept(0, msg); setEnded(); }
}
} else {
QPID_LOG(info, "Message already acquired " << id.getValue());
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 98980e0360..aaf0805543 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
@@ -78,10 +79,23 @@ Exchange::PreRoute::~PreRoute(){
}
}
+// Bracket a scope with calls to Cluster::routing and Cluster::routed
+struct ScopedClusterRouting {
+ Broker* broker;
+ boost::intrusive_ptr<Message> message;
+ ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m)
+ : broker(b), message(m) {
+ if (broker) broker->getCluster().routing(message);
+ }
+ ~ScopedClusterRouting() {
+ if (broker) broker->getCluster().routed(message);
+ }
+};
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
+ ScopedClusterRouting scr(broker, &msg.getMessage());
int count = 0;
-
if (b.get()) {
// Block the content release if the message is transient AND there is more than one binding
if (!msg.getMessage().isPersistent() && b->size() > 1) {
diff --git a/cpp/src/qpid/broker/NullCluster.h b/cpp/src/qpid/broker/NullCluster.h
new file mode 100644
index 0000000000..4f3485eb40
--- /dev/null
+++ b/cpp/src/qpid/broker/NullCluster.h
@@ -0,0 +1,66 @@
+#ifndef QPID_BROKER_NULLCLUSTER_H
+#define QPID_BROKER_NULLCLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/Cluster.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * No-op implementation of Cluster interface, installed by broker when
+ * no cluster plug-in is present or clustering is disabled.
+ */
+class NullCluster : public Cluster
+{
+ public:
+
+ // Messages
+
+ virtual void routing(const boost::intrusive_ptr<Message>&) {}
+ virtual void enqueue(QueuedMessage&) {}
+ virtual void routed(const boost::intrusive_ptr<Message>&) {}
+ virtual void acquire(const QueuedMessage&) {}
+ virtual void accept(const QueuedMessage&) {}
+ virtual void reject(const QueuedMessage&) {}
+ virtual void rejected(const QueuedMessage&) {}
+ virtual void release(const QueuedMessage&) {}
+ virtual void dequeue(const QueuedMessage&) {}
+
+ // Consumers
+
+ virtual void consume(const Queue&, size_t) {}
+ virtual void cancel(const Queue&, size_t) {}
+
+ // Wiring
+
+ virtual void create(const Queue&) {}
+ virtual void destroy(const Queue&) {}
+ virtual void create(const Exchange&) {}
+ virtual void destroy(const Exchange&) {}
+ virtual void bind(const Queue&, const Exchange&, const std::string&, const framing::FieldTable&) {}
+};
+
+}} // namespace qpid::broker
+
+#endif
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e59857462c..b05172f984 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
@@ -224,6 +225,7 @@ void Queue::requeue(const QueuedMessage& msg){
}
}
}
+ if (broker) broker->getCluster().release(msg);
copy.notify();
}
@@ -236,8 +238,22 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
}
}
+// Inform the cluster of an acquired message on exit from a function
+// that does the acquiring. The calling function should set qmsg
+// to the acquired message.
+struct ClusterAcquireOnExit {
+ Broker* broker;
+ QueuedMessage qmsg;
+ ClusterAcquireOnExit(Broker* b) : broker(b) {}
+ ~ClusterAcquireOnExit() {
+ if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+ }
+};
+
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
+ ClusterAcquireOnExit willAcquire(broker);
+
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -248,16 +264,18 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
if (lastValueQueue) {
clearLVQIndex(*i);
}
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
+ QPID_LOG(debug, "Acquired message at " << i->position << " from " << name);
+ willAcquire.qmsg = *i;
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
return false;
}
bool Queue::acquire(const QueuedMessage& msg) {
+ ClusterAcquireOnExit acquire(broker);
+
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -265,16 +283,17 @@ bool Queue::acquire(const QueuedMessage& msg) {
Messages::iterator i = findAt(msg.position);
if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
(!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
+ (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
+ ) {
clearLVQIndex(msg);
QPID_LOG(debug,
"Match found, acquire succeeded: " <<
i->position << " == " << msg.position);
+ acquire.qmsg = *i;
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
@@ -314,6 +333,8 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
+ ClusterAcquireOnExit willAcquire(broker); // Outside the lock
+
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -330,6 +351,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
+ willAcquire.qmsg = msg;
popMsg(msg);
return CONSUMED;
} else {
@@ -451,40 +473,51 @@ QueuedMessage Queue::find(SequenceNumber pos) const {
return QueuedMessage();
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
assertClusterSafe();
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
}
+ consumers = ++consumerCount;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
}
- consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
+ if (broker) broker->getCluster().consume(*this, consumers);
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ consumers = --consumerCount;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+ }
+ if (broker) broker->getCluster().cancel(*this, consumers);
}
QueuedMessage Queue::get(){
+ ClusterAcquireOnExit acquire(broker); // Outside lock
+
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if(!messages.empty()){
msg = getFront();
+ acquire.qmsg = msg;
popMsg(msg);
}
return msg;
@@ -609,10 +642,11 @@ void Queue::popMsg(QueuedMessage& qmsg)
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
+ QueuedMessage qm;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- QueuedMessage qm(this, msg, ++sequence);
+ qm = QueuedMessage(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -629,12 +663,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
if (!old) old = i->second;
i->second->setReplacementMessage(msg,this);
+ // FIXME aconway 2010-10-15: it is incorrect to use qm.position below
+ // should be using the position of the message being replaced.
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
} else {
- Mutex::ScopedUnlock u(messageLock);
+ Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
}
@@ -651,6 +687,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
copy.notify();
+ if (broker) broker->getCluster().enqueue(qm);
}
QueuedMessage Queue::getFront()
@@ -792,12 +829,42 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
if (policy.get()) policy->enqueueAborted(msg);
}
+void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) {
+ if (broker) broker->getCluster().accept(msg);
+ dequeue(ctxt, msg);
+}
+
+struct ScopedClusterReject {
+ Broker* broker;
+ const QueuedMessage& qmsg;
+ ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) {
+ if (broker) broker->getCluster().reject(qmsg);
+ }
+ ~ScopedClusterReject() {
+ if (broker) broker->getCluster().rejected(qmsg);
+ }
+};
+
+void Queue::reject(const QueuedMessage &msg) {
+ ScopedClusterReject scr(broker, msg);
+ Exchange::shared_ptr alternate = getAlternateExchange();
+ if (alternate) {
+ DeliverableMessage delivery(msg.payload);
+ alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
+ QPID_LOG(info, "Routed rejected message from " << getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << getName());
+ }
+ dequeue(0, msg);
+}
+
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
-
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
@@ -846,6 +913,9 @@ void Queue::popAndDequeue()
*/
void Queue::dequeued(const QueuedMessage& msg)
{
+ // Note: Cluster::dequeued does only local book-keeping, no multicast
+ // So OK to call here with lock held.
+ if (broker) broker->getCluster().dequeue(msg);
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
@@ -861,6 +931,7 @@ void Queue::create(const FieldTable& _settings)
store->create(*this, _settings);
}
configure(_settings);
+ if (broker) broker->getCluster().create(*this);
}
void Queue::configure(const FieldTable& _settings, bool recovering)
@@ -934,6 +1005,7 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
+ if (broker) broker->getCluster().destroy(*this);
}
void Queue::notifyDeleted()
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 96c79d1b92..572f3dc0e2 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -259,6 +259,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
void enqueueAborted(boost::intrusive_ptr<Message> msg);
+
+ /** Message acknowledged, dequeue it. */
+ QPID_BROKER_EXTERN void accept(TransactionContext* ctxt, const QueuedMessage &msg);
+
+ /** Message rejected, dequeue it and re-route to alternate exchange if necessary. */
+ QPID_BROKER_EXTERN void reject(const QueuedMessage &msg);
+
/**
* dequeue from store (only done once messages is acknowledged)
*/
diff --git a/cpp/src/qpid/broker/QueuedMessage.h b/cpp/src/qpid/broker/QueuedMessage.h
index 35e48b11f3..8cf73bda52 100644
--- a/cpp/src/qpid/broker/QueuedMessage.h
+++ b/cpp/src/qpid/broker/QueuedMessage.h
@@ -34,10 +34,9 @@ struct QueuedMessage
framing::SequenceNumber position;
Queue* queue;
- QueuedMessage() : queue(0) {}
+ QueuedMessage(Queue* q=0) : position(0), queue(q) {}
QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
};
inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index c91cfba2f8..f393879c16 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -333,7 +333,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(record);
}
if (acquire && !ackExpected) {
- queue->dequeue(0, msg);
+ queue->accept(0, msg);
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -347,11 +347,6 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
assertClusterSafe();
- // FIXME aconway 2009-06-08: if we have byte & message credit but
- // checkCredit fails because the message is to big, we should
- // remain on queue's listener list for possible smaller messages
- // in future.
- //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
diff --git a/cpp/src/qpid/cluster/new-cluster-design.txt b/cpp/src/qpid/cluster/new-cluster-design.txt
index 392de890c3..8ee740372d 100644
--- a/cpp/src/qpid/cluster/new-cluster-design.txt
+++ b/cpp/src/qpid/cluster/new-cluster-design.txt
@@ -75,6 +75,8 @@ Use a moving queue ownership protocol to agree order of dequeues, rather
than relying on identical state and lock-step behavior to cause
identical dequeues on each broker.
+Clearly defined interface between broker code and cluster plug-in.
+
*** Requirements
The cluster must provide these delivery guarantees:
@@ -365,3 +367,4 @@ there a better term?
Clustering and scalability: new design may give us the flexibility to
address scalability as part of cluster design. Think about
relationship to federation and "fragmented queues" idea.
+
diff --git a/cpp/src/qpid/cluster/new-cluster-plan.txt b/cpp/src/qpid/cluster/new-cluster-plan.txt
new file mode 100644
index 0000000000..57c1241607
--- /dev/null
+++ b/cpp/src/qpid/cluster/new-cluster-plan.txt
@@ -0,0 +1,439 @@
+-*-org-*-
+Notes on new cluster implementation. See also: new-cluster-design.txt
+
+* Implementation plan.
+
+Co-existence with old cluster code and tests:
+- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
+- Double up tests with old version/new version as the new code develops.
+
+Minimal POC for message delivery & perf test.
+- no wiring replication, no updates, no failover, no persistence, no async completion.
+- just implement publish and acquire/dequeue locking protocol.
+- measure performance.
+
+Full implementation of transient cluster
+- Update (based on existing update), async completion etc.
+- Passing all existing transient cluster tests.
+
+Persistent cluster
+- Make sure async completion works correctly.
+- InitialStatus protoocl etc. to support persistent start-up (existing code)
+- cluster restart from store: stores not identical. Load one, update the rest.
+ - assign cluster ID's to messages recovered from store, don't replicate.
+
+Improved update protocol
+- per-queue, less stalling, bounded catch-up.
+
+* Task list
+
+** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
+
+NOTE: as implementation questions arise, take the easiest option and make
+a note for later optimization/improvement.
+
+*** Tests
+- python test: 4 senders, numbered messages, 4 receivers, verify message set.
+- acquire then release messages: verify can be dequeued on any member
+- acquire then kill broker: verify can be dequeued other members.
+- acquire then reject: verify goes on alt-exchange once only.
+
+*** TODO broker::Cluster interface and call points.
+
+Initial draft is commited.
+
+Issues to review:
+
+queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue
+when messages are pushed. How to reconcile with queue ownership?
+
+rejecting messages: if there's an alternate exchange where do we do the
+re-routing? On origin broker or on all brokers?
+
+Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc.
+Intercepting client actions on the queue vs. internal actions
+(e.g. ring policy)
+
+*** Main classes
+
+BrokerHandler:
+- implements broker::Cluster intercept points.
+- sends mcast events to inform cluster of local actions.
+- thread safe, called in connection threads.
+
+LocalMessageMap:
+- Holds local messages while they are being enqueued.
+- thread safe: called by both BrokerHandler and DeliverHandler
+
+MessageHandler:
+- handles delivered mcast messages related to messages.
+- initiates local actions in response to mcast events.
+- thread unsafe, only called in deliver thread.
+- maintains view of cluster state regarding messages.
+
+QueueOwnerHandler:
+- handles delivered mcast messages related to queue consumer ownership.
+- thread safe, called in deliver, connection and timer threads.
+- maintains view of cluster state regarding queue ownership.
+
+cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
+- thread safe: manage state used by both DeliverHandler and BrokerHandler
+
+The following code sketch illustrates only the "happy path" error handling
+is omitted.
+
+*** BrokerHandler
+Types:
+- struct QueuedMessage { Message msg; QueueName q; Position pos; }
+- SequenceNumber 64 bit sequence number to identify messages.
+- NodeId 64 bit CPG node-id, identifies member of the cluster.
+- struct MessageId { NodeId node; SequenceNumber seq; }
+
+Members:
+- atomic<SequenceNumber> sequence // sequence number for message IDs.
+- thread_local bool noReplicate // suppress replication.
+- thread_local bool isRouting // suppress operations while routing
+- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.
+
+NOTE: localMessage is also modified by DeliverHandler.
+
+broker::Cluster intercept functions:
+
+routing(msg)
+ if noReplicate: return
+ # Supress everything except enqueues while we are routing.
+ # We don't want to replicate acquires & dequeues caused by an enqueu,
+ # e.g. removal of messages from ring/LV queues.
+ isRouting = true
+
+enqueue(qmsg):
+ if noReplicate: return
+ if !qmsg.msg.id:
+ seq = sequence++
+ qmsg.msg.id = (self,seq)
+ localMessage[seq] = qmsg
+ mcast create(encode(qmsg.msg),seq)
+ mcast enqueue(qmsg.q,qmsg.msg.id.seq)
+
+routed(msg):
+ if noReplicate: return
+ if msg.id: mcast routed(msg.id.seq)
+ isRouting = false
+
+acquire(qmsg):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast acquire(msg.id, q)
+
+release(QueuedMessage)
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast release(id, q)
+
+accept(QueuedMessage):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast dequeue(msg.id, msg.q)
+
+reject(QueuedMessage):
+ isRejecting = true
+ if msg.id: mcast reject(msg.id, msg.q)
+
+rejected(QueuedMessage):
+ isRejecting = false
+ mcast dequeue(msg.id, msg.q)
+
+dequeue(QueuedMessage)
+ # No mcast in dequeue, only used for local cleanup of resources.
+ # E.g. messages that are replaced on an LVQ are dequeued without being
+ # accepted or rejected. dequeue is called with the queue lock held
+ # FIXME revisit - move it out of the queue lock.
+ cleanup(msg)
+
+*** DeliverHandler and mcast messages
+Types:
+- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
+- struct QueueKey { MessageId id; QueueName q; }
+- typedef map<QueueKey, QueueEntry> Queue
+- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
+
+Members:
+- QueueEntry enqueued[QueueKey]
+- Node node[NodeId]
+
+Mcast messages in Message class:
+
+create(msg,seq)
+ if sender != self: node[sender].routing[seq] = decode(msg)
+
+enqueue(q,seq):
+ id = (sender,seq)
+ if sender == self:
+ enqueued[id,q] = (localMessage[seq], acquired=None)
+ else:
+ msg = sender.routing[seq]
+ enqueued[id,q] = (qmsg, acquired=None)
+ with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
+
+routed(seq):
+ if sender == self: localMessage.erase(msg.id.seq)
+ else: sender.routing.erase(seq)
+
+acquire(id,q):
+ enqueued[id,q].acquired = sender
+ node[sender].acquired.push_back((id,q))
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
+
+release(id,q)
+ enqueued[id,q].acquired = None
+ node[sender].acquired.erase((id,q))
+ if sender != self
+ with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
+
+reject(id,q):
+ sender.routing[id] = enqueued[id,q] # prepare for re-queueing
+
+rejected(id,q)
+ sender.routing.erase[id]
+
+dequeue(id,q)
+ entry = enqueued[id,q]
+ enqueued.erase[id,q]
+ node[entry.acquired].acquired.erase(id,q)
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
+
+member m leaves cluster:
+ for key in node[m].acquired:
+ release(key.id, key.q)
+ node.erase(m)
+
+*** Queue consumer locking
+
+When a queue is locked it does not deliver messages to its consumers.
+
+New broker::Queue functions:
+- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
+- startConsumers(): reset consumersStopped flag
+
+Implementation sketch, locking omitted:
+
+void Queue::stopConsumers() {
+ consumersStopped = true;
+ while (consumersBusy) consumersBusyMonitor.wait();
+}
+
+void Queue::startConsumers() {
+ consumersStopped = false;
+ listeners.notify();
+}
+
+bool Queue::dispatch(consumer) {
+ if (consumersStopped) return false;
+ ++consumersBusy;
+ do_regular_dispatch_body()
+ if (--consumersBusy == 0) consumersBusyMonitor.notify();
+}
+
+*** QueueOwnerHandler
+
+Invariants:
+- Each queue is owned by at most one node at any time.
+- Each node is interested in a set of queues at any given time.
+- A queue is un-owned if no node is interested.
+
+The queue owner releases the queue when
+- it loses interest i.e. queue has no consumers with credit.
+- a configured time delay expires and there are other interested nodes.
+
+The owner mcasts release(q). On delivery the new queue owner is the
+next node in node-id order (treating nodes as a circular list)
+starting from the old owner that is interested in the queue.
+
+Queue consumers initially are stopped, only started when we get
+ownership from the cluster.
+
+Thread safety: called by deliver, connection and timer threads, needs locking.
+
+Thread safe object per queue holding queue ownership status.
+Called by deliver, connection and timer threads.
+
+class QueueOwnership {
+ bool owned;
+ Timer timer;
+ BrokerQueue q;
+
+ drop(): # locked
+ if owned:
+ owned = false
+ q.stopConsumers()
+ mcast release(q.name, false)
+ timer.stop()
+
+ take(): # locked
+ if not owned:
+ owned = true
+ q.startConsumers()
+ timer.start(timeout)
+
+ timer.fire(): drop()
+}
+
+Data Members, only modified/examined in deliver thread:
+- typedef set<NodeId> ConsumerSet
+- map<QueueName, ConsumerSet> consumers
+- map<QueueName, NodeId> owner
+
+Thread safe data members, accessed in connection threads (via BrokerHandler):
+- map<QueueName, QueueOwnership> ownership
+
+Multicast messages in QueueOwner class:
+
+consume(q):
+ if sender==self and consumers[q].empty(): ownership[q].take()
+ consumers[q].insert(sender)
+
+release(q):
+ asssert(owner[q] == sender and owner[q] in consumers[q])
+ owner[q] = circular search from sender in consumers[q]
+ if owner==self: ownership[q].take()
+
+cancel(q):
+ assert(queue[q].owner != sender) # sender must release() before cancel()
+ consumers[q].erase(sender)
+
+member-leaves:
+ for q in queue: if owner[q] = left: left.release(q)
+
+Need 2 more intercept points in broker::Cluster:
+
+consume(q,consumer,consumerCount) - Queue::consume()
+ if consumerCount == 1: mcast consume(q)
+
+cancel(q,consumer,consumerCount) - Queue::cancel()
+ if consumerCount == 0:
+ ownership[q].drop()
+ mcast cancel(q)
+
+#TODO: lifecycle, updating cluster data structures when queues are destroyed
+
+*** Re-use of existing cluster code
+- re-use Event
+- re-use Multicaster
+- re-use same PollableQueueSetup (may experiment later)
+- new Core class to replace Cluster.
+- keep design modular, keep threading rules clear.
+
+** TODO [#B] Large message replication.
+Need to be able to multicast large messages in fragments
+
+** TODO [#B] Batch CPG multicast messages
+The new cluster design involves a lot of small multicast messages,
+they need to be batched into larger CPG messages for efficiency.
+** TODO [#B] Genuine async completion
+Replace current synchronous waiting implementation with genuine async completion.
+
+Test: enhance test_store.cpp to defer enqueueComplete till special message received.
+
+Async callback uses *requestIOProcessing* to queue action on IO thread.
+
+** TODO [#B] Async completion of accept when dequeue completes.
+Interface is already there on broker::Message, just need to ensure
+that store and cluster implementations call it appropriately.
+
+** TODO [#B] Replicate wiring.
+From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
+
+** TODO [#B] New members joining - first pass
+
+Re-use update code from old cluster but don't replicate sessions &
+connections.
+
+Need to extend it to send cluster IDs with messages.
+
+Need to replicate the queue ownership data as part of the update.
+
+** TODO [#B] Persistence support.
+InitialStatus protoocl etc. to support persistent start-up (existing code)
+
+Only one broker recovers from store, update to others.
+
+Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
+
+** TODO [#B] Handle other ways that messages can leave a queue.
+
+Other ways (other than via a consumer) that messages are take off a queue.
+
+NOTE: Not controlled by queue lock, how to make them consistent?
+
+Target broker may not have all messages on other brokers for purge/destroy.
+- Queue::move() - need to wait for lock? Replicate?
+- Queue::get() - ???
+- Queue::purge() - replicate purge? or just delete what's on broker ?
+- Queue::destroy() - messages to alternate exchange on all brokers.?
+
+Need to add callpoints & mcast messages to replicate these?
+
+** TODO [#B] Flow control for internal queues.
+
+Need to bound the size of the internal queues holding cluster events & frames.
+- stop polling when we reach bound.
+- start polling when we get back under it.
+** TODO [#B] Integration with transactions.
+Do we want to replicate during transaction & replicate commit/rollback
+or replicate only on commit?
+No integration with DTX transactions.
+** TODO [#B] Make new cluster work with replication exchange.
+Possibly re-use some common logic. Replication exchange is like clustering
+except over TCP.
+** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
+Cluster needs to complete these asynchronously to guarantee resources
+exist across the cluster when the command completes.
+
+** TODO [#C] Allow non-replicated exchanges, queues.
+
+Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
+- save replicated status to store.
+- support in management tools.
+Replicated exchange: replicate binds to replicated queues.
+Replicated queue: replicate all messages.
+
+** TODO [#C] New members joining - improved.
+
+Replicate wiring like old cluster, stall for wiring but not for
+messages. Update messages on a per-queue basis from back to front.
+
+Updater:
+- stall & push wiring: declare exchanges, queues, bindings.
+- start update iterator thread on each queue.
+- unstall and process normally while iterator threads run.
+
+Update iterator thread:
+- starts at back of updater queue, message m.
+- send update_front(q,m) to updatee and advance towards front
+- at front: send update_done(q)
+
+Updatee:
+- stall, receive wiring, lock all queues, mark queues "updating", unstall
+- update_front(q,m): push m to *front* of q
+- update_done(q): mark queue "ready"
+
+Updatee cannot take the queue consume lock for a queue that is updating.
+Updatee *can* push messages onto a queue that is updating.
+
+TODO: Is there any way to eliminate the stall for wiring?
+
+** TODO [#C] Refactoring of common concerns.
+
+There are a bunch of things that act as "Queue observers" with intercept
+points in similar places.
+- QueuePolicy
+- QueuedEvents (async replication)
+- MessageStore
+- Cluster
+
+Look for ways to capitalize on the similarity & simplify the code.
+
+In particular QueuedEvents (async replication) strongly resembles
+cluster replication, but over TCP rather than multicast.