diff options
| author | Alan Conway <aconway@apache.org> | 2010-02-05 18:17:57 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-02-05 18:17:57 +0000 |
| commit | 4908b9593c12e93ace60f7c15382c07750f1ca20 (patch) | |
| tree | d32ad984892603c73bef8f8146f8595323d41a72 /cpp/src/qpid/cluster | |
| parent | 5e6c819aa19922a66288986b79e21dfd9a03c837 (diff) | |
| download | qpid-python-4908b9593c12e93ace60f7c15382c07750f1ca20.tar.gz | |
Synchronize management agent lists during cluster update.
- replicate management agent lists during cluster update.
- suppress management agent output during update.
- on join all members force full output at next periodic processing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@907030 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 69 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 4 |
5 files changed, 59 insertions, 37 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d10e1fd458..eb6428d394 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -590,6 +590,7 @@ void Cluster::initMapCompleted(Lock& l) { if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); + if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. state = JOINER; mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); QPID_LOG(notice, *this << " joining cluster " << name); @@ -672,7 +673,7 @@ void Cluster::debugSnapshot(const char* prefix, Connection* connection) { assertClusterSafe(); std::ostringstream msg; msg << prefix; - if (connection) msg << " " << *connection; + if (connection) msg << " " << connection->getId(); msg << " snapshot " << map.getFrameSeq() << ":"; AppendQueue append(msg); broker.getQueues().eachQueue(append); @@ -761,7 +762,10 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) << " to " << updatee); deliverEventQueue.start(); // Not involved in update. } - if (updatee != self && url) debugSnapshot("join"); + if (updatee != self && url) { + debugSnapshot("join"); + if (mAgent) mAgent->clusterUpdate(); + } } static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { @@ -830,9 +834,11 @@ void Cluster::checkUpdateIn(Lock& l) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; broker.setClusterUpdatee(false); + if (mAgent) mAgent->suppress(false); // Enable management output. discarding = false; // ok to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); debugSnapshot("initial"); + if (mAgent) mAgent->clusterUpdate(); deliverEventQueue.start(); } else if (updateRetracted) { // Update was retracted, request another update @@ -992,10 +998,12 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { + QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name) timer->deliverWakeup(name); } void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { + QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) timer->deliverDrop(name); } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 5faa184e30..3ce2b3f376 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -197,7 +197,6 @@ struct GiveReadCreditOnExit { void Connection::deliverDoOutput(uint32_t limit) { output.deliverDoOutput(limit); - cluster.debugSnapshot("deliver-do-output", this); } // Called in delivery thread, in cluster order. @@ -532,5 +531,14 @@ void Connection::managementSetupState(uint64_t objectNum, uint16_t bootSequence) agent->setBootSequence(bootSequence); } +void Connection::managementAgents(const std::string& data) { + management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); + if (!agent) + throw Exception(QPID_MSG("Management agents update but no management agent.")); + framing::Buffer buf(const_cast<char*>(data.data()), data.size()); + agent->importAgents(buf); + QPID_LOG(debug, cluster << " updated management agents"); +} + }} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 9a4e52a9d6..a2f96782f7 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -164,6 +164,7 @@ class Connection : void addQueueListener(const std::string& queue, uint32_t listener); void managementSchema(const std::string& data); + void managementAgents(const std::string& data); void managementSetupState(uint64_t objectNum, uint16_t bootSequence); uint32_t getSsf() const { return connectionCtor.ssf; } diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 6c8bb7e890..36efdfba65 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -53,6 +53,7 @@ #include "qpid/framing/TypeCode.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" +#include "qmf/org/apache/qpid/broker/ManagementSetupState.h" #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> @@ -128,15 +129,7 @@ void UpdateClient::update() { << " at " << updateeUrl); Broker& b = updaterBroker; - // - // Bash the state of the slave into conformance with ours. The - // goal here is to get his state arranged so as to mimic our - // state, w/r/t object ID creation. Currently, that means that we - // propagate our boot seq and object UID counter to him so that - // subsequently created objects on his side will track what's on - // our side. - // - updateManagementSetupState(b); + updateManagementSetupState(); b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); @@ -154,16 +147,8 @@ void UpdateClient::update() { ClusterConnectionProxy(session).expiryId(expiry.getId()); - // FIXME aconway 2010-01-08: we should enforce that all cluster members - // have mgmt enabled or none of them do. - - management::ManagementAgent* agent = updaterBroker.getManagementAgent(); - if (agent) { - string schemaData; - agent->exportSchemas(schemaData); - ClusterConnectionProxy(session).managementSchema(schemaData); - } - + updateManagementAgent(); + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); @@ -184,21 +169,41 @@ template <class T> std::string encode(const T& t) { } } // namespace -// -// Propagate the management setup state block, currently consisting of -// object number counter and boot sequence counter, to the slave. -// -void UpdateClient::updateManagementSetupState(Broker & b) + +// Propagate the management state +void UpdateClient::updateManagementSetupState() { management::ManagementAgent* agent = updaterBroker.getManagementAgent(); - if (agent) { - qmf::org::apache::qpid::broker::ManagementSetupState mss(b.getManagementAgent(), 0); - mss.set_objectNum(b.getManagementAgent()->getNextObjectId()); - mss.set_bootSequence(b.getManagementAgent()->getBootSequence()); - QPID_LOG(debug, updaterId << " updating management-setup-state " << mss.get_objectNum() - << " " << mss.get_bootSequence() << "\n"); - ClusterConnectionProxy(session).managementSetupState(mss.get_objectNum(), mss.get_bootSequence()); - } + if (!agent) return; + + // + // Bash the state of the slave into conformance with ours. The + // goal here is to get his state arranged so as to mimic our + // state, w/r/t object ID creation. Currently, that means that we + // propagate our boot seq and object UID counter to him so that + // subsequently created objects on his side will track what's on + // our side. + // + qmf::org::apache::qpid::broker::ManagementSetupState mss(agent, 0); + mss.set_objectNum(agent->getNextObjectId()); + mss.set_bootSequence(agent->getBootSequence()); + QPID_LOG(debug, updaterId << " updating management-setup-state " + << mss.get_objectNum() + << " " << mss.get_bootSequence() << "\n"); + ClusterConnectionProxy(session).managementSetupState( + mss.get_objectNum(), mss.get_bootSequence()); +} + +void UpdateClient::updateManagementAgent() +{ + management::ManagementAgent* agent = updaterBroker.getManagementAgent(); + if (!agent) return; + // Send management schemas and agents. + string data; + agent->exportSchemas(data); + ClusterConnectionProxy(session).managementSchema(data); + agent->exportAgents(data); + ClusterConnectionProxy(session).managementAgents(data); } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 7407b7450b..be09af7e81 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -29,7 +29,6 @@ #include "qpid/client/AsyncSession.h" #include "qpid/broker/SemanticState.h" #include "qpid/sys/Runnable.h" -#include "qmf/org/apache/qpid/broker/ManagementSetupState.h" #include <boost/shared_ptr.hpp> @@ -98,7 +97,8 @@ class UpdateClient : public sys::Runnable { void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); void updateQueueListeners(const boost::shared_ptr<broker::Queue>&); void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c); - void updateManagementSetupState(broker::Broker & b); + void updateManagementSetupState(); + void updateManagementAgent(); Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering; MemberId updaterId; |
