summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.h')
-rw-r--r--cpp/src/qpid/cluster/Cluster.h62
1 files changed, 30 insertions, 32 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index feeb68fd4b..f962f4c72f 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -26,6 +26,7 @@
#include "ConnectionMap.h"
#include "FailoverExchange.h"
#include "Quorum.h"
+#include "Multicaster.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
@@ -55,8 +56,10 @@ namespace cluster {
class Connection;
/**
- * Connection to the cluster.
+ * Connection to the cluster
*
+ * Threading notes: 3 thread categories: connection, deliver, dump.
+ *
*/
class Cluster : private Cpg::Handler, public management::Manageable {
public:
@@ -70,29 +73,26 @@ class Cluster : private Cpg::Handler, public management::Manageable {
virtual ~Cluster();
- // Connection map
+ // Connection map - called in connection threads.
void insert(const ConnectionPtr&);
void erase(ConnectionId);
- // Send to the cluster
- void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t id);
- void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
-
- // URLs of current cluster members.
+ // URLs of current cluster members - called in connection threads.
std::vector<Url> getUrls() const;
boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
- // Leave the cluster
+ // Leave the cluster - called in any thread.
void leave();
- // Dump completedJo
+ // Dump completed - called in dump thread
void dumpInDone(const ClusterMap&);
MemberId getId() const;
broker::Broker& getBroker() const;
+ Multicaster& getMulticast() { return mcast; }
boost::function<bool ()> isQuorate;
- void checkQuorum();
+ void checkQuorum(); // called in connection threads.
size_t getReadMax() { return readMax; }
@@ -109,22 +109,17 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// The parameter makes it hard to forget since you have to have an instance of
// a Lock to call the unlocked functions.
- void mcastControl(const framing::AMQBody& controlBody);
- void mcast(const Event& e);
-
void leave(Lock&);
std::vector<Url> getUrls(Lock&) const;
- void sendMcast(PollableEventQueue::Queue& );
-
- // Called via CPG, deliverQueue or DumpClient threads.
+ // Make an offer if we can - called in deliver thread.
void tryMakeOffer(const MemberId&, Lock&);
// Called in main thread in ~Broker.
void brokerShutdown();
// Cluster controls implement XML methods from cluster.xml.
- // May be called in CPG thread via deliver() OR in deliverQueue thread.
+ // Called in deliver thread.
//
void dumpRequest(const MemberId&, const std::string&, Lock&);
void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&);
@@ -134,6 +129,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void delivered(PollableEventQueue::Queue&); // deliverQueue callback
void deliveredEvent(const Event&);
+ // Helper, called in deliver thread.
void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
// CPG callbacks, called in CPG IO thread.
@@ -177,25 +173,31 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void setClusterId(const framing::Uuid&);
- mutable sys::Monitor lock;
-
+ // Immutable members set on construction, never changed.
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
const std::string name;
const Url myUrl;
const MemberId myId;
-
- ConnectionMap connections;
+ const size_t readMax;
+ framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableEventQueue deliverQueue, mcastQueue;
- PlainEventQueue mcastStallQueue;
- uint32_t mcastId;
- framing::Uuid clusterId;
+
+ // Thread safe members
+ Multicaster mcast;
qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+ PollableEventQueue deliverQueue;
+ ConnectionMap connections;
+ boost::shared_ptr<FailoverExchange> failoverExchange;
+ Quorum quorum;
+
+ // Remaining members are protected by lock.
+ mutable sys::Monitor lock;
+ // Local cluster state, cluster map
enum {
INIT, ///< Initial state, no CPG messages received.
NEWBIE, ///< Sent dump request, waiting for dump offer.
@@ -206,17 +208,13 @@ class Cluster : private Cpg::Handler, public management::Manageable {
DUMPER, ///< Offer accepted, sending a state dump.
LEFT ///< Final state, left the cluster.
} state;
-
ClusterMap map;
- sys::Thread dumpThread;
- boost::optional<ClusterMap> dumpedMap;
-
size_t lastSize;
bool lastBroker;
- boost::shared_ptr<FailoverExchange> failoverExchange;
- Quorum quorum;
- size_t readMax;
+ // Dump related
+ sys::Thread dumpThread;
+ boost::optional<ClusterMap> dumpedMap;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;