summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-04 17:04:45 +0000
committerAlan Conway <aconway@apache.org>2009-02-04 17:04:45 +0000
commit314eb1b65a752daaa80a2cb5174bac78c4643bcb (patch)
treea8fcbb5f9cc7d5af1cd5016f253c98296fa9f3bb /cpp/src/qpid/cluster
parent80c1c1da2855cc0c03d08a0fcb425c38b3344333 (diff)
downloadqpid-python-314eb1b65a752daaa80a2cb5174bac78c4643bcb.tar.gz
Cluster sets recovery flag on Broker for first member in cluster.
Disable recovery from local store if the recovery flag is not set. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@740793 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp55
-rw-r--r--cpp/src/qpid/cluster/Cluster.h17
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp8
3 files changed, 52 insertions, 28 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index eaa4a720b1..41688b5c49 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -21,28 +21,30 @@
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUpdateRequestBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterUpdateOfferBody.h"
-#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
-#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
+#include "qpid/framing/ClusterUpdateOfferBody.h"
+#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/log/Helpers.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/LatencyMetric.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/management/ManagementBroker.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
-#include "qmf/org/apache/qpid/cluster/Package.h"
-#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/Thread.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -101,11 +103,28 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
poller),
connections(*this),
decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ initialized(false),
state(INIT),
lastSize(0),
lastBroker(false),
sequence(0)
{
+ failoverExchange.reset(new FailoverExchange(this));
+ if (quorum_) quorum.init();
+ cpg.join(name);
+ // pump the CPG dispatch manually till we get initialized.
+ while (!initialized)
+ cpg.dispatchOne();
+}
+
+Cluster::~Cluster() {
+ if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+}
+
+void Cluster::initialize() {
+ if (myUrl.empty())
+ myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
+ QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
_qmf::Package packageInit(mAgent);
@@ -114,18 +133,11 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
mgmtObject->set_status("JOINING");
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
- failoverExchange.reset(new FailoverExchange(this));
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
- QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
- if (quorum_) quorum.init();
- cpg.join(name);
- broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
-}
-
-Cluster::~Cluster() {
- if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+ // Add finalizer last for exception safety.
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
// Called in connection thread to insert a client connection.
@@ -279,6 +291,11 @@ void Cluster::configChange (
cpg_address */*joined*/, int /*nJoined*/)
{
Mutex::ScopedLock l(lock);
+ if (state == INIT) { // First config change.
+ // Recover only if we are first in cluster.
+ broker.setRecovery(nCurrent == 1);
+ initialized = true;
+ }
QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
std::string addresses;
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 1cfcd04c6f..f7955aa743 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -64,15 +64,17 @@ class Cluster : private Cpg::Handler, public management::Manageable {
public:
typedef boost::intrusive_ptr<Connection> ConnectionPtr;
typedef std::vector<ConnectionPtr> Connections;
-
- /**
- * Join a cluster.
- */
+
+ /** Construct the cluster in plugin earlyInitialize */
Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
size_t readMax, size_t writeEstimate);
virtual ~Cluster();
+ /** Join the cluster in plugin initialize. Requires transport
+ * plugins to be available.. */
+ void initialize();
+
// Connection map - called in connection threads.
void addLocalConnection(const ConnectionPtr&);
void addShadowConnection(const ConnectionPtr&);
@@ -177,7 +179,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
const std::string name;
- const Url myUrl;
+ Url myUrl;
const MemberId myId;
const size_t readMax;
const size_t writeEstimate;
@@ -197,7 +199,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Called only from event delivery thread
Decoder decoder;
-
+
+ // Used only during initialization
+ bool initialized;
+
// Remaining members are protected by lock
mutable sys::Monitor lock;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 1c15747c77..7e0bdcbea8 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -136,13 +136,13 @@ struct ClusterPlugin : public Plugin {
Options* getOptions() { return &options; }
- void initialize(Plugin::Target& target) {
+ void earlyInitialize(Plugin::Target& target) {
if (values.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
cluster = new Cluster(
values.name,
- values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
+ values.url.empty() ? Url() : Url(values.url),
*broker,
values.quorum,
values.readMax, values.writeEstimate*1024
@@ -158,7 +158,9 @@ struct ClusterPlugin : public Plugin {
}
}
- void earlyInitialize(Plugin::Target&) {}
+ void initialize(Plugin::Target& ) {
+ cluster->initialize();
+ }
};
static ClusterPlugin instance; // Static initialization.