diff options
| author | Alan Conway <aconway@apache.org> | 2007-06-29 17:59:00 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-06-29 17:59:00 +0000 |
| commit | fda6dadde945a9c73c97b73dc79e93368b743348 (patch) | |
| tree | d7755539ae485efdfbc46298cd1ef6632515159e /cpp/src/qpid/cluster | |
| parent | 79cd6c772da003ddc917eff362f9adaa99e28b49 (diff) | |
| download | qpid-python-fda6dadde945a9c73c97b73dc79e93368b743348.tar.gz | |
* Summary:
- Improved plugin framework and HandlerUpdater interface.
- Cluster handlers for traffic to/from cluster.
- Cluster HandlerUpdater configures channel chains for cluster.
- Cluster PluginProvider registers cluster objects with broker.
* src/qpid/framing/AMQFrame.h: Made data members public. Handlers
need to be able to modify frame data, getters/setters are just a
nuisance here.
* src/tests/Cluster.cpp: Updated for cluster changes, using
handlers instead of friendship to hook test into Cluster code.
* src/qpid/framing/amqp_types.h: Added CHANNEL_MAX and
CHANNEL_HIGH_BIT constants.
* src/qpid/framing/HandlerUpdater.h: Renamed ChannelInitializer,
broke dependency on broker channel types.
* src/qpid/framing/Handler.h: Added constructors and nextHandler()
* src/qpid/framing/AMQFrame.h (class AMQFrame): Inlined getChannel()
* src/qpid/cluster/ClusterPluginProvider.cpp: Provider for cluster
plugins.
* src/qpid/cluster/Cluster.cpp: Use ChannelManager. Factor out
plugin details to ClusterPluginProvider.
* src/qpid/cluster/ChannelManager.h: Insert cluster handlers
into channel chains, route frames between cluster and channels.
* src/qpid/broker/BrokerAdapter.cpp (startOk): use CHANNEL_MAX
constant.
* src/qpid/broker/Broker.cpp:
- Refactored for new plugin framework.
- Added getUrl().
* src/qpid/Url.h: Added constructor from Address.
* src/qpid/Plugin.h: Generalized plugin framework, broke
dependency on Broker interfaces. We may want to use plug-ins for
clients also at some point.
* src/tests/run_test: Fix bug when VALGRIND is not set.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/ChannelManager.cpp | 85 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ChannelManager.h | 66 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 121 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 64 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPluginProvider.cpp | 66 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 3 |
6 files changed, 336 insertions, 69 deletions
diff --git a/cpp/src/qpid/cluster/ChannelManager.cpp b/cpp/src/qpid/cluster/ChannelManager.cpp new file mode 100644 index 0000000000..f573d78ca1 --- /dev/null +++ b/cpp/src/qpid/cluster/ChannelManager.cpp @@ -0,0 +1,85 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "ChannelManager.h" + +namespace qpid { +namespace cluster { + +using namespace framing; + +/** Handler to multicast to the cluster */ +struct ClusterHandler : public FrameHandler { + + ClusterHandler(FrameHandler::Chain next, ChannelId bitmask_) + : FrameHandler(next), bitmask(bitmask_) {} + + void handle(AMQFrame& frame) { + frame.channel |= bitmask; // Mark the frame + nextHandler(frame); + // TODO aconway 2007-06-28: Right now everything is backed up + // via multicast. When we have point-to-point backups this + // function must determine where each frame should be sent: to + // multicast or only to specific backup(s) via AMQP. + } + + ChannelId bitmask; +}; + +ChannelManager::ChannelManager(FrameHandler::Chain mcast) : mcastOut(mcast){} + +void ChannelManager::update(ChannelId id, FrameHandler::Chains& chains) { + // Store the original cluster chains for the channel. + channels[id] = chains; + + // Replace chains with multicast-to-cluster handlers that mark the + // high-bit of the channel ID on outgoing frames so we can tell + // them from incoming frames in handle() + // + // When handle() receives the frames from the cluster it + // will forward them to the original channel chains stored in + // channels map. + // + chains.in = make_shared_ptr(new ClusterHandler(mcastOut, 0)); + chains.out= make_shared_ptr(new ClusterHandler(mcastOut, CHANNEL_HIGH_BIT)); +} + +void ChannelManager::handle(AMQFrame& frame) { + bool isOut = frame.channel | CHANNEL_HIGH_BIT; + frame.channel |= ~CHANNEL_HIGH_BIT; // Clear the bit. + ChannelMap::iterator i = channels.find(frame.getChannel()); + if (i != channels.end()) { + Chain& chain = isOut ? i->second.out : i->second.in; + chain->handle(frame); + } + else + updateFailoverState(frame); +} + +void ChannelManager::updateFailoverState(AMQFrame& ) { + QPID_LOG(critical, "Failover is not implemented"); + // FIXME aconway 2007-06-28: + // If the channel is not in my map then I'm not primary so + // I don't pass the frame to the channel handler but I + // do need to update the session failover state. +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ChannelManager.h b/cpp/src/qpid/cluster/ChannelManager.h new file mode 100644 index 0000000000..59fce77957 --- /dev/null +++ b/cpp/src/qpid/cluster/ChannelManager.h @@ -0,0 +1,66 @@ +#ifndef QPID_CLUSTER_CHANNELMANAGER_H +#define QPID_CLUSTER_CHANNELMANAGER_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/HandlerUpdater.h" +#include <map> + +namespace qpid { +namespace cluster { + +/** + * Manage channels and handler chains on channels for the cluster. + * + * As HandlerUpdater plugin, updates channel handler chains with + * cluster handlers. + * + * As a FrameHandler handles frames coming from the cluster and + * dispatches them to the appropriate channel handler. + * + */ +class ChannelManager : public framing::HandlerUpdater, + public framing::FrameHandler +{ + public: + /** Takes a handler to send frames to the cluster. */ + ChannelManager(framing::FrameHandler::Chain mcastOut); + + /** As FrameHandler handle frames from the cluster */ + void handle(framing::AMQFrame& frame); + + /** As ChannelUpdater update the handler chains. */ + void update(framing::ChannelId id, framing::FrameHandler::Chains& chains); + + private: + void updateFailoverState(framing::AMQFrame&); + + typedef std::map<framing::ChannelId, + framing::FrameHandler::Chains> ChannelMap; + + framing::FrameHandler::Chain mcastOut; + ChannelMap channels; +}; + + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 30073c4551..8d898eefa3 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,13 +17,13 @@ */ #include "Cluster.h" -#include "Cpg.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> #include <iterator> +#include <map> namespace qpid { namespace cluster { @@ -35,40 +35,62 @@ ostream& operator <<(ostream& out, const Cluster& cluster) { return out << cluster.name.str() << "(" << cluster.self << ")"; } +namespace { +Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1]; +struct StatusMapInit { + StatusMapInit() { + statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN; + statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE; + statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN; + statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP; + statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN; + } +} instance; +} + +Cluster::Member::Member(const cpg_address& addr) + : status(statusMap[addr.reason]) {} + void Cluster::notify() { + ProtocolVersion version; // TODO aconway 2007-06-25: Use proxy here. AMQFrame frame(version, 0, make_shared_ptr(new ClusterNotifyBody(version, url))); handle(frame); } -Cluster::Cluster( - const std::string& name_, const std::string& url_, FrameHandler& next_, - ProtocolVersion ver) - : name(name_), url(url_), version(ver), - cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), - boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), - next(next_) -{ - self=Id(cpg->getLocalNoideId(), getpid()); +Cluster::Cluster(const std::string& name_, const std::string& url_) : + name(name_), + url(url_), + cpg(new Cpg( + boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), + boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), + self(cpg->getLocalNoideId(), getpid()) +{} + +void Cluster::join(FrameHandler::Chain next) { QPID_LOG(trace, *this << " Joining cluster."); + next = next; + dispatcher=Thread(*this); cpg->join(name); notify(); - dispatcher=Thread(*this); } Cluster::~Cluster() { - try { - QPID_LOG(trace, *this << " Leaving cluster."); - cpg->leave(name); - cpg.reset(); - dispatcher.join(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception leaving cluster " << e.what()); + if (cpg) { + try { + QPID_LOG(trace, *this << " Leaving cluster."); + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << e.what()); + } } } void Cluster::handle(AMQFrame& frame) { + assert(cpg); QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -104,52 +126,59 @@ void Cluster::cpgDeliver( frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame); // TODO aconway 2007-06-20: use visitor pattern. - ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); + ClusterNotifyBody* notifyIn= + dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); if (notifyIn) { - Mutex::ScopedLock l(lock); - members[from].reset(new Member(notifyIn->getUrl())); - lock.notifyAll(); + { + Mutex::ScopedLock l(lock); + assert(members[from]); + members[from]->url = notifyIn->getUrl(); + members[from]->status = Member::BROKER; + } + if (callback) + callback(); } else - next.handle(frame); + next->handle(frame); } void Cluster::cpgConfigChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, - struct cpg_address *ccMembers, int nMembers, + struct cpg_address *current, int nCurrent, struct cpg_address *left, int nLeft, struct cpg_address *joined, int nJoined ) { - QPID_LOG( - trace, - *this << " Configuration change. " << endl - << " Joined: " << make_pair(joined, nJoined) << endl - << " Left: " << make_pair(left, nLeft) << endl - << " Current: " << make_pair(ccMembers, nMembers)); - + QPID_LOG(trace, + *this << " Configuration change. " << endl + << " Joined: " << make_pair(joined, nJoined) << endl + << " Left: " << make_pair(left, nLeft) << endl + << " Current: " << make_pair(current, nCurrent)); + + bool needNotify=false; + MemberList updated; { Mutex::ScopedLock l(lock); - // Erase members that left. - for (int i = 0; i < nLeft; ++i) - members.erase(Id(left[i])); - lock.notifyAll(); - } - - // If there are new members (other than myself) then notify. - for (int i=0; i< nJoined; ++i) { - if (Id(joined[i]) != self) { - notify(); - break; + for (int i = 0; i < nJoined; ++i) { + Id id(current[i]); + members[id].reset(new Member(current[i])); + if (id != self) + needNotify = true; // Notify new members other than myself. } - } - - // Note: New members are be added to my map when cpgDeliver - // gets a cluster.notify frame. + for (int i = 0; i < nLeft; ++i) + members.erase(Id(current[i])); + } // End of locked scope. + if (needNotify) + notify(); + if (callback) + callback(); } +void Cluster::setCallback(boost::function<void()> f) { callback=f; } + void Cluster::run() { + assert(cpg); cpg->dispatchBlocking(); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 1cbbb249f2..aff213b6c9 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -25,47 +25,74 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" #include "qpid/shared_ptr.h" -#include "qpid/framing/ProtocolVersion.h" +#include <boost/function.hpp> #include <boost/scoped_ptr.hpp> #include <map> #include <vector> namespace qpid { + +namespace broker { +class HandlerUpdater; +} + namespace cluster { +class ChannelManager; + /** - * Represents a cluster. Creating an instance joins current process - * to the cluster. + * Represents a cluster, provides access to data about members. + * + * Implements a FrameHandler that multicasts frames to the cluster. + * + * Requires a handler for frames arriving from the cluster, + * normally a ChannelManager but other handlers could be interposed + * for testing, logging etc. */ class Cluster : public framing::FrameHandler, private sys::Runnable { public: /** Details of a cluster member */ struct Member { - Member(const std::string& url_) : url(url_) {} + typedef shared_ptr<const Member> Ptr; + /** Status of a cluster member. */ + enum Status { + JOIN, ///< Process joined the group. + LEAVE, ///< Process left the group cleanly. + NODEDOWN, ///< Process's node went down. + NODEUP, ///< Process's node joined the cluster. + PROCDOWN, ///< Process died without leaving. + BROKER ///< Broker details are available. + }; + + Member(const cpg_address&); std::string url; + Status status; }; - - typedef std::vector<shared_ptr<const Member> > MemberList; + + typedef std::vector<Member::Ptr> MemberList; /** - * Join a cluster. + * Create a cluster object but do not joing. * @param name of the cluster. * @param url of this broker, sent to the cluster. - * @param next handler receives the frame when it has been - * acknowledged by the cluster. */ - Cluster(const std::string& name, - const std::string& url, - framing::FrameHandler& next, - framing::ProtocolVersion); + Cluster(const std::string& name, const std::string& url); ~Cluster(); + + /** Join the cluster. + *@handler is the handler for frames arriving from the cluster. + */ + void join(framing::FrameHandler::Chain handler); /** Multicast a frame to the cluster. */ void handle(framing::AMQFrame&); /** Get the current cluster membership. */ MemberList getMembers() const; + + /** Called when membership changes. */ + void setCallback(boost::function<void()>); /** Number of members in the cluster. */ size_t size() const; @@ -76,7 +103,6 @@ class Cluster : public framing::FrameHandler, private sys::Runnable { void run(); void notify(); - void cpgDeliver( cpg_handle_t /*handle*/, struct cpg_name *group, @@ -93,18 +119,14 @@ class Cluster : public framing::FrameHandler, private sys::Runnable { struct cpg_address */*joined*/, int /*nJoined*/ ); - Id self; + mutable sys::Monitor lock; Cpg::Name name; std::string url; - framing::ProtocolVersion version; boost::scoped_ptr<Cpg> cpg; - framing::FrameHandler& next; + Id self; MemberMap members; sys::Thread dispatcher; - - protected: - // Allow access from ClusterTest subclass. - mutable sys::Monitor lock; + boost::function<void()> callback; friend std::ostream& operator <<(std::ostream&, const Cluster&); }; diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp new file mode 100644 index 0000000000..3a09a66b81 --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp @@ -0,0 +1,66 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/broker/Broker.h" +#include "qpid/framing/HandlerUpdater.h" +#include "qpid/cluster/Cluster.h" +#include "qpid/cluster/ChannelManager.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" + +namespace qpid { +namespace cluster { + +using namespace std; + +struct ClusterPluginProvider : public PluginProvider { + + struct ClusterOptions : public Options { + string clusterName; + ClusterOptions() { + addOptions() + ("cluster", optValue(clusterName, "NAME"), + "Join the cluster named NAME"); + } + }; + + ClusterOptions options; + shared_ptr<Cluster> cluster; + + Options* getOptions() { + return &options; + } + + void provide(PluginUser& user) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&user); + // Only provide to a Broker, and only if the --cluster config is set. + if (broker && !options.clusterName.empty()) { + assert(!cluster); // A process can only belong to one cluster. + cluster.reset(new Cluster(options.clusterName, broker->getUrl())); + + // Channel manager is both the next handler for the cluster + // and the HandlerUpdater plugin for the broker. + shared_ptr<ChannelManager> manager(new ChannelManager(cluster)); + cluster->join(manager); + broker->use(manager); + } + } +}; + +static ClusterPluginProvider instance; // Static initialization. + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index 6b157301a7..e164ed1215 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -23,11 +23,10 @@ #include "qpid/cluster/Dispatchable.h" #include <boost/function.hpp> #include <cassert> -#ifdef CLUSTER extern "C" { #include <openais/cpg.h> } -#endif + namespace qpid { namespace cluster { |
