diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.h')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 62 |
1 files changed, 30 insertions, 32 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index feeb68fd4b..f962f4c72f 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -26,6 +26,7 @@ #include "ConnectionMap.h" #include "FailoverExchange.h" #include "Quorum.h" +#include "Multicaster.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" @@ -55,8 +56,10 @@ namespace cluster { class Connection; /** - * Connection to the cluster. + * Connection to the cluster * + * Threading notes: 3 thread categories: connection, deliver, dump. + * */ class Cluster : private Cpg::Handler, public management::Manageable { public: @@ -70,29 +73,26 @@ class Cluster : private Cpg::Handler, public management::Manageable { virtual ~Cluster(); - // Connection map + // Connection map - called in connection threads. void insert(const ConnectionPtr&); void erase(ConnectionId); - // Send to the cluster - void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t id); - void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id); - - // URLs of current cluster members. + // URLs of current cluster members - called in connection threads. std::vector<Url> getUrls() const; boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } - // Leave the cluster + // Leave the cluster - called in any thread. void leave(); - // Dump completedJo + // Dump completed - called in dump thread void dumpInDone(const ClusterMap&); MemberId getId() const; broker::Broker& getBroker() const; + Multicaster& getMulticast() { return mcast; } boost::function<bool ()> isQuorate; - void checkQuorum(); + void checkQuorum(); // called in connection threads. size_t getReadMax() { return readMax; } @@ -109,22 +109,17 @@ class Cluster : private Cpg::Handler, public management::Manageable { // The parameter makes it hard to forget since you have to have an instance of // a Lock to call the unlocked functions. - void mcastControl(const framing::AMQBody& controlBody); - void mcast(const Event& e); - void leave(Lock&); std::vector<Url> getUrls(Lock&) const; - void sendMcast(PollableEventQueue::Queue& ); - - // Called via CPG, deliverQueue or DumpClient threads. + // Make an offer if we can - called in deliver thread. void tryMakeOffer(const MemberId&, Lock&); // Called in main thread in ~Broker. void brokerShutdown(); // Cluster controls implement XML methods from cluster.xml. - // May be called in CPG thread via deliver() OR in deliverQueue thread. + // Called in deliver thread. // void dumpRequest(const MemberId&, const std::string&, Lock&); void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&); @@ -134,6 +129,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void delivered(PollableEventQueue::Queue&); // deliverQueue callback void deliveredEvent(const Event&); + // Helper, called in deliver thread. void dumpStart(const MemberId& dumpee, const Url& url, Lock&); // CPG callbacks, called in CPG IO thread. @@ -177,25 +173,31 @@ class Cluster : private Cpg::Handler, public management::Manageable { void setClusterId(const framing::Uuid&); - mutable sys::Monitor lock; - + // Immutable members set on construction, never changed. broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; const std::string name; const Url myUrl; const MemberId myId; - - ConnectionMap connections; + const size_t readMax; + framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - PollableEventQueue deliverQueue, mcastQueue; - PlainEventQueue mcastStallQueue; - uint32_t mcastId; - framing::Uuid clusterId; + + // Thread safe members + Multicaster mcast; qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle + PollableEventQueue deliverQueue; + ConnectionMap connections; + boost::shared_ptr<FailoverExchange> failoverExchange; + Quorum quorum; + + // Remaining members are protected by lock. + mutable sys::Monitor lock; + // Local cluster state, cluster map enum { INIT, ///< Initial state, no CPG messages received. NEWBIE, ///< Sent dump request, waiting for dump offer. @@ -206,17 +208,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { DUMPER, ///< Offer accepted, sending a state dump. LEFT ///< Final state, left the cluster. } state; - ClusterMap map; - sys::Thread dumpThread; - boost::optional<ClusterMap> dumpedMap; - size_t lastSize; bool lastBroker; - boost::shared_ptr<FailoverExchange> failoverExchange; - Quorum quorum; - size_t readMax; + // Dump related + sys::Thread dumpThread; + boost::optional<ClusterMap> dumpedMap; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; |
