diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 37932be735..4f98c60cad 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -264,6 +264,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), + failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), + updateDataExchange(new UpdateDataExchange(this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), @@ -283,17 +285,16 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); // Failover exchange provides membership updates to clients. - failoverExchange.reset(new FailoverExchange(broker.GetVhostObject(), &broker)); broker.getExchanges().registerExchange(failoverExchange); // Update exchange is used during updates to replicate messages // without modifying delivery-properties.exchange. broker.getExchanges().registerExchange( boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + // Update-data exchange is used for passing data that may be too large // for single control frame. - broker.getExchanges().registerExchange( - boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent()))); + broker.getExchanges().registerExchange(updateDataExchange); // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { @@ -931,11 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) { // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); - if (mAgent) mAgent->suppress(false); // Enable management output. + if (mAgent) { + // Update management agent now, after all update activity is complete. + updateDataExchange->updateManagementAgent(mAgent); + mAgent->suppress(false); // Enable management output. + mAgent->clusterUpdate(); + } discarding = false; // OK to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. - if (mAgent) mAgent->clusterUpdate(); enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } |
