diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
| commit | ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch) | |
| tree | d8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src/qpid/cluster/Cluster.h | |
| parent | 2141967346b884e592a42353ae596d37eb90fe7b (diff) | |
| download | qpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz | |
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions.
client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang.
src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.h')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 171 |
1 files changed, 93 insertions, 78 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index a8c916a99b..d1cf4b752f 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -23,18 +23,18 @@ #include "Event.h" #include "NoOpConnectionOutputHandler.h" #include "ClusterMap.h" -#include "JoiningHandler.h" -#include "MemberHandler.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Monitor.h" -#include "qpid/Url.h" +#include "qpid/sys/LockPtr.h" #include "qpid/management/Manageable.h" +#include "qpid/Url.h" #include "qmf/org/apache/qpid/cluster/Cluster.h" #include <boost/intrusive_ptr.hpp> #include <boost/bind.hpp> +#include <boost/optional.hpp> #include <algorithm> #include <vector> @@ -47,12 +47,13 @@ class Connection; /** * Connection to the cluster. - * Keeps cluster membership data. + * */ -class Cluster : private Cpg::Handler, public management::Manageable -{ +class Cluster : private Cpg::Handler, public management::Manageable { public: - + typedef boost::intrusive_ptr<Connection> ConnectionPtr; + typedef std::vector<ConnectionPtr> Connections; + /** * Join a cluster. * @param name of the cluster. @@ -62,58 +63,68 @@ class Cluster : private Cpg::Handler, public management::Manageable virtual ~Cluster(); - // FIXME aconway 2008-09-26: thread safety - void insert(const boost::intrusive_ptr<Connection>&); + // Connection map + void insert(const ConnectionPtr&); void erase(ConnectionId); - void dumpComplete(); - - /** Get the URLs of current cluster members. */ - std::vector<Url> getUrls() const; - - /** Number of members in the cluster. */ - size_t size() const; - - bool empty() const { return size() == 0; } - /** Send to the cluster */ - void mcastControl(const framing::AMQBody& controlBody, Connection* cptr); + // Send to the cluster + void mcastControl(const framing::AMQBody& controlBody, Connection* cptr=0); void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id); - void mcastEvent(const Event& e); - - /** Leave the cluster */ - void leave(); + void mcast(const Event& e); - MemberId getSelf() const { return self; } - MemberId getId() const { return self; } + // URLs of current cluster members. + std::vector<Url> getUrls() const; - void ready(); - void stall(); - void unstall(); + // Leave the cluster + void leave(); - void brokerShutdown(); + // Dump completedJo + void dumpInDone(const ClusterMap&); - broker::Broker& getBroker(); + MemberId getId() const; + broker::Broker& getBroker() const; - template <class F> void eachConnection(const F& f) { - for (ConnectionMap::const_iterator i = connections.begin(); i != connections.end(); ++i) - f(i->second); - } - private: + typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr; + typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr; + typedef sys::Monitor::ScopedLock Lock; + typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; - typedef sys::PollableQueue<Event> EventQueue; - enum State { - START, // Start state, no cluster update received yet. - DISCARD, // Discard updates up to dump start point. - CATCHUP, // Stalled at catchup point, waiting for dump. - DUMPING, // Stalled while sending a state dump. - READY // Normal processing. - }; - - void connectionEvent(const Event&); - - /** CPG deliver callback. */ - void deliver( + typedef sys::PollableQueue<Event> PollableEventQueue; + typedef std::deque<Event> PlainEventQueue; + + // Unlocked versions of public functions + void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&); + void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&); + void mcast(const Event& e, Lock&); + void leave(Lock&); + std::vector<Url> getUrls(Lock&) const; + + // Called via CPG, deliverQueue or DumpClient threads. + void tryMakeOffer(const MemberId&, Lock&); + + // Called in CPG, connection IO and DumpClient threads. + void unstall(Lock&); + + // Called in main thread in ~Broker. + void brokerShutdown(); + + // Cluster controls implement XML methods from cluster.xml. + // May be called in CPG thread via deliver() OR in deliverQueue thread. + // + void dumpRequest(const MemberId&, const std::string&, Lock&); + void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&); + void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&); + void ready(const MemberId&, const std::string&, Lock&); + void shutdown(const MemberId&, Lock&); + void process(const Event&); // deliverQueue callback + void process(const Event&, Lock&); // unlocked version + + // CPG callbacks, called in CPG IO thread. + void dispatch(sys::DispatchHandle&); // Dispatch CPG events. + void disconnect(sys::DispatchHandle&); // PG was disconnected + + void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, struct cpg_name *group, uint32_t /*nodeid*/, @@ -121,8 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable void* /*msg*/, int /*msg_len*/); - /** CPG config change callback */ - void configChange( + void configChange( // CPG config change callback. cpg_handle_t /*handle*/, struct cpg_name */*group*/, struct cpg_address */*members*/, int /*nMembers*/, @@ -130,45 +140,50 @@ class Cluster : private Cpg::Handler, public management::Manageable struct cpg_address */*joined*/, int /*nJoined*/ ); - /** Callback to dispatch CPG events. */ - void dispatch(sys::DispatchHandle&); - /** Callback if CPG fd is disconnected. */ - void disconnect(sys::DispatchHandle&); + boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&, Lock&); + Connections getConnections(Lock&); - boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); - - virtual qpid::management::ManagementObject* GetManagementObject(void) const; + virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - void stopClusterNode(void); - void stopFullCluster(void); - void updateMemberStats(void); + void stopClusterNode(); + void stopFullCluster(); + void updateMemberStats(Lock&); + + // Called in connection IO threads . + void checkDumpIn(Lock&); + + // Called in DumpClient thread. + void dumpOutDone(); + void dumpOutError(const std::exception&); + void dumpOutDone(Lock&); + + mutable sys::Monitor lock; - mutable sys::Monitor lock; // Protect access to members. broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; - Cpg::Name name; - Url url; - ClusterMap map; - MemberId self; + const Cpg::Name name; + const Url myUrl; + const MemberId memberId; + ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - EventQueue connectionEventQueue; - State state; - qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle + PollableEventQueue deliverQueue; + PlainEventQueue mcastQueue; + uint32_t mcastId; - // Handlers for different states. - ClusterHandler* handler; - JoiningHandler joiningHandler; - MemberHandler memberHandler; + qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle - uint32_t mcastId; - size_t lastSize; + enum { INIT, NEWBIE, DUMPEE, READY, OFFER, DUMPER, LEFT } state; + ClusterMap map; + sys::Thread dumpThread; + boost::optional<ClusterMap> dumpedMap; + + size_t lastSize; - friend class ClusterHandler; - friend class JoiningHandler; - friend class MemberHandler; + friend std::ostream& operator<<(std::ostream&, const Cluster&); + friend class ClusterDispatcher; }; }} // namespace qpid::cluster |
