summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp144
1 files changed, 70 insertions, 74 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 463193a346..ea3d3547f5 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -47,44 +47,26 @@ using qpid::management::Args;
namespace qpid {
namespace broker {
-class Connection::MgmtClient : public Connection::MgmtWrapper
-{
- management::Client::shared_ptr mgmtClient;
-
-public:
- MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
- const std::string& mgmtId, bool incoming);
- ~MgmtClient();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
-};
-
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
- adapter(*this, isLink),
+ adapter(*this, isLink_),
+ isLink(isLink_),
mgmtClosing(false),
- mgmtId(mgmtId_)
-{
- initMgmt();
-}
-
-void Connection::initMgmt(bool asLink)
+ mgmtId(mgmtId_),
+ links(broker_.getLinks())
{
Manageable* parent = broker.GetVhostObject ();
+ if (isLink)
+ links.notifyConnection (mgmtId, this);
+
if (parent != 0)
{
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
if (agent.get () != 0)
- {
- if (asLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
- } else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
- }
- }
+ mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink));
+ agent->addObject (mgmtObject);
}
}
@@ -95,19 +77,65 @@ void Connection::requestIOProcessing (boost::function0<void> callback)
}
-Connection::~Connection () {}
+Connection::~Connection ()
+{
+ if (mgmtObject.get() != 0)
+ mgmtObject->resourceDestroy();
+ if (isLink)
+ links.notifyClosed (mgmtId);
+}
void Connection::received(framing::AMQFrame& frame){
- if (mgmtClosing)
- close (403, "Closed by Management Request", 0, 0);
-
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
getChannel(frame.getChannel()).in(frame);
}
-
- if (mgmtWrapper.get()) mgmtWrapper->received(frame);
+
+ if (isLink)
+ recordFromServer(frame);
+ else
+ recordFromClient(frame);
+}
+
+void Connection::recordFromServer (framing::AMQFrame& frame)
+{
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->inc_framesToClient ();
+ mgmtObject->inc_bytesToClient (frame.size ());
+ }
+}
+
+void Connection::recordFromClient (framing::AMQFrame& frame)
+{
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->inc_framesFromClient ();
+ mgmtObject->inc_bytesFromClient (frame.size ());
+ }
+}
+
+string Connection::getAuthMechanism()
+{
+ if (!isLink)
+ return string("ANONYMOUS");
+
+ return links.getAuthMechanism(mgmtId);
+}
+
+string Connection::getAuthCredentials()
+{
+ if (!isLink)
+ return string();
+
+ return links.getAuthCredentials(mgmtId);
+}
+
+void Connection::notifyConnectionForced(const string& text)
+{
+ if (isLink)
+ links.notifyConnectionForced(mgmtId, text);
}
void Connection::close(
@@ -125,7 +153,7 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
- ptr_map_ptr(channels.begin())->handleDetach();
+ ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -147,10 +175,12 @@ bool Connection::doOutput()
if (ioCallback)
ioCallback(); // Lend the IO thread for management processing
ioCallback = 0;
- if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
- //then do other output as needed:
- return outputTasks.doOutput();
+ if (mgmtClosing)
+ close (403, "Closed by Management Request", 0, 0);
+ else
+ //then do other output as needed:
+ return outputTasks.doOutput();
}catch(ConnectionException& e){
close(e.code, e.what(), 0, 0);
}catch(std::exception& e){
@@ -174,7 +204,7 @@ SessionHandler& Connection::getChannel(ChannelId id) {
ManagementObject::shared_ptr Connection::GetManagementObject (void) const
{
- return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
+ return dynamic_pointer_cast<ManagementObject>(mgmtObject);
}
Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
@@ -187,7 +217,7 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
{
case management::Client::METHOD_CLOSE :
mgmtClosing = true;
- if (mgmtWrapper.get()) mgmtWrapper->closing();
+ if (mgmtObject.get()) mgmtObject->set_closing(1);
out->activateOutput();
status = Manageable::STATUS_OK;
break;
@@ -196,39 +226,5 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
return status;
}
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
- ManagementAgent::shared_ptr agent,
- const std::string& mgmtId, bool incoming)
-{
- mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId, incoming));
- agent->addObject (mgmtClient);
-}
-
-Connection::MgmtClient::~MgmtClient()
-{
- if (mgmtClient.get () != 0)
- mgmtClient->resourceDestroy ();
-}
-
-void Connection::MgmtClient::received(framing::AMQFrame& frame)
-{
- if (mgmtClient.get () != 0)
- {
- mgmtClient->inc_framesFromClient ();
- mgmtClient->inc_bytesFromClient (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtClient);
-}
-
-void Connection::MgmtClient::closing()
-{
- if (mgmtClient) mgmtClient->set_closing (1);
-}
-
}}