diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/broker/Connection.cpp | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 57 |
1 files changed, 23 insertions, 34 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 460799280e..0b3059d26c 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -156,16 +156,7 @@ Connection::~Connection() void Connection::received(framing::AMQFrame& frame) { // Received frame on connection so delay timeout restartTimeout(); - - if (frame.getChannel() == 0 && frame.getMethod()) { - adapter.handle(frame); - } else { - if (adapter.isOpen()) - getChannel(frame.getChannel()).in(frame); - else - close(connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received."); - } - + adapter.handle(frame); if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else @@ -278,8 +269,7 @@ void Connection::setUserId(const string& userId) ConnectionState::setUserId(userId); // In a cluster, the cluster code will raise the connect event // when the connection is replicated to the cluster. - if (!sys::isCluster()) - raiseConnectEvent(); + if (!broker.isInCluster()) raiseConnectEvent(); } void Connection::raiseConnectEvent() { @@ -289,11 +279,11 @@ void Connection::raiseConnectEvent() { } } -void Connection::setFederationLink(bool b) +void Connection::setUserProxyAuth(bool b) { - ConnectionState::setFederationLink(b); + ConnectionState::setUserProxyAuth(b); if (mgmtObject != 0) - mgmtObject->set_federationLink(b); + mgmtObject->set_userProxyAuth(b); } void Connection::close(connection::CloseCode code, const string& text) @@ -332,31 +322,30 @@ void Connection::closed(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); - while (!exclusiveQueues.empty()) { - boost::shared_ptr<Queue> q(exclusiveQueues.front()); - q->releaseExclusiveOwnership(); - if (q->canAutoDelete()) { - Queue::tryAutoDelete(broker, q); - } - exclusiveQueues.erase(exclusiveQueues.begin()); - } } catch(std::exception& e) { QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); assert(0); } } +void Connection::doIoCallbacks() { + { + ScopedLock<Mutex> l(ioCallbackLock); + // Although IO callbacks execute in the connection thread context, they are + // not cluster safe because they are queued for execution in non-IO threads. + ClusterUnsafeScope cus; + while (!ioCallbacks.empty()) { + boost::function0<void> cb = ioCallbacks.front(); + ioCallbacks.pop(); + ScopedUnlock<Mutex> ul(ioCallbackLock); + cb(); // Lend the IO thread for management processing + } + } +} + bool Connection::doOutput() { try { - { - ScopedLock<Mutex> l(ioCallbackLock); - while (!ioCallbacks.empty()) { - boost::function0<void> cb = ioCallbacks.front(); - ioCallbacks.pop(); - ScopedUnlock<Mutex> ul(ioCallbackLock); - cb(); // Lend the IO thread for management processing - } - } + doIoCallbacks(); if (mgmtClosing) { closed(); close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request"); @@ -476,8 +465,8 @@ void Connection::OutboundFrameTracker::abort() { next->abort(); } void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) -{ - next->send(f); +{ + next->send(f); con.sent(f); } void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p) |