summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-06-29 17:59:00 +0000
committerAlan Conway <aconway@apache.org>2007-06-29 17:59:00 +0000
commitfda6dadde945a9c73c97b73dc79e93368b743348 (patch)
treed7755539ae485efdfbc46298cd1ef6632515159e /cpp/src/qpid/cluster
parent79cd6c772da003ddc917eff362f9adaa99e28b49 (diff)
downloadqpid-python-fda6dadde945a9c73c97b73dc79e93368b743348.tar.gz
* Summary:
- Improved plugin framework and HandlerUpdater interface. - Cluster handlers for traffic to/from cluster. - Cluster HandlerUpdater configures channel chains for cluster. - Cluster PluginProvider registers cluster objects with broker. * src/qpid/framing/AMQFrame.h: Made data members public. Handlers need to be able to modify frame data, getters/setters are just a nuisance here. * src/tests/Cluster.cpp: Updated for cluster changes, using handlers instead of friendship to hook test into Cluster code. * src/qpid/framing/amqp_types.h: Added CHANNEL_MAX and CHANNEL_HIGH_BIT constants. * src/qpid/framing/HandlerUpdater.h: Renamed ChannelInitializer, broke dependency on broker channel types. * src/qpid/framing/Handler.h: Added constructors and nextHandler() * src/qpid/framing/AMQFrame.h (class AMQFrame): Inlined getChannel() * src/qpid/cluster/ClusterPluginProvider.cpp: Provider for cluster plugins. * src/qpid/cluster/Cluster.cpp: Use ChannelManager. Factor out plugin details to ClusterPluginProvider. * src/qpid/cluster/ChannelManager.h: Insert cluster handlers into channel chains, route frames between cluster and channels. * src/qpid/broker/BrokerAdapter.cpp (startOk): use CHANNEL_MAX constant. * src/qpid/broker/Broker.cpp: - Refactored for new plugin framework. - Added getUrl(). * src/qpid/Url.h: Added constructor from Address. * src/qpid/Plugin.h: Generalized plugin framework, broke dependency on Broker interfaces. We may want to use plug-ins for clients also at some point. * src/tests/run_test: Fix bug when VALGRIND is not set. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551981 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/ChannelManager.cpp85
-rw-r--r--cpp/src/qpid/cluster/ChannelManager.h66
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp121
-rw-r--r--cpp/src/qpid/cluster/Cluster.h64
-rw-r--r--cpp/src/qpid/cluster/ClusterPluginProvider.cpp66
-rw-r--r--cpp/src/qpid/cluster/Cpg.h3
6 files changed, 336 insertions, 69 deletions
diff --git a/cpp/src/qpid/cluster/ChannelManager.cpp b/cpp/src/qpid/cluster/ChannelManager.cpp
new file mode 100644
index 0000000000..f573d78ca1
--- /dev/null
+++ b/cpp/src/qpid/cluster/ChannelManager.cpp
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "ChannelManager.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+/** Handler to multicast to the cluster */
+struct ClusterHandler : public FrameHandler {
+
+ ClusterHandler(FrameHandler::Chain next, ChannelId bitmask_)
+ : FrameHandler(next), bitmask(bitmask_) {}
+
+ void handle(AMQFrame& frame) {
+ frame.channel |= bitmask; // Mark the frame
+ nextHandler(frame);
+ // TODO aconway 2007-06-28: Right now everything is backed up
+ // via multicast. When we have point-to-point backups this
+ // function must determine where each frame should be sent: to
+ // multicast or only to specific backup(s) via AMQP.
+ }
+
+ ChannelId bitmask;
+};
+
+ChannelManager::ChannelManager(FrameHandler::Chain mcast) : mcastOut(mcast){}
+
+void ChannelManager::update(ChannelId id, FrameHandler::Chains& chains) {
+ // Store the original cluster chains for the channel.
+ channels[id] = chains;
+
+ // Replace chains with multicast-to-cluster handlers that mark the
+ // high-bit of the channel ID on outgoing frames so we can tell
+ // them from incoming frames in handle()
+ //
+ // When handle() receives the frames from the cluster it
+ // will forward them to the original channel chains stored in
+ // channels map.
+ //
+ chains.in = make_shared_ptr(new ClusterHandler(mcastOut, 0));
+ chains.out= make_shared_ptr(new ClusterHandler(mcastOut, CHANNEL_HIGH_BIT));
+}
+
+void ChannelManager::handle(AMQFrame& frame) {
+ bool isOut = frame.channel | CHANNEL_HIGH_BIT;
+ frame.channel |= ~CHANNEL_HIGH_BIT; // Clear the bit.
+ ChannelMap::iterator i = channels.find(frame.getChannel());
+ if (i != channels.end()) {
+ Chain& chain = isOut ? i->second.out : i->second.in;
+ chain->handle(frame);
+ }
+ else
+ updateFailoverState(frame);
+}
+
+void ChannelManager::updateFailoverState(AMQFrame& ) {
+ QPID_LOG(critical, "Failover is not implemented");
+ // FIXME aconway 2007-06-28:
+ // If the channel is not in my map then I'm not primary so
+ // I don't pass the frame to the channel handler but I
+ // do need to update the session failover state.
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ChannelManager.h b/cpp/src/qpid/cluster/ChannelManager.h
new file mode 100644
index 0000000000..59fce77957
--- /dev/null
+++ b/cpp/src/qpid/cluster/ChannelManager.h
@@ -0,0 +1,66 @@
+#ifndef QPID_CLUSTER_CHANNELMANAGER_H
+#define QPID_CLUSTER_CHANNELMANAGER_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/framing/HandlerUpdater.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Manage channels and handler chains on channels for the cluster.
+ *
+ * As HandlerUpdater plugin, updates channel handler chains with
+ * cluster handlers.
+ *
+ * As a FrameHandler handles frames coming from the cluster and
+ * dispatches them to the appropriate channel handler.
+ *
+ */
+class ChannelManager : public framing::HandlerUpdater,
+ public framing::FrameHandler
+{
+ public:
+ /** Takes a handler to send frames to the cluster. */
+ ChannelManager(framing::FrameHandler::Chain mcastOut);
+
+ /** As FrameHandler handle frames from the cluster */
+ void handle(framing::AMQFrame& frame);
+
+ /** As ChannelUpdater update the handler chains. */
+ void update(framing::ChannelId id, framing::FrameHandler::Chains& chains);
+
+ private:
+ void updateFailoverState(framing::AMQFrame&);
+
+ typedef std::map<framing::ChannelId,
+ framing::FrameHandler::Chains> ChannelMap;
+
+ framing::FrameHandler::Chain mcastOut;
+ ChannelMap channels;
+};
+
+
+}} // namespace qpid::cluster
+
+
+
+#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 30073c4551..8d898eefa3 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,13 +17,13 @@
*/
#include "Cluster.h"
-#include "Cpg.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
#include <iterator>
+#include <map>
namespace qpid {
namespace cluster {
@@ -35,40 +35,62 @@ ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << cluster.name.str() << "(" << cluster.self << ")";
}
+namespace {
+Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1];
+struct StatusMapInit {
+ StatusMapInit() {
+ statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN;
+ statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE;
+ statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN;
+ statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP;
+ statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN;
+ }
+} instance;
+}
+
+Cluster::Member::Member(const cpg_address& addr)
+ : status(statusMap[addr.reason]) {}
+
void Cluster::notify() {
+ ProtocolVersion version;
// TODO aconway 2007-06-25: Use proxy here.
AMQFrame frame(version, 0,
make_shared_ptr(new ClusterNotifyBody(version, url)));
handle(frame);
}
-Cluster::Cluster(
- const std::string& name_, const std::string& url_, FrameHandler& next_,
- ProtocolVersion ver)
- : name(name_), url(url_), version(ver),
- cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
- boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
- next(next_)
-{
- self=Id(cpg->getLocalNoideId(), getpid());
+Cluster::Cluster(const std::string& name_, const std::string& url_) :
+ name(name_),
+ url(url_),
+ cpg(new Cpg(
+ boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
+ boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
+ self(cpg->getLocalNoideId(), getpid())
+{}
+
+void Cluster::join(FrameHandler::Chain next) {
QPID_LOG(trace, *this << " Joining cluster.");
+ next = next;
+ dispatcher=Thread(*this);
cpg->join(name);
notify();
- dispatcher=Thread(*this);
}
Cluster::~Cluster() {
- try {
- QPID_LOG(trace, *this << " Leaving cluster.");
- cpg->leave(name);
- cpg.reset();
- dispatcher.join();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception leaving cluster " << e.what());
+ if (cpg) {
+ try {
+ QPID_LOG(trace, *this << " Leaving cluster.");
+ cpg->leave(name);
+ cpg.reset();
+ dispatcher.join();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Exception leaving cluster " << e.what());
+ }
}
}
void Cluster::handle(AMQFrame& frame) {
+ assert(cpg);
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -104,52 +126,59 @@ void Cluster::cpgDeliver(
frame.decode(buf);
QPID_LOG(trace, *this << " RECV: " << frame);
// TODO aconway 2007-06-20: use visitor pattern.
- ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
+ ClusterNotifyBody* notifyIn=
+ dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
if (notifyIn) {
- Mutex::ScopedLock l(lock);
- members[from].reset(new Member(notifyIn->getUrl()));
- lock.notifyAll();
+ {
+ Mutex::ScopedLock l(lock);
+ assert(members[from]);
+ members[from]->url = notifyIn->getUrl();
+ members[from]->status = Member::BROKER;
+ }
+ if (callback)
+ callback();
}
else
- next.handle(frame);
+ next->handle(frame);
}
void Cluster::cpgConfigChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
- struct cpg_address *ccMembers, int nMembers,
+ struct cpg_address *current, int nCurrent,
struct cpg_address *left, int nLeft,
struct cpg_address *joined, int nJoined
)
{
- QPID_LOG(
- trace,
- *this << " Configuration change. " << endl
- << " Joined: " << make_pair(joined, nJoined) << endl
- << " Left: " << make_pair(left, nLeft) << endl
- << " Current: " << make_pair(ccMembers, nMembers));
-
+ QPID_LOG(trace,
+ *this << " Configuration change. " << endl
+ << " Joined: " << make_pair(joined, nJoined) << endl
+ << " Left: " << make_pair(left, nLeft) << endl
+ << " Current: " << make_pair(current, nCurrent));
+
+ bool needNotify=false;
+ MemberList updated;
{
Mutex::ScopedLock l(lock);
- // Erase members that left.
- for (int i = 0; i < nLeft; ++i)
- members.erase(Id(left[i]));
- lock.notifyAll();
- }
-
- // If there are new members (other than myself) then notify.
- for (int i=0; i< nJoined; ++i) {
- if (Id(joined[i]) != self) {
- notify();
- break;
+ for (int i = 0; i < nJoined; ++i) {
+ Id id(current[i]);
+ members[id].reset(new Member(current[i]));
+ if (id != self)
+ needNotify = true; // Notify new members other than myself.
}
- }
-
- // Note: New members are be added to my map when cpgDeliver
- // gets a cluster.notify frame.
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(Id(current[i]));
+ } // End of locked scope.
+ if (needNotify)
+ notify();
+ if (callback)
+ callback();
}
+void Cluster::setCallback(boost::function<void()> f) { callback=f; }
+
void Cluster::run() {
+ assert(cpg);
cpg->dispatchBlocking();
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 1cbbb249f2..aff213b6c9 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -25,47 +25,74 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
#include "qpid/shared_ptr.h"
-#include "qpid/framing/ProtocolVersion.h"
+#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
#include <map>
#include <vector>
namespace qpid {
+
+namespace broker {
+class HandlerUpdater;
+}
+
namespace cluster {
+class ChannelManager;
+
/**
- * Represents a cluster. Creating an instance joins current process
- * to the cluster.
+ * Represents a cluster, provides access to data about members.
+ *
+ * Implements a FrameHandler that multicasts frames to the cluster.
+ *
+ * Requires a handler for frames arriving from the cluster,
+ * normally a ChannelManager but other handlers could be interposed
+ * for testing, logging etc.
*/
class Cluster : public framing::FrameHandler, private sys::Runnable {
public:
/** Details of a cluster member */
struct Member {
- Member(const std::string& url_) : url(url_) {}
+ typedef shared_ptr<const Member> Ptr;
+ /** Status of a cluster member. */
+ enum Status {
+ JOIN, ///< Process joined the group.
+ LEAVE, ///< Process left the group cleanly.
+ NODEDOWN, ///< Process's node went down.
+ NODEUP, ///< Process's node joined the cluster.
+ PROCDOWN, ///< Process died without leaving.
+ BROKER ///< Broker details are available.
+ };
+
+ Member(const cpg_address&);
std::string url;
+ Status status;
};
-
- typedef std::vector<shared_ptr<const Member> > MemberList;
+
+ typedef std::vector<Member::Ptr> MemberList;
/**
- * Join a cluster.
+ * Create a cluster object but do not joing.
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
- * @param next handler receives the frame when it has been
- * acknowledged by the cluster.
*/
- Cluster(const std::string& name,
- const std::string& url,
- framing::FrameHandler& next,
- framing::ProtocolVersion);
+ Cluster(const std::string& name, const std::string& url);
~Cluster();
+
+ /** Join the cluster.
+ *@handler is the handler for frames arriving from the cluster.
+ */
+ void join(framing::FrameHandler::Chain handler);
/** Multicast a frame to the cluster. */
void handle(framing::AMQFrame&);
/** Get the current cluster membership. */
MemberList getMembers() const;
+
+ /** Called when membership changes. */
+ void setCallback(boost::function<void()>);
/** Number of members in the cluster. */
size_t size() const;
@@ -76,7 +103,6 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
void run();
void notify();
-
void cpgDeliver(
cpg_handle_t /*handle*/,
struct cpg_name *group,
@@ -93,18 +119,14 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
struct cpg_address */*joined*/, int /*nJoined*/
);
- Id self;
+ mutable sys::Monitor lock;
Cpg::Name name;
std::string url;
- framing::ProtocolVersion version;
boost::scoped_ptr<Cpg> cpg;
- framing::FrameHandler& next;
+ Id self;
MemberMap members;
sys::Thread dispatcher;
-
- protected:
- // Allow access from ClusterTest subclass.
- mutable sys::Monitor lock;
+ boost::function<void()> callback;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
};
diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
new file mode 100644
index 0000000000..3a09a66b81
--- /dev/null
+++ b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/HandlerUpdater.h"
+#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/ChannelManager.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+
+struct ClusterPluginProvider : public PluginProvider {
+
+ struct ClusterOptions : public Options {
+ string clusterName;
+ ClusterOptions() {
+ addOptions()
+ ("cluster", optValue(clusterName, "NAME"),
+ "Join the cluster named NAME");
+ }
+ };
+
+ ClusterOptions options;
+ shared_ptr<Cluster> cluster;
+
+ Options* getOptions() {
+ return &options;
+ }
+
+ void provide(PluginUser& user) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&user);
+ // Only provide to a Broker, and only if the --cluster config is set.
+ if (broker && !options.clusterName.empty()) {
+ assert(!cluster); // A process can only belong to one cluster.
+ cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
+
+ // Channel manager is both the next handler for the cluster
+ // and the HandlerUpdater plugin for the broker.
+ shared_ptr<ChannelManager> manager(new ChannelManager(cluster));
+ cluster->join(manager);
+ broker->use(manager);
+ }
+ }
+};
+
+static ClusterPluginProvider instance; // Static initialization.
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index 6b157301a7..e164ed1215 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -23,11 +23,10 @@
#include "qpid/cluster/Dispatchable.h"
#include <boost/function.hpp>
#include <cassert>
-#ifdef CLUSTER
extern "C" {
#include <openais/cpg.h>
}
-#endif
+
namespace qpid {
namespace cluster {