summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-07 17:27:06 +0000
committerAlan Conway <aconway@apache.org>2008-10-07 17:27:06 +0000
commit41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (patch)
treede5e5b5e431bf695b2c44e198ee93d179201a0e2 /cpp/src/qpid/cluster
parenta653ebe5bdfad1d44a576d2ab23f7e6ea80ba96f (diff)
downloadqpid-python-41d33af55b9fbf4c664ccb56accb1a37bd1ef006.tar.gz
broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files.
cluster: added FailoverExchange - send cluster membership to clients. client: added FailoverListener - receive cluster updates from failover exchange. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702552 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp36
-rw-r--r--cpp/src/qpid/cluster/Cluster.h15
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp10
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h10
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp7
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.cpp99
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.h68
7 files changed, 219 insertions, 26 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9c503d6d13..e64692bc91 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -19,6 +19,7 @@
#include "Cluster.h"
#include "Connection.h"
#include "DumpClient.h"
+#include "FailoverExchange.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
@@ -109,6 +110,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
// FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
+ failoverExchange.reset(new FailoverExchange(this));
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
@@ -331,15 +333,15 @@ void Cluster::configChange (
Mutex::ScopedLock l(lock);
QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
- map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
- updateMemberStats(l);
+ bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
if (state == LEFT) return;
if (!map.isAlive(memberId)) { leave(l); return; }
-
+
if(state == INIT) { // First configChange
if (map.aliveCount() == 1) {
QPID_LOG(info, *this << " first in cluster at " << myUrl);
map = ClusterMap(memberId, myUrl, true);
+ memberUpdate(l);
unstall(l);
}
else { // Joining established group.
@@ -348,6 +350,8 @@ void Cluster::configChange (
QPID_LOG(debug, *this << " send dump-request " << myUrl);
}
}
+ else if (state >= READY && changed)
+ memberUpdate(l);
}
void Cluster::dumpInDone(const ClusterMap& m) {
@@ -403,8 +407,9 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
tryMakeOffer(id, l);
}
-void Cluster::ready(const MemberId& id, const std::string& url, Lock&) {
- map.ready(id, Url(url));
+void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
+ if (map.ready(id, Url(url)))
+ memberUpdate(l);
}
void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
@@ -454,8 +459,8 @@ void Cluster::checkDumpIn(Lock& l) {
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
- unstall(l);
mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
+ unstall(l);
}
}
@@ -488,28 +493,31 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string
Lock l(lock);
QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
- case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
+ case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
+ case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
default: return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode() {
+void Cluster::stopClusterNode(Lock&) {
QPID_LOG(notice, *this << " stopped by admin");
leave();
}
-void Cluster::stopFullCluster() {
- Lock l(lock);
+void Cluster::stopFullCluster(Lock& l) {
QPID_LOG(notice, *this << " shutting down cluster " << name.str());
mcastControl(ClusterShutdownBody(), 0, l);
}
-void Cluster::updateMemberStats(Lock& l) {
+void Cluster::memberUpdate(Lock& l) {
+ std::vector<Url> vectUrl = getUrls(l);
+ size_t size = vectUrl.size();
+
+ failoverExchange->setUrls(vectUrl);
+
if (mgmtObject) {
- std::vector<Url> vectUrl = getUrls(l);
- size_t size = vectUrl.size();
+
if (lastSize != size && size == 1){
QPID_LOG(info, *this << " last node standing, updating queue policies.");
broker.getQueues().updateQueueClusterState(true);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index d1cf4b752f..723a23d1bd 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -23,6 +23,7 @@
#include "Event.h"
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
+#include "FailoverExchange.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
@@ -74,6 +75,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// URLs of current cluster members.
std::vector<Url> getUrls() const;
+ boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
// Leave the cluster
void leave();
@@ -93,6 +95,11 @@ class Cluster : private Cpg::Handler, public management::Manageable {
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
+ // NB: The final Lock& parameter on functions below is used to mark functions
+ // that should only be called by a function that already holds the lock.
+ // The parameter makes it hard to forget since you have to have an instance of
+ // a Lock to call the unlocked functions.
+
// Unlocked versions of public functions
void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&);
void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&);
@@ -145,9 +152,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- void stopClusterNode();
- void stopFullCluster();
- void updateMemberStats(Lock&);
+
+ void stopClusterNode(Lock&);
+ void stopFullCluster(Lock&);
+ void memberUpdate(Lock&);
// Called in connection IO threads .
void checkDumpIn(Lock&);
@@ -181,6 +189,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::optional<ClusterMap> dumpedMap;
size_t lastSize;
+ boost::shared_ptr<FailoverExchange> failoverExchange;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index f3b5451afb..e2fc25bfaa 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -72,18 +72,20 @@ ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt)
std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1));
}
-void ClusterMap::configChange(
+bool ClusterMap::configChange(
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
cpg_address */*joined*/, int /*nJoined*/)
{
cpg_address* a;
+ bool memberChange=false;
for (a = left; a != left+nLeft; ++a) {
- members.erase(*a);
+ memberChange = members.erase(*a);
newbies.erase(*a);
}
alive.clear();
std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
+ return memberChange;
}
Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
@@ -133,8 +135,8 @@ bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) {
return false;
}
-void ClusterMap::ready(const MemberId& id, const Url& url) {
- if (isAlive(id)) members[id] = url;
+bool ClusterMap::ready(const MemberId& id, const Url& url) {
+ return isAlive(id) && members.insert(Map::value_type(id,url)).second;
}
boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) {
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 79afba7dc0..c0012facaf 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -50,8 +50,10 @@ class ClusterMap {
ClusterMap(const MemberId& id, const Url& url, bool isReady);
ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
- /** Update from config change. */
- void configChange(
+ /** Update from config change.
+ *@return true if member set changed.
+ */
+ bool configChange(
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined);
@@ -76,7 +78,9 @@ class ClusterMap {
bool dumpRequest(const MemberId& id, const std::string& url);
/** Return non-empty Url if accepted */
boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to);
- void ready(const MemberId& id, const Url&);
+
+ /**@return true If this is a new member */
+ bool ready(const MemberId& id, const Url&);
private:
Url getUrl(const Map& map, const MemberId& id);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 8457467196..14a666a1c6 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -34,6 +34,7 @@ namespace qpid {
namespace cluster {
using namespace std;
+using broker::Broker;
struct ClusterValues {
string name;
@@ -74,12 +75,14 @@ struct ClusterPlugin : public Plugin {
Options* getOptions() { return &options; }
void initialize(Plugin::Target& target) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified.
+ if (values.name.empty()) return; // Only if --cluster-name option was specified.
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
+ broker->getExchanges().registerExchange(cluster->getFailoverExchange());
}
void earlyInitialize(Plugin::Target&) {}
diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp
new file mode 100644
index 0000000000..abc7f5df6f
--- /dev/null
+++ b/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverExchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/Array.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+using namespace std;
+
+using namespace broker;
+using namespace framing;
+
+const string FailoverExchange::TYPE_NAME("amq.failover");
+
+FailoverExchange::FailoverExchange(management::Manageable* parent) : Exchange(TYPE_NAME, parent) {
+ if (mgmtExchange != 0)
+ mgmtExchange->set_type(TYPE_NAME);
+}
+
+
+void FailoverExchange::setUrls(const vector<Url>& u) {
+ Lock l(lock);
+ urls=u;
+ if (urls.empty()) return;
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(&FailoverExchange::sendUpdate, this, _1));
+}
+
+string FailoverExchange::getType() const { return TYPE_NAME; }
+
+bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
+ Lock l(lock);
+ sendUpdate(queue);
+ return queues.insert(queue).second;
+}
+
+bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
+ Lock l(lock);
+ return queues.erase(queue);
+}
+
+bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, const framing::FieldTable*) {
+ Lock l(lock);
+ return queues.find(queue) != queues.end();
+}
+
+void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) {
+ QPID_LOG(warning, "Message received by exchange " << TYPE_NAME << " ignoring");
+}
+
+void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
+ // Called with lock held.
+ if (urls.empty()) return;
+ framing::Array array(0x95); // FIXME aconway 2008-10-06: Array is unusable like this. Need type constants or better mapping.
+ for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
+ const ProtocolVersion v;
+ boost::intrusive_ptr<Message> msg(new Message);
+ AMQFrame command(MessageTransferBody(v, TYPE_NAME, 1, 0));
+ command.setLastSegment(false);
+ msg->getFrames().append(command);
+ AMQHeaderBody header;
+ header.get<MessageProperties>(true)->setContentLength(0);
+ header.get<MessageProperties>(true)->getApplicationHeaders().setArray(TYPE_NAME, array);
+ AMQFrame headerFrame(header);
+ headerFrame.setFirstSegment(false);
+ msg->getFrames().append(headerFrame);
+ DeliverableMessage(msg).deliverTo(queue);
+}
+
+}} // namespace cluster
diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h
new file mode 100644
index 0000000000..738cd2a602
--- /dev/null
+++ b/cpp/src/qpid/cluster/FailoverExchange.h
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_FAILOVEREXCHANGE_H
+#define QPID_CLUSTER_FAILOVEREXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/Url.h"
+
+#include <vector>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Failover exchange provides failover host list, as specified in AMQP 0-10.
+ */
+class FailoverExchange : public broker::Exchange
+{
+ public:
+ static const std::string TYPE_NAME;
+
+ FailoverExchange(management::Manageable* parent);
+
+ void setUrls(const std::vector<Url>&);
+
+ // Exchange overrides
+ std::string getType() const;
+ bool bind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool unbind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool isBound(broker::Queue::shared_ptr queue, const std::string* const routingKey, const framing::FieldTable* const args);
+ void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+
+ private:
+ void sendUpdate(const broker::Queue::shared_ptr&);
+
+ typedef sys::Mutex::ScopedLock Lock;
+ typedef std::vector<Url> Urls;
+ typedef std::set<broker::Queue::shared_ptr> Queues;
+
+ sys::Mutex lock;
+ Urls urls;
+ Queues queues;
+
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_FAILOVEREXCHANGE_H*/