diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 5e85d3c89c..e77911bd10 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -49,14 +49,14 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : ConnectionState(out_, broker_), + receivedFn(boost::bind(&Connection::receivedImpl, this, _1)), + closedFn(boost::bind(&Connection::closedImpl, this)), adapter(*this, isLink_), isLink(isLink_), mgmtClosing(false), mgmtId(mgmtId_), mgmtObject(0), - links(broker_.getLinks()), - lastInHandler(*this), - inChain(lastInHandler) + links(broker_.getLinks()) { Manageable* parent = broker.GetVhostObject(); @@ -71,6 +71,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink); agent->addObject(mgmtObject); } + + Plugin::initializeAll(*this); // Let plug-ins update extension points. } void Connection::requestIOProcessing(boost::function0<void> callback) @@ -79,7 +81,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback) out->activateOutput(); } - Connection::~Connection() { if (mgmtObject != 0) @@ -88,9 +89,9 @@ Connection::~Connection() links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); } - -void Connection::receivedLast(framing::AMQFrame& frame){ +void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); } + +void Connection::receivedImpl(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { @@ -170,10 +171,13 @@ void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed(){ // Physically closed, suspend open sessions. +void Connection::closed() { closedFn(); } + +void Connection::closedImpl(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); + // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10. while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -183,8 +187,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions. exclusiveQueues.erase(exclusiveQueues.begin()); } } catch(std::exception& e) { - QPID_LOG(error, " Unhandled exception while closing session: " << - e.what()); + QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); assert(0); } } |
