diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 40 |
1 files changed, 8 insertions, 32 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d049001eb0..738a9fc5c4 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -258,15 +258,14 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); + if (store.getState() == STORE_STATE_DIRTY_STORE) + broker.setRecovery(false); // Ditch my current store. if (store.getClusterId()) clusterId = store.getClusterId(); // Use stored ID if there is one. QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); - // Pump the CPG dispatch manually till we get initialized. - while (state == INIT) - cpg.dispatchOne(); } Cluster::~Cluster() { @@ -277,18 +276,6 @@ void Cluster::initialize() { if (settings.quorum) quorum.start(poller); if (myUrl.empty()) myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - // Cluster constructor will leave us in either READY or JOINER state. - switch (state) { - case READY: - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); - break; - case JOINER: - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); - break; - default: - assert(0); - } - QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); @@ -389,23 +376,9 @@ void Cluster::deliver( deliverEvent(e); } -void Cluster::deliverEvent(const Event& e) { - // During initialization, execute events directly in the same thread. - // Once initialized, push to pollable queue to be processed in another thread. - if (state == INIT) - deliveredEvent(e); - else - deliverEventQueue.push(e); -} +void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } -void Cluster::deliverFrame(const EventFrame& e) { - // During initialization, execute events directly in the same thread. - // Once initialized, push to pollable queue to be processed in another thread. - if (state == INIT) - deliveredFrame(e); - else - deliverFrameQueue.push(e); -} +void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); } const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { return (body && body->getMethod() && @@ -621,13 +594,16 @@ void Cluster::initMapCompleted(Lock& l) { broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); state = JOINER; + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); + QPID_LOG(notice, *this << " joining cluster " << name); } else { // I can go ready. discarding = false; setReady(l); memberUpdate(l); + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); + QPID_LOG(notice, *this << " joined cluster " << name); } - QPID_LOG(debug, *this << "Initialization complete"); } } |
