summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-07-17 00:03:50 +0000
committerAlan Conway <aconway@apache.org>2008-07-17 00:03:50 +0000
commite65b0086a2924ff04640b1350393a816249d01b3 (patch)
treeb372c5386cc44e3ad16c4ae585088ed038a629e4 /cpp/src/qpid/broker
parente596837411d54a16dd3cb1e5de717664496c2bd0 (diff)
downloadqpid-python-e65b0086a2924ff04640b1350393a816249d01b3.tar.gz
Cluster: shadow connections, fix lifecycle & valgrind issues.
- tests/ForkedBroker: improved broker forking, exec full qpidd. - Plugin::addFinalizer - more flexible way to shutdown plugins. - Reworked cluster extension points using boost::function. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Connection.cpp23
-rw-r--r--cpp/src/qpid/broker/Connection.h28
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionState.h10
6 files changed, 35 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index bffca94f95..b8204c9cf5 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -285,6 +285,7 @@ void Broker::shutdown() {
// call any function that is not async-signal safe.
// Any unsafe shutdown actions should be done in the destructor.
poller->shutdown();
+ finalize(); // Finalize any plugins.
}
Broker::~Broker() {
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 5e85d3c89c..e77911bd10 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -49,14 +49,14 @@ namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
+ receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
+ closedFn(boost::bind(&Connection::closedImpl, this)),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
mgmtId(mgmtId_),
mgmtObject(0),
- links(broker_.getLinks()),
- lastInHandler(*this),
- inChain(lastInHandler)
+ links(broker_.getLinks())
{
Manageable* parent = broker.GetVhostObject();
@@ -71,6 +71,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink);
agent->addObject(mgmtObject);
}
+
+ Plugin::initializeAll(*this); // Let plug-ins update extension points.
}
void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -79,7 +81,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
out->activateOutput();
}
-
Connection::~Connection()
{
if (mgmtObject != 0)
@@ -88,9 +89,9 @@ Connection::~Connection()
links.notifyClosed(mgmtId);
}
-void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); }
-
-void Connection::receivedLast(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); }
+
+void Connection::receivedImpl(framing::AMQFrame& frame){
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
@@ -170,10 +171,13 @@ void Connection::idleOut(){}
void Connection::idleIn(){}
-void Connection::closed(){ // Physically closed, suspend open sessions.
+void Connection::closed() { closedFn(); }
+
+void Connection::closedImpl(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
+ // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10.
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -183,8 +187,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
exclusiveQueues.erase(exclusiveQueues.begin());
}
} catch(std::exception& e) {
- QPID_LOG(error, " Unhandled exception while closing session: " <<
- e.what());
+ QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
assert(0);
}
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index c911e88200..0d646bab83 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_CONNECTION_H
+#define QPID_BROKER_CONNECTION_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,8 +21,6 @@
* under the License.
*
*/
-#ifndef _Connection_
-#define _Connection_
#include <memory>
#include <sstream>
@@ -43,7 +44,8 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Connection.h"
-#include "qpid/HandlerChain.h"
+#include "qpid/Plugin.h"
+#include "qpid/RefCounted.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -53,11 +55,11 @@ namespace broker {
class LinkRegistry;
class Connection : public sys::ConnectionInputHandler,
- public ConnectionState
+ public ConnectionState,
+ public Plugin::Target,
+ public RefCounted
{
public:
- typedef boost::shared_ptr<Connection> shared_ptr;
-
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
@@ -74,8 +76,8 @@ class Connection : public sys::ConnectionInputHandler,
void received(framing::AMQFrame& frame);
void idleOut();
void idleIn();
- void closed();
bool doOutput();
+ void closed();
void closeChannel(framing::ChannelId channel);
@@ -92,12 +94,16 @@ class Connection : public sys::ConnectionInputHandler,
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
+ // Extension points: allow plugins to insert additional functionality.
+ boost::function<void(framing::AMQFrame&)> receivedFn;
+ boost::function<void()> closedFn;
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- // End of the received handler chain.
- void receivedLast(framing::AMQFrame& frame);
+ void receivedImpl(framing::AMQFrame& frame);
+ void closedImpl();
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
@@ -108,10 +114,8 @@ class Connection : public sys::ConnectionInputHandler,
boost::function0<void> ioCallback;
management::Connection* mgmtObject;
LinkRegistry& links;
- framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler;
- PluginHandlerChain<framing::FrameHandler, Connection> inChain;
};
}}
-#endif
+#endif /*!QPID_BROKER_CONNECTION_H*/
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 698f8123e8..c9cf6ece8d 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -70,6 +70,8 @@ class ConnectionState : public ConnectionToken, public management::Manageable
sys::ConnectionOutputHandler& getOutput() const { return *out; }
framing::ProtocolVersion getVersion() const { return version; }
+ void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; }
+
protected:
framing::ProtocolVersion version;
sys::ConnectionOutputHandler* out;
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 3cc509c904..aa6f6b7520 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -54,11 +54,7 @@ SessionState::SessionState(
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
- mgmtObject(0),
- inLastHandler(*this),
- outLastHandler(*this),
- inChain(inLastHandler),
- outChain(outLastHandler)
+ mgmtObject(0)
{
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
@@ -75,9 +71,6 @@ SessionState::SessionState(
SessionState::~SessionState() {
// Remove ID from active session list.
- // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge,
- // they don't belong in the manager. For now rely on uniqueness of UUIDs.
- //
broker.getSessionManager().forget(getId());
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -126,7 +119,6 @@ void SessionState::activateOutput() {
Mutex::ScopedLock l(lock);
if (isAttached())
getConnection().outputTasks.activateOutput();
- // FIXME aconway 2008-05-22: should we hold the lock over activateOutput??
}
ManagementObject* SessionState::GetManagementObject (void) const
@@ -224,10 +216,7 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
-void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); }
-void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); }
-
-void SessionState::handleInLast(AMQFrame& frame) {
+void SessionState::handleIn(AMQFrame& frame) {
SequenceNumber commandId = receiverGetCurrent();
try {
//TODO: make command handling more uniform, regardless of whether
@@ -258,7 +247,7 @@ void SessionState::handleInLast(AMQFrame& frame) {
}
}
-void SessionState::handleOutLast(AMQFrame& frame) {
+void SessionState::handleOut(AMQFrame& frame) {
assert(handler);
handler->out(frame);
}
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index f6bf98d431..96f2e8f512 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -23,7 +23,6 @@
*/
#include "qpid/SessionState.h"
-#include "qpid/HandlerChain.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Mutex.h"
@@ -102,10 +101,6 @@ class SessionState : public qpid::SessionState,
void readyToSend();
- // Tag types to identify PluginHandlerChains.
- struct InTag {};
- struct OutTag {};
-
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
@@ -131,11 +126,6 @@ class SessionState : public qpid::SessionState,
IncompleteMessageList incomplete;
IncompleteMessageList::CompletionListener enqueuedOp;
management::Session* mgmtObject;
- framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler;
- framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler;
-
- qpid::PluginHandlerChain<framing::FrameHandler, InTag> inChain;
- qpid::PluginHandlerChain<framing::FrameHandler, OutTag> outChain;
friend class SessionManager;
};