summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-01-18 20:43:41 +0000
committerAlan Conway <aconway@apache.org>2011-01-18 20:43:41 +0000
commitbe6297381c3272178a43940ccf81986073e5ad9f (patch)
treed71771c2cf490b27bf068ef2644f98ced45426d6 /cpp/src/qpid
parent7db454bc1eae3744c676fe9e8ddd6e999cee13f1 (diff)
downloadqpid-python-be6297381c3272178a43940ccf81986073e5ad9f.tar.gz
QPID-2982 Bug 669452 - Creating a route and using management tools can crash cluster members.
Cluster update did not include federation link and bridge objects. Fixed update to include them. Management linkUp and linkDown events were generated only on the broker receiving the link. Suppressed these events in a cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1060568 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp7
-rw-r--r--cpp/src/qpid/broker/Link.cpp12
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp9
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h7
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp27
-rw-r--r--cpp/src/qpid/cluster/Connection.h4
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp30
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h6
9 files changed, 89 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 5911d916ad..7fbbf4e2c4 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -60,8 +60,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
- if (!args.i_durable)
- agent->addObject(mgmtObject);
+ agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
}
@@ -167,10 +166,6 @@ void Bridge::destroy()
void Bridge::setPersistenceId(uint64_t pId) const
{
- if (mgmtObject != 0 && persistenceId == 0) {
- ManagementAgent* agent = link->getBroker()->getManagementAgent();
- agent->addObject (mgmtObject, pId);
- }
persistenceId = pId;
}
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 5a50d26c8c..e1091df724 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/sys/ClusterSafe.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
@@ -130,9 +131,12 @@ void Link::established ()
{
stringstream addr;
addr << host << ":" << port;
-
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
+ // Don't raise the management event in a cluster, other members wont't get this call.
+ if (!sys::isCluster())
+ agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
{
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_OPERATIONAL);
@@ -150,11 +154,13 @@ void Link::closed (int, std::string text)
connection = 0;
+ // Don't raise the management event in a cluster, other members wont't get this call.
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ if (!sys::isCluster())
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index ea14552cc1..82f1f0ea24 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -379,3 +379,12 @@ void LinkRegistry::setPassive(bool p)
passive = p;
//will activate or passivate links on maintenance visit
}
+
+void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
+}
+
+void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
+}
+
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index a1931920d7..4c97e4f9d8 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -31,6 +31,7 @@
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <boost/function.hpp>
namespace qpid {
namespace broker {
@@ -148,6 +149,12 @@ namespace broker {
* bridges won't therefore pull or push any messages.
*/
void setPassive(bool);
+
+
+ /** Iterate over each link in the registry. Used for cluster updates. */
+ void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
+ /** Iterate over each bridge in the registry. Used for cluster updates. */
+ void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f);
};
}
}
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5720f7fcc1..dd4882774b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1045272;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 2797fdcf02..c7689577a7 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -32,6 +32,8 @@
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Bridge.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -346,13 +348,12 @@ size_t Connection::decode(const char* data, size_t size) {
// returns true if the header is complete or already read.
bool Connection::checkProtocolHeader(const char*& data, size_t size) {
if (expectProtocolHeader) {
- //If this is an outgoing link, we will receive a protocol
- //header which needs to be decoded first
+ // This is an outgoing link connection, we will receive a protocol
+ // header which needs to be decoded first
framing::ProtocolInitiation pi;
Buffer buf(const_cast<char*&>(data), size);
if (pi.decode(buf)) {
//TODO: check the version is correct
- QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
expectProtocolHeader = false;
data += pi.encodedSize();
} else {
@@ -650,5 +651,25 @@ void Connection::managementSetupState(
agent->setUuid(id);
agent->setName(vendor, product, instance);
}
+
+void Connection::config(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ string kind;
+ buf.getShortString (kind);
+ if (kind == "link") {
+ broker::Link::shared_ptr link =
+ broker::Link::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated link "
+ << link->getHost() << ":" << link->getPort());
+ }
+ else if (kind == "bridge") {
+ broker::Bridge::shared_ptr bridge =
+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
+ }
+ else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
+}
+
+
}} // Namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 69b8cb1450..d90cdd898b 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -183,7 +183,9 @@ class Connection :
const std::string& vendor,
const std::string& product,
const std::string& instance);
-
+
+ void config(const std::string& encoded);
+
void setSecureConnection ( broker::SecureConnection * sc );
private:
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 59db4de526..e5d20c85e6 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -34,6 +34,9 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Link.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -167,7 +170,7 @@ void UpdateClient::update() {
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
ClusterConnectionProxy(session).expiryId(expiry.getId());
-
+ updateLinks();
updateManagementAgent();
session.close();
@@ -199,6 +202,14 @@ template <class T> std::string encode(const T& t) {
t.encode(buf);
return encoded;
}
+
+template <class T> std::string encode(const T& t, bool encodeKind) {
+ std::string encoded;
+ encoded.resize(t.encodedSize());
+ framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ t.encode(buf, encodeKind);
+ return encoded;
+}
} // namespace
@@ -583,4 +594,21 @@ void UpdateClient::updateQueueListener(std::string& q,
ClusterConnectionProxy(session).addQueueListener(q, n);
}
+void UpdateClient::updateLinks() {
+ broker::LinkRegistry& links = updaterBroker.getLinks();
+ links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1));
+ links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1));
+}
+
+void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
+ QPID_LOG(debug, *this << " updating link "
+ << link->getHost() << ":" << link->getPort());
+ ClusterConnectionProxy(session).config(encode(*link));
+}
+
+void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) {
+ QPID_LOG(debug, *this << " updating bridge " << bridge->getName());
+ ClusterConnectionProxy(session).config(encode(*bridge));
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 76621cd7ba..156fa112df 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -49,6 +49,8 @@ class DeliveryRecord;
class SessionState;
class SemanticState;
class Decoder;
+class Link;
+class Bridge;
} // namespace broker
@@ -99,6 +101,10 @@ class UpdateClient : public sys::Runnable {
void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c);
void updateManagementSetupState();
void updateManagementAgent();
+ void updateLinks();
+ void updateLink(const boost::shared_ptr<broker::Link>&);
+ void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+
Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
MemberId updaterId;