diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 144 |
1 files changed, 70 insertions, 74 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 463193a346..ea3d3547f5 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -47,44 +47,26 @@ using qpid::management::Args; namespace qpid { namespace broker { -class Connection::MgmtClient : public Connection::MgmtWrapper -{ - management::Client::shared_ptr mgmtClient; - -public: - MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, - const std::string& mgmtId, bool incoming); - ~MgmtClient(); - void received(framing::AMQFrame& frame); - management::ManagementObject::shared_ptr getManagementObject() const; - void closing(); -}; - -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) : +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : ConnectionState(out_, broker_), - adapter(*this, isLink), + adapter(*this, isLink_), + isLink(isLink_), mgmtClosing(false), - mgmtId(mgmtId_) -{ - initMgmt(); -} - -void Connection::initMgmt(bool asLink) + mgmtId(mgmtId_), + links(broker_.getLinks()) { Manageable* parent = broker.GetVhostObject (); + if (isLink) + links.notifyConnection (mgmtId, this); + if (parent != 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); if (agent.get () != 0) - { - if (asLink) { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false)); - } else { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true)); - } - } + mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink)); + agent->addObject (mgmtObject); } } @@ -95,19 +77,65 @@ void Connection::requestIOProcessing (boost::function0<void> callback) } -Connection::~Connection () {} +Connection::~Connection () +{ + if (mgmtObject.get() != 0) + mgmtObject->resourceDestroy(); + if (isLink) + links.notifyClosed (mgmtId); +} void Connection::received(framing::AMQFrame& frame){ - if (mgmtClosing) - close (403, "Closed by Management Request", 0, 0); - if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { getChannel(frame.getChannel()).in(frame); } - - if (mgmtWrapper.get()) mgmtWrapper->received(frame); + + if (isLink) + recordFromServer(frame); + else + recordFromClient(frame); +} + +void Connection::recordFromServer (framing::AMQFrame& frame) +{ + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesToClient (); + mgmtObject->inc_bytesToClient (frame.size ()); + } +} + +void Connection::recordFromClient (framing::AMQFrame& frame) +{ + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesFromClient (); + mgmtObject->inc_bytesFromClient (frame.size ()); + } +} + +string Connection::getAuthMechanism() +{ + if (!isLink) + return string("ANONYMOUS"); + + return links.getAuthMechanism(mgmtId); +} + +string Connection::getAuthCredentials() +{ + if (!isLink) + return string(); + + return links.getAuthCredentials(mgmtId); +} + +void Connection::notifyConnectionForced(const string& text) +{ + if (isLink) + links.notifyConnectionForced(mgmtId, text); } void Connection::close( @@ -125,7 +153,7 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) - ptr_map_ptr(channels.begin())->handleDetach(); + ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -147,10 +175,12 @@ bool Connection::doOutput() if (ioCallback) ioCallback(); // Lend the IO thread for management processing ioCallback = 0; - if (mgmtClosing) close (403, "Closed by Management Request", 0, 0); - //then do other output as needed: - return outputTasks.doOutput(); + if (mgmtClosing) + close (403, "Closed by Management Request", 0, 0); + else + //then do other output as needed: + return outputTasks.doOutput(); }catch(ConnectionException& e){ close(e.code, e.what(), 0, 0); }catch(std::exception& e){ @@ -174,7 +204,7 @@ SessionHandler& Connection::getChannel(ChannelId id) { ManagementObject::shared_ptr Connection::GetManagementObject (void) const { - return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); + return dynamic_pointer_cast<ManagementObject>(mgmtObject); } Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) @@ -187,7 +217,7 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) { case management::Client::METHOD_CLOSE : mgmtClosing = true; - if (mgmtWrapper.get()) mgmtWrapper->closing(); + if (mgmtObject.get()) mgmtObject->set_closing(1); out->activateOutput(); status = Manageable::STATUS_OK; break; @@ -196,39 +226,5 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) return status; } -Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, - ManagementAgent::shared_ptr agent, - const std::string& mgmtId, bool incoming) -{ - mgmtClient = management::Client::shared_ptr - (new management::Client (conn, parent, mgmtId, incoming)); - agent->addObject (mgmtClient); -} - -Connection::MgmtClient::~MgmtClient() -{ - if (mgmtClient.get () != 0) - mgmtClient->resourceDestroy (); -} - -void Connection::MgmtClient::received(framing::AMQFrame& frame) -{ - if (mgmtClient.get () != 0) - { - mgmtClient->inc_framesFromClient (); - mgmtClient->inc_bytesFromClient (frame.size ()); - } -} - -management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const -{ - return dynamic_pointer_cast<ManagementObject>(mgmtClient); -} - -void Connection::MgmtClient::closing() -{ - if (mgmtClient) mgmtClient->set_closing (1); -} - }} |
