From 8c3baf496f9424249e2a666d79f0e3b38ba8d8fc Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 8 Jul 2008 22:58:37 +0000 Subject: HandlerChain: plug-in handler chain extension points. Replaces Handler::Chain. Updated Sessoin & Connection handler chains and Cluster. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@675017 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 3 +- cpp/src/qpid/HandlerChain.h | 97 +++++++++++++++++++++++++++++++ cpp/src/qpid/Plugin.cpp | 21 +++++++ cpp/src/qpid/Plugin.h | 28 +++++++-- cpp/src/qpid/amqp_0_10/Connection.cpp | 8 +-- cpp/src/qpid/amqp_0_10/Connection.h | 3 +- cpp/src/qpid/broker/Broker.h | 3 - cpp/src/qpid/broker/Connection.cpp | 2 +- cpp/src/qpid/broker/Connection.h | 6 +- cpp/src/qpid/broker/ConnectionManager.cpp | 41 ------------- cpp/src/qpid/broker/ConnectionManager.h | 70 ---------------------- cpp/src/qpid/broker/SessionManager.cpp | 9 +-- cpp/src/qpid/broker/SessionManager.h | 13 ----- cpp/src/qpid/broker/SessionState.cpp | 8 +-- cpp/src/qpid/broker/SessionState.h | 14 +++-- cpp/src/qpid/cluster/Cluster.cpp | 33 ++--------- cpp/src/qpid/cluster/Cluster.h | 10 ++-- cpp/src/qpid/cluster/ClusterPlugin.cpp | 13 +++-- cpp/src/qpid/framing/Handler.h | 24 +------- cpp/src/tests/cluster_test.cpp | 26 ++++++++- 20 files changed, 208 insertions(+), 224 deletions(-) create mode 100644 cpp/src/qpid/HandlerChain.h delete mode 100644 cpp/src/qpid/broker/ConnectionManager.cpp delete mode 100644 cpp/src/qpid/broker/ConnectionManager.h (limited to 'cpp/src') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 5b4a16429a..bfebd4ae88 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -248,8 +248,6 @@ libqpidbroker_la_SOURCES = \ qpid/amqp_0_10/Connection.cpp \ qpid/broker/Broker.cpp \ qpid/broker/BrokerSingleton.cpp \ - qpid/broker/ConnectionManager.h \ - qpid/broker/ConnectionManager.cpp \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ qpid/broker/PersistableMessage.cpp \ @@ -354,6 +352,7 @@ nobase_include_HEADERS = \ qpid/amqp_0_10/Exception.h \ qpid/Msg.h \ qpid/Options.h \ + qpid/HandlerChain.h \ qpid/Plugin.h \ qpid/ptr_map.h \ qpid/RangeSet.h \ diff --git a/cpp/src/qpid/HandlerChain.h b/cpp/src/qpid/HandlerChain.h new file mode 100644 index 0000000000..adeaa96536 --- /dev/null +++ b/cpp/src/qpid/HandlerChain.h @@ -0,0 +1,97 @@ +#ifndef QPID_HANDLERCHAIN_H +#define QPID_HANDLERCHAIN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include + +namespace qpid { + +/** + * Chain-of-responsibility design pattern. + * + * Construct a chain of objects deriving from Base. Each implements + * Base::f by doing its own logic and then calling Base::f on the next + * handler (or not if it chooses not to.) + * + * HandlerChain acts as a smart pointer to the first object in the chain. + */ +template +class HandlerChain { + public: + /** Base class for chainable handlers */ + class Handler : public Base { + public: + Handler() : next() {} + virtual ~Handler() {} + virtual void setNext(Base* next_) { next = next_; } + + protected: + Base* next; + }; + + typedef std::auto_ptr HandlerAutoPtr; + + /**@param target is the object at the end of the chain. */ + HandlerChain(Base& target) : first(&target) {} + + /** HandlerChain owns the ChainableHandler. */ + void push(HandlerAutoPtr h) { + handlers.push_back(h); + h->setNext(first); + first = h.get(); + } + + // Smart pointer functions + Base* operator*() { return first; } + const Base* operator*() const { return first; } + Base* operator->() { return first; } + const Base* operator->() const { return first; } + operator bool() const { return first; } + + private: + boost::ptr_vector handlers; + Base* first; +}; + +/** + * A PluginHandlerChain calls Plugin::initAll(*this) on construction, + * allowing plugins to add handlers. + * + * @param Tag can be any class, use to distinguish different plugin + * chains with the same Base type. + */ +template +struct PluginHandlerChain : public HandlerChain, + public Plugin::Target +{ + PluginHandlerChain(Base& target) : HandlerChain(target) { + Plugin::initAll(*this); + } +}; + + +} // namespace qpid + +#endif /*!QPID_HANDLERCHAIN_H*/ diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp index 733d134334..b8206499ae 100644 --- a/cpp/src/qpid/Plugin.cpp +++ b/cpp/src/qpid/Plugin.cpp @@ -20,10 +20,13 @@ #include "Plugin.h" #include "qpid/Options.h" +#include +#include namespace qpid { namespace { + Plugin::Plugins& thePlugins() { // This is a single threaded singleton implementation so // it is important to be sure that the first use of this @@ -31,8 +34,17 @@ Plugin::Plugins& thePlugins() { static Plugin::Plugins plugins; return plugins; } + +void call(boost::function f) { f(); } + +} // namespace + +Plugin::Target::~Target() { + std::for_each(cleanup.begin(), cleanup.end(), &call); } +void Plugin::Target::addCleanup(const boost::function& f) { cleanup.push_back(f); } + Plugin::Plugin() { // Register myself. thePlugins().push_back(this); @@ -44,6 +56,12 @@ Options* Plugin::getOptions() { return 0; } const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); } +namespace { +template void each_plugin(const F& f) { + std::for_each(Plugin::getPlugins().begin(), Plugin::getPlugins().end(), f); +} +} + void Plugin::addOptions(Options& opts) { for (Plugins::const_iterator i = getPlugins().begin(); i != getPlugins().end(); ++i) { if ((*i)->getOptions()) @@ -51,4 +69,7 @@ void Plugin::addOptions(Options& opts) { } } +void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, t)); } +void Plugin::initAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, t)); } + } // namespace qpid diff --git a/cpp/src/qpid/Plugin.h b/cpp/src/qpid/Plugin.h index 3ead770129..a53d4e5d18 100644 --- a/cpp/src/qpid/Plugin.h +++ b/cpp/src/qpid/Plugin.h @@ -40,11 +40,17 @@ class Plugin : boost::noncopyable public: /** * Base interface for targets that receive plug-ins. - * - * The Broker is a plug-in target, there might be others - * in future. + * Plug-ins can register clean-up functions to execute when + * the target is destroyed. */ - struct Target { virtual ~Target() {} }; + struct Target { + public: + virtual ~Target(); + void addCleanup(const boost::function& cleanupFunction); + + private: + std::vector > cleanup; + }; typedef std::vector Plugins; @@ -69,7 +75,9 @@ class Plugin : boost::noncopyable virtual Options* getOptions(); /** - * Initialize Plugin functionality on a Target. + * Initialize Plugin functionality on a Target, called before + * initializing the target. + * * Plugins should ignore targets they don't recognize. * * Called before the target itself is initialized. @@ -77,7 +85,9 @@ class Plugin : boost::noncopyable virtual void earlyInitialize(Target&) = 0; /** - * Initialize Plugin functionality on a Target. + * Initialize Plugin functionality on a Target. Called after + * initializing the target. + * * Plugins should ignore targets they don't recognize. * * Called after the target is fully initialized. @@ -89,6 +99,12 @@ class Plugin : boost::noncopyable */ static const Plugins& getPlugins(); + /** Call earlyInitialize() on all registered plugins */ + static void earlyInitAll(Target&); + + /** Call initialize() on all registered plugins */ + static void initAll(Target&); + /** For each registered plugin, add plugin.getOptions() to opts. */ static void addOptions(Options& opts); }; diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index 407fe5ebd8..ccd31c78a7 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -29,7 +29,7 @@ using sys::Mutex; Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) : frameQueueClosed(false), output(o), - connection(broker.getConnectionManager().create(this, broker, id, _isClient)), + connection(this, broker, id, _isClient), identifier(id), initialized(false), isClient(_isClient) {} size_t Connection::decode(const char* buffer, size_t size) { @@ -46,13 +46,13 @@ size_t Connection::decode(const char* buffer, size_t size) { framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - connection->received(frame); + connection.received(frame); } return in.getPosition(); } bool Connection::canEncode() { - if (!frameQueueClosed) connection->doOutput(); + if (!frameQueueClosed) connection.doOutput(); Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -91,7 +91,7 @@ void Connection::close() { } void Connection::closed() { - connection->closed(); + connection.closed(); } void Connection::send(framing::AMQFrame& f) { diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h index c08545df0f..a3a756cefb 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.h +++ b/cpp/src/qpid/amqp_0_10/Connection.h @@ -33,7 +33,6 @@ namespace qpid { namespace broker { class Broker; } namespace amqp_0_10 { -// FIXME aconway 2008-03-18: Update to 0-10. class Connection : public sys::ConnectionCodec, public sys::ConnectionOutputHandler { @@ -41,7 +40,7 @@ class Connection : public sys::ConnectionCodec, bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; - std::auto_ptr connection; // FIXME aconway 2008-03-18: + broker::Connection connection; std::string identifier; bool initialized; bool isClient; diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 9a9f502bf0..be59cef24c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -23,7 +23,6 @@ */ #include "ConnectionFactory.h" -#include "ConnectionManager.h" #include "ConnectionToken.h" #include "DirectExchange.h" #include "DtxManager.h" @@ -121,7 +120,6 @@ class Broker : public sys::Runnable, public Plugin::Target, Options& getOptions() { return config; } SessionManager& getSessionManager() { return sessionManager; } - ConnectionManager& getConnectionManager() { return connectionManager; } management::ManagementObject* GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -159,7 +157,6 @@ class Broker : public sys::Runnable, public Plugin::Target, ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; - ConnectionManager connectionManager; management::ManagementAgent* managementAgent; management::Broker* mgmtObject; Vhost::shared_ptr vhostObject; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index bb99c61cdd..61384638b3 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -88,7 +88,7 @@ Connection::~Connection() links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame){ inChain(frame); } +void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); } void Connection::receivedLast(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 717e1a6270..c911e88200 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -43,6 +43,7 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Connection.h" +#include "qpid/HandlerChain.h" #include @@ -91,8 +92,6 @@ class Connection : public sys::ConnectionInputHandler, void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); - framing::FrameHandler::Chain& getInChain() { return inChain; } - private: typedef boost::ptr_map ChannelMap; typedef std::vector::iterator queue_iterator; @@ -110,8 +109,7 @@ class Connection : public sys::ConnectionInputHandler, management::Connection* mgmtObject; LinkRegistry& links; framing::FrameHandler::MemFunRef lastInHandler; - framing::FrameHandler::Chain inChain; - + PluginHandlerChain inChain; }; }} diff --git a/cpp/src/qpid/broker/ConnectionManager.cpp b/cpp/src/qpid/broker/ConnectionManager.cpp deleted file mode 100644 index 165de5220e..0000000000 --- a/cpp/src/qpid/broker/ConnectionManager.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ConnectionManager.h" -#include "Connection.h" - -namespace qpid { -namespace broker { - -std::auto_ptr -ConnectionManager::create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient) { - std::auto_ptr c(new Connection(out, broker, mgmtId, isClient)); - sys::Mutex::ScopedLock l(lock); - std::for_each(observers.begin(), observers.end(), - boost::bind(&Observer::created, _1, boost::ref(*c))); - return c; -} - -void ConnectionManager::add(const boost::intrusive_ptr& observer) { - sys::Mutex::ScopedLock l(lock); - observers.push_back(observer); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionManager.h b/cpp/src/qpid/broker/ConnectionManager.h deleted file mode 100644 index a999523d0d..0000000000 --- a/cpp/src/qpid/broker/ConnectionManager.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef QPID_BROKER_CONNECTIONMANAGER_H -#define QPID_BROKER_CONNECTIONMANAGER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/RefCounted.h" -#include "qpid/sys/Mutex.h" -#include -#include -#include - -namespace qpid { - -namespace sys { -class ConnectionOutputHandler; -} - -namespace broker { - -class Broker; -class Connection; - -/** - * Manages connections and observers. - */ -class ConnectionManager { - public: - - /** - * Observer notified of ConnectionManager events. - */ - struct Observer : public RefCounted { - /** Called when a connection is attached. */ - virtual void created(Connection&) {} - }; - - /** Called to create a new Connection, applies observers. */ - std::auto_ptr create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient = false); - - /** Add an observer */ - void add(const boost::intrusive_ptr&); - - private: - typedef std::vector > Observers; - - sys::Mutex lock; - Observers observers; -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_CONNECTIONMANAGER_H*/ diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 69ef29c3eb..e7190fdae6 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -55,11 +55,8 @@ std::auto_ptr SessionManager::attach(SessionHandler& h, const Ses throw SessionBusyException(QPID_MSG("Session already attached: " << id)); Detached::iterator i = std::find(detached.begin(), detached.end(), id); std::auto_ptr state; - if (i == detached.end()) { + if (i == detached.end()) state.reset(new SessionState(broker, h, id, config)); - for_each(observers.begin(), observers.end(), - boost::bind(&Observer::opened, _1,boost::ref(*state))); - } else { state.reset(detached.release(i).release()); state->attach(h); @@ -99,8 +96,4 @@ void SessionManager::eraseExpired() { } } -void SessionManager::add(const intrusive_ptr& o) { - observers.push_back(o); -} - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index 9a4142f613..db88e7ec10 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -46,14 +46,6 @@ class SessionHandler; */ class SessionManager : private boost::noncopyable { public: - /** - * Observer notified of SessionManager events. - */ - struct Observer : public RefCounted { - /** Called when a stateless session is attached. */ - virtual void opened(SessionState&) {} - }; - SessionManager(const qpid::SessionState::Configuration&, Broker&); ~SessionManager(); @@ -67,9 +59,6 @@ class SessionManager : private boost::noncopyable { /** Forget about an attached session. Called by SessionState destructor. */ void forget(const SessionId&); - /** Add an Observer. */ - void add(const boost::intrusive_ptr&); - Broker& getBroker() const { return broker; } const qpid::SessionState::Configuration& getSessionConfig() const { return config; } @@ -77,7 +66,6 @@ class SessionManager : private boost::noncopyable { private: typedef boost::ptr_vector Detached; // Sorted in expiry order. typedef std::set Attached; - typedef std::vector > Observers; void eraseExpired(); @@ -85,7 +73,6 @@ class SessionManager : private boost::noncopyable { Detached detached; Attached attached; qpid::SessionState::Configuration config; - Observers observers; Broker& broker; }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 8a17a787a2..0a122fcae8 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -224,8 +224,8 @@ void SessionState::enqueued(boost::intrusive_ptr msg) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } -void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); } -void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); } +void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); } +void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); } void SessionState::handleInLast(AMQFrame& frame) { SequenceNumber commandId = receiverGetCurrent(); @@ -291,8 +291,4 @@ void SessionState::readyToSend() { Broker& SessionState::getBroker() { return broker; } -framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; } - -framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; } - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 5d18ed161e..f6bf98d431 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -23,6 +23,7 @@ */ #include "qpid/SessionState.h" +#include "qpid/HandlerChain.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" #include "qpid/sys/Mutex.h" @@ -58,8 +59,8 @@ class SessionHandler; class SessionManager; /** - * Broker-side session state includes sessions handler chains, which may - * themselves have state. + * Broker-side session state includes session's handler chains, which + * may themselves have state. */ class SessionState : public qpid::SessionState, public SessionContext, @@ -101,8 +102,9 @@ class SessionState : public qpid::SessionState, void readyToSend(); - framing::FrameHandler::Chain& getInChain(); - framing::FrameHandler::Chain& getOutChain(); + // Tag types to identify PluginHandlerChains. + struct InTag {}; + struct OutTag {}; private: @@ -131,7 +133,9 @@ class SessionState : public qpid::SessionState, management::Session* mgmtObject; framing::FrameHandler::MemFunRef inLastHandler; framing::FrameHandler::MemFunRef outLastHandler; - framing::FrameHandler::Chain inChain, outChain; + + qpid::PluginHandlerChain inChain; + qpid::PluginHandlerChain outChain; friend class SessionManager; }; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d97a840f82..4ea77e7fbf 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" +#include "qpid/memory.h" #include #include #include @@ -36,25 +37,12 @@ using namespace qpid::sys; using namespace std; using broker::Connection; -namespace { - -// FIXME aconway 2008-07-01: sending every frame to cluster, -// serializing all processing in cluster deliver thread. -// This will not perform at all, but provides a correct starting point. -// -// TODO: -// - Fake "Connection" for cluster: owns shadow sessions. -// - Maintain shadow sessions. -// - Apply foreign frames to shadow sessions. -// - - // Beginning of inbound chain: send to cluster. -struct ClusterSendHandler : public FrameHandler { - Connection& connection; +struct ClusterSendHandler : public HandlerChain::Handler { + Cluster::ConnectionChain& connection; Cluster& cluster; - ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {} + ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {} void handle(AMQFrame& f) { // FIXME aconway 2008-01-29: Refcount Connections to ensure @@ -63,16 +51,8 @@ struct ClusterSendHandler : public FrameHandler { } }; -struct ConnectionObserver : public broker::ConnectionManager::Observer { - Cluster& cluster; - ConnectionObserver(Cluster& c) : cluster(c) {} - - void created(Connection& c) { - // FIXME aconway 2008-06-16: clean up chaining and observers. - ClusterSendHandler* sender=new ClusterSendHandler(c, cluster); - c.getInChain().insert(sender); - } -}; +void Cluster::initialize(Cluster::ConnectionChain& cc) { + cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this))); } ostream& operator <<(ostream& out, const Cluster& cluster) { @@ -95,7 +75,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg(*this), name(name_), url(url_), - observer(new ConnectionObserver(*this)), self(cpg.self()) { QPID_LOG(trace, "Joining cluster: " << name_); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 031baf914a..84b5ed072c 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -22,6 +22,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/ShadowConnectionOutputHandler.h" +#include "qpid/HandlerChain.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" @@ -47,6 +48,8 @@ namespace cluster { class Cluster : private sys::Runnable, private Cpg::Handler { public: + typedef PluginHandlerChain ConnectionChain; + /** Details of a cluster member */ struct Member { Member(const Url& url_=Url()) : url(url_) {} @@ -62,11 +65,11 @@ class Cluster : private sys::Runnable, private Cpg::Handler */ Cluster(const std::string& name, const Url& url, broker::Broker&); + // Add cluster handlers to broker chains. + void initialize(ConnectionChain&); + virtual ~Cluster(); - // FIXME aconway 2008-01-29: - boost::intrusive_ptr getObserver() { return observer; } - /** Get the current cluster membership. */ MemberList getMembers() const; @@ -124,7 +127,6 @@ class Cluster : private sys::Runnable, private Cpg::Handler MemberMap members; sys::Thread dispatcher; boost::function callback; - boost::intrusive_ptr observer; Id self; ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 6d3dca84be..c4b67de141 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -54,24 +54,29 @@ struct ClusterOptions : public Options { }; struct ClusterPlugin : public Plugin { + typedef PluginHandlerChain ConnectionChain; ClusterOptions options; boost::optional cluster; - Options* getOptions() { return &options; } + template void init(Plugin::Target& t) { + Chain* c = dynamic_cast(&t); + if (c) cluster->initialize(*c); + } void earlyInitialize(Plugin::Target&) {} void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast(&target); - // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.name.empty()) { - assert(!cluster); // A process can only belong to one cluster. + if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process."); cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getConnectionManager().add(cluster->getObserver()); + return; } + if (!cluster) return; // Ignore chain handlers if we didn't init a cluster. + init(target); } }; diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index edd7f469b0..a2a8ee7bfa 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -28,7 +28,6 @@ namespace qpid { namespace framing { -/** Generic handler that can be linked into chains. */ template struct Handler { typedef T HandledType; @@ -46,23 +45,6 @@ struct Handler { /** Pointer to next handler in a linked list. */ Handler* next; - /** A Chain is a handler holding a linked list of sub-handlers. - * Chain::next is invoked after the full chain, it is not itself part of the chain. - * Handlers inserted into the chain are deleted by the Chain dtor. - */ - class Chain : public Handler { - public: - Chain(Handler& next_) : Handler(&next_), first(&next_) {} - ~Chain() { while (first != next) pop(); } - void handle(T t) { first->handle(t); } - void insert(Handler* h) { h->next = first; first = h; } - bool empty() { return first == next; } - - private: - void pop() { Handler* p=first; first=first->next; delete p; } - Handler* first; - }; - /** Adapt any void(T) functor as a Handler. * Functor(f) will copy f. * Functor(f) will only take a reference to x. @@ -84,7 +66,7 @@ struct Handler { MemFunRef(X& x, Handler* next=0) : Handler(next), target(&x) {} void handle(T t) { (target->*F)(t); } - /** Allow calling with -> syntax, compatible with Chains */ + /** Allow calling with -> syntax, like a qpid::HandlerChain */ MemFunRef* operator->() { return this; } private: @@ -103,15 +85,13 @@ struct Handler { }; /** Support for implementing an in-out handler pair as a single class. - * Public interface is Handler::Chains pair, but implementation - * overrides handleIn, handleOut functions in a single class. + * Overrides handleIn, handleOut functions in a single class. */ struct InOutHandler : protected InOutHandlerInterface { InOutHandler(Handler* nextIn=0, Handler* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {} MemFunRef in; MemFunRef out; }; - }; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 65aa4d5a28..beab305f75 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -174,7 +174,7 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { } } -QPID_AUTO_TEST_CASE(testMessageReplication) { +QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. ClusterFixture cluster(2); Client c0(cluster[0].getPort()); @@ -190,6 +190,28 @@ QPID_AUTO_TEST_CASE(testMessageReplication) { BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } -// TODO aconway 2008-06-25: dequeue replication, failover. +QPID_AUTO_TEST_CASE(testMessageDequeue) { + // Enqueue on one broker, dequeue on two others. + ClusterFixture cluster (3); + Client c0(cluster[0].getPort()); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); + c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.close(); + + Message msg; + + Client c1(cluster[1].getPort()); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("foo", msg.getData()); + + Client c2(cluster[2].getPort()); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("bar", msg.getData()); + QueueQueryResult r = c2.session.queueQuery("q"); + BOOST_CHECK_EQUAL(0, r.getMessageCount()); +} + +// TODO aconway 2008-06-25: failover. QPID_AUTO_TEST_SUITE_END() -- cgit v1.2.1