diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 132 |
1 files changed, 46 insertions, 86 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index c441686def..7fb2e5ad58 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -26,9 +26,7 @@ #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" -#include "qpid/framing/ClusterReadyBody.h" -#include "qpid/framing/ClusterDumpErrorBody.h" -#include "qpid/framing/ClusterMapBody.h" +#include "qpid/framing/ClusterUpdateBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -56,11 +54,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); } - void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); } - void ready(const std::string& u) { cluster.ready(member, u); } - virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) { - cluster.mapInit(members, dumpees, dumps); - } + void update(const FieldTable& members,bool dumping) { cluster.update(members, dumping); } }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -76,7 +70,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::disconnect, this, _1) // disconnect ), connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), - state(DISCARD) + state(START) { QPID_LOG(notice, self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); @@ -127,7 +121,7 @@ void Cluster::mcastEvent(const Event& e) { size_t Cluster::size() const { Mutex::ScopedLock l(lock); - return map.memberCount(); + return map.size(); } std::vector<Url> Cluster::getUrls() const { @@ -229,7 +223,7 @@ void Cluster::configChange( cpg_name */*group*/, cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) + cpg_address */*joined*/, int nJoined) { // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node. QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent)); @@ -240,19 +234,17 @@ void Cluster::configChange( broker.shutdown(); } Mutex::ScopedLock l(lock); - if (state == DISCARD) { - if (nCurrent == 1 && *current == self) { - QPID_LOG(notice, self << " first in cluster."); - map.ready(self, url); - ready(); // First in cluster. - } - else if (find(joined, joined+nJoined, self) != joined+nJoined) { - QPID_LOG(notice, self << " requesting state dump."); // Just joined - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); - } + map.configChange(current, nCurrent); + if (state == START && nCurrent == 1) { // First in cluster + assert(*current == self); + assert(map.empty()); + QPID_LOG(notice, self << " first in cluster."); + map.insert(self, url); + ready(); + } + else if (nJoined && self == map.first()) { // Send an update to new members. + mcastControl(map.toControl(), 0); } - for (int i = 0; i < nLeft; ++i) - map.leave(left[i]); } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -268,33 +260,42 @@ void Cluster::disconnect(sys::DispatchHandle& ) { broker.shutdown(); } -// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place. -// Only one at a time to simplify things? -void Cluster::dumpRequest(const MemberId& m, const string& urlStr) { +void Cluster::update(const FieldTable& members, bool dumping) { Mutex::ScopedLock l(lock); - Url url(urlStr); - if (self == m) { - switch (state) { - case DISCARD: state = CATCHUP; stall(); break; - case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map. - default: assert(0); - }; - } - else if (self == map.dumpRequest(m, url)) { - assert(state == READY); - QPID_LOG(info, self << " dumping to " << url); - // state = DUMPING; - // stall(); - // FIXME aconway 2008-09-15: need to stall map? - // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. - mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump. + map.update(members, dumping); + QPID_LOG(info, "Cluster update:\n " << map); + if (state == START && dumping == false) { + state = DISCARD; + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); } } -void Cluster::ready(const MemberId& m, const string& urlStr) { +void Cluster::dumpRequest(const MemberId& m, const string& urlStr) { Mutex::ScopedLock l(lock); - Url url(urlStr); - map.ready(m, url); + bool wasDumping = map.isDumping(); + map.setDumping(true); + if (!wasDumping) { + if (self == m) { // My turn + assert(state == DISCARD); + // FIXME aconway 2008-09-15: RECEIVE DUMP + // state = CATCHUP; + // stall(); + // When received + map.insert(self, url); + mcastControl(map.toControl(), 0); + ready(); + } + else if (state == READY && self == map.first()) { // Give the dump. + QPID_LOG(info, self << " dumping to " << url); + // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. + // state = DUMPING; + // stall(); + (void)urlStr; + // When dump complete: + map.setDumping(false); + mcastControl(map.toControl(), 0); + } + } } broker::Broker& Cluster::getBroker(){ return broker; } @@ -314,7 +315,6 @@ void Cluster::ready() { // Called with lock held QPID_LOG(info, self << " ready with URL " << url); state = READY; - mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); connectionEventQueue.start(poller); // FIXME aconway 2008-09-15: stall/unstall map? } @@ -331,45 +331,5 @@ void Cluster::shutdown() { delete this; } -/** Received from cluster */ -void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) { - QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee); - Mutex::ScopedLock l(lock); - map.dumpError(dumpee); - if (state == DUMPING && map.dumps(self) == 0) - ready(); -} - -/** Called in local dump thread */ -void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) { - assert(state == DUMPING); - QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg); - mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0); - Mutex::ScopedLock l(lock); - map.dumpError(dumpee); - if (map.dumps(self) == 0) // Unstall immediately. - ready(); -} - -void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) { - Mutex::ScopedLock l(lock); - // FIXME aconway 2008-09-15: faking out dump here. - switch (state) { - case DISCARD: - map.init(members, dumpees, dumps); - state = HAVE_DUMP; - break; - case CATCHUP: - map.init(members, dumpees, dumps); - ready(); - break; - default: - break; - } -} - -void Cluster::dumpTo(const Url& ) { - // FIXME aconway 2008-09-12: DumpClient -} }} // namespace qpid::cluster |
