diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2013-01-22 21:35:09 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2013-01-22 21:35:09 +0000 |
| commit | 66d0e713e1ec31e1ef1b686ea7e21191e52ba2d4 (patch) | |
| tree | 4e3aba17a3ba9114ae5c897d8f37ffd8a643478a /qpid/cpp/src | |
| parent | 18de0170eedfd2d1725f7d73de43c63481c5b5b0 (diff) | |
| download | qpid-python-66d0e713e1ec31e1ef1b686ea7e21191e52ba2d4.tar.gz | |
QPID-4546: delete links regardless of connection state.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1437187 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 120 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 2 |
2 files changed, 77 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index dfa7d4c3ab..70d0f68427 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -148,7 +148,6 @@ Link::Link(const string& _name, persistenceId(0), broker(_broker), state(0), visitCount(0), currentInterval(1), - closing(false), reconnectNext(0), // Index of next address for reconnecting in url. nextFreeChannel(1), freeChannels(1, framing::CHANNEL_MAX), @@ -213,6 +212,7 @@ void Link::setStateLH (int newState) case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; case STATE_FAILED : mgmtObject->set_state("Failed"); break; case STATE_CLOSED : mgmtObject->set_state("Closed"); break; + case STATE_CLOSING : mgmtObject->set_state("Closing"); break; } } @@ -242,19 +242,20 @@ void Link::established(Connection* c) if (agent) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); - bool isClosing = false; + bool isClosing = true; { Mutex::ScopedLock mutex(lock); - setStateLH(STATE_OPERATIONAL); - currentInterval = 1; - visitCount = 0; - connection = c; - isClosing = closing; + if (state != STATE_CLOSING) { + isClosing = false; + setStateLH(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + connection = c; + c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } } if (isClosing) destroy(); - else // Process any IO tasks bridges added before established. - c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -279,9 +280,10 @@ class DetachedCallback : public SessionHandler::ErrorListener { }; } -void Link::opened() { +void Link::opened() +{ Mutex::ScopedLock mutex(lock); - if (!connection) return; + if (!connection || state != STATE_OPERATIONAL) return; if (connection->GetManagementObject()) { mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); @@ -338,34 +340,43 @@ void Link::opened() { } } + +// called when connection attempt fails (see startConnectionLH) void Link::closed(int, std::string text) { - Mutex::ScopedLock mutex(lock); QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); - connection = 0; + bool isClosing = false; + { + Mutex::ScopedLock mutex(lock); + + connection = 0; - mgmtObject->set_connectionRef(qpid::management::ObjectId()); - if (state == STATE_OPERATIONAL && agent) { - stringstream addr; - addr << host << ":" << port; + mgmtObject->set_connectionRef(qpid::management::ObjectId()); + if (state == STATE_OPERATIONAL && agent) { + stringstream addr; + addr << host << ":" << port; agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); - } + } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - (*i)->closed(); - created.push_back(*i); - } - active.clear(); + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); + created.push_back(*i); + } + active.clear(); - if (state != STATE_FAILED) - { - setStateLH(STATE_WAITING); - mgmtObject->set_lastError (text); + if (state == STATE_CLOSING) { + isClosing = true; + } else if (state != STATE_FAILED) { + setStateLH(STATE_WAITING); + mgmtObject->set_lastError (text); + } } + if (isClosing) destroy(); } -// Called in connection IO thread, cleans up the connection before destroying Link +// Cleans up the connection before destroying Link. Must be called in connection thread +// if the connection is active. Caller Note well: may call "delete this"! void Link::destroy () { Bridges toDelete; @@ -395,7 +406,9 @@ void Link::destroy () for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) (*i)->close(); toDelete.clear(); - listener(this); // notify LinkRegistry that this Link has been destroyed + // notify LinkRegistry that this Link has been destroyed. Will result in "delete + // this" if LinkRegistry is holding the last shared pointer to *this + listener(this); } void Link::add(Bridge::shared_ptr bridge) @@ -437,7 +450,7 @@ void Link::ioThreadProcessing() { Mutex::ScopedLock mutex(lock); - if (state != STATE_OPERATIONAL || closing) + if (state != STATE_OPERATIONAL) return; // check for bridge session errors and recover @@ -474,7 +487,6 @@ void Link::ioThreadProcessing() void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); - if (closing) return; if (state == STATE_WAITING) { visitCount++; @@ -490,10 +502,11 @@ void Link::maintenanceVisit () } } } - else if (state == STATE_OPERATIONAL && - (!active.empty() || !created.empty() || !cancellations.empty()) && - connection && connection->isOpen()) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + else if (state == STATE_OPERATIONAL) { + if ((!active.empty() || !created.empty() || !cancellations.empty()) && + connection && connection->isOpen()) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } } void Link::reconnectLH(const Address& a) @@ -564,9 +577,17 @@ void Link::returnChannel(framing::ChannelId c) void Link::notifyConnectionForced(const string text) { - Mutex::ScopedLock mutex(lock); - setStateLH(STATE_FAILED); - mgmtObject->set_lastError(text); + bool isClosing = false; + { + Mutex::ScopedLock mutex(lock); + if (state == STATE_CLOSING) { + isClosing = true; + } else { + setStateLH(STATE_FAILED); + mgmtObject->set_lastError(text); + } + } + if (isClosing) destroy(); } void Link::setPersistenceId(uint64_t id) const @@ -656,14 +677,25 @@ ManagementObject::shared_ptr Link::GetManagementObject(void) const void Link::close() { QPID_LOG(debug, "Link::close(), link=" << name ); - Mutex::ScopedLock mutex(lock); - if (!closing) { - closing = true; - if (state != STATE_CONNECTING && connection) { - //connection can only be closed on the connections own IO processing thread - connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + bool destroy_now = false; + { + Mutex::ScopedLock mutex(lock); + if (state != STATE_CLOSING) { + int old_state = state; + setStateLH(STATE_CLOSING); + if (connection) { + //connection can only be closed on the connections own IO processing thread + connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + } else if (old_state == STATE_CONNECTING) { + // cannot destroy Link now since a connection request is outstanding. + // destroy the link after we get a response (see Link::established, + // Link::closed, Link::notifyConnectionForced, etc). + } else { + destroy_now = true; + } } } + if (destroy_now) destroy(); } diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index f2f672b507..01ddc68d97 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -74,7 +74,6 @@ class Link : public PersistableConfig, public management::Manageable { int state; uint32_t visitCount; uint32_t currentInterval; - bool closing; Url url; // URL can contain many addresses. size_t reconnectNext; // Index for next re-connect attempt @@ -98,6 +97,7 @@ class Link : public PersistableConfig, public management::Manageable { static const int STATE_OPERATIONAL = 3; static const int STATE_FAILED = 4; static const int STATE_CLOSED = 5; + static const int STATE_CLOSING = 6; // Waiting for outstanding connect to complete first static const uint32_t MAX_INTERVAL = 32; |
