diff options
| author | Alan Conway <aconway@apache.org> | 2009-02-04 17:04:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-02-04 17:04:45 +0000 |
| commit | 314eb1b65a752daaa80a2cb5174bac78c4643bcb (patch) | |
| tree | a8fcbb5f9cc7d5af1cd5016f253c98296fa9f3bb /cpp/src/qpid/cluster | |
| parent | 80c1c1da2855cc0c03d08a0fcb425c38b3344333 (diff) | |
| download | qpid-python-314eb1b65a752daaa80a2cb5174bac78c4643bcb.tar.gz | |
Cluster sets recovery flag on Broker for first member in cluster.
Disable recovery from local store if the recovery flag is not set.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@740793 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 8 |
3 files changed, 52 insertions, 28 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index eaa4a720b1..41688b5c49 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -21,28 +21,30 @@ #include "UpdateClient.h" #include "FailoverExchange.h" +#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" +#include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterUpdateRequestBody.h" -#include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterUpdateOfferBody.h" -#include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" -#include "qpid/log/Statement.h" +#include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterShutdownBody.h" +#include "qpid/framing/ClusterUpdateOfferBody.h" +#include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/log/Helpers.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/LatencyMetric.h" +#include "qpid/log/Statement.h" +#include "qpid/management/IdAllocator.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qmf/org/apache/qpid/cluster/Package.h" -#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" +#include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/Thread.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -101,11 +103,28 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + initialized(false), state(INIT), lastSize(0), lastBroker(false), sequence(0) { + failoverExchange.reset(new FailoverExchange(this)); + if (quorum_) quorum.init(); + cpg.join(name); + // pump the CPG dispatch manually till we get initialized. + while (!initialized) + cpg.dispatchOne(); +} + +Cluster::~Cluster() { + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. +} + +void Cluster::initialize() { + if (myUrl.empty()) + myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); + QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ _qmf::Package packageInit(mAgent); @@ -114,18 +133,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b mgmtObject->set_status("JOINING"); } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); - failoverExchange.reset(new FailoverExchange(this)); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); - QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); - if (quorum_) quorum.init(); - cpg.join(name); - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety. -} - -Cluster::~Cluster() { - if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + // Add finalizer last for exception safety. + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. @@ -279,6 +291,11 @@ void Cluster::configChange ( cpg_address */*joined*/, int /*nJoined*/) { Mutex::ScopedLock l(lock); + if (state == INIT) { // First config change. + // Recover only if we are first in cluster. + broker.setRecovery(nCurrent == 1); + initialized = true; + } QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); std::string addresses; diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 1cfcd04c6f..f7955aa743 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -64,15 +64,17 @@ class Cluster : private Cpg::Handler, public management::Manageable { public: typedef boost::intrusive_ptr<Connection> ConnectionPtr; typedef std::vector<ConnectionPtr> Connections; - - /** - * Join a cluster. - */ + + /** Construct the cluster in plugin earlyInitialize */ Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, size_t readMax, size_t writeEstimate); virtual ~Cluster(); + /** Join the cluster in plugin initialize. Requires transport + * plugins to be available.. */ + void initialize(); + // Connection map - called in connection threads. void addLocalConnection(const ConnectionPtr&); void addShadowConnection(const ConnectionPtr&); @@ -177,7 +179,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::shared_ptr<sys::Poller> poller; Cpg cpg; const std::string name; - const Url myUrl; + Url myUrl; const MemberId myId; const size_t readMax; const size_t writeEstimate; @@ -197,7 +199,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Called only from event delivery thread Decoder decoder; - + + // Used only during initialization + bool initialized; + // Remaining members are protected by lock mutable sys::Monitor lock; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 1c15747c77..7e0bdcbea8 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -136,13 +136,13 @@ struct ClusterPlugin : public Plugin { Options* getOptions() { return &options; } - void initialize(Plugin::Target& target) { + void earlyInitialize(Plugin::Target& target) { if (values.name.empty()) return; // Only if --cluster-name option was specified. Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; cluster = new Cluster( values.name, - values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), + values.url.empty() ? Url() : Url(values.url), *broker, values.quorum, values.readMax, values.writeEstimate*1024 @@ -158,7 +158,9 @@ struct ClusterPlugin : public Plugin { } } - void earlyInitialize(Plugin::Target&) {} + void initialize(Plugin::Target& ) { + cluster->initialize(); + } }; static ClusterPlugin instance; // Static initialization. |
