diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 27 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 30 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 6 |
5 files changed, 63 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5720f7fcc1..dd4882774b 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1045272; +const uint32_t Cluster::CLUSTER_VERSION = 1058747; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 2797fdcf02..c7689577a7 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -32,6 +32,8 @@ #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Bridge.h" #include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" @@ -346,13 +348,12 @@ size_t Connection::decode(const char* data, size_t size) { // returns true if the header is complete or already read. bool Connection::checkProtocolHeader(const char*& data, size_t size) { if (expectProtocolHeader) { - //If this is an outgoing link, we will receive a protocol - //header which needs to be decoded first + // This is an outgoing link connection, we will receive a protocol + // header which needs to be decoded first framing::ProtocolInitiation pi; Buffer buf(const_cast<char*&>(data), size); if (pi.decode(buf)) { //TODO: check the version is correct - QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")"); expectProtocolHeader = false; data += pi.encodedSize(); } else { @@ -650,5 +651,25 @@ void Connection::managementSetupState( agent->setUuid(id); agent->setName(vendor, product, instance); } + +void Connection::config(const std::string& encoded) { + Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + string kind; + buf.getShortString (kind); + if (kind == "link") { + broker::Link::shared_ptr link = + broker::Link::decode(cluster.getBroker().getLinks(), buf); + QPID_LOG(debug, cluster << " updated link " + << link->getHost() << ":" << link->getPort()); + } + else if (kind == "bridge") { + broker::Bridge::shared_ptr bridge = + broker::Bridge::decode(cluster.getBroker().getLinks(), buf); + QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); + } + else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); +} + + }} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 69b8cb1450..d90cdd898b 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -183,7 +183,9 @@ class Connection : const std::string& vendor, const std::string& product, const std::string& instance); - + + void config(const std::string& encoded); + void setSecureConnection ( broker::SecureConnection * sc ); private: diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 59db4de526..e5d20c85e6 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -34,6 +34,9 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Link.h" #include "qpid/broker/Message.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" @@ -167,7 +170,7 @@ void UpdateClient::update() { b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); ClusterConnectionProxy(session).expiryId(expiry.getId()); - + updateLinks(); updateManagementAgent(); session.close(); @@ -199,6 +202,14 @@ template <class T> std::string encode(const T& t) { t.encode(buf); return encoded; } + +template <class T> std::string encode(const T& t, bool encodeKind) { + std::string encoded; + encoded.resize(t.encodedSize()); + framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + t.encode(buf, encodeKind); + return encoded; +} } // namespace @@ -583,4 +594,21 @@ void UpdateClient::updateQueueListener(std::string& q, ClusterConnectionProxy(session).addQueueListener(q, n); } +void UpdateClient::updateLinks() { + broker::LinkRegistry& links = updaterBroker.getLinks(); + links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1)); + links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1)); +} + +void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) { + QPID_LOG(debug, *this << " updating link " + << link->getHost() << ":" << link->getPort()); + ClusterConnectionProxy(session).config(encode(*link)); +} + +void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) { + QPID_LOG(debug, *this << " updating bridge " << bridge->getName()); + ClusterConnectionProxy(session).config(encode(*bridge)); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 76621cd7ba..156fa112df 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -49,6 +49,8 @@ class DeliveryRecord; class SessionState; class SemanticState; class Decoder; +class Link; +class Bridge; } // namespace broker @@ -99,6 +101,10 @@ class UpdateClient : public sys::Runnable { void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c); void updateManagementSetupState(); void updateManagementAgent(); + void updateLinks(); + void updateLink(const boost::shared_ptr<broker::Link>&); + void updateBridge(const boost::shared_ptr<broker::Bridge>&); + Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering; MemberId updaterId; |
