summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
committerAlan Conway <aconway@apache.org>2007-07-19 21:52:24 +0000
commitcb566519d58ded6704507fa5530bf901e620edf6 (patch)
treeab4b29ddd0ad2b5e9015647e379bede84163b13e /cpp/src/qpid/cluster
parent3f900af77d5f781431dc25e307974e0fc27aa561 (diff)
downloadqpid-python-cb566519d58ded6704507fa5530bf901e620edf6.tar.gz
* Summary:
- Connect cluster handlers into broker handler chains. - Progress on wiring replication. * src/tests/cluster.mk: Temporarily disabled Cluster test. * src/tests/Cluster.h, cpp, Cluster_child.cpp: Updated to use UUIDs. * src/qpidd.cpp: - Load optional libs (cluster) - Include plugin config in options.parse. * src/qpid/cluster/SessionManager.h: - Create sessions, update handler chains (as HandlerUpdater) - Handle frames from cluster. * src/qpid/cluster/ClusterPlugin.h, .cpp: - renamed from ClusterPluginProvider - Create and connect Cluster and SessionManager. - Register SessionManager as HandlerUpdater. * src/qpid/cluster/Cluster.h, .cpp: Refactor as SessionFrameHandler. * src/qpid/broker/Connection.cpp: Apply HandlerUpdaters. * src/qpid/broker/Broker.h, .cpp: - Initialize plugins - Apply HandlerUpdaters * src/qpid/Plugin.h, .cpp: Simplified plugin framework. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp62
-rw-r--r--cpp/src/qpid/cluster/Cluster.h43
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp (renamed from cpp/src/qpid/cluster/ClusterPluginProvider.cpp)19
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp103
-rw-r--r--cpp/src/qpid/cluster/SessionManager.h68
5 files changed, 218 insertions, 77 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f2d1b75f3f..256378ccd5 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,6 +32,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
+
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
}
@@ -46,38 +47,20 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-struct Cluster::IncomingHandler : public FrameHandler {
- IncomingHandler(Cluster& c) : cluster(c) {}
- void handle(AMQFrame& frame) {
- SessionFrame sf(Uuid(true), frame, SessionFrame::IN);
- cluster.mcast(sf);
- }
- Cluster& cluster;
-};
-
-struct Cluster::OutgoingHandler : public FrameHandler {
- OutgoingHandler(Cluster& c) : cluster(c) {}
- void handle(AMQFrame& frame) {
- SessionFrame sf(Uuid(true), frame, SessionFrame::OUT);
- cluster.mcast(sf);
- }
- Cluster& cluster;
-};
-// TODO aconway 2007-06-28: Right now everything is backed up via
-// multicast. When we have point-to-point backups the
-// Incoming/Outgoing handlers must determine where each frame should
-// be sent: to multicast or only to specific backup(s) via AMQP.
-Cluster::Cluster(const std::string& name_, const std::string& url_) :
+Cluster::Cluster(
+ const std::string& name_, const std::string& url_,
+ const SessionFrameHandler::Chain& next
+) :
+ SessionFrameHandler(next),
cpg(new Cpg(*this)),
name(name_),
url(url_),
- self(cpg->getLocalNoideId(), getpid()),
- toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+ self(cpg->getLocalNoideId(), getpid())
{
- QPID_LOG(trace, *this << " Joining cluster.");
+ QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg->join(name);
notify();
dispatcher=Thread(*this);
@@ -102,7 +85,7 @@ Cluster::~Cluster() {
}
}
-void Cluster::mcast(SessionFrame& frame) {
+void Cluster::handle(SessionFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -114,7 +97,7 @@ void Cluster::mcast(SessionFrame& frame) {
void Cluster::notify() {
SessionFrame sf;
sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
- mcast(sf);
+ handle(sf);
}
size_t Cluster::size() const {
@@ -122,11 +105,6 @@ size_t Cluster::size() const {
return members.size();
}
-void Cluster::setReceivedChain(const SessionFrameHandler::Chain& chain) {
- Mutex::ScopedLock l(lock);
- receivedChain = chain;
-}
-
Cluster::MemberList Cluster::getMembers() const {
// TODO aconway 2007-07-04: use read/write lock?
Mutex::ScopedLock l(lock);
@@ -152,7 +130,7 @@ void Cluster::deliver(
if (frame.uuid.isNull())
handleClusterFrame(from, frame.frame);
else
- receivedChain->handle(frame);
+ next->handle(frame);
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -166,24 +144,22 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
}
// Handle cluster control frame from the null session.
-bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
- if (notifyIn) {
+ assert(notifyIn);
MemberList list;
{
Mutex::ScopedLock l(lock);
- if (!members[from])
- members[from].reset(new Member(url));
+ shared_ptr<Member>& member=members[from];
+ if (!member)
+ member.reset(new Member(notifyIn->getUrl()));
else
- members[from]->url = notifyIn->getUrl();
- QPID_LOG(trace, *this << ": member update: " << members);
+ member->url = notifyIn->getUrl();
lock.notifyAll();
+ QPID_LOG(trace, *this << ": members joined: " << members);
}
- return true;
- }
- return false;
}
void Cluster::configChange(
@@ -207,7 +183,7 @@ void Cluster::configChange(
// We don't record members joining here, we record them when
// we get their ClusterNotify message.
}
- if (newMembers)
+ if (newMembers) // Notify new members of my presence.
notify();
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 6ab4cb58df..f6afe14c62 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -37,11 +37,17 @@
namespace qpid { namespace cluster {
/**
- * Represents a cluster, provides access to data about members.
- * Implements HandlerUpdater to manage handlers that route frames to
- * and from the cluster.
+ * Connection to the cluster. Maintains cluster membership
+ * data.
+ *
+ * As SessionFrameHandler, handles frames by sending them to the
+ * cluster, sends frames received from the cluster to the next
+ * SessionFrameHandler.
+ *
+ *
*/
-class Cluster : private sys::Runnable, private Cpg::Handler
+class Cluster : public SessionFrameHandler,
+ private sys::Runnable, private Cpg::Handler
{
public:
/** Details of a cluster member */
@@ -57,8 +63,10 @@ class Cluster : private sys::Runnable, private Cpg::Handler
* Join a cluster.
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
+ * @param handler for frames received from the cluster.
*/
- Cluster(const std::string& name, const std::string& url);
+ Cluster(const std::string& name, const std::string& url,
+ const SessionFrameHandler::Chain& next);
virtual ~Cluster();
@@ -70,14 +78,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler
bool empty() const { return size() == 0; }
- /** Get handler chains to send incoming/outgoing frames to the cluster */
- framing::FrameHandler::Chains getSendChains() {
- return toChains;
- }
-
- /** Set handler for frames received from the cluster */
- void setReceivedChain(const SessionFrameHandler::Chain& chain);
-
/** Wait for predicate(*this) to be true, up to timeout.
*@return True if predicate became true, false if timed out.
*Note the predicate may not be true after wait returns,
@@ -86,13 +86,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler
bool wait(boost::function<bool(const Cluster&)> predicate,
sys::Duration timeout=sys::TIME_INFINITE) const;
+ /** Send frame to the cluster */
+ void handle(SessionFrame&);
+
private:
typedef Cpg::Id Id;
typedef std::map<Id, shared_ptr<Member> > MemberMap;
- typedef std::map<
- framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
- void mcast(SessionFrame&); ///< send frame by multicast.
void notify(); ///< Notify cluster of my details.
void deliver(
@@ -112,7 +112,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
);
void run();
- bool handleClusterFrame(Id from, framing::AMQFrame&);
+ void handleClusterFrame(Id from, framing::AMQFrame&);
mutable sys::Monitor lock;
boost::scoped_ptr<Cpg> cpg;
@@ -120,17 +120,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler
std::string url;
Id self;
MemberMap members;
- ChannelMap channels;
sys::Thread dispatcher;
boost::function<void()> callback;
- framing::FrameHandler::Chains toChains;
- SessionFrameHandler::Chain receivedChain;
-
- struct IncomingHandler;
- struct OutgoingHandler;
-
- friend struct IncomingHandler;
- friend struct OutgoingHandler;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index d48fbadf7b..10b1c44f40 100644
--- a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -16,8 +16,8 @@
*
*/
#include "qpid/broker/Broker.h"
-#include "qpid/framing/HandlerUpdater.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/SessionManager.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
@@ -26,11 +26,11 @@ namespace cluster {
using namespace std;
-struct ClusterPluginProvider : public PluginProvider {
+struct ClusterPlugin : public Plugin {
struct ClusterOptions : public Options {
string clusterName;
- ClusterOptions() {
+ ClusterOptions() : Options("Cluster Options") {
addOptions()
("cluster", optValue(clusterName, "NAME"),
"Join the cluster named NAME");
@@ -39,22 +39,25 @@ struct ClusterPluginProvider : public PluginProvider {
ClusterOptions options;
shared_ptr<Cluster> cluster;
+ shared_ptr<SessionManager> sessions;
Options* getOptions() {
return &options;
}
- void provide(PluginUser& user) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&user);
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker, and only if the --cluster config is set.
if (broker && !options.clusterName.empty()) {
assert(!cluster); // A process can only belong to one cluster.
- cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
- // FIXME aconway 2007-06-29: register HandlerUpdater.
+ sessions.reset(new SessionManager());
+ cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions));
+ sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10:
+ broker->add(sessions);
}
}
};
-static ClusterPluginProvider instance; // Static initialization.
+static ClusterPlugin instance; // Static initialization.
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
new file mode 100644
index 0000000000..24f201535d
--- /dev/null
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "SessionManager.h"
+#include "ClassifierHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace sys;
+
+/** Wrap plain AMQFrames in SessionFrames */
+struct FrameWrapperHandler : public FrameHandler {
+
+ FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_)
+ : uuid(id), direction(dir), next(next_) {
+ assert(!uuid.isNull());
+ }
+
+ void handle(AMQFrame& frame) {
+ SessionFrame sf(uuid, frame, direction);
+ assert(next);
+ next->handle(sf);
+ }
+
+ Uuid uuid;
+ bool direction;
+ SessionFrameHandler::Chain next;
+};
+
+SessionManager::SessionManager() {}
+
+void SessionManager::update(FrameHandler::Chains& chains)
+{
+ Mutex::ScopedLock l(lock);
+ // Create a new local session, store local chains.
+ Uuid uuid(true);
+ sessions[uuid] = chains;
+
+ // Replace local incoming chain. Build from the back.
+ //
+ // TODO aconway 2007-07-05: Currently mcast wiring, bypass
+ // everythign else.
+ assert(clusterSend);
+ FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend));
+ FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
+ chains.in = classify;
+
+ // FIXME aconway 2007-07-05: Need to stop bypassed frames
+ // from overtaking mcast frames.
+ //
+
+ // Leave outgoing chain unmodified.
+ // TODO aconway 2007-07-05: Failover will require replication of
+ // outgoing frames to session replicas.
+
+}
+
+void SessionManager::handle(SessionFrame& frame) {
+ // Incoming from frame.
+ FrameHandler::Chains chains;
+ {
+ Mutex::ScopedLock l(lock);
+ SessionMap::iterator i = sessions.find(frame.uuid);
+ if (i == sessions.end()) {
+ QPID_LOG(trace, "Non-local frame cluster: " << frame.frame);
+ chains = nonLocal;
+ }
+ else {
+ QPID_LOG(trace, "Local frame from cluster: " << frame.frame);
+ chains = i->second;
+ }
+ }
+ FrameHandler::Chain chain =
+ chain = frame.isIncoming ? chains.in : chains.out;
+ // TODO aconway 2007-07-11: Should this be assert(chain)
+ if (chain)
+ chain->handle(frame.frame);
+
+ // TODO aconway 2007-07-05: Here's where we should unblock frame
+ // dispatch for the channel.
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h
new file mode 100644
index 0000000000..c23efde18e
--- /dev/null
+++ b/cpp/src/qpid/cluster/SessionManager.h
@@ -0,0 +1,68 @@
+#ifndef QPID_CLUSTER_SESSIONMANAGER_H
+#define QPID_CLUSTER_SESSIONMANAGER_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/BrokerChannel.h"
+#include "qpid/cluster/SessionFrame.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Manage sessions and handler chains for the cluster.
+ *
+ */
+class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler
+{
+ public:
+ SessionManager();
+
+ /** Set the handler to send to the cluster */
+ void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
+
+ /** As ChannelUpdater update the handler chains. */
+ void update(framing::FrameHandler::Chains& chains);
+
+ /** As SessionFrameHandler handle frames received from the cluster */
+ void handle(SessionFrame&);
+
+ /** Get ChannelID for UUID. Return 0 if no mapping */
+ framing::ChannelId getChannelId(const framing::Uuid&) const;
+
+ private:
+ typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
+
+ sys::Mutex lock;
+ SessionFrameHandler::Chain clusterSend;
+ SessionMap sessions;
+ framing::FrameHandler::Chains nonLocal;
+};
+
+
+}} // namespace qpid::cluster
+
+
+
+#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/