summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2013-01-22 21:35:09 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2013-01-22 21:35:09 +0000
commit66d0e713e1ec31e1ef1b686ea7e21191e52ba2d4 (patch)
tree4e3aba17a3ba9114ae5c897d8f37ffd8a643478a /qpid/cpp/src
parent18de0170eedfd2d1725f7d73de43c63481c5b5b0 (diff)
downloadqpid-python-66d0e713e1ec31e1ef1b686ea7e21191e52ba2d4.tar.gz
QPID-4546: delete links regardless of connection state.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1437187 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp120
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h2
2 files changed, 77 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index dfa7d4c3ab..70d0f68427 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -148,7 +148,6 @@ Link::Link(const string& _name,
persistenceId(0), broker(_broker), state(0),
visitCount(0),
currentInterval(1),
- closing(false),
reconnectNext(0), // Index of next address for reconnecting in url.
nextFreeChannel(1),
freeChannels(1, framing::CHANNEL_MAX),
@@ -213,6 +212,7 @@ void Link::setStateLH (int newState)
case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
case STATE_FAILED : mgmtObject->set_state("Failed"); break;
case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
+ case STATE_CLOSING : mgmtObject->set_state("Closing"); break;
}
}
@@ -242,19 +242,20 @@ void Link::established(Connection* c)
if (agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
- bool isClosing = false;
+ bool isClosing = true;
{
Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- connection = c;
- isClosing = closing;
+ if (state != STATE_CLOSING) {
+ isClosing = false;
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ connection = c;
+ c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ }
}
if (isClosing)
destroy();
- else // Process any IO tasks bridges added before established.
- c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -279,9 +280,10 @@ class DetachedCallback : public SessionHandler::ErrorListener {
};
}
-void Link::opened() {
+void Link::opened()
+{
Mutex::ScopedLock mutex(lock);
- if (!connection) return;
+ if (!connection || state != STATE_OPERATIONAL) return;
if (connection->GetManagementObject()) {
mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
@@ -338,34 +340,43 @@ void Link::opened() {
}
}
+
+// called when connection attempt fails (see startConnectionLH)
void Link::closed(int, std::string text)
{
- Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
- connection = 0;
+ bool isClosing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+
+ connection = 0;
- mgmtObject->set_connectionRef(qpid::management::ObjectId());
- if (state == STATE_OPERATIONAL && agent) {
- stringstream addr;
- addr << host << ":" << port;
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
+ if (state == STATE_OPERATIONAL && agent) {
+ stringstream addr;
+ addr << host << ":" << port;
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
- }
+ }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->closed();
- created.push_back(*i);
- }
- active.clear();
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
+ created.push_back(*i);
+ }
+ active.clear();
- if (state != STATE_FAILED)
- {
- setStateLH(STATE_WAITING);
- mgmtObject->set_lastError (text);
+ if (state == STATE_CLOSING) {
+ isClosing = true;
+ } else if (state != STATE_FAILED) {
+ setStateLH(STATE_WAITING);
+ mgmtObject->set_lastError (text);
+ }
}
+ if (isClosing) destroy();
}
-// Called in connection IO thread, cleans up the connection before destroying Link
+// Cleans up the connection before destroying Link. Must be called in connection thread
+// if the connection is active. Caller Note well: may call "delete this"!
void Link::destroy ()
{
Bridges toDelete;
@@ -395,7 +406,9 @@ void Link::destroy ()
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
(*i)->close();
toDelete.clear();
- listener(this); // notify LinkRegistry that this Link has been destroyed
+ // notify LinkRegistry that this Link has been destroyed. Will result in "delete
+ // this" if LinkRegistry is holding the last shared pointer to *this
+ listener(this);
}
void Link::add(Bridge::shared_ptr bridge)
@@ -437,7 +450,7 @@ void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
- if (state != STATE_OPERATIONAL || closing)
+ if (state != STATE_OPERATIONAL)
return;
// check for bridge session errors and recover
@@ -474,7 +487,6 @@ void Link::ioThreadProcessing()
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
- if (closing) return;
if (state == STATE_WAITING)
{
visitCount++;
@@ -490,10 +502,11 @@ void Link::maintenanceVisit ()
}
}
}
- else if (state == STATE_OPERATIONAL &&
- (!active.empty() || !created.empty() || !cancellations.empty()) &&
- connection && connection->isOpen())
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ else if (state == STATE_OPERATIONAL) {
+ if ((!active.empty() || !created.empty() || !cancellations.empty()) &&
+ connection && connection->isOpen())
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ }
}
void Link::reconnectLH(const Address& a)
@@ -564,9 +577,17 @@ void Link::returnChannel(framing::ChannelId c)
void Link::notifyConnectionForced(const string text)
{
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_FAILED);
- mgmtObject->set_lastError(text);
+ bool isClosing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+ if (state == STATE_CLOSING) {
+ isClosing = true;
+ } else {
+ setStateLH(STATE_FAILED);
+ mgmtObject->set_lastError(text);
+ }
+ }
+ if (isClosing) destroy();
}
void Link::setPersistenceId(uint64_t id) const
@@ -656,14 +677,25 @@ ManagementObject::shared_ptr Link::GetManagementObject(void) const
void Link::close() {
QPID_LOG(debug, "Link::close(), link=" << name );
- Mutex::ScopedLock mutex(lock);
- if (!closing) {
- closing = true;
- if (state != STATE_CONNECTING && connection) {
- //connection can only be closed on the connections own IO processing thread
- connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ bool destroy_now = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+ if (state != STATE_CLOSING) {
+ int old_state = state;
+ setStateLH(STATE_CLOSING);
+ if (connection) {
+ //connection can only be closed on the connections own IO processing thread
+ connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ } else if (old_state == STATE_CONNECTING) {
+ // cannot destroy Link now since a connection request is outstanding.
+ // destroy the link after we get a response (see Link::established,
+ // Link::closed, Link::notifyConnectionForced, etc).
+ } else {
+ destroy_now = true;
+ }
}
}
+ if (destroy_now) destroy();
}
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index f2f672b507..01ddc68d97 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -74,7 +74,6 @@ class Link : public PersistableConfig, public management::Manageable {
int state;
uint32_t visitCount;
uint32_t currentInterval;
- bool closing;
Url url; // URL can contain many addresses.
size_t reconnectNext; // Index for next re-connect attempt
@@ -98,6 +97,7 @@ class Link : public PersistableConfig, public management::Manageable {
static const int STATE_OPERATIONAL = 3;
static const int STATE_FAILED = 4;
static const int STATE_CLOSED = 5;
+ static const int STATE_CLOSING = 6; // Waiting for outstanding connect to complete first
static const uint32_t MAX_INTERVAL = 32;