summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp15
1 files changed, 10 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 37932be735..4f98c60cad 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -264,6 +264,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
+ failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
+ updateDataExchange(new UpdateDataExchange(this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -283,17 +285,16 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
// Failover exchange provides membership updates to clients.
- failoverExchange.reset(new FailoverExchange(broker.GetVhostObject(), &broker));
broker.getExchanges().registerExchange(failoverExchange);
// Update exchange is used during updates to replicate messages
// without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
// Update-data exchange is used for passing data that may be too large
// for single control frame.
- broker.getExchanges().registerExchange(
- boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, broker.getManagementAgent())));
+ broker.getExchanges().registerExchange(updateDataExchange);
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
@@ -931,11 +932,15 @@ void Cluster::checkUpdateIn(Lock& l) {
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
- if (mAgent) mAgent->suppress(false); // Enable management output.
+ if (mAgent) {
+ // Update management agent now, after all update activity is complete.
+ updateDataExchange->updateManagementAgent(mAgent);
+ mAgent->suppress(false); // Enable management output.
+ mAgent->clusterUpdate();
+ }
discarding = false; // OK to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
- if (mAgent) mAgent->clusterUpdate();
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}