diff options
| author | Alan Conway <aconway@apache.org> | 2010-02-05 18:17:57 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-02-05 18:17:57 +0000 |
| commit | 4908b9593c12e93ace60f7c15382c07750f1ca20 (patch) | |
| tree | d32ad984892603c73bef8f8146f8595323d41a72 /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | 5e6c819aa19922a66288986b79e21dfd9a03c837 (diff) | |
| download | qpid-python-4908b9593c12e93ace60f7c15382c07750f1ca20.tar.gz | |
Synchronize management agent lists during cluster update.
- replicate management agent lists during cluster update.
- suppress management agent output during update.
- on join all members force full output at next periodic processing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@907030 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d10e1fd458..eb6428d394 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -590,6 +590,7 @@ void Cluster::initMapCompleted(Lock& l) { if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); + if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. state = JOINER; mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); QPID_LOG(notice, *this << " joining cluster " << name); @@ -672,7 +673,7 @@ void Cluster::debugSnapshot(const char* prefix, Connection* connection) { assertClusterSafe(); std::ostringstream msg; msg << prefix; - if (connection) msg << " " << *connection; + if (connection) msg << " " << connection->getId(); msg << " snapshot " << map.getFrameSeq() << ":"; AppendQueue append(msg); broker.getQueues().eachQueue(append); @@ -761,7 +762,10 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) << " to " << updatee); deliverEventQueue.start(); // Not involved in update. } - if (updatee != self && url) debugSnapshot("join"); + if (updatee != self && url) { + debugSnapshot("join"); + if (mAgent) mAgent->clusterUpdate(); + } } static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { @@ -830,9 +834,11 @@ void Cluster::checkUpdateIn(Lock& l) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; broker.setClusterUpdatee(false); + if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); debugSnapshot("initial"); + if (mAgent) mAgent->clusterUpdate(); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update @@ -992,10 +998,12 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { + QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name) timer->deliverWakeup(name); } void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { + QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) timer->deliverDrop(name); } |
