summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp190
1 files changed, 170 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 88761533cf..1e73a60144 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -21,14 +21,18 @@
#include "Connection.h"
#include "SessionState.h"
#include "BrokerAdapter.h"
+#include "Bridge.h"
#include "SemanticHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ArgsLinkBind.h"
+#include "qpid/management/ArgsLinkPull.h"
#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
#include <algorithm>
#include <iostream>
@@ -47,7 +51,43 @@ using qpid::management::Args;
namespace qpid {
namespace broker {
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId) :
+class Connection::MgmtClient : public Connection::MgmtWrapper
+{
+ management::Client::shared_ptr mgmtClient;
+
+public:
+ MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtClient();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+};
+
+class Connection::MgmtLink : public Connection::MgmtWrapper
+{
+ typedef boost::ptr_vector<Bridge> Bridges;
+
+ management::Link::shared_ptr mgmtLink;
+ Bridges created;//holds list of bridges pending creation
+ Bridges cancelled;//holds list of bridges pending cancellation
+ Bridges active;//holds active bridges
+ uint channelCounter;
+ sys::Mutex lock;
+
+ void cancel(Bridge*);
+
+public:
+ MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ ~MgmtLink();
+ void received(framing::AMQFrame& frame);
+ management::ManagementObject::shared_ptr getManagementObject() const;
+ void closing();
+ void processPending();
+ void process(Connection& connection, const management::Args& args);
+};
+
+
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
broker(broker_),
outputTasks(*out_),
out(out_),
@@ -56,7 +96,11 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
client(0),
stagingThreshold(broker.getStagingThreshold()),
adapter(*this),
- mgmtClosing(0)
+ mgmtClosing(0),
+ mgmtId(mgmtId_)
+{}
+
+void Connection::initMgmt(bool asLink)
{
Manageable* parent = broker.GetVhostObject ();
@@ -66,18 +110,16 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
if (agent.get () != 0)
{
- mgmtObject = management::Client::shared_ptr
- (new management::Client (this, parent, mgmtId));
- agent->addObject (mgmtObject);
+ if (asLink) {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ } else {
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ }
}
}
}
-Connection::~Connection ()
-{
- if (mgmtObject.get () != 0)
- mgmtObject->resourceDestroy ();
-}
+Connection::~Connection () {}
void Connection::received(framing::AMQFrame& frame){
if (mgmtClosing)
@@ -88,12 +130,8 @@ void Connection::received(framing::AMQFrame& frame){
} else {
getChannel(frame.getChannel()).in(frame);
}
-
- if (mgmtObject.get () != 0)
- {
- mgmtObject->inc_framesFromClient ();
- mgmtObject->inc_bytesFromClient (frame.size ());
- }
+
+ if (mgmtWrapper.get()) mgmtWrapper->received(frame);
}
void Connection::close(
@@ -107,6 +145,7 @@ void Connection::close(
void Connection::initiated(const framing::ProtocolInitiation& header) {
version = ProtocolVersion(header.getMajor(), header.getMinor());
adapter.init(header);
+ initMgmt();
}
void Connection::idleOut(){}
@@ -133,8 +172,12 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
}
bool Connection::doOutput()
-{
+{
try{
+ //process any pending mgmt commands:
+ if (mgmtWrapper.get()) mgmtWrapper->processPending();
+
+ //then do other output as needed:
return outputTasks.doOutput();
}catch(ConnectionException& e){
close(e.code, e.what(), 0, 0);
@@ -159,11 +202,11 @@ SessionHandler& Connection::getChannel(ChannelId id) {
ManagementObject::shared_ptr Connection::GetManagementObject (void) const
{
- return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+ return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
}
Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
- Args& /*args*/)
+ Args& args)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -173,7 +216,13 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
{
case management::Client::METHOD_CLOSE :
mgmtClosing = 1;
- mgmtObject->set_closing (1);
+ if (mgmtWrapper.get()) mgmtWrapper->closing();
+ status = Manageable::STATUS_OK;
+ break;
+ case management::Link::METHOD_BRIDGE :
+ //queue this up and request chance to do output (i.e. get connections thread of control):
+ mgmtWrapper->process(*this, args);
+ out->activateOutput();
status = Manageable::STATUS_OK;
break;
}
@@ -192,5 +241,106 @@ const string& Connection::getUserId() const
return userId;
}
+Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+ : channelCounter(1)
+{
+ mgmtLink = management::Link::shared_ptr
+ (new management::Link(conn, parent, mgmtId));
+ agent->addObject (mgmtLink);
+}
+
+Connection::MgmtLink::~MgmtLink()
+{
+ if (mgmtLink.get () != 0)
+ mgmtLink->resourceDestroy ();
+}
+
+void Connection::MgmtLink::received(framing::AMQFrame& frame)
+{
+ if (mgmtLink.get () != 0)
+ {
+ mgmtLink->inc_framesFromPeer ();
+ mgmtLink->inc_bytesFromPeer (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtLink);
+}
+
+void Connection::MgmtLink::closing()
+{
+ if (mgmtLink) mgmtLink->set_closing (1);
+}
+
+void Connection::MgmtLink::processPending()
+{
+ //process any pending creates
+ if (!created.empty()) {
+ for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
+ i->create();
+ }
+ active.transfer(active.end(), created.begin(), created.end(), created);
+ }
+ if (!cancelled.empty()) {
+ //process any pending cancellations
+ for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
+ i->cancel();
+ }
+ cancelled.clear();
+ }
+}
+
+void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
+{
+ created.push_back(new Bridge(channelCounter++, connection,
+ boost::bind(&MgmtLink::cancel, this, _1),
+ dynamic_cast<const management::ArgsLinkBridge&>(args)));
+}
+
+void Connection::MgmtLink::cancel(Bridge* b)
+{
+ //need to take this out the active map and add it to the cancelled map
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if (&(*i) == b) {
+ cancelled.transfer(cancelled.end(), i, active);
+ break;
+ }
+ }
+}
+
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+{
+ mgmtClient = management::Client::shared_ptr
+ (new management::Client (conn, parent, mgmtId));
+ agent->addObject (mgmtClient);
+}
+
+Connection::MgmtClient::~MgmtClient()
+{
+ if (mgmtClient.get () != 0)
+ mgmtClient->resourceDestroy ();
+}
+
+void Connection::MgmtClient::received(framing::AMQFrame& frame)
+{
+ if (mgmtClient.get () != 0)
+ {
+ mgmtClient->inc_framesFromClient ();
+ mgmtClient->inc_bytesFromClient (frame.size ());
+ }
+}
+
+management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const
+{
+ return dynamic_pointer_cast<ManagementObject>(mgmtClient);
+}
+
+void Connection::MgmtClient::closing()
+{
+ if (mgmtClient) mgmtClient->set_closing (1);
+}
+
}}