summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp132
1 files changed, 46 insertions, 86 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index c441686def..7fb2e5ad58 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -26,9 +26,7 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
-#include "qpid/framing/ClusterDumpErrorBody.h"
-#include "qpid/framing/ClusterMapBody.h"
+#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -56,11 +54,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); }
- void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); }
- void ready(const std::string& u) { cluster.ready(member, u); }
- virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
- cluster.mapInit(members, dumpees, dumps);
- }
+ void update(const FieldTable& members,bool dumping) { cluster.update(members, dumping); }
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -76,7 +70,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
- state(DISCARD)
+ state(START)
{
QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
@@ -127,7 +121,7 @@ void Cluster::mcastEvent(const Event& e) {
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
- return map.memberCount();
+ return map.size();
}
std::vector<Url> Cluster::getUrls() const {
@@ -229,7 +223,7 @@ void Cluster::configChange(
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined)
+ cpg_address */*joined*/, int nJoined)
{
// FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent));
@@ -240,19 +234,17 @@ void Cluster::configChange(
broker.shutdown();
}
Mutex::ScopedLock l(lock);
- if (state == DISCARD) {
- if (nCurrent == 1 && *current == self) {
- QPID_LOG(notice, self << " first in cluster.");
- map.ready(self, url);
- ready(); // First in cluster.
- }
- else if (find(joined, joined+nJoined, self) != joined+nJoined) {
- QPID_LOG(notice, self << " requesting state dump."); // Just joined
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
- }
+ map.configChange(current, nCurrent);
+ if (state == START && nCurrent == 1) { // First in cluster
+ assert(*current == self);
+ assert(map.empty());
+ QPID_LOG(notice, self << " first in cluster.");
+ map.insert(self, url);
+ ready();
+ }
+ else if (nJoined && self == map.first()) { // Send an update to new members.
+ mcastControl(map.toControl(), 0);
}
- for (int i = 0; i < nLeft; ++i)
- map.leave(left[i]);
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -268,33 +260,42 @@ void Cluster::disconnect(sys::DispatchHandle& ) {
broker.shutdown();
}
-// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place.
-// Only one at a time to simplify things?
-void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
+void Cluster::update(const FieldTable& members, bool dumping) {
Mutex::ScopedLock l(lock);
- Url url(urlStr);
- if (self == m) {
- switch (state) {
- case DISCARD: state = CATCHUP; stall(); break;
- case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map.
- default: assert(0);
- };
- }
- else if (self == map.dumpRequest(m, url)) {
- assert(state == READY);
- QPID_LOG(info, self << " dumping to " << url);
- // state = DUMPING;
- // stall();
- // FIXME aconway 2008-09-15: need to stall map?
- // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
- mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump.
+ map.update(members, dumping);
+ QPID_LOG(info, "Cluster update:\n " << map);
+ if (state == START && dumping == false) {
+ state = DISCARD;
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
}
}
-void Cluster::ready(const MemberId& m, const string& urlStr) {
+void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
Mutex::ScopedLock l(lock);
- Url url(urlStr);
- map.ready(m, url);
+ bool wasDumping = map.isDumping();
+ map.setDumping(true);
+ if (!wasDumping) {
+ if (self == m) { // My turn
+ assert(state == DISCARD);
+ // FIXME aconway 2008-09-15: RECEIVE DUMP
+ // state = CATCHUP;
+ // stall();
+ // When received
+ map.insert(self, url);
+ mcastControl(map.toControl(), 0);
+ ready();
+ }
+ else if (state == READY && self == map.first()) { // Give the dump.
+ QPID_LOG(info, self << " dumping to " << url);
+ // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
+ // state = DUMPING;
+ // stall();
+ (void)urlStr;
+ // When dump complete:
+ map.setDumping(false);
+ mcastControl(map.toControl(), 0);
+ }
+ }
}
broker::Broker& Cluster::getBroker(){ return broker; }
@@ -314,7 +315,6 @@ void Cluster::ready() {
// Called with lock held
QPID_LOG(info, self << " ready with URL " << url);
state = READY;
- mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
connectionEventQueue.start(poller);
// FIXME aconway 2008-09-15: stall/unstall map?
}
@@ -331,45 +331,5 @@ void Cluster::shutdown() {
delete this;
}
-/** Received from cluster */
-void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) {
- QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee);
- Mutex::ScopedLock l(lock);
- map.dumpError(dumpee);
- if (state == DUMPING && map.dumps(self) == 0)
- ready();
-}
-
-/** Called in local dump thread */
-void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) {
- assert(state == DUMPING);
- QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg);
- mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0);
- Mutex::ScopedLock l(lock);
- map.dumpError(dumpee);
- if (map.dumps(self) == 0) // Unstall immediately.
- ready();
-}
-
-void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
- Mutex::ScopedLock l(lock);
- // FIXME aconway 2008-09-15: faking out dump here.
- switch (state) {
- case DISCARD:
- map.init(members, dumpees, dumps);
- state = HAVE_DUMP;
- break;
- case CATCHUP:
- map.init(members, dumpees, dumps);
- ready();
- break;
- default:
- break;
- }
-}
-
-void Cluster::dumpTo(const Url& ) {
- // FIXME aconway 2008-09-12: DumpClient
-}
}} // namespace qpid::cluster