summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp65
1 files changed, 41 insertions, 24 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index e1091df724..8010bf43e7 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,7 +30,6 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
-#include "qpid/sys/ClusterSafe.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
@@ -57,8 +56,8 @@ Link::Link(LinkRegistry* _links,
string& _password,
Broker* _broker,
Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
+ : links(_links), store(_store), host(_host), port(_port),
+ transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -97,7 +96,8 @@ void Link::setStateLH (int newState)
return;
state = newState;
- if (mgmtObject == 0)
+
+ if (hideManagement())
return;
switch (state)
@@ -117,12 +117,12 @@ void Link::startConnectionLH ()
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
setStateLH(STATE_CONNECTING);
- broker->connect (host, port, transport,
+ broker->connect (host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
- if (mgmtObject != 0)
+ if (!hideManagement())
mgmtObject->set_lastError (e.what());
}
}
@@ -133,8 +133,7 @@ void Link::established ()
addr << host << ":" << port;
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- // Don't raise the management event in a cluster, other members wont't get this call.
- if (!sys::isCluster())
+ if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
{
@@ -154,12 +153,11 @@ void Link::closed (int, std::string text)
connection = 0;
- // Don't raise the management event in a cluster, other members wont't get this call.
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- if (!sys::isCluster())
+ if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -172,7 +170,7 @@ void Link::closed (int, std::string text)
if (state != STATE_FAILED)
{
setStateLH(STATE_WAITING);
- if (mgmtObject != 0)
+ if (!hideManagement())
mgmtObject->set_lastError (text);
}
@@ -221,7 +219,7 @@ void Link::cancel(Bridge::shared_ptr bridge)
{
{
Mutex::ScopedLock mutex(lock);
-
+
for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
if ((*i).get() == bridge.get()) {
created.erase(i);
@@ -250,6 +248,19 @@ void Link::ioThreadProcessing()
return;
QPID_LOG(debug, "Link::ioThreadProcessing()");
+ // check for bridge session errors and recover
+ if (!active.empty()) {
+ Bridges::iterator removed = std::remove_if(
+ active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+ for (Bridges::iterator i = removed; i != active.end(); ++i) {
+ Bridge::shared_ptr bridge = *i;
+ bridge->closed();
+ bridge->cancel(*connection);
+ created.push_back(bridge);
+ }
+ active.erase(removed, active.end());
+ }
+
//process any pending creates and/or cancellations
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -277,9 +288,9 @@ void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
- if (connection && updateUrls) {
+ if (connection && updateUrls) {
urls.reset(connection->getKnownHosts());
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
updateUrls = false;
}
@@ -298,7 +309,7 @@ void Link::maintenanceVisit ()
}
}
}
- else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
+ else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -309,7 +320,7 @@ void Link::reconnect(const qpid::Address& a)
port = a.port;
transport = a.protocol;
startConnectionLH();
- if (mgmtObject != 0) {
+ if (!hideManagement()) {
stringstream errorString;
errorString << "Failed over to " << a;
mgmtObject->set_lastError(errorString.str());
@@ -319,7 +330,7 @@ void Link::reconnect(const qpid::Address& a)
bool Link::tryFailover()
{
Address next;
- if (urls.next(next) &&
+ if (urls.next(next) &&
(next.host != host || next.port != port || next.protocol != transport)) {
links->changeAddress(Address(transport, host, port), next);
QPID_LOG(debug, "Link failing over to " << host << ":" << port);
@@ -329,6 +340,12 @@ bool Link::tryFailover()
}
}
+// Management updates for a linke are inconsistent in a cluster, so they are
+// suppressed.
+bool Link::hideManagement() const {
+ return !mgmtObject || ( broker && broker->isInCluster());
+}
+
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
@@ -341,7 +358,7 @@ void Link::notifyConnectionForced(const string text)
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_FAILED);
- if (mgmtObject != 0)
+ if (!hideManagement())
mgmtObject->set_lastError(text);
}
@@ -363,7 +380,7 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
string authMechanism;
string username;
string password;
-
+
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
@@ -375,7 +392,7 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
return links.declare(host, port, transport, durable, authMechanism, username, password).first;
}
-void Link::encode(Buffer& buffer) const
+void Link::encode(Buffer& buffer) const
{
buffer.putShortString(string("link"));
buffer.putShortString(host);
@@ -387,8 +404,8 @@ void Link::encode(Buffer& buffer) const
buffer.putShortString(password);
}
-uint32_t Link::encodedSize() const
-{
+uint32_t Link::encodedSize() const
+{
return host.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port