diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-07 17:27:06 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-07 17:27:06 +0000 |
| commit | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (patch) | |
| tree | de5e5b5e431bf695b2c44e198ee93d179201a0e2 /cpp/src/qpid/cluster | |
| parent | a653ebe5bdfad1d44a576d2ab23f7e6ea80ba96f (diff) | |
| download | qpid-python-41d33af55b9fbf4c664ccb56accb1a37bd1ef006.tar.gz | |
broker: Fixed incorrect pass-by-reference of Queue::shared_ptr in several files.
cluster: added FailoverExchange - send cluster membership to clients.
client: added FailoverListener - receive cluster updates from failover exchange.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702552 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 36 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/FailoverExchange.cpp | 99 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/FailoverExchange.h | 68 |
7 files changed, 219 insertions, 26 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9c503d6d13..e64692bc91 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -19,6 +19,7 @@ #include "Cluster.h" #include "Connection.h" #include "DumpClient.h" +#include "FailoverExchange.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" @@ -109,6 +110,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : // FIXME aconway 2008-09-24: // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } + failoverExchange.reset(new FailoverExchange(this)); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); cpg.join(name); @@ -331,15 +333,15 @@ void Cluster::configChange ( Mutex::ScopedLock l(lock); QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - map.configChange(current, nCurrent, left, nLeft, joined, nJoined); - updateMemberStats(l); + bool changed = map.configChange(current, nCurrent, left, nLeft, joined, nJoined); if (state == LEFT) return; if (!map.isAlive(memberId)) { leave(l); return; } - + if(state == INIT) { // First configChange if (map.aliveCount() == 1) { QPID_LOG(info, *this << " first in cluster at " << myUrl); map = ClusterMap(memberId, myUrl, true); + memberUpdate(l); unstall(l); } else { // Joining established group. @@ -348,6 +350,8 @@ void Cluster::configChange ( QPID_LOG(debug, *this << " send dump-request " << myUrl); } } + else if (state >= READY && changed) + memberUpdate(l); } void Cluster::dumpInDone(const ClusterMap& m) { @@ -403,8 +407,9 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { tryMakeOffer(id, l); } -void Cluster::ready(const MemberId& id, const std::string& url, Lock&) { - map.ready(id, Url(url)); +void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { + if (map.ready(id, Url(url))) + memberUpdate(l); } void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { @@ -454,8 +459,8 @@ void Cluster::checkDumpIn(Lock& l) { if (state == DUMPEE && dumpedMap) { map = *dumpedMap; QPID_LOG(debug, *this << " incoming dump complete. Members: " << map); - unstall(l); mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l); + unstall(l); } } @@ -488,28 +493,31 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string Lock l(lock); QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break; - case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break; + case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; + case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; default: return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; } -void Cluster::stopClusterNode() { +void Cluster::stopClusterNode(Lock&) { QPID_LOG(notice, *this << " stopped by admin"); leave(); } -void Cluster::stopFullCluster() { - Lock l(lock); +void Cluster::stopFullCluster(Lock& l) { QPID_LOG(notice, *this << " shutting down cluster " << name.str()); mcastControl(ClusterShutdownBody(), 0, l); } -void Cluster::updateMemberStats(Lock& l) { +void Cluster::memberUpdate(Lock& l) { + std::vector<Url> vectUrl = getUrls(l); + size_t size = vectUrl.size(); + + failoverExchange->setUrls(vectUrl); + if (mgmtObject) { - std::vector<Url> vectUrl = getUrls(l); - size_t size = vectUrl.size(); + if (lastSize != size && size == 1){ QPID_LOG(info, *this << " last node standing, updating queue policies."); broker.getQueues().updateQueueClusterState(true); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index d1cf4b752f..723a23d1bd 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -23,6 +23,7 @@ #include "Event.h" #include "NoOpConnectionOutputHandler.h" #include "ClusterMap.h" +#include "FailoverExchange.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" @@ -74,6 +75,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // URLs of current cluster members. std::vector<Url> getUrls() const; + boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } // Leave the cluster void leave(); @@ -93,6 +95,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; + // NB: The final Lock& parameter on functions below is used to mark functions + // that should only be called by a function that already holds the lock. + // The parameter makes it hard to forget since you have to have an instance of + // a Lock to call the unlocked functions. + // Unlocked versions of public functions void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&); void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&); @@ -145,9 +152,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - void stopClusterNode(); - void stopFullCluster(); - void updateMemberStats(Lock&); + + void stopClusterNode(Lock&); + void stopFullCluster(Lock&); + void memberUpdate(Lock&); // Called in connection IO threads . void checkDumpIn(Lock&); @@ -181,6 +189,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::optional<ClusterMap> dumpedMap; size_t lastSize; + boost::shared_ptr<FailoverExchange> failoverExchange; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index f3b5451afb..e2fc25bfaa 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -72,18 +72,20 @@ ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1)); } -void ClusterMap::configChange( +bool ClusterMap::configChange( cpg_address *current, int nCurrent, cpg_address *left, int nLeft, cpg_address */*joined*/, int /*nJoined*/) { cpg_address* a; + bool memberChange=false; for (a = left; a != left+nLeft; ++a) { - members.erase(*a); + memberChange = members.erase(*a); newbies.erase(*a); } alive.clear(); std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); + return memberChange; } Url ClusterMap::getUrl(const Map& map, const MemberId& id) { @@ -133,8 +135,8 @@ bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) { return false; } -void ClusterMap::ready(const MemberId& id, const Url& url) { - if (isAlive(id)) members[id] = url; +bool ClusterMap::ready(const MemberId& id, const Url& url) { + return isAlive(id) && members.insert(Map::value_type(id,url)).second; } boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) { diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 79afba7dc0..c0012facaf 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -50,8 +50,10 @@ class ClusterMap { ClusterMap(const MemberId& id, const Url& url, bool isReady); ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states); - /** Update from config change. */ - void configChange( + /** Update from config change. + *@return true if member set changed. + */ + bool configChange( cpg_address *current, int nCurrent, cpg_address *left, int nLeft, cpg_address *joined, int nJoined); @@ -76,7 +78,9 @@ class ClusterMap { bool dumpRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to); - void ready(const MemberId& id, const Url&); + + /**@return true If this is a new member */ + bool ready(const MemberId& id, const Url&); private: Url getUrl(const Map& map, const MemberId& id); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 8457467196..14a666a1c6 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -34,6 +34,7 @@ namespace qpid { namespace cluster { using namespace std; +using broker::Broker; struct ClusterValues { string name; @@ -74,12 +75,14 @@ struct ClusterPlugin : public Plugin { Options* getOptions() { return &options; } void initialize(Plugin::Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. + if (values.name.empty()) return; // Only if --cluster-name option was specified. + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker) return; cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); + broker->getExchanges().registerExchange(cluster->getFailoverExchange()); } void earlyInitialize(Plugin::Target&) {} diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp new file mode 100644 index 0000000000..abc7f5df6f --- /dev/null +++ b/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "FailoverExchange.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/Array.h" +#include <boost/bind.hpp> +#include <algorithm> + +namespace qpid { +namespace cluster { +using namespace std; + +using namespace broker; +using namespace framing; + +const string FailoverExchange::TYPE_NAME("amq.failover"); + +FailoverExchange::FailoverExchange(management::Manageable* parent) : Exchange(TYPE_NAME, parent) { + if (mgmtExchange != 0) + mgmtExchange->set_type(TYPE_NAME); +} + + +void FailoverExchange::setUrls(const vector<Url>& u) { + Lock l(lock); + urls=u; + if (urls.empty()) return; + std::for_each(queues.begin(), queues.end(), + boost::bind(&FailoverExchange::sendUpdate, this, _1)); +} + +string FailoverExchange::getType() const { return TYPE_NAME; } + +bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { + Lock l(lock); + sendUpdate(queue); + return queues.insert(queue).second; +} + +bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { + Lock l(lock); + return queues.erase(queue); +} + +bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, const framing::FieldTable*) { + Lock l(lock); + return queues.find(queue) != queues.end(); +} + +void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) { + QPID_LOG(warning, "Message received by exchange " << TYPE_NAME << " ignoring"); +} + +void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { + // Called with lock held. + if (urls.empty()) return; + framing::Array array(0x95); // FIXME aconway 2008-10-06: Array is unusable like this. Need type constants or better mapping. + for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) + array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); + const ProtocolVersion v; + boost::intrusive_ptr<Message> msg(new Message); + AMQFrame command(MessageTransferBody(v, TYPE_NAME, 1, 0)); + command.setLastSegment(false); + msg->getFrames().append(command); + AMQHeaderBody header; + header.get<MessageProperties>(true)->setContentLength(0); + header.get<MessageProperties>(true)->getApplicationHeaders().setArray(TYPE_NAME, array); + AMQFrame headerFrame(header); + headerFrame.setFirstSegment(false); + msg->getFrames().append(headerFrame); + DeliverableMessage(msg).deliverTo(queue); +} + +}} // namespace cluster diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h new file mode 100644 index 0000000000..738cd2a602 --- /dev/null +++ b/cpp/src/qpid/cluster/FailoverExchange.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLUSTER_FAILOVEREXCHANGE_H +#define QPID_CLUSTER_FAILOVEREXCHANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Exchange.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/Url.h" + +#include <vector> +#include <set> + +namespace qpid { +namespace cluster { + +/** + * Failover exchange provides failover host list, as specified in AMQP 0-10. + */ +class FailoverExchange : public broker::Exchange +{ + public: + static const std::string TYPE_NAME; + + FailoverExchange(management::Manageable* parent); + + void setUrls(const std::vector<Url>&); + + // Exchange overrides + std::string getType() const; + bool bind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args); + bool unbind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args); + bool isBound(broker::Queue::shared_ptr queue, const std::string* const routingKey, const framing::FieldTable* const args); + void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + + private: + void sendUpdate(const broker::Queue::shared_ptr&); + + typedef sys::Mutex::ScopedLock Lock; + typedef std::vector<Url> Urls; + typedef std::set<broker::Queue::shared_ptr> Queues; + + sys::Mutex lock; + Urls urls; + Queues queues; + +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_FAILOVEREXCHANGE_H*/ |
