diff options
| author | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
| commit | e65b0086a2924ff04640b1350393a816249d01b3 (patch) | |
| tree | b372c5386cc44e3ad16c4ae585088ed038a629e4 /cpp/src/qpid/cluster | |
| parent | e596837411d54a16dd3cb1e5de717664496c2bd0 (diff) | |
| download | qpid-python-e65b0086a2924ff04640b1350393a816249d01b3.tar.gz | |
Cluster: shadow connections, fix lifecycle & valgrind issues.
- tests/ForkedBroker: improved broker forking, exec full qpidd.
- Plugin::addFinalizer - more flexible way to shutdown plugins.
- Reworked cluster extension points using boost::function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 195 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 74 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.cpp | 82 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionInterceptor.h | 77 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 1 |
7 files changed, 331 insertions, 159 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 4ea77e7fbf..3b7f32e822 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,15 +17,19 @@ */ #include "Cluster.h" +#include "ConnectionInterceptor.h" + #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/framing/ClusterConnectionCloseBody.h" #include "qpid/log/Statement.h" #include "qpid/memory.h" +#include "qpid/shared_ptr.h" #include <boost/bind.hpp> -#include <boost/scoped_array.hpp> +#include <boost/cast.hpp> #include <algorithm> #include <iterator> #include <map> @@ -37,24 +41,6 @@ using namespace qpid::sys; using namespace std; using broker::Connection; -// Beginning of inbound chain: send to cluster. -struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler { - Cluster::ConnectionChain& connection; - Cluster& cluster; - - ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {} - - void handle(AMQFrame& f) { - // FIXME aconway 2008-01-29: Refcount Connections to ensure - // Connection not destroyed till message is self delivered. - cluster.send(f, &connection, next); // Indirectly send to next via cluster. - } -}; - -void Cluster::initialize(Cluster::ConnectionChain& cc) { - cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this))); -} - ostream& operator <<(ostream& out, const Cluster& cluster) { return out << cluster.name.str() << "-" << cluster.self; } @@ -69,14 +55,14 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { return out; } -// FIXME aconway 2008-07-02: create a Connection for the cluster. Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - broker(b), cpg(*this), + broker(&b), name(name_), url(url_), self(cpg.self()) { + broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(trace, "Joining cluster: " << name_); cpg.join(name); notify(); @@ -90,15 +76,27 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : } Cluster::~Cluster() { - QPID_LOG(trace, *this << " Leaving cluster."); - try { - cpg.leave(name); - cpg.shutdown(); - dispatcher.join(); - } - catch (const std::exception& e) { - QPID_LOG(error, "Exception leaving cluster " << *this << ": " - << e.what()); + cpg.shutdown(); + dispatcher.join(); +} + +// local connection initializes plugins +void Cluster::initialize(broker::Connection& c) { + bool isLocal = &c.getOutput() != &shadowOut; + if (isLocal) + new ConnectionInterceptor(c, *this); +} + +void Cluster::leave() { + if (!broker.get()) return; // Already left + QPID_LOG(info, QPID_MSG("Leaving cluster " << *this)); + // Must not be called in the dispatch thread. + assert(Thread::current().id() != dispatcher.id()); + cpg.leave(name); + // Wait till final config-change is delivered and broker is released. + { + Mutex::ScopedLock l(lock); + while(broker.get()) lock.wait(); } } @@ -112,22 +110,20 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { buf.putLongLong(value); } -void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) { +void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) { QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - // TODO aconway 2008-07-03: More efficient buffer management. + // FIXME aconway 2008-07-03: More efficient buffer management. // Cache coded form of decoded frames for re-encoding? Buffer buf(buffer); - assert(frame.size() + 128 < sizeof(buffer)); + assert(frame.size() + 64 < sizeof(buffer)); frame.encode(buf); encodePtr(buf, connection); - encodePtr(buf, next); iovec iov = { buffer, buf.getPosition() }; cpg.mcast(name, &iov, 1); } void Cluster::notify() { - AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())); - send(frame, 0, 0); + send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0); } size_t Cluster::size() const { @@ -143,19 +139,17 @@ Cluster::MemberList Cluster::getMembers() const { return result; } -boost::shared_ptr<broker::Connection> -Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) { - // FIXME aconway 2008-07-02: locking - called by deliver in - // cluster thread so no locks but may need to revisit as model - // changes. - ShadowConnectionId id(member, connectionPtr); - boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id]; - if (!ptr) { +ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) { + ShadowConnectionId id(member, remotePtr); + ShadowConnectionMap::iterator i = shadowConnectionMap.find(id); + if (i == shadowConnectionMap.end()) { // A new shadow connection. std::ostringstream os; - os << name << ":" << member << ":" << std::hex << connectionPtr; - ptr.reset(new broker::Connection(&shadowOut, broker, os.str())); + os << name << ":" << member << ":" << remotePtr; + broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str()); + ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id)); + i = shadowConnectionMap.insert(value).first; } - return ptr; + return i->second; } void Cluster::deliver( @@ -171,78 +165,75 @@ void Cluster::deliver( Buffer buf(static_cast<char*>(msg), msg_len); AMQFrame frame; frame.decode(buf); - void* connectionId; - decodePtr(buf, connectionId); + ConnectionInterceptor* connection; + decodePtr(buf, connection); + QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); - QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame); - - if (connectionId == 0) // A cluster control frame. - handleClusterFrame(from, frame); - else if (from == self) { // My own frame, carries a next pointer. - FrameHandler* next; - decodePtr(buf, next); - next->handle(frame); - } - else { // Foreign frame, forward to shadow connection. - // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr. - boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId); - shadow->received(frame); + if (!broker.get()) { + QPID_LOG(warning, "Ignoring late DLVR, already left the cluster."); + return; } + + if (connection && from != self) // Look up shadow for remote connections + connection = getShadowConnection(from, connection); + + if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) + handleMethod(from, connection, *frame.getMethod()); + else + connection->deliver(frame); } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. - QPID_LOG(error, "Error handling frame from cluster " << e.what()); + QPID_LOG(critical, "Error in cluster delivery: " << e.what()); + assert(0); + throw; } } -bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, - Duration timeout) const -{ - AbsTime deadline(now(), timeout); - Mutex::ScopedLock l(lock); - while (!predicate(*this) && lock.wait(deadline)) - ; - return (predicate(*this)); -} - -// Handle cluster control frame . -void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { - // TODO aconway 2007-06-20: use visitor pattern here. - ClusterNotifyBody* notifyIn= - dynamic_cast<ClusterNotifyBody*>(frame.getBody()); - assert(notifyIn); - MemberList list; - { - Mutex::ScopedLock l(lock); - members[from].url=notifyIn->getUrl(); - lock.notifyAll(); - QPID_LOG(debug, "Cluster join: " << members); +// Handle cluster methods +// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism. +void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) { + assert(method.amqpClassId() == CLUSTER_CLASS_ID); + switch (method.amqpMethodId()) { + case CLUSTER_NOTIFY_METHOD_ID: { + ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method); + Mutex::ScopedLock l(lock); + members[from].url=notify.getUrl(); + lock.notifyAll(); + break; + } + case CLUSTER_CONNECTION_CLOSE_METHOD_ID: { + if (!connection->isLocal()) + shadowConnectionMap.erase(connection->getShadowId()); + connection->deliverClosed(); + break; + } + default: + assert(0); } } void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, - cpg_address */*current*/, int /*nCurrent*/, + cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) + cpg_address */*joined*/, int nJoined) { - bool newMembers=false; - MemberList updated; - { - Mutex::ScopedLock l(lock); - if (nLeft) { - for (int i = 0; i < nLeft; ++i) - members.erase(Id(left[i])); - QPID_LOG(debug, "Cluster leave: " << members); - lock.notifyAll(); - } - newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self); - // We don't record members joining here, we record them when - // we get their ClusterNotify message. + Mutex::ScopedLock l(lock); + for (int i = 0; i < nLeft; ++i) + members.erase(left[i]); + for(int j = 0; j < nCurrent; ++j) + members[current[j]].id = current[j]; + QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):" + << members); + assert(members.size() == size_t(nCurrent)); + if (members.find(self) == members.end()) { + QPID_LOG(debug, "Left cluster " << *this); + broker = 0; // Release broker reference. } - if (newMembers) // Notify new members of my presence. - notify(); + + lock.notifyAll(); // Threads waiting for membership changes. } void Cluster::run() { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 84b5ed072c..7147b1ac05 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -22,14 +22,14 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/ShadowConnectionOutputHandler.h" -#include "qpid/HandlerChain.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/log/Logger.h" #include "qpid/Url.h" - +#include "qpid/RefCounted.h" #include <boost/optional.hpp> #include <boost/function.hpp> @@ -41,19 +41,21 @@ namespace qpid { namespace cluster { +class ConnectionInterceptor; + /** * Connection to the cluster. * Keeps cluster membership data. */ -class Cluster : private sys::Runnable, private Cpg::Handler +class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted { public: - typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; + typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId; /** Details of a cluster member */ struct Member { - Member(const Url& url_=Url()) : url(url_) {} - Url url; ///< Broker address. + Cpg::Id id; + Url url; }; typedef std::vector<Member> MemberList; @@ -65,11 +67,11 @@ class Cluster : private sys::Runnable, private Cpg::Handler */ Cluster(const std::string& name, const Url& url, broker::Broker&); - // Add cluster handlers to broker chains. - void initialize(ConnectionChain&); - virtual ~Cluster(); + /** Initialize interceptors for a new connection */ + void initialize(broker::Connection&); + /** Get the current cluster membership. */ MemberList getMembers() const; @@ -78,22 +80,22 @@ class Cluster : private sys::Runnable, private Cpg::Handler bool empty() const { return size() == 0; } - /** Wait for predicate(*this) to be true, up to timeout. - *@return True if predicate became true, false if timed out. - *Note the predicate may not be true after wait returns, - *all the caller can say is it was true at some earlier point. - */ - bool wait(boost::function<bool(const Cluster&)> predicate, - sys::Duration timeout=sys::TIME_INFINITE) const; - /** Send frame to the cluster */ - void send(framing::AMQFrame&, void* connection, framing::FrameHandler*); + void send(const framing::AMQFrame&, ConnectionInterceptor*); + + /** Leave the cluster */ + void leave(); + + // Cluster frame handing functions + void notify(const std::string& url); + void connectionClose(); private: typedef Cpg::Id Id; typedef std::map<Id, Member> MemberMap; - typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId; - typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap; + typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap; + + boost::function<void()> shutdownNext; void notify(); ///< Notify cluster of my details. @@ -114,19 +116,18 @@ class Cluster : private sys::Runnable, private Cpg::Handler ); void run(); - - void handleClusterFrame(Id from, framing::AMQFrame&); - boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*); + void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method); + + ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*); - mutable sys::Monitor lock; - broker::Broker& broker; + mutable sys::Monitor lock; // Protect access to members. Cpg cpg; + boost::intrusive_ptr<broker::Broker> broker; Cpg::Name name; Url url; MemberMap members; sys::Thread dispatcher; - boost::function<void()> callback; Id self; ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index c4b67de141..a2c66e3790 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -15,8 +15,8 @@ * limitations under the License. * */ -#include <boost/program_options/value_semantic.hpp> +#include "ConnectionInterceptor.h" #include "qpid/broker/Broker.h" @@ -25,61 +25,81 @@ #include "qpid/Options.h" #include "qpid/shared_ptr.h" -#include <boost/optional.hpp> #include <boost/utility/in_place_factory.hpp> - namespace qpid { namespace cluster { using namespace std; -struct ClusterOptions : public Options { +struct ClusterValues { string name; string url; - ClusterOptions() : Options("Cluster Options") { + Url getUrl(uint16_t port) const { + if (url.empty()) return Url::getIpAddressesUrl(port); + return Url(url); + } +}; + +/** Note separating options from values to work around boost version differences. + * Old boost takes a reference to options objects, but new boost makes a copy. + * New boost allows a shared_ptr but that's not compatible with old boost. + */ +struct ClusterOptions : public Options { + ClusterValues& values; + + ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) { addOptions() - ("cluster-name", optValue(name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(url,"URL"), + ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join") + ("cluster-url", optValue(values.url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n") ; } - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } }; struct ClusterPlugin : public Plugin { - typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain; + ClusterValues values; ClusterOptions options; - boost::optional<Cluster> cluster; + boost::intrusive_ptr<Cluster> cluster; + + ClusterPlugin() : options(values) {} + + Options* getOptions() { return &options; } - template <class Chain> void init(Plugin::Target& t) { - Chain* c = dynamic_cast<Chain*>(&t); - if (c) cluster->initialize(*c); + void init(broker::Broker& b) { + if (values.name.empty()) return; // Only if --cluster-name option was specified. + if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process."); + cluster = new Cluster(values.name, values.getUrl(b.getPort()), b); + b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); + } + + template <class T> void init(T& t) { + if (cluster) cluster->initialize(t); + } + + template <class T> bool init(Plugin::Target& target) { + T* t = dynamic_cast<T*>(&target); + if (t) init(*t); + return t; } void earlyInitialize(Plugin::Target&) {} void initialize(Plugin::Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker && !options.name.empty()) { - if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process."); - cluster = boost::in_place(options.name, - options.getUrl(broker->getPort()), - boost::ref(*broker)); - return; - } - if (!cluster) return; // Ignore chain handlers if we didn't init a cluster. - init<ConnectionChain>(target); + if (init<broker::Broker>(target)) return; + if (!cluster) return; // Remaining plugins only valid if cluster initialized. + if (init<broker::Connection>(target)) return; } + + void shutdown() { cluster = 0; } }; static ClusterPlugin instance; // Static initialization. + +// For test purposes. +boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.cpp b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp new file mode 100644 index 0000000000..5283ba9b1a --- /dev/null +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionInterceptor.h" +#include "qpid/framing/ClusterConnectionCloseBody.h" +#include "qpid/framing/AMQFrame.h" + +namespace qpid { +namespace cluster { + +using namespace framing; + +template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; b = c; } + +ConnectionInterceptor::ConnectionInterceptor( + broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_) + : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_) +{ + connection->addFinalizer(boost::bind(operator delete, this)); + // Attach my functions to Connection extension points. + shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1)); + shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this)); +} + +ConnectionInterceptor::~ConnectionInterceptor() { + assert(isClosed); + assert(connection == 0); +} + +void ConnectionInterceptor::received(framing::AMQFrame& f) { + if (isClosed) return; + cluster.send(f, this); +} + +void ConnectionInterceptor::deliver(framing::AMQFrame& f) { + receivedNext(f); +} + +void ConnectionInterceptor::closed() { + if (isClosed) return; + try { + // Called when the local network connection is closed. We still + // need to process any outstanding cluster frames for this + // connection to ensure our sessions are up-to-date. We defer + // closing the Connection object till deliverClosed(), but replace + // its output handler with a null handler since the network output + // handler will be deleted. + // + connection->setOutputHandler(&discardHandler); + cluster.send(AMQFrame(in_place<ClusterConnectionCloseBody>()), this); + isClosed = true; + } + catch (const std::exception& e) { + QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); + } +} + +void ConnectionInterceptor::deliverClosed() { + closedNext(); + // Drop reference so connection will be deleted, which in turn + // will delete this via finalizer added in ctor. + connection = 0; +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionInterceptor.h b/cpp/src/qpid/cluster/ConnectionInterceptor.h new file mode 100644 index 0000000000..d499acb832 --- /dev/null +++ b/cpp/src/qpid/cluster/ConnectionInterceptor.h @@ -0,0 +1,77 @@ +#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H +#define QPID_CLUSTER_CONNECTIONPLUGIN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Cluster.h" +#include "qpid/broker/Connection.h" +#include "qpid/sys/ConnectionOutputHandler.h" + +namespace qpid { +namespace framing { class AMQFrame; } +namespace cluster { + +/** + * Plug-in associated with broker::Connections, both local and shadow. + */ +class ConnectionInterceptor { + public: + ConnectionInterceptor(broker::Connection&, Cluster&, + Cluster::ShadowConnectionId shadowId=Cluster::ShadowConnectionId(0,0)); + ~ConnectionInterceptor(); + + // Called on self-delivery + void deliver(framing::AMQFrame& f); + + // Called on self-delivery of my own cluster.connection-close + void deliverClosed(); + + Cluster::ShadowConnectionId getShadowId() const { return shadowId; } + + bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); } + + private: + struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler { + void close() {} + void send(framing::AMQFrame&) {} + void doOutput() {} + void activateOutput() {} + }; + + // Functions to add to Connection extension points. + void received(framing::AMQFrame&); + void closed(); + + boost::function<void(framing::AMQFrame&)> receivedNext; + boost::function<void()> closedNext; + + boost::intrusive_ptr<broker::Connection> connection; + Cluster& cluster; + NullConnectionHandler discardHandler; + bool isClosed; + Cluster::ShadowConnectionId shadowId; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/ + diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 3118e11e57..674781ac06 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -62,7 +62,7 @@ void Cpg::globalConfigChange( cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); } -Cpg::Cpg(Handler& h) : handler(h) { +Cpg::Cpg(Handler& h) : handler(h), isShutdown(false) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); check(cpg_context_set(handle, this), "Cannot set CPG context"); @@ -78,10 +78,10 @@ Cpg::~Cpg() { } void Cpg::shutdown() { - if (handle) { - cpg_context_set(handle, 0); + if (!isShutdown) { + QPID_LOG(debug,"Shutting down CPG"); + isShutdown=true; check(cpg_finalize(handle), "Error in shutdown of CPG"); - handle = 0; } } diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index d3142efcb6..c89bf3e121 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -165,6 +165,7 @@ class Cpg : public Dispatchable { cpg_handle_t handle; Handler& handler; + bool isShutdown; }; std::ostream& operator <<(std::ostream& out, const cpg_name& name); |
