diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 190 |
1 files changed, 170 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 88761533cf..1e73a60144 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -21,14 +21,18 @@ #include "Connection.h" #include "SessionState.h" #include "BrokerAdapter.h" +#include "Bridge.h" #include "SemanticHandler.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/management/ArgsLinkBind.h" +#include "qpid/management/ArgsLinkPull.h" #include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> #include <algorithm> #include <iostream> @@ -47,7 +51,43 @@ using qpid::management::Args; namespace qpid { namespace broker { -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId) : +class Connection::MgmtClient : public Connection::MgmtWrapper +{ + management::Client::shared_ptr mgmtClient; + +public: + MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); + ~MgmtClient(); + void received(framing::AMQFrame& frame); + management::ManagementObject::shared_ptr getManagementObject() const; + void closing(); +}; + +class Connection::MgmtLink : public Connection::MgmtWrapper +{ + typedef boost::ptr_vector<Bridge> Bridges; + + management::Link::shared_ptr mgmtLink; + Bridges created;//holds list of bridges pending creation + Bridges cancelled;//holds list of bridges pending cancellation + Bridges active;//holds active bridges + uint channelCounter; + sys::Mutex lock; + + void cancel(Bridge*); + +public: + MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); + ~MgmtLink(); + void received(framing::AMQFrame& frame); + management::ManagementObject::shared_ptr getManagementObject() const; + void closing(); + void processPending(); + void process(Connection& connection, const management::Args& args); +}; + + +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) : broker(broker_), outputTasks(*out_), out(out_), @@ -56,7 +96,11 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std client(0), stagingThreshold(broker.getStagingThreshold()), adapter(*this), - mgmtClosing(0) + mgmtClosing(0), + mgmtId(mgmtId_) +{} + +void Connection::initMgmt(bool asLink) { Manageable* parent = broker.GetVhostObject (); @@ -66,18 +110,16 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (agent.get () != 0) { - mgmtObject = management::Client::shared_ptr - (new management::Client (this, parent, mgmtId)); - agent->addObject (mgmtObject); + if (asLink) { + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId)); + } else { + mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId)); + } } } } -Connection::~Connection () -{ - if (mgmtObject.get () != 0) - mgmtObject->resourceDestroy (); -} +Connection::~Connection () {} void Connection::received(framing::AMQFrame& frame){ if (mgmtClosing) @@ -88,12 +130,8 @@ void Connection::received(framing::AMQFrame& frame){ } else { getChannel(frame.getChannel()).in(frame); } - - if (mgmtObject.get () != 0) - { - mgmtObject->inc_framesFromClient (); - mgmtObject->inc_bytesFromClient (frame.size ()); - } + + if (mgmtWrapper.get()) mgmtWrapper->received(frame); } void Connection::close( @@ -107,6 +145,7 @@ void Connection::close( void Connection::initiated(const framing::ProtocolInitiation& header) { version = ProtocolVersion(header.getMajor(), header.getMinor()); adapter.init(header); + initMgmt(); } void Connection::idleOut(){} @@ -133,8 +172,12 @@ void Connection::closed(){ // Physically closed, suspend open sessions. } bool Connection::doOutput() -{ +{ try{ + //process any pending mgmt commands: + if (mgmtWrapper.get()) mgmtWrapper->processPending(); + + //then do other output as needed: return outputTasks.doOutput(); }catch(ConnectionException& e){ close(e.code, e.what(), 0, 0); @@ -159,11 +202,11 @@ SessionHandler& Connection::getChannel(ChannelId id) { ManagementObject::shared_ptr Connection::GetManagementObject (void) const { - return dynamic_pointer_cast<ManagementObject> (mgmtObject); + return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); } Manageable::status_t Connection::ManagementMethod (uint32_t methodId, - Args& /*args*/) + Args& args) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -173,7 +216,13 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, { case management::Client::METHOD_CLOSE : mgmtClosing = 1; - mgmtObject->set_closing (1); + if (mgmtWrapper.get()) mgmtWrapper->closing(); + status = Manageable::STATUS_OK; + break; + case management::Link::METHOD_BRIDGE : + //queue this up and request chance to do output (i.e. get connections thread of control): + mgmtWrapper->process(*this, args); + out->activateOutput(); status = Manageable::STATUS_OK; break; } @@ -192,5 +241,106 @@ const string& Connection::getUserId() const return userId; } +Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) + : channelCounter(1) +{ + mgmtLink = management::Link::shared_ptr + (new management::Link(conn, parent, mgmtId)); + agent->addObject (mgmtLink); +} + +Connection::MgmtLink::~MgmtLink() +{ + if (mgmtLink.get () != 0) + mgmtLink->resourceDestroy (); +} + +void Connection::MgmtLink::received(framing::AMQFrame& frame) +{ + if (mgmtLink.get () != 0) + { + mgmtLink->inc_framesFromPeer (); + mgmtLink->inc_bytesFromPeer (frame.size ()); + } +} + +management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const +{ + return dynamic_pointer_cast<ManagementObject>(mgmtLink); +} + +void Connection::MgmtLink::closing() +{ + if (mgmtLink) mgmtLink->set_closing (1); +} + +void Connection::MgmtLink::processPending() +{ + //process any pending creates + if (!created.empty()) { + for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { + i->create(); + } + active.transfer(active.end(), created.begin(), created.end(), created); + } + if (!cancelled.empty()) { + //process any pending cancellations + for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { + i->cancel(); + } + cancelled.clear(); + } +} + +void Connection::MgmtLink::process(Connection& connection, const management::Args& args) +{ + created.push_back(new Bridge(channelCounter++, connection, + boost::bind(&MgmtLink::cancel, this, _1), + dynamic_cast<const management::ArgsLinkBridge&>(args))); +} + +void Connection::MgmtLink::cancel(Bridge* b) +{ + //need to take this out the active map and add it to the cancelled map + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if (&(*i) == b) { + cancelled.transfer(cancelled.end(), i, active); + break; + } + } +} + +Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) +{ + mgmtClient = management::Client::shared_ptr + (new management::Client (conn, parent, mgmtId)); + agent->addObject (mgmtClient); +} + +Connection::MgmtClient::~MgmtClient() +{ + if (mgmtClient.get () != 0) + mgmtClient->resourceDestroy (); +} + +void Connection::MgmtClient::received(framing::AMQFrame& frame) +{ + if (mgmtClient.get () != 0) + { + mgmtClient->inc_framesFromClient (); + mgmtClient->inc_bytesFromClient (frame.size ()); + } +} + +management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const +{ + return dynamic_pointer_cast<ManagementObject>(mgmtClient); +} + +void Connection::MgmtClient::closing() +{ + if (mgmtClient) mgmtClient->set_closing (1); +} + }} |
