diff options
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 65 |
1 files changed, 41 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index e1091df724..8010bf43e7 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,7 +30,6 @@ #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" -#include "qpid/sys/ClusterSafe.h" using namespace qpid::broker; using qpid::framing::Buffer; @@ -57,8 +56,8 @@ Link::Link(LinkRegistry* _links, string& _password, Broker* _broker, Manageable* parent) - : links(_links), store(_store), host(_host), port(_port), - transport(_transport), + : links(_links), store(_store), host(_host), port(_port), + transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), mgmtObject(0), broker(_broker), state(0), @@ -97,7 +96,8 @@ void Link::setStateLH (int newState) return; state = newState; - if (mgmtObject == 0) + + if (hideManagement()) return; switch (state) @@ -117,12 +117,12 @@ void Link::startConnectionLH () // Set the state before calling connect. It is possible that connect // will fail synchronously and call Link::closed before returning. setStateLH(STATE_CONNECTING); - broker->connect (host, port, transport, + broker->connect (host, boost::lexical_cast<std::string>(port), transport, boost::bind (&Link::closed, this, _1, _2)); QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); } catch(std::exception& e) { setStateLH(STATE_WAITING); - if (mgmtObject != 0) + if (!hideManagement()) mgmtObject->set_lastError (e.what()); } } @@ -133,8 +133,7 @@ void Link::established () addr << host << ":" << port; QPID_LOG (info, "Inter-broker link established to " << addr.str()); - // Don't raise the management event in a cluster, other members wont't get this call. - if (!sys::isCluster()) + if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); { @@ -154,12 +153,11 @@ void Link::closed (int, std::string text) connection = 0; - // Don't raise the management event in a cluster, other members wont't get this call. if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); - if (!sys::isCluster()) + if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } @@ -172,7 +170,7 @@ void Link::closed (int, std::string text) if (state != STATE_FAILED) { setStateLH(STATE_WAITING); - if (mgmtObject != 0) + if (!hideManagement()) mgmtObject->set_lastError (text); } @@ -221,7 +219,7 @@ void Link::cancel(Bridge::shared_ptr bridge) { { Mutex::ScopedLock mutex(lock); - + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { if ((*i).get() == bridge.get()) { created.erase(i); @@ -250,6 +248,19 @@ void Link::ioThreadProcessing() return; QPID_LOG(debug, "Link::ioThreadProcessing()"); + // check for bridge session errors and recover + if (!active.empty()) { + Bridges::iterator removed = std::remove_if( + active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1)); + for (Bridges::iterator i = removed; i != active.end(); ++i) { + Bridge::shared_ptr bridge = *i; + bridge->closed(); + bridge->cancel(*connection); + created.push_back(bridge); + } + active.erase(removed, active.end()); + } + //process any pending creates and/or cancellations if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { @@ -277,9 +288,9 @@ void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); - if (connection && updateUrls) { + if (connection && updateUrls) { urls.reset(connection->getKnownHosts()); - QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); updateUrls = false; } @@ -298,7 +309,7 @@ void Link::maintenanceVisit () } } } - else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0) + else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -309,7 +320,7 @@ void Link::reconnect(const qpid::Address& a) port = a.port; transport = a.protocol; startConnectionLH(); - if (mgmtObject != 0) { + if (!hideManagement()) { stringstream errorString; errorString << "Failed over to " << a; mgmtObject->set_lastError(errorString.str()); @@ -319,7 +330,7 @@ void Link::reconnect(const qpid::Address& a) bool Link::tryFailover() { Address next; - if (urls.next(next) && + if (urls.next(next) && (next.host != host || next.port != port || next.protocol != transport)) { links->changeAddress(Address(transport, host, port), next); QPID_LOG(debug, "Link failing over to " << host << ":" << port); @@ -329,6 +340,12 @@ bool Link::tryFailover() } } +// Management updates for a linke are inconsistent in a cluster, so they are +// suppressed. +bool Link::hideManagement() const { + return !mgmtObject || ( broker && broker->isInCluster()); +} + uint Link::nextChannel() { Mutex::ScopedLock mutex(lock); @@ -341,7 +358,7 @@ void Link::notifyConnectionForced(const string text) Mutex::ScopedLock mutex(lock); setStateLH(STATE_FAILED); - if (mgmtObject != 0) + if (!hideManagement()) mgmtObject->set_lastError(text); } @@ -363,7 +380,7 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) string authMechanism; string username; string password; - + buffer.getShortString(host); port = buffer.getShort(); buffer.getShortString(transport); @@ -375,7 +392,7 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer) return links.declare(host, port, transport, durable, authMechanism, username, password).first; } -void Link::encode(Buffer& buffer) const +void Link::encode(Buffer& buffer) const { buffer.putShortString(string("link")); buffer.putShortString(host); @@ -387,8 +404,8 @@ void Link::encode(Buffer& buffer) const buffer.putShortString(password); } -uint32_t Link::encodedSize() const -{ +uint32_t Link::encodedSize() const +{ return host.size() + 1 // short-string (host) + 5 // short-string ("link") + 2 // port |