diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 55 |
1 files changed, 36 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index eaa4a720b1..41688b5c49 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -21,28 +21,30 @@ #include "UpdateClient.h" #include "FailoverExchange.h" +#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" +#include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterUpdateRequestBody.h" -#include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterUpdateOfferBody.h" -#include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" -#include "qpid/log/Statement.h" +#include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterShutdownBody.h" +#include "qpid/framing/ClusterUpdateOfferBody.h" +#include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/log/Helpers.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/LatencyMetric.h" +#include "qpid/log/Statement.h" +#include "qpid/management/IdAllocator.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qmf/org/apache/qpid/cluster/Package.h" -#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" +#include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/Thread.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -101,11 +103,28 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + initialized(false), state(INIT), lastSize(0), lastBroker(false), sequence(0) { + failoverExchange.reset(new FailoverExchange(this)); + if (quorum_) quorum.init(); + cpg.join(name); + // pump the CPG dispatch manually till we get initialized. + while (!initialized) + cpg.dispatchOne(); +} + +Cluster::~Cluster() { + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. +} + +void Cluster::initialize() { + if (myUrl.empty()) + myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); + QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ _qmf::Package packageInit(mAgent); @@ -114,18 +133,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b mgmtObject->set_status("JOINING"); } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); - failoverExchange.reset(new FailoverExchange(this)); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); - QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); - if (quorum_) quorum.init(); - cpg.join(name); - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety. -} - -Cluster::~Cluster() { - if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + // Add finalizer last for exception safety. + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. @@ -279,6 +291,11 @@ void Cluster::configChange ( cpg_address */*joined*/, int /*nJoined*/) { Mutex::ScopedLock l(lock); + if (state == INIT) { // First config change. + // Recover only if we are first in cluster. + broker.setRecovery(nCurrent == 1); + initialized = true; + } QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); std::string addresses; |
