summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-05-12 17:04:07 +0000
committerGordon Sim <gsim@apache.org>2008-05-12 17:04:07 +0000
commit0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch)
treed478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp/src/qpid/broker/Connection.cpp
parent4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff)
downloadqpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections) 2) Improved handling of federation links: a) Links can be created even if the remote broker is not reachable b) If links are lost, re-establishment will occur using an exponential back-off algorithm 3) Durability of exchanges is now viewable through management 4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins. 5) General configuration storage capability has been added to the store/recover interface. This is used for federation links. 6) Management object-ids for durable objects are now themselves durable. (Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp130
1 files changed, 19 insertions, 111 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 1994c4fdf5..d156b4a914 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -52,37 +52,14 @@ class Connection::MgmtClient : public Connection::MgmtWrapper
management::Client::shared_ptr mgmtClient;
public:
- MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
+ MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming);
~MgmtClient();
void received(framing::AMQFrame& frame);
management::ManagementObject::shared_ptr getManagementObject() const;
void closing();
};
-class Connection::MgmtLink : public Connection::MgmtWrapper
-{
- typedef boost::ptr_vector<Bridge> Bridges;
-
- management::Link::shared_ptr mgmtLink;
- Bridges created;//holds list of bridges pending creation
- Bridges cancelled;//holds list of bridges pending cancellation
- Bridges active;//holds active bridges
- uint channelCounter;
- sys::Mutex linkLock;
-
- void cancel(Bridge*);
-
-public:
- MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtLink();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
- void processPending();
- void process(Connection& connection, const management::Args& args);
-};
-
-
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
adapter(*this, isLink),
@@ -103,14 +80,21 @@ void Connection::initMgmt(bool asLink)
if (agent.get () != 0)
{
if (asLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
} else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
+ mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
}
}
}
}
+void Connection::requestIOProcessing (boost::function0<void> callback)
+{
+ ioCallback = callback;
+ out->activateOutput();
+}
+
+
Connection::~Connection () {}
void Connection::received(framing::AMQFrame& frame){
@@ -160,8 +144,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
bool Connection::doOutput()
{
try{
- //process any pending mgmt commands:
- if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (ioCallback)
+ ioCallback(); // Lend the IO thread for management processing
+ ioCallback = 0;
if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
//then do other output as needed:
@@ -192,8 +177,7 @@ ManagementObject::shared_ptr Connection::GetManagementObject (void) const
return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
}
-Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
- Args& args)
+Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -207,93 +191,17 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
out->activateOutput();
status = Manageable::STATUS_OK;
break;
- case management::Link::METHOD_BRIDGE :
- //queue this up and request chance to do output (i.e. get connections thread of control):
- mgmtWrapper->process(*this, args);
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
}
return status;
}
-Connection::MgmtLink::MgmtLink(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
- : channelCounter(1)
-{
- mgmtLink = management::Link::shared_ptr
- (new management::Link(conn, parent, mgmtId));
- agent->addObject (mgmtLink);
-}
-
-Connection::MgmtLink::~MgmtLink()
-{
- if (mgmtLink.get () != 0)
- mgmtLink->resourceDestroy ();
-}
-
-void Connection::MgmtLink::received(framing::AMQFrame& frame)
-{
- if (mgmtLink.get () != 0)
- {
- mgmtLink->inc_framesFromPeer ();
- mgmtLink->inc_bytesFromPeer (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtLink::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtLink);
-}
-
-void Connection::MgmtLink::closing()
-{
- if (mgmtLink) mgmtLink->set_closing (1);
-}
-
-void Connection::MgmtLink::processPending()
-{
- Mutex::ScopedLock l(linkLock);
- //process any pending creates
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create();
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
- }
- cancelled.clear();
- }
-}
-
-void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
-{
- Mutex::ScopedLock l(linkLock);
- created.push_back(new Bridge(channelCounter++, connection,
- boost::bind(&MgmtLink::cancel, this, _1),
- dynamic_cast<const management::ArgsLinkBridge&>(args)));
-}
-
-void Connection::MgmtLink::cancel(Bridge* b)
-{
- Mutex::ScopedLock l(linkLock);
- //need to take this out the active map and add it to the cancelled map
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == b) {
- cancelled.transfer(cancelled.end(), i, active);
- break;
- }
- }
-}
-
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
+Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
+ ManagementAgent::shared_ptr agent,
+ const std::string& mgmtId, bool incoming)
{
mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId));
+ (new management::Client (conn, parent, mgmtId, incoming));
agent->addObject (mgmtClient);
}