diff options
| author | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-07-19 21:52:24 +0000 |
| commit | cb566519d58ded6704507fa5530bf901e620edf6 (patch) | |
| tree | ab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src/qpid/cluster | |
| parent | 3f900af77d5f781431dc25e307974e0fc27aa561 (diff) | |
| download | qpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz | |
* Summary:
- Connect cluster handlers into broker handler chains.
- Progress on wiring replication.
* src/tests/cluster.mk: Temporarily disabled Cluster test.
* src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs.
* src/qpidd.cpp:
- Load optional libs (cluster)
- Include plugin config in options.parse.
* src/qpid/cluster/SessionManager.h:
- Create sessions, update handler chains (as HandlerUpdater)
- Handle frames from cluster.
* src/qpid/cluster/ClusterPlugin.h, .cpp:
- renamed from ClusterPluginProvider
- Create and connect Cluster and SessionManager.
- Register SessionManager as HandlerUpdater.
* src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler.
* src/qpid/broker/Connection.cpp: Apply HandlerUpdaters.
* src/qpid/broker/Broker.h, .cpp:
- Initialize plugins
- Apply HandlerUpdaters
* src/qpid/Plugin.h, .cpp: Simplified plugin framework.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 62 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 43 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp (renamed from cpp/src/qpid/cluster/ClusterPluginProvider.cpp) | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 103 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/SessionManager.h | 68 |
5 files changed, 218 insertions, 77 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f2d1b75f3f..256378ccd5 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -32,6 +32,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; + ostream& operator <<(ostream& out, const Cluster& cluster) { return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; } @@ -46,38 +47,20 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -struct Cluster::IncomingHandler : public FrameHandler { - IncomingHandler(Cluster& c) : cluster(c) {} - void handle(AMQFrame& frame) { - SessionFrame sf(Uuid(true), frame, SessionFrame::IN); - cluster.mcast(sf); - } - Cluster& cluster; -}; - -struct Cluster::OutgoingHandler : public FrameHandler { - OutgoingHandler(Cluster& c) : cluster(c) {} - void handle(AMQFrame& frame) { - SessionFrame sf(Uuid(true), frame, SessionFrame::OUT); - cluster.mcast(sf); - } - Cluster& cluster; -}; -// TODO aconway 2007-06-28: Right now everything is backed up via -// multicast. When we have point-to-point backups the -// Incoming/Outgoing handlers must determine where each frame should -// be sent: to multicast or only to specific backup(s) via AMQP. -Cluster::Cluster(const std::string& name_, const std::string& url_) : +Cluster::Cluster( + const std::string& name_, const std::string& url_, + const SessionFrameHandler::Chain& next +) : + SessionFrameHandler(next), cpg(new Cpg(*this)), name(name_), url(url_), - self(cpg->getLocalNoideId(), getpid()), - toChains(new IncomingHandler(*this), new OutgoingHandler(*this)) + self(cpg->getLocalNoideId(), getpid()) { - QPID_LOG(trace, *this << " Joining cluster."); + QPID_LOG(trace, *this << " Joining cluster: " << name_); cpg->join(name); notify(); dispatcher=Thread(*this); @@ -102,7 +85,7 @@ Cluster::~Cluster() { } } -void Cluster::mcast(SessionFrame& frame) { +void Cluster::handle(SessionFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -114,7 +97,7 @@ void Cluster::mcast(SessionFrame& frame) { void Cluster::notify() { SessionFrame sf; sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url))); - mcast(sf); + handle(sf); } size_t Cluster::size() const { @@ -122,11 +105,6 @@ size_t Cluster::size() const { return members.size(); } -void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) { - Mutex::ScopedLock l(lock); - receivedChain = chain; -} - Cluster::MemberList Cluster::getMembers() const { // TODO aconway 2007-07-04: use read/write lock? Mutex::ScopedLock l(lock); @@ -152,7 +130,7 @@ void Cluster::deliver( if (frame.uuid.isNull()) handleClusterFrame(from, frame.frame); else - receivedChain->handle(frame); + next->handle(frame); } bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, @@ -166,24 +144,22 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, } // Handle cluster control frame from the null session. -bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { +void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); - if (notifyIn) { + assert(notifyIn); MemberList list; { Mutex::ScopedLock l(lock); - if (!members[from]) - members[from].reset(new Member(url)); + shared_ptr<Member>& member=members[from]; + if (!member) + member.reset(new Member(notifyIn->getUrl())); else - members[from]->url = notifyIn->getUrl(); - QPID_LOG(trace, *this << ": member update: " << members); + member->url = notifyIn->getUrl(); lock.notifyAll(); + QPID_LOG(trace, *this << ": members joined: " << members); } - return true; - } - return false; } void Cluster::configChange( @@ -207,7 +183,7 @@ void Cluster::configChange( // We don't record members joining here, we record them when // we get their ClusterNotify message. } - if (newMembers) + if (newMembers) // Notify new members of my presence. notify(); } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 6ab4cb58df..f6afe14c62 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -37,11 +37,17 @@ namespace qpid { namespace cluster { /** - * Represents a cluster, provides access to data about members. - * Implements HandlerUpdater to manage handlers that route frames to - * and from the cluster. + * Connection to the cluster. Maintains cluster membership + * data. + * + * As SessionFrameHandler, handles frames by sending them to the + * cluster, sends frames received from the cluster to the next + * SessionFrameHandler. + * + * */ -class Cluster : private sys::Runnable, private Cpg::Handler +class Cluster : public SessionFrameHandler, + private sys::Runnable, private Cpg::Handler { public: /** Details of a cluster member */ @@ -57,8 +63,10 @@ class Cluster : private sys::Runnable, private Cpg::Handler * Join a cluster. * @param name of the cluster. * @param url of this broker, sent to the cluster. + * @param handler for frames received from the cluster. */ - Cluster(const std::string& name, const std::string& url); + Cluster(const std::string& name, const std::string& url, + const SessionFrameHandler::Chain& next); virtual ~Cluster(); @@ -70,14 +78,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler bool empty() const { return size() == 0; } - /** Get handler chains to send incoming/outgoing frames to the cluster */ - framing::FrameHandler::Chains getSendChains() { - return toChains; - } - - /** Set handler for frames received from the cluster */ - void setReceivedChain(const SessionFrameHandler::Chain& chain); - /** Wait for predicate(*this) to be true, up to timeout. *@return True if predicate became true, false if timed out. *Note the predicate may not be true after wait returns, @@ -86,13 +86,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler bool wait(boost::function<bool(const Cluster&)> predicate, sys::Duration timeout=sys::TIME_INFINITE) const; + /** Send frame to the cluster */ + void handle(SessionFrame&); + private: typedef Cpg::Id Id; typedef std::map<Id, shared_ptr<Member> > MemberMap; - typedef std::map< - framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; - void mcast(SessionFrame&); ///< send frame by multicast. void notify(); ///< Notify cluster of my details. void deliver( @@ -112,7 +112,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler ); void run(); - bool handleClusterFrame(Id from, framing::AMQFrame&); + void handleClusterFrame(Id from, framing::AMQFrame&); mutable sys::Monitor lock; boost::scoped_ptr<Cpg> cpg; @@ -120,17 +120,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler std::string url; Id self; MemberMap members; - ChannelMap channels; sys::Thread dispatcher; boost::function<void()> callback; - framing::FrameHandler::Chains toChains; - SessionFrameHandler::Chain receivedChain; - - struct IncomingHandler; - struct OutgoingHandler; - - friend struct IncomingHandler; - friend struct OutgoingHandler; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index d48fbadf7b..10b1c44f40 100644 --- a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -16,8 +16,8 @@ * */ #include "qpid/broker/Broker.h" -#include "qpid/framing/HandlerUpdater.h" #include "qpid/cluster/Cluster.h" +#include "qpid/cluster/SessionManager.h" #include "qpid/Plugin.h" #include "qpid/Options.h" @@ -26,11 +26,11 @@ namespace cluster { using namespace std; -struct ClusterPluginProvider : public PluginProvider { +struct ClusterPlugin : public Plugin { struct ClusterOptions : public Options { string clusterName; - ClusterOptions() { + ClusterOptions() : Options("Cluster Options") { addOptions() ("cluster", optValue(clusterName, "NAME"), "Join the cluster named NAME"); @@ -39,22 +39,25 @@ struct ClusterPluginProvider : public PluginProvider { ClusterOptions options; shared_ptr<Cluster> cluster; + shared_ptr<SessionManager> sessions; Options* getOptions() { return &options; } - void provide(PluginUser& user) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&user); + void initialize(Plugin::Target& target) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.clusterName.empty()) { assert(!cluster); // A process can only belong to one cluster. - cluster.reset(new Cluster(options.clusterName, broker->getUrl())); - // FIXME aconway 2007-06-29: register HandlerUpdater. + sessions.reset(new SessionManager()); + cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions)); + sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10: + broker->add(sessions); } } }; -static ClusterPluginProvider instance; // Static initialization. +static ClusterPlugin instance; // Static initialization. }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp new file mode 100644 index 0000000000..24f201535d --- /dev/null +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -0,0 +1,103 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQFrame.h" +#include "SessionManager.h" +#include "ClassifierHandler.h" + +namespace qpid { +namespace cluster { + +using namespace framing; +using namespace sys; + +/** Wrap plain AMQFrames in SessionFrames */ +struct FrameWrapperHandler : public FrameHandler { + + FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_) + : uuid(id), direction(dir), next(next_) { + assert(!uuid.isNull()); + } + + void handle(AMQFrame& frame) { + SessionFrame sf(uuid, frame, direction); + assert(next); + next->handle(sf); + } + + Uuid uuid; + bool direction; + SessionFrameHandler::Chain next; +}; + +SessionManager::SessionManager() {} + +void SessionManager::update(FrameHandler::Chains& chains) +{ + Mutex::ScopedLock l(lock); + // Create a new local session, store local chains. + Uuid uuid(true); + sessions[uuid] = chains; + + // Replace local incoming chain. Build from the back. + // + // TODO aconway 2007-07-05: Currently mcast wiring, bypass + // everythign else. + assert(clusterSend); + FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend)); + FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); + chains.in = classify; + + // FIXME aconway 2007-07-05: Need to stop bypassed frames + // from overtaking mcast frames. + // + + // Leave outgoing chain unmodified. + // TODO aconway 2007-07-05: Failover will require replication of + // outgoing frames to session replicas. + +} + +void SessionManager::handle(SessionFrame& frame) { + // Incoming from frame. + FrameHandler::Chains chains; + { + Mutex::ScopedLock l(lock); + SessionMap::iterator i = sessions.find(frame.uuid); + if (i == sessions.end()) { + QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); + chains = nonLocal; + } + else { + QPID_LOG(trace, "Local frame from cluster: " << frame.frame); + chains = i->second; + } + } + FrameHandler::Chain chain = + chain = frame.isIncoming ? chains.in : chains.out; + // TODO aconway 2007-07-11: Should this be assert(chain) + if (chain) + chain->handle(frame.frame); + + // TODO aconway 2007-07-05: Here's where we should unblock frame + // dispatch for the channel. +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h new file mode 100644 index 0000000000..c23efde18e --- /dev/null +++ b/cpp/src/qpid/cluster/SessionManager.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLUSTER_SESSIONMANAGER_H +#define QPID_CLUSTER_SESSIONMANAGER_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/broker/BrokerChannel.h" +#include "qpid/cluster/SessionFrame.h" +#include "qpid/framing/HandlerUpdater.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" + +#include <map> + +namespace qpid { +namespace cluster { + +/** + * Manage sessions and handler chains for the cluster. + * + */ +class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler +{ + public: + SessionManager(); + + /** Set the handler to send to the cluster */ + void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; } + + /** As ChannelUpdater update the handler chains. */ + void update(framing::FrameHandler::Chains& chains); + + /** As SessionFrameHandler handle frames received from the cluster */ + void handle(SessionFrame&); + + /** Get ChannelID for UUID. Return 0 if no mapping */ + framing::ChannelId getChannelId(const framing::Uuid&) const; + + private: + typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap; + + sys::Mutex lock; + SessionFrameHandler::Chain clusterSend; + SessionMap sessions; + framing::FrameHandler::Chains nonLocal; +}; + + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/ |
