summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-01-11 17:23:18 +0000
committerAlan Conway <aconway@apache.org>2010-01-11 17:23:18 +0000
commit892e84f39a40a3868ca5630371784e883127f21a (patch)
tree913127cae64803e5b00589bf5257e3729c5c66e7 /cpp/src/qpid/cluster
parent7a3841889a648eac5f57305c80f1f25a01a115ee (diff)
downloadqpid-python-892e84f39a40a3868ca5630371784e883127f21a.tar.gz
Fix broker crash "confirmed N but only sent M" with managed agents running.
The broker's ManagementAgent caches schemas from managed agents. This cache was not being replicated to new cluster members. If an agent such as sesame was running and connected to a newly-joined broker, that broker could send schema request messages which were not sent by other brokers that had the schema in cache. This resulted in the other brokers exiting with a "confirmed N but only sent M" message. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@897955 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp10
-rw-r--r--cpp/src/qpid/cluster/Connection.h1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp13
4 files changed, 24 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 320111c2e1..cc245d2f3f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 884125;
+const uint32_t Cluster::CLUSTER_VERSION = 896973;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 3a5d121dc1..ea01dd6949 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -40,6 +40,7 @@
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
#include <boost/current_function.hpp>
@@ -478,5 +479,14 @@ void Connection::addQueueListener(const std::string& q, uint32_t listener) {
findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
}
+void Connection::managementSchema(const std::string& data) {
+ management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
+ if (!agent)
+ throw Exception(QPID_MSG("Management schema update but no management agent."));
+ framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+ agent->importSchemas(buf);
+ QPID_LOG(debug, cluster << " updated management schemas");
+}
+
}} // Namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 51e6107bfd..4795d914ed 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -167,6 +167,7 @@ class Connection :
OutputInterceptor& getOutput() { return output; }
void addQueueListener(const std::string& queue, uint32_t listener);
+ void managementSchema(const std::string& data);
private:
struct NullFrameHandler : public framing::FrameHandler {
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index b20cc907a2..d4bd4da7f8 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -138,10 +138,21 @@ void UpdateClient::update() {
session.queueDelete(arg::queue=UPDATE);
session.close();
- // Update queue listeners: must come after sessions so consumerNumbering is populated.
+ // Update queue listeners: must come after sessions so consumerNumbering is populated
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
ClusterConnectionProxy(session).expiryId(expiry.getId());
+
+ // FIXME aconway 2010-01-08: we should enforce that all cluster members
+ // have mgmt enabled or none of them do.
+
+ management::ManagementAgent* agent = updaterBroker.getManagementAgent();
+ if (agent) {
+ string schemaData;
+ agent->exportSchemas(schemaData);
+ ClusterConnectionProxy(session).managementSchema(schemaData);
+ }
+
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);