diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/Link.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 259 |
1 files changed, 131 insertions, 128 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 9727040c9b..bb5d5cb678 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -30,8 +30,10 @@ #include "qpid/log/Statement.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/amqp_types.h" #include "qpid/broker/AclModule.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/NameGenerator.h" #include "qpid/UrlArray.h" namespace qpid { @@ -148,12 +150,12 @@ Link::Link(const string& _name, host(_host), port(_port), transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), - persistenceId(0), mgmtObject(0), broker(_broker), state(0), + persistenceId(0), broker(_broker), state(0), visitCount(0), currentInterval(1), - closing(false), reconnectNext(0), // Index of next address for reconnecting in url. - channelCounter(1), + nextFreeChannel(1), + freeChannels(1, framing::CHANNEL_MAX), connection(0), agent(0), listener(l), @@ -166,19 +168,15 @@ Link::Link(const string& _name, agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Link(agent, this, parent, name, durable); + mgmtObject = _qmf::Link::shared_ptr(new _qmf::Link(agent, this, parent, name, durable)); mgmtObject->set_host(host); mgmtObject->set_port(port); mgmtObject->set_transport(transport); agent->addObject(mgmtObject, 0, durable); } } - if (links->isPassive()) { - setStateLH(STATE_PASSIVE); - } else { - setStateLH(STATE_WAITING); - startConnectionLH(); - } + setStateLH(STATE_WAITING); + startConnectionLH(); broker->getTimer().add(timerTask); if (failover) { @@ -212,9 +210,6 @@ void Link::setStateLH (int newState) state = newState; - if (hideManagement()) - return; - switch (state) { case STATE_WAITING : mgmtObject->set_state("Waiting"); break; @@ -222,7 +217,7 @@ void Link::setStateLH (int newState) case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; case STATE_FAILED : mgmtObject->set_state("Failed"); break; case STATE_CLOSED : mgmtObject->set_state("Closed"); break; - case STATE_PASSIVE : mgmtObject->set_state("Passive"); break; + case STATE_CLOSING : mgmtObject->set_state("Closing"); break; } } @@ -233,40 +228,39 @@ void Link::startConnectionLH () // Set the state before calling connect. It is possible that connect // will fail synchronously and call Link::closed before returning. setStateLH(STATE_CONNECTING); - broker->connect (host, boost::lexical_cast<std::string>(port), transport, + broker->connect (name, host, boost::lexical_cast<std::string>(port), transport, boost::bind (&Link::closed, this, _1, _2)); - QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); + QPID_LOG (info, "Inter-broker link connecting to " << host << ":" << port); } catch(const std::exception& e) { QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: " << e.what()); setStateLH(STATE_WAITING); - if (!hideManagement()) - mgmtObject->set_lastError (e.what()); + mgmtObject->set_lastError (e.what()); } } void Link::established(Connection* c) { - if (state == STATE_PASSIVE) return; stringstream addr; addr << host << ":" << port; QPID_LOG (info, "Inter-broker link established to " << addr.str()); - if (!hideManagement() && agent) + if (agent) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); - bool isClosing = false; + bool isClosing = true; { Mutex::ScopedLock mutex(lock); - setStateLH(STATE_OPERATIONAL); - currentInterval = 1; - visitCount = 0; - connection = c; - isClosing = closing; + if (state != STATE_CLOSING) { + isClosing = false; + setStateLH(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + connection = c; + c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } } if (isClosing) destroy(); - else // Process any IO tasks bridges added before established. - c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -291,11 +285,12 @@ class DetachedCallback : public SessionHandler::ErrorListener { }; } -void Link::opened() { +void Link::opened() +{ Mutex::ScopedLock mutex(lock); - if (!connection) return; + if (!connection || state != STATE_OPERATIONAL) return; - if (!hideManagement() && connection->GetManagementObject()) { + if (connection->GetManagementObject()) { mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); } @@ -350,37 +345,43 @@ void Link::opened() { } } + +// called when connection attempt fails (see startConnectionLH) void Link::closed(int, std::string text) { - Mutex::ScopedLock mutex(lock); QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); - connection = 0; + bool isClosing = false; + { + Mutex::ScopedLock mutex(lock); + + connection = 0; - if (!hideManagement()) { mgmtObject->set_connectionRef(qpid::management::ObjectId()); if (state == STATE_OPERATIONAL && agent) { stringstream addr; addr << host << ":" << port; agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } - } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - (*i)->closed(); - created.push_back(*i); - } - active.clear(); + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + (*i)->closed(); + created.push_back(*i); + } + active.clear(); - if (state != STATE_FAILED && state != STATE_PASSIVE) - { - setStateLH(STATE_WAITING); - if (!hideManagement()) + if (state == STATE_CLOSING) { + isClosing = true; + } else if (state != STATE_FAILED) { + setStateLH(STATE_WAITING); mgmtObject->set_lastError (text); + } } + if (isClosing) destroy(); } -// Called in connection IO thread, cleans up the connection before destroying Link +// Cleans up the connection before destroying Link. Must be called in connection thread +// if the connection is active. Caller Note well: may call "delete this"! void Link::destroy () { Bridges toDelete; @@ -410,7 +411,9 @@ void Link::destroy () for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) (*i)->close(); toDelete.clear(); - listener(this); // notify LinkRegistry that this Link has been destroyed + // notify LinkRegistry that this Link has been destroyed. Will result in "delete + // this" if LinkRegistry is holding the last shared pointer to *this + listener(this); } void Link::add(Bridge::shared_ptr bridge) @@ -452,7 +455,7 @@ void Link::ioThreadProcessing() { Mutex::ScopedLock mutex(lock); - if (state != STATE_OPERATIONAL || closing) + if (state != STATE_OPERATIONAL) return; // check for bridge session errors and recover @@ -489,9 +492,9 @@ void Link::ioThreadProcessing() void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); - if (closing) return; - if (state == STATE_WAITING) - { + + switch (state) { + case STATE_WAITING: visitCount++; if (visitCount >= currentInterval) { @@ -504,11 +507,17 @@ void Link::maintenanceVisit () startConnectionLH(); } } + break; + + case STATE_OPERATIONAL: + if ((!active.empty() || !created.empty() || !cancellations.empty()) && + connection && connection->isOpen()) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + break; + + default: // no-op for all other states + break; } - else if (state == STATE_OPERATIONAL && - (!active.empty() || !created.empty() || !cancellations.empty()) && - connection && connection->isOpen()) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::reconnectLH(const Address& a) @@ -517,14 +526,13 @@ void Link::reconnectLH(const Address& a) port = a.port; transport = a.protocol; - if (!hideManagement()) { - stringstream errorString; - errorString << "Failing over to " << a; - mgmtObject->set_lastError(errorString.str()); - mgmtObject->set_host(host); - mgmtObject->set_port(port); - mgmtObject->set_transport(transport); - } + stringstream errorString; + errorString << "Failing over to " << a; + mgmtObject->set_lastError(errorString.str()); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); + startConnectionLH(); } @@ -541,26 +549,56 @@ bool Link::tryFailoverLH() { return false; } -// Management updates for a link are inconsistent in a cluster, so they are -// suppressed. -bool Link::hideManagement() const { - return !mgmtObject || ( broker && broker->isInCluster()); +// Allocate channel from link free pool +framing::ChannelId Link::nextChannel() +{ + Mutex::ScopedLock mutex(lock); + if (!freeChannels.empty()) { + // A free channel exists. + for (framing::ChannelId i = 1; i <= framing::CHANNEL_MAX; i++) + { + // extract proposed free channel + framing::ChannelId c = nextFreeChannel; + // calculate next free channel + if (framing::CHANNEL_MAX == nextFreeChannel) + nextFreeChannel = 1; + else + nextFreeChannel += 1; + // if proposed channel is free, use it + if (freeChannels.contains(c)) + { + freeChannels -= c; + QPID_LOG(debug, "Link " << name << " allocates channel: " << c); + return c; + } + } + assert (false); + } + + throw Exception(Msg() << "Link " << name << " channel pool is empty"); } -uint Link::nextChannel() +// Return channel to link free pool +void Link::returnChannel(framing::ChannelId c) { Mutex::ScopedLock mutex(lock); - if (channelCounter >= framing::CHANNEL_MAX) - channelCounter = 1; - return channelCounter++; + QPID_LOG(debug, "Link " << name << " frees channel: " << c); + freeChannels += c; } void Link::notifyConnectionForced(const string text) { - Mutex::ScopedLock mutex(lock); - setStateLH(STATE_FAILED); - if (!hideManagement()) - mgmtObject->set_lastError(text); + bool isClosing = false; + { + Mutex::ScopedLock mutex(lock); + if (state == STATE_CLOSING) { + isClosing = true; + } else { + setStateLH(STATE_FAILED); + mgmtObject->set_lastError(text); + } + } + if (isClosing) destroy(); } void Link::setPersistenceId(uint64_t id) const @@ -643,21 +681,32 @@ uint32_t Link::encodedSize() const + password.size() + 1; } -ManagementObject* Link::GetManagementObject (void) const +ManagementObject::shared_ptr Link::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } void Link::close() { QPID_LOG(debug, "Link::close(), link=" << name ); - Mutex::ScopedLock mutex(lock); - if (!closing) { - closing = true; - if (state != STATE_CONNECTING && connection) { - //connection can only be closed on the connections own IO processing thread - connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + bool destroy_now = false; + { + Mutex::ScopedLock mutex(lock); + if (state != STATE_CLOSING) { + int old_state = state; + setStateLH(STATE_CLOSING); + if (connection) { + //connection can only be closed on the connections own IO processing thread + connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + } else if (old_state == STATE_CONNECTING) { + // cannot destroy Link now since a connection request is outstanding. + // destroy the link after we get a response (see Link::established, + // Link::closed, Link::notifyConnectionForced, etc). + } else { + destroy_now = true; + } } } + if (destroy_now) destroy(); } @@ -701,22 +750,6 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te return Manageable::STATUS_UNKNOWN_METHOD; } -void Link::setPassive(bool passive) -{ - Mutex::ScopedLock mutex(lock); - if (passive) { - setStateLH(STATE_PASSIVE); - } else { - if (state == STATE_PASSIVE) { - setStateLH(STATE_WAITING); - } else { - QPID_LOG(warning, "Ignoring attempt to activate non-passive link " - << host << ":" << port); - } - } -} - - /** utility to clean up connection resources correctly */ void Link::closeConnection( const std::string& reason) { @@ -752,28 +785,6 @@ namespace { const std::string FAILOVER_INDEX("failover-index"); } -void Link::getState(framing::FieldTable& state) const -{ - state.clear(); - Mutex::ScopedLock mutex(lock); - if (!url.empty()) { - state.setString(FAILOVER_ADDRESSES, url.str()); - state.setInt(FAILOVER_INDEX, reconnectNext); - } -} - -void Link::setState(const framing::FieldTable& state) -{ - Mutex::ScopedLock mutex(lock); - if (state.isSet(FAILOVER_ADDRESSES)) { - Url failovers(state.getAsString(FAILOVER_ADDRESSES)); - setUrl(failovers); - } - if (state.isSet(FAILOVER_INDEX)) { - reconnectNext = state.getAsInt(FAILOVER_INDEX); - } -} - std::string Link::createName(const std::string& transport, const std::string& host, uint16_t port) @@ -784,14 +795,6 @@ std::string Link::createName(const std::string& transport, return linkName.str(); } - -bool Link::pendingConnection(const std::string& _host, uint16_t _port) const -{ - Mutex::ScopedLock mutex(lock); - return (isConnecting() && _port == port && _host == host); -} - - const std::string Link::exchangeTypeName("qpid.LinkExchange"); }} // namespace qpid::broker |