diff options
| author | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
| commit | e65b0086a2924ff04640b1350393a816249d01b3 (patch) | |
| tree | b372c5386cc44e3ad16c4ae585088ed038a629e4 /cpp/src/qpid/broker | |
| parent | e596837411d54a16dd3cb1e5de717664496c2bd0 (diff) | |
| download | qpid-python-e65b0086a2924ff04640b1350393a816249d01b3.tar.gz | |
Cluster: shadow connections, fix lifecycle & valgrind issues.
- tests/ForkedBroker: improved broker forking, exec full qpidd.
- Plugin::addFinalizer - more flexible way to shutdown plugins.
- Reworked cluster extension points using boost::function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionState.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 10 |
6 files changed, 35 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index bffca94f95..b8204c9cf5 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -285,6 +285,7 @@ void Broker::shutdown() { // call any function that is not async-signal safe. // Any unsafe shutdown actions should be done in the destructor. poller->shutdown(); + finalize(); // Finalize any plugins. } Broker::~Broker() { diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 5e85d3c89c..e77911bd10 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -49,14 +49,14 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : ConnectionState(out_, broker_), + receivedFn(boost::bind(&Connection::receivedImpl, this, _1)), + closedFn(boost::bind(&Connection::closedImpl, this)), adapter(*this, isLink_), isLink(isLink_), mgmtClosing(false), mgmtId(mgmtId_), mgmtObject(0), - links(broker_.getLinks()), - lastInHandler(*this), - inChain(lastInHandler) + links(broker_.getLinks()) { Manageable* parent = broker.GetVhostObject(); @@ -71,6 +71,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink); agent->addObject(mgmtObject); } + + Plugin::initializeAll(*this); // Let plug-ins update extension points. } void Connection::requestIOProcessing(boost::function0<void> callback) @@ -79,7 +81,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback) out->activateOutput(); } - Connection::~Connection() { if (mgmtObject != 0) @@ -88,9 +89,9 @@ Connection::~Connection() links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); } - -void Connection::receivedLast(framing::AMQFrame& frame){ +void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); } + +void Connection::receivedImpl(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { @@ -170,10 +171,13 @@ void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed(){ // Physically closed, suspend open sessions. +void Connection::closed() { closedFn(); } + +void Connection::closedImpl(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); + // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10. while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -183,8 +187,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions. exclusiveQueues.erase(exclusiveQueues.begin()); } } catch(std::exception& e) { - QPID_LOG(error, " Unhandled exception while closing session: " << - e.what()); + QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); assert(0); } } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index c911e88200..0d646bab83 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -1,3 +1,6 @@ +#ifndef QPID_BROKER_CONNECTION_H +#define QPID_BROKER_CONNECTION_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,8 +21,6 @@ * under the License. * */ -#ifndef _Connection_ -#define _Connection_ #include <memory> #include <sstream> @@ -43,7 +44,8 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Connection.h" -#include "qpid/HandlerChain.h" +#include "qpid/Plugin.h" +#include "qpid/RefCounted.h" #include <boost/ptr_container/ptr_map.hpp> @@ -53,11 +55,11 @@ namespace broker { class LinkRegistry; class Connection : public sys::ConnectionInputHandler, - public ConnectionState + public ConnectionState, + public Plugin::Target, + public RefCounted { public: - typedef boost::shared_ptr<Connection> shared_ptr; - Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); ~Connection (); @@ -74,8 +76,8 @@ class Connection : public sys::ConnectionInputHandler, void received(framing::AMQFrame& frame); void idleOut(); void idleIn(); - void closed(); bool doOutput(); + void closed(); void closeChannel(framing::ChannelId channel); @@ -92,12 +94,16 @@ class Connection : public sys::ConnectionInputHandler, void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); + // Extension points: allow plugins to insert additional functionality. + boost::function<void(framing::AMQFrame&)> receivedFn; + boost::function<void()> closedFn; + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - // End of the received handler chain. - void receivedLast(framing::AMQFrame& frame); + void receivedImpl(framing::AMQFrame& frame); + void closedImpl(); ChannelMap channels; framing::AMQP_ClientProxy::Connection* client; @@ -108,10 +114,8 @@ class Connection : public sys::ConnectionInputHandler, boost::function0<void> ioCallback; management::Connection* mgmtObject; LinkRegistry& links; - framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler; - PluginHandlerChain<framing::FrameHandler, Connection> inChain; }; }} -#endif +#endif /*!QPID_BROKER_CONNECTION_H*/ diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 698f8123e8..c9cf6ece8d 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -70,6 +70,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable sys::ConnectionOutputHandler& getOutput() const { return *out; } framing::ProtocolVersion getVersion() const { return version; } + void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; } + protected: framing::ProtocolVersion version; sys::ConnectionOutputHandler* out; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 3cc509c904..aa6f6b7520 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -54,11 +54,7 @@ SessionState::SessionState( adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), - mgmtObject(0), - inLastHandler(*this), - outLastHandler(*this), - inChain(inLastHandler), - outChain(outLastHandler) + mgmtObject(0) { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { @@ -75,9 +71,6 @@ SessionState::SessionState( SessionState::~SessionState() { // Remove ID from active session list. - // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge, - // they don't belong in the manager. For now rely on uniqueness of UUIDs. - // broker.getSessionManager().forget(getId()); if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -126,7 +119,6 @@ void SessionState::activateOutput() { Mutex::ScopedLock l(lock); if (isAttached()) getConnection().outputTasks.activateOutput(); - // FIXME aconway 2008-05-22: should we hold the lock over activateOutput?? } ManagementObject* SessionState::GetManagementObject (void) const @@ -224,10 +216,7 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } -void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); } -void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); } - -void SessionState::handleInLast(AMQFrame& frame) { +void SessionState::handleIn(AMQFrame& frame) { SequenceNumber commandId = receiverGetCurrent(); try { //TODO: make command handling more uniform, regardless of whether @@ -258,7 +247,7 @@ void SessionState::handleInLast(AMQFrame& frame) { } } -void SessionState::handleOutLast(AMQFrame& frame) { +void SessionState::handleOut(AMQFrame& frame) { assert(handler); handler->out(frame); } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index f6bf98d431..96f2e8f512 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -23,7 +23,6 @@ */ #include "qpid/SessionState.h" -#include "qpid/HandlerChain.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" #include "qpid/sys/Mutex.h" @@ -102,10 +101,6 @@ class SessionState : public qpid::SessionState, void readyToSend(); - // Tag types to identify PluginHandlerChains. - struct InTag {}; - struct OutTag {}; - private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); @@ -131,11 +126,6 @@ class SessionState : public qpid::SessionState, IncompleteMessageList incomplete; IncompleteMessageList::CompletionListener enqueuedOp; management::Session* mgmtObject; - framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler; - framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler; - - qpid::PluginHandlerChain<framing::FrameHandler, InTag> inChain; - qpid::PluginHandlerChain<framing::FrameHandler, OutTag> outChain; friend class SessionManager; }; |
