summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp48
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp10
5 files changed, 95 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index d416789572..f21c861149 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -104,7 +104,7 @@ public:
urlVec = urlArrayToVector(addresses);
for(size_t i = 0; i < urlVec.size(); ++i)
urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
- QPID_LOG(notice, "Remote broker has provided these failover addresses= " << urls);
+ QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls);
link->setUrl(urls);
}
}
@@ -253,6 +253,7 @@ void Link::established(Connection* c)
void Link::setUrl(const Url& u) {
+ QPID_LOG(info, "Setting remote broker failover addresses for link '" << getName() << "' to these urls: " << u);
Mutex::ScopedLock mutex(lock);
url = u;
reconnectNext = 0;
@@ -687,6 +688,35 @@ bool Link::getRemoteAddress(qpid::Address& addr) const
}
+// FieldTable keys for internal state data
+namespace {
+ const std::string FAILOVER_ADDRESSES("failover-addresses");
+ const std::string FAILOVER_INDEX("failover-index");
+}
+
+void Link::getState(framing::FieldTable& state) const
+{
+ state.clear();
+ Mutex::ScopedLock mutex(lock);
+ if (!url.empty()) {
+ state.setString(FAILOVER_ADDRESSES, url.str());
+ state.setInt(FAILOVER_INDEX, reconnectNext);
+ }
+}
+
+void Link::setState(const framing::FieldTable& state)
+{
+ Mutex::ScopedLock mutex(lock);
+ if (state.isSet(FAILOVER_ADDRESSES)) {
+ Url failovers(state.getAsString(FAILOVER_ADDRESSES));
+ setUrl(failovers);
+ }
+ if (state.isSet(FAILOVER_INDEX)) {
+ reconnectNext = state.getAsInt(FAILOVER_INDEX);
+ }
+}
+
+
const std::string Link::exchangeTypeName("qpid.LinkExchange");
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 5025aba678..a97fa48664 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -51,7 +51,7 @@ class LinkExchange;
class Link : public PersistableConfig, public management::Manageable {
private:
- sys::Mutex lock;
+ mutable sys::Mutex lock;
LinkRegistry* links;
MessageStore* store;
@@ -174,6 +174,10 @@ class Link : public PersistableConfig, public management::Manageable {
// manage the exchange owned by this link
static const std::string exchangeTypeName;
static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);
+
+ // replicate internal state of this Link for clustering
+ void getState(framing::FieldTable& state) const;
+ void setState(const framing::FieldTable& state);
};
}
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 3d5a7be1c3..512e0f03cb 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -797,6 +797,54 @@ void Connection::config(const std::string& encoded) {
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
}
+namespace {
+ // find a Link that matches the given Address
+ class LinkFinder {
+ qpid::Address id;
+ boost::shared_ptr<broker::Link> link;
+ public:
+ LinkFinder(const qpid::Address& _id) : id(_id) {}
+ boost::shared_ptr<broker::Link> getLink() { return link; }
+ void operator() (boost::shared_ptr<broker::Link> l)
+ {
+ if (!link) {
+ qpid::Address addr(l->getTransport(), l->getHost(), l->getPort());
+ if (id == addr) {
+ link = l;
+ }
+ }
+ }
+ };
+}
+
+void Connection::internalState(const std::string& type,
+ const std::string& name,
+ const framing::FieldTable& state)
+{
+ if (type == "link") {
+ // name is the string representation of the Link's _configured_ destination address
+ Url dest;
+ try {
+ dest = name;
+ } catch(...) {
+ throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name));
+ }
+ assert(dest.size());
+ LinkFinder finder(dest[0]);
+ cluster.getBroker().getLinks().eachLink(boost::ref(finder));
+ if (finder.getLink()) {
+ try {
+ finder.getLink()->setState(state);
+ } catch(...) {
+ throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state));
+ }
+ QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state);
+ } else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name));
+ }
+ else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type));
+}
+
+
void Connection::doCatchupIoCallbacks() {
// We need to process IO callbacks during the catch-up phase in
// order to service asynchronous completions for messages
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 920c4937db..26514c76e2 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -200,6 +200,8 @@ class Connection :
const std::string& instance);
void config(const std::string& encoded);
+ void internalState(const std::string& type, const std::string& name,
+ const framing::FieldTable& state);
void setSecureConnection ( broker::SecureConnection * sc );
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 3a3582d032..20684fd8a7 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -688,7 +688,15 @@ void UpdateClient::updateLinks() {
void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
QPID_LOG(debug, *this << " updating link "
<< link->getHost() << ":" << link->getPort());
- ClusterConnectionProxy(session).config(encode(*link));
+ ClusterConnectionProxy(session).config(encode(*link)); // push the configuration
+ // now push the current state
+ framing::FieldTable state;
+ link->getState(state);
+ std::ostringstream os;
+ os << qpid::Address(link->getTransport(), link->getHost(), link->getPort());
+ ClusterConnectionProxy(session).internalState(std::string("link"),
+ os.str(),
+ state);
}
void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) {