summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-06 17:00:57 +0000
committerAlan Conway <aconway@apache.org>2010-01-06 17:00:57 +0000
commit469c2fe3cf3ecd99799074a67adc374381ac4533 (patch)
tree09f7cc9356c01625db06c068466804e63b68d7bf /cpp/src/qpid/cluster/Cluster.cpp
parent995e679a76a6f1b48b3d7eeec9b4c2d60b448fb3 (diff)
downloadqpid-python-469c2fe3cf3ecd99799074a67adc374381ac4533.tar.gz
Don't hold up broker initialization for cluster initialization.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@896536 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp40
1 files changed, 8 insertions, 32 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d049001eb0..738a9fc5c4 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -258,15 +258,14 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
+ if (store.getState() == STORE_STATE_DIRTY_STORE)
+ broker.setRecovery(false); // Ditch my current store.
if (store.getClusterId())
clusterId = store.getClusterId(); // Use stored ID if there is one.
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
- // Pump the CPG dispatch manually till we get initialized.
- while (state == INIT)
- cpg.dispatchOne();
}
Cluster::~Cluster() {
@@ -277,18 +276,6 @@ void Cluster::initialize() {
if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- // Cluster constructor will leave us in either READY or JOINER state.
- switch (state) {
- case READY:
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
- break;
- case JOINER:
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
- break;
- default:
- assert(0);
- }
- QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -389,23 +376,9 @@ void Cluster::deliver(
deliverEvent(e);
}
-void Cluster::deliverEvent(const Event& e) {
- // During initialization, execute events directly in the same thread.
- // Once initialized, push to pollable queue to be processed in another thread.
- if (state == INIT)
- deliveredEvent(e);
- else
- deliverEventQueue.push(e);
-}
+void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); }
-void Cluster::deliverFrame(const EventFrame& e) {
- // During initialization, execute events directly in the same thread.
- // Once initialized, push to pollable queue to be processed in another thread.
- if (state == INIT)
- deliveredFrame(e);
- else
- deliverFrameQueue.push(e);
-}
+void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); }
const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
return (body && body->getMethod() &&
@@ -621,13 +594,16 @@ void Cluster::initMapCompleted(Lock& l) {
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
state = JOINER;
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ QPID_LOG(notice, *this << " joining cluster " << name);
}
else { // I can go ready.
discarding = false;
setReady(l);
memberUpdate(l);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
+ QPID_LOG(notice, *this << " joined cluster " << name);
}
- QPID_LOG(debug, *this << "Initialization complete");
}
}