summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp7
-rw-r--r--cpp/src/qpid/broker/Link.cpp12
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp9
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h7
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp27
-rw-r--r--cpp/src/qpid/cluster/Connection.h4
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp30
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h6
9 files changed, 89 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 5911d916ad..7fbbf4e2c4 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -60,8 +60,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
- if (!args.i_durable)
- agent->addObject(mgmtObject);
+ agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
}
@@ -167,10 +166,6 @@ void Bridge::destroy()
void Bridge::setPersistenceId(uint64_t pId) const
{
- if (mgmtObject != 0 && persistenceId == 0) {
- ManagementAgent* agent = link->getBroker()->getManagementAgent();
- agent->addObject (mgmtObject, pId);
- }
persistenceId = pId;
}
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 5a50d26c8c..e1091df724 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/sys/ClusterSafe.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
@@ -130,9 +131,12 @@ void Link::established ()
{
stringstream addr;
addr << host << ":" << port;
-
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
+ // Don't raise the management event in a cluster, other members wont't get this call.
+ if (!sys::isCluster())
+ agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
{
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_OPERATIONAL);
@@ -150,11 +154,13 @@ void Link::closed (int, std::string text)
connection = 0;
+ // Don't raise the management event in a cluster, other members wont't get this call.
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+ if (!sys::isCluster())
+ agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index ea14552cc1..82f1f0ea24 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -379,3 +379,12 @@ void LinkRegistry::setPassive(bool p)
passive = p;
//will activate or passivate links on maintenance visit
}
+
+void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
+}
+
+void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
+}
+
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index a1931920d7..4c97e4f9d8 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -31,6 +31,7 @@
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <boost/function.hpp>
namespace qpid {
namespace broker {
@@ -148,6 +149,12 @@ namespace broker {
* bridges won't therefore pull or push any messages.
*/
void setPassive(bool);
+
+
+ /** Iterate over each link in the registry. Used for cluster updates. */
+ void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
+ /** Iterate over each bridge in the registry. Used for cluster updates. */
+ void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f);
};
}
}
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5720f7fcc1..dd4882774b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1045272;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 2797fdcf02..c7689577a7 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -32,6 +32,8 @@
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Bridge.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
@@ -346,13 +348,12 @@ size_t Connection::decode(const char* data, size_t size) {
// returns true if the header is complete or already read.
bool Connection::checkProtocolHeader(const char*& data, size_t size) {
if (expectProtocolHeader) {
- //If this is an outgoing link, we will receive a protocol
- //header which needs to be decoded first
+ // This is an outgoing link connection, we will receive a protocol
+ // header which needs to be decoded first
framing::ProtocolInitiation pi;
Buffer buf(const_cast<char*&>(data), size);
if (pi.decode(buf)) {
//TODO: check the version is correct
- QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
expectProtocolHeader = false;
data += pi.encodedSize();
} else {
@@ -650,5 +651,25 @@ void Connection::managementSetupState(
agent->setUuid(id);
agent->setName(vendor, product, instance);
}
+
+void Connection::config(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ string kind;
+ buf.getShortString (kind);
+ if (kind == "link") {
+ broker::Link::shared_ptr link =
+ broker::Link::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated link "
+ << link->getHost() << ":" << link->getPort());
+ }
+ else if (kind == "bridge") {
+ broker::Bridge::shared_ptr bridge =
+ broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+ QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
+ }
+ else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
+}
+
+
}} // Namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 69b8cb1450..d90cdd898b 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -183,7 +183,9 @@ class Connection :
const std::string& vendor,
const std::string& product,
const std::string& instance);
-
+
+ void config(const std::string& encoded);
+
void setSecureConnection ( broker::SecureConnection * sc );
private:
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 59db4de526..e5d20c85e6 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -34,6 +34,9 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Link.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -167,7 +170,7 @@ void UpdateClient::update() {
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
ClusterConnectionProxy(session).expiryId(expiry.getId());
-
+ updateLinks();
updateManagementAgent();
session.close();
@@ -199,6 +202,14 @@ template <class T> std::string encode(const T& t) {
t.encode(buf);
return encoded;
}
+
+template <class T> std::string encode(const T& t, bool encodeKind) {
+ std::string encoded;
+ encoded.resize(t.encodedSize());
+ framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ t.encode(buf, encodeKind);
+ return encoded;
+}
} // namespace
@@ -583,4 +594,21 @@ void UpdateClient::updateQueueListener(std::string& q,
ClusterConnectionProxy(session).addQueueListener(q, n);
}
+void UpdateClient::updateLinks() {
+ broker::LinkRegistry& links = updaterBroker.getLinks();
+ links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1));
+ links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1));
+}
+
+void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
+ QPID_LOG(debug, *this << " updating link "
+ << link->getHost() << ":" << link->getPort());
+ ClusterConnectionProxy(session).config(encode(*link));
+}
+
+void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) {
+ QPID_LOG(debug, *this << " updating bridge " << bridge->getName());
+ ClusterConnectionProxy(session).config(encode(*bridge));
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 76621cd7ba..156fa112df 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -49,6 +49,8 @@ class DeliveryRecord;
class SessionState;
class SemanticState;
class Decoder;
+class Link;
+class Bridge;
} // namespace broker
@@ -99,6 +101,10 @@ class UpdateClient : public sys::Runnable {
void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c);
void updateManagementSetupState();
void updateManagementAgent();
+ void updateLinks();
+ void updateLink(const boost::shared_ptr<broker::Link>&);
+ void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+
Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
MemberId updaterId;