diff options
| author | Alan Conway <aconway@apache.org> | 2010-12-01 21:33:12 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-12-01 21:33:12 +0000 |
| commit | 1e20951003db44b71298649f674664a9e1ba26c5 (patch) | |
| tree | 48e9786d8ea42393504c2ec9478c4c3c516c8c54 /cpp/src/qpid/cluster/UpdateClient.cpp | |
| parent | e3af2ca02840a6697461532d0059105c5fd08e84 (diff) | |
| download | qpid-python-1e20951003db44b71298649f674664a9e1ba26c5.tar.gz | |
Modified cluster_tests causes broker shut down with invalid-argument error.
Described in https://bugzilla.redhat.com/show_bug.cgi?id=655078. The
management agent's deleted-object list was not being replicated to new
members joining the cluster, so management generated fewer deleted
object notifications on the newer member, causing it to fail with an
invalid-argument error. The list is now being replicated correctly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1041181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 40 |
1 files changed, 34 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index bc1b812a94..7d73f3c1db 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -18,12 +18,14 @@ * under the License. * */ +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/cluster/UpdateClient.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ClusterMap.h" #include "qpid/cluster/Connection.h" #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" +#include "qpid/cluster/UpdateDataExchange.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/client/SessionImpl.h" @@ -52,6 +54,7 @@ #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/TypeCode.h" #include "qpid/log/Statement.h" +#include "qpid/types/Variant.h" #include "qpid/Url.h" #include "qmf/org/apache/qpid/broker/ManagementSetupState.h" #include <boost/bind.hpp> @@ -62,12 +65,14 @@ namespace qpid { namespace cluster { +using amqp_0_10::ListCodec; using broker::Broker; using broker::Exchange; using broker::Queue; using broker::QueueBinding; using broker::Message; using broker::SemanticState; +using types::Variant; using namespace framing; namespace arg=client::arg; @@ -153,7 +158,6 @@ void UpdateClient::update() { std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); session.queueDelete(arg::queue=UPDATE); - session.close(); // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); @@ -162,14 +166,16 @@ void UpdateClient::update() { updateManagementAgent(); + session.close(); + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false); client::ConnectionAccess::getImpl(connection)->handle(frame); - // FIXME aconway 2010-06-16: Connection will be closed from the other end. - // connection.close(); + // NOTE: connection will be closed from the other end, don't close + // it here as that causes a race. // FIXME aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. @@ -221,12 +227,34 @@ void UpdateClient::updateManagementAgent() { management::ManagementAgent* agent = updaterBroker.getManagementAgent(); if (!agent) return; - // Send management schemas and agents. string data; + + QPID_LOG(debug, updaterId << " updating management schemas. ") agent->exportSchemas(data); - ClusterConnectionProxy(session).managementSchema(data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); + + QPID_LOG(debug, updaterId << " updating management agents. ") agent->exportAgents(data); - ClusterConnectionProxy(session).managementAgents(data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); + + QPID_LOG(debug, updaterId << " updating management deleted objects. ") + typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; + DeletedObjectList deleted; + agent->exportDeletedObjects(deleted); + Variant::List list; + for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); ++i) { + string encoded; + (*i)->encode(encoded); + list.push_back(encoded); + } + ListCodec::encode(list, data); + session.messageTransfer( + arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY), + arg::destination=UpdateDataExchange::EXCHANGE_NAME); } void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { |
