summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.h
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
committerAlan Conway <aconway@apache.org>2008-10-03 20:56:38 +0000
commitff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch)
treed8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src/qpid/cluster/Cluster.h
parent2141967346b884e592a42353ae596d37eb90fe7b (diff)
downloadqpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions. client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang. src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.h')
-rw-r--r--cpp/src/qpid/cluster/Cluster.h171
1 files changed, 93 insertions, 78 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index a8c916a99b..d1cf4b752f 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -23,18 +23,18 @@
#include "Event.h"
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
-#include "JoiningHandler.h"
-#include "MemberHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/Url.h"
+#include "qpid/sys/LockPtr.h"
#include "qpid/management/Manageable.h"
+#include "qpid/Url.h"
#include "qmf/org/apache/qpid/cluster/Cluster.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/bind.hpp>
+#include <boost/optional.hpp>
#include <algorithm>
#include <vector>
@@ -47,12 +47,13 @@ class Connection;
/**
* Connection to the cluster.
- * Keeps cluster membership data.
+ *
*/
-class Cluster : private Cpg::Handler, public management::Manageable
-{
+class Cluster : private Cpg::Handler, public management::Manageable {
public:
-
+ typedef boost::intrusive_ptr<Connection> ConnectionPtr;
+ typedef std::vector<ConnectionPtr> Connections;
+
/**
* Join a cluster.
* @param name of the cluster.
@@ -62,58 +63,68 @@ class Cluster : private Cpg::Handler, public management::Manageable
virtual ~Cluster();
- // FIXME aconway 2008-09-26: thread safety
- void insert(const boost::intrusive_ptr<Connection>&);
+ // Connection map
+ void insert(const ConnectionPtr&);
void erase(ConnectionId);
- void dumpComplete();
-
- /** Get the URLs of current cluster members. */
- std::vector<Url> getUrls() const;
-
- /** Number of members in the cluster. */
- size_t size() const;
-
- bool empty() const { return size() == 0; }
- /** Send to the cluster */
- void mcastControl(const framing::AMQBody& controlBody, Connection* cptr);
+ // Send to the cluster
+ void mcastControl(const framing::AMQBody& controlBody, Connection* cptr=0);
void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
- void mcastEvent(const Event& e);
-
- /** Leave the cluster */
- void leave();
+ void mcast(const Event& e);
- MemberId getSelf() const { return self; }
- MemberId getId() const { return self; }
+ // URLs of current cluster members.
+ std::vector<Url> getUrls() const;
- void ready();
- void stall();
- void unstall();
+ // Leave the cluster
+ void leave();
- void brokerShutdown();
+ // Dump completedJo
+ void dumpInDone(const ClusterMap&);
- broker::Broker& getBroker();
+ MemberId getId() const;
+ broker::Broker& getBroker() const;
- template <class F> void eachConnection(const F& f) {
- for (ConnectionMap::const_iterator i = connections.begin(); i != connections.end(); ++i)
- f(i->second);
- }
-
private:
+ typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
+ typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
+ typedef sys::Monitor::ScopedLock Lock;
+
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
- typedef sys::PollableQueue<Event> EventQueue;
- enum State {
- START, // Start state, no cluster update received yet.
- DISCARD, // Discard updates up to dump start point.
- CATCHUP, // Stalled at catchup point, waiting for dump.
- DUMPING, // Stalled while sending a state dump.
- READY // Normal processing.
- };
-
- void connectionEvent(const Event&);
-
- /** CPG deliver callback. */
- void deliver(
+ typedef sys::PollableQueue<Event> PollableEventQueue;
+ typedef std::deque<Event> PlainEventQueue;
+
+ // Unlocked versions of public functions
+ void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&);
+ void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&);
+ void mcast(const Event& e, Lock&);
+ void leave(Lock&);
+ std::vector<Url> getUrls(Lock&) const;
+
+ // Called via CPG, deliverQueue or DumpClient threads.
+ void tryMakeOffer(const MemberId&, Lock&);
+
+ // Called in CPG, connection IO and DumpClient threads.
+ void unstall(Lock&);
+
+ // Called in main thread in ~Broker.
+ void brokerShutdown();
+
+ // Cluster controls implement XML methods from cluster.xml.
+ // May be called in CPG thread via deliver() OR in deliverQueue thread.
+ //
+ void dumpRequest(const MemberId&, const std::string&, Lock&);
+ void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&);
+ void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&);
+ void ready(const MemberId&, const std::string&, Lock&);
+ void shutdown(const MemberId&, Lock&);
+ void process(const Event&); // deliverQueue callback
+ void process(const Event&, Lock&); // unlocked version
+
+ // CPG callbacks, called in CPG IO thread.
+ void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
+ void disconnect(sys::DispatchHandle&); // PG was disconnected
+
+ void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
struct cpg_name *group,
uint32_t /*nodeid*/,
@@ -121,8 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable
void* /*msg*/,
int /*msg_len*/);
- /** CPG config change callback */
- void configChange(
+ void configChange( // CPG config change callback.
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
struct cpg_address */*members*/, int /*nMembers*/,
@@ -130,45 +140,50 @@ class Cluster : private Cpg::Handler, public management::Manageable
struct cpg_address */*joined*/, int /*nJoined*/
);
- /** Callback to dispatch CPG events. */
- void dispatch(sys::DispatchHandle&);
- /** Callback if CPG fd is disconnected. */
- void disconnect(sys::DispatchHandle&);
+ boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&, Lock&);
+ Connections getConnections(Lock&);
- boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
-
- virtual qpid::management::ManagementObject* GetManagementObject(void) const;
+ virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
- void stopClusterNode(void);
- void stopFullCluster(void);
- void updateMemberStats(void);
+ void stopClusterNode();
+ void stopFullCluster();
+ void updateMemberStats(Lock&);
+
+ // Called in connection IO threads .
+ void checkDumpIn(Lock&);
+
+ // Called in DumpClient thread.
+ void dumpOutDone();
+ void dumpOutError(const std::exception&);
+ void dumpOutDone(Lock&);
+
+ mutable sys::Monitor lock;
- mutable sys::Monitor lock; // Protect access to members.
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
- Cpg::Name name;
- Url url;
- ClusterMap map;
- MemberId self;
+ const Cpg::Name name;
+ const Url myUrl;
+ const MemberId memberId;
+
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- EventQueue connectionEventQueue;
- State state;
- qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+ PollableEventQueue deliverQueue;
+ PlainEventQueue mcastQueue;
+ uint32_t mcastId;
- // Handlers for different states.
- ClusterHandler* handler;
- JoiningHandler joiningHandler;
- MemberHandler memberHandler;
+ qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
- uint32_t mcastId;
- size_t lastSize;
+ enum { INIT, NEWBIE, DUMPEE, READY, OFFER, DUMPER, LEFT } state;
+ ClusterMap map;
+ sys::Thread dumpThread;
+ boost::optional<ClusterMap> dumpedMap;
+
+ size_t lastSize;
- friend class ClusterHandler;
- friend class JoiningHandler;
- friend class MemberHandler;
+ friend std::ostream& operator<<(std::ostream&, const Cluster&);
+ friend class ClusterDispatcher;
};
}} // namespace qpid::cluster