summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp55
1 files changed, 36 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index eaa4a720b1..41688b5c49 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -21,28 +21,30 @@
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUpdateRequestBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterUpdateOfferBody.h"
-#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
-#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
+#include "qpid/framing/ClusterUpdateOfferBody.h"
+#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/log/Helpers.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/LatencyMetric.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/management/ManagementBroker.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
-#include "qmf/org/apache/qpid/cluster/Package.h"
-#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/Thread.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -101,11 +103,28 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
poller),
connections(*this),
decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ initialized(false),
state(INIT),
lastSize(0),
lastBroker(false),
sequence(0)
{
+ failoverExchange.reset(new FailoverExchange(this));
+ if (quorum_) quorum.init();
+ cpg.join(name);
+ // pump the CPG dispatch manually till we get initialized.
+ while (!initialized)
+ cpg.dispatchOne();
+}
+
+Cluster::~Cluster() {
+ if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+}
+
+void Cluster::initialize() {
+ if (myUrl.empty())
+ myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
+ QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
_qmf::Package packageInit(mAgent);
@@ -114,18 +133,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
mgmtObject->set_status("JOINING");
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
- failoverExchange.reset(new FailoverExchange(this));
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
- QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
- if (quorum_) quorum.init();
- cpg.join(name);
- broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
-}
-
-Cluster::~Cluster() {
- if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+ // Add finalizer last for exception safety.
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
// Called in connection thread to insert a client connection.
@@ -279,6 +291,11 @@ void Cluster::configChange (
cpg_address */*joined*/, int /*nJoined*/)
{
Mutex::ScopedLock l(lock);
+ if (state == INIT) { // First config change.
+ // Recover only if we are first in cluster.
+ broker.setRecovery(nCurrent == 1);
+ initialized = true;
+ }
QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
std::string addresses;