summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-15 19:39:22 +0000
committerAlan Conway <aconway@apache.org>2008-09-15 19:39:22 +0000
commite60518c80a7ee6e96719a365d84b777aee59df4f (patch)
treeba4d2cc340b6497265df9624fb0385241a03b463 /cpp/src/qpid/cluster
parent6099da5735246f255eb62be535a2f462c7d3bab9 (diff)
downloadqpid-python-e60518c80a7ee6e96719a365d84b777aee59df4f.tar.gz
Cluster member stalling, cluster map updates and unit tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@695593 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp231
-rw-r--r--cpp/src/qpid/cluster/Cluster.h40
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp101
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h43
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp17
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h11
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp3
-rw-r--r--cpp/src/qpid/cluster/types.h7
9 files changed, 292 insertions, 165 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9db2a61a82..c441686def 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -25,7 +25,10 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterDumpErrorBody.h"
+#include "qpid/framing/ClusterMapBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -50,19 +53,14 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
Cluster& cluster;
MemberId member;
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- void urlNotice(const std::string& u) { cluster.urlNotice(member, u); }
- void ready() { cluster.ready(member); }
-
- void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) {
- assert(0); // Not passed to cluster, used to start a brain dump over TCP.
- }
-
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
- virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) {
- // FIXME aconway 2008-09-12: TODO
+ void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); }
+ void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); }
+ void ready(const std::string& u) { cluster.ready(member, u); }
+ virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
+ cluster.mapInit(members, dumpees, dumps);
}
-
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -80,17 +78,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
state(DISCARD)
{
- QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
+ QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
- cpg.join(name);
-
- connectionEventQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
+ cpg.join(name);
+
}
-Cluster::~Cluster() {
- QPID_LOG(debug, "~Cluster()");
-}
+Cluster::~Cluster() {}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Mutex::ScopedLock l(lock);
@@ -102,60 +97,47 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
-// FIXME aconway 2008-09-10: leave is currently not called,
-// It should be called if we are shut down by a cluster admin command.
+// FIXME aconway 2008-09-10: call leave from cluster admin command.
// Any other type of exit is caught in disconnect().
//
void Cluster::leave() {
- QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
+ QPID_LOG(notice, self << " leaving cluster " << name.str());
cpg.leave(name);
- // Cluster will shut down in configChange when the cluster knows we've left.
-}
-
-template <class T> void decodePtr(Buffer& buf, T*& ptr) {
- uint64_t value = buf.getLongLong();
- ptr = reinterpret_cast<T*>(value);
-}
-
-template <class T> void encodePtr(Buffer& buf, T* ptr) {
- uint64_t value = reinterpret_cast<uint64_t>(ptr);
- buf.putLongLong(value);
+ // Defer shut down to the final configChange when the group knows we've left.
}
-void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- Event e(CONTROL, connection, frame.size());
+void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
+ QPID_LOG(trace, "MCAST [" << self << "]: " << body);
+ AMQFrame f(body);
+ Event e(CONTROL, ConnectionId(self, cptr), f.size());
Buffer buf(e);
- frame.encode(buf);
+ f.encode(buf);
mcastEvent(e);
}
void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data");
Event e(DATA, connection, size);
memcpy(e.getData(), data, size);
mcastEvent(e);
}
void Cluster::mcastEvent(const Event& e) {
- QPID_LOG(trace, "Multicasting: " << e);
e.mcast(name, cpg);
}
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
- return urls.size();
+ return map.memberCount();
}
std::vector<Url> Cluster::getUrls() const {
Mutex::ScopedLock l(lock);
- std::vector<Url> result(urls.size());
- std::transform(urls.begin(), urls.end(), result.begin(),
- boost::bind(&UrlMap::value_type::second, _1));
- return result;
-}
+ return map.memberUrls();
+}
+// FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
+ Mutex::ScopedLock l(lock);
if (id.getMember() == self)
return boost::intrusive_ptr<Connection>(id.getConnectionPtr());
ConnectionMap::iterator i = connections.find(id);
@@ -180,17 +162,19 @@ void Cluster::deliver(
try {
MemberId from(nodeid, pid);
Event e = Event::delivered(from, msg, msg_len);
- QPID_LOG(trace, "Cluster deliver: " << e);
-
// Process cluster controls immediately
if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
Buffer buf(e);
AMQFrame frame;
- while (frame.decode(buf))
+ while (frame.decode(buf)) {
+ QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
- throw Exception("Invalid cluster control");
+ throw Exception(QPID_MSG("Invalid cluster control"));
+ }
}
- else { // Process connection controls & data via the connectionEventQueue.
+ else {
+ // Process connection controls & data via the connectionEventQueue
+ // unless we are in the DISCARD state, in which case ignore.
if (state != DISCARD) {
e.setConnection(getConnection(e.getConnectionId()));
connectionEventQueue.push(e);
@@ -227,15 +211,15 @@ ostream& operator<<(ostream& o, const AddrList& a) {
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " joined "; break;
- case CPG_REASON_LEAVE: reasonString = " left ";break;
- case CPG_REASON_NODEDOWN: reasonString = " node-down ";break;
- case CPG_REASON_NODEUP: reasonString = " node-up ";break;
- case CPG_REASON_PROCDOWN: reasonString = " process-down ";break;
+ case CPG_REASON_JOIN: reasonString = " joined"; break;
+ case CPG_REASON_LEAVE: reasonString = " left";break;
+ case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
+ case CPG_REASON_NODEUP: reasonString = " node-up";break;
+ case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
- o << member << reasonString;
+ o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : "");
}
return o;
}
@@ -247,23 +231,28 @@ void Cluster::configChange(
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined)
{
- QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
- << AddrList(joined, nJoined) << AddrList(left, nLeft));
-
- if (nJoined) // Notfiy new members of my URL.
- mcastFrame(
- AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
- ConnectionId(self,0));
-
+ // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
+ QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent));
+ QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft));
if (find(left, left+nLeft, self) != left+nLeft) {
// We have left the group, this is the final config change.
- QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str());
- broker.shutdown();
+ QPID_LOG(notice, self << " left cluster " << name.str());
+ broker.shutdown();
}
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
- // Add new members when their URL notice arraives.
- lock.notifyAll(); // Threads waiting for membership changes.
+ if (state == DISCARD) {
+ if (nCurrent == 1 && *current == self) {
+ QPID_LOG(notice, self << " first in cluster.");
+ map.ready(self, url);
+ ready(); // First in cluster.
+ }
+ else if (find(joined, joined+nJoined, self) != joined+nJoined) {
+ QPID_LOG(notice, self << " requesting state dump."); // Just joined
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
+ }
+ }
+ for (int i = 0; i < nLeft; ++i)
+ map.leave(left[i]);
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -275,24 +264,59 @@ void Cluster::disconnect(sys::DispatchHandle& ) {
// FIXME aconway 2008-09-11: this should be logged as critical,
// when we provide admin option to shut down cluster and let
// members leave cleanly.
- QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str());
+ QPID_LOG(notice, self << " disconnected from cluster " << name.str());
broker.shutdown();
}
-void Cluster::urlNotice(const MemberId& m, const string& url) {
- //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also.
- //FIXME aconway 2008-09-11: Note multiple meanings of my own notice -
- //from DISCARD->STALL and from STALL->READY via map.
+// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place.
+// Only one at a time to simplify things?
+void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
+ Mutex::ScopedLock l(lock);
+ Url url(urlStr);
+ if (self == m) {
+ switch (state) {
+ case DISCARD: state = CATCHUP; stall(); break;
+ case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map.
+ default: assert(0);
+ };
+ }
+ else if (self == map.dumpRequest(m, url)) {
+ assert(state == READY);
+ QPID_LOG(info, self << " dumping to " << url);
+ // state = DUMPING;
+ // stall();
+ // FIXME aconway 2008-09-15: need to stall map?
+ // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
+ mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump.
+ }
+}
+
+void Cluster::ready(const MemberId& m, const string& urlStr) {
+ Mutex::ScopedLock l(lock);
+ Url url(urlStr);
+ map.ready(m, url);
+}
+
+broker::Broker& Cluster::getBroker(){ return broker; }
+
+void Cluster::stall() {
+ Mutex::ScopedLock l(lock);
+ // Stop processing connection events. We still process config changes
+ // and cluster controls in deliver()
+ connectionEventQueue.stop();
- QPID_LOG(info, "Cluster member " << m << " has URL " << url);
- // My brain dump is up to this point, stall till it is complete.
- if (m == self && state == DISCARD)
- state = STALL;
- urls.insert(UrlMap::value_type(m,Url(url)));
+ // FIXME aconway 2008-09-11: Flow control, we should slow down or
+ // stop reading from local connections while stalled to avoid an
+ // unbounded queue.
}
-void Cluster::ready(const MemberId& ) {
- // FIXME aconway 2008-09-08: TODO
+void Cluster::ready() {
+ // Called with lock held
+ QPID_LOG(info, self << " ready with URL " << url);
+ state = READY;
+ mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+ connectionEventQueue.start(poller);
+ // FIXME aconway 2008-09-15: stall/unstall map?
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -301,26 +325,51 @@ void Cluster::ready(const MemberId& ) {
// callbacks will be invoked.
//
void Cluster::shutdown() {
- QPID_LOG(notice, "Cluster member " << self << " shutting down.");
+ QPID_LOG(notice, self << " shutting down.");
try { cpg.shutdown(); }
catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
delete this;
}
-broker::Broker& Cluster::getBroker(){ return broker; }
+/** Received from cluster */
+void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) {
+ QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee);
+ Mutex::ScopedLock l(lock);
+ map.dumpError(dumpee);
+ if (state == DUMPING && map.dumps(self) == 0)
+ ready();
+}
-void Cluster::stall() {
- // Stop processing connection events. We still process config changes
- // and cluster controls in deliver()
+/** Called in local dump thread */
+void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) {
+ assert(state == DUMPING);
+ QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg);
+ mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0);
+ Mutex::ScopedLock l(lock);
+ map.dumpError(dumpee);
+ if (map.dumps(self) == 0) // Unstall immediately.
+ ready();
+}
- // FIXME aconway 2008-09-11: Flow control, we should slow down or
- // stop reading from local connections while stalled to avoid an
- // unbounded queue.
- connectionEventQueue.stop();
+void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
+ Mutex::ScopedLock l(lock);
+ // FIXME aconway 2008-09-15: faking out dump here.
+ switch (state) {
+ case DISCARD:
+ map.init(members, dumpees, dumps);
+ state = HAVE_DUMP;
+ break;
+ case CATCHUP:
+ map.init(members, dumpees, dumps);
+ ready();
+ break;
+ default:
+ break;
+ }
}
-void Cluster::unStall() {
- connectionEventQueue.start(poller);
+void Cluster::dumpTo(const Url& ) {
+ // FIXME aconway 2008-09-12: DumpClient
}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 5187cb08e7..24db07b32b 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -19,19 +19,19 @@
*
*/
-#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/Event.h"
-#include "qpid/sys/PollableQueue.h"
-#include "qpid/cluster/NoOpConnectionOutputHandler.h"
+#include "Cpg.h"
+#include "Event.h"
+#include "NoOpConnectionOutputHandler.h"
+#include "ClusterMap.h"
#include "qpid/broker/Broker.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/Url.h"
#include <boost/intrusive_ptr.hpp>
-#include <map>
#include <vector>
namespace qpid {
@@ -68,33 +68,38 @@ class Cluster : private Cpg::Handler
bool empty() const { return size() == 0; }
/** Send to the cluster */
- void mcastFrame(const framing::AMQFrame&, const ConnectionId&);
+ void mcastControl(const framing::AMQBody& controlBody, Connection* cptr);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcastEvent(const Event& e);
/** Leave the cluster */
void leave();
- void urlNotice(const MemberId&, const std::string& url);
- void ready(const MemberId&);
+ void dumpRequest(const MemberId&, const std::string& url);
+ void dumpError(const MemberId& dumper, const MemberId& dumpee);
+ void ready(const MemberId&, const std::string& url);
+ void mapInit(const framing::FieldTable& members,
+ const framing::FieldTable& dumpees,
+ const framing::FieldTable& dumps);
MemberId getSelf() const { return self; }
void stall();
- void unStall();
+ void ready();
void shutdown();
broker::Broker& getBroker();
private:
- typedef std::map<MemberId, Url> UrlMap;
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
typedef sys::PollableQueue<Event> EventQueue;
enum State {
- DISCARD, // Initially discard connection events up to my own join message.
- READY, // Normal processing.
- STALL // Stalled while a new member joins.
+ DISCARD, // Discard updates up to catchup point.
+ HAVE_DUMP, // Received state dump, waiting for catchup point.
+ CATCHUP, // Stalled at catchup point, waiting for dump.
+ DUMPING, // Stalled while sending a state dump.
+ READY // Normal processing.
};
void connectionEvent(const Event&);
@@ -126,23 +131,22 @@ class Cluster : private Cpg::Handler
boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
+ void dumpTo(const Url&);
+ void dumpError(const MemberId&, const Url&, const char* msg);
+
mutable sys::Monitor lock; // Protect access to members.
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
Cpg::Name name;
Url url;
- UrlMap urls;
+ ClusterMap map;
MemberId self;
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
EventQueue connectionEventQueue;
State state;
-
- friend std::ostream& operator <<(std::ostream&, const Cluster&);
- friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
- friend std::ostream& operator <<(std::ostream&, const UrlMap&);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index b0c45ad625..24c3ed5552 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -24,36 +24,38 @@
#include <boost/bind.hpp>
#include <algorithm>
#include <functional>
+#include <iterator>
+#include <ostream>
namespace qpid {
using namespace framing;
namespace cluster {
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : stalled(false) {}
-MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) {
- if (isMember(id)) return MemberId(); // Ignore notices from established members.
- if (isDumpee(id)) {
- // Dumpee caught up, graduate to member with new URL and remove dumper from list.
- dumpees.erase(id);
- members[id] = url;
- }
- else if (members.empty()) {
- // First in cluster, congratulations!
- members[id] = url;
+MemberId ClusterMap::dumpRequest(const MemberId& id, const Url& url) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::dumpRequest, this, id, url));
+ return MemberId();
}
- else {
- // New member needs brain dump.
- MemberId dumper = nextDumper();
- Dumpee& d = dumpees[id];
- d.url = url;
- d.dumper = dumper;
- return dumper;
+ MemberId dumper = nextDumper();
+ Dumpee& d = dumpees[id];
+ d.url = url;
+ d.dumper = dumper;
+ return dumper;
+}
+
+void ClusterMap::ready(const MemberId& id, const Url& url) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::ready, this, id, url));
+ return;
}
- return MemberId();
+ dumpees.erase(id);
+ members[id] = url;
}
+
MemberId ClusterMap::nextDumper() const {
// Choose the first member in member-id order of the group that
// has the least number of dumps-in-progress.
@@ -73,6 +75,11 @@ MemberId ClusterMap::nextDumper() const {
}
void ClusterMap::leave(const MemberId& id) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::leave, this, id));
+ return;
+ }
+
if (isDumpee(id))
dumpees.erase(id);
if (isMember(id)) {
@@ -95,7 +102,13 @@ int ClusterMap::dumps(const MemberId& id) const {
return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id));
}
-void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); }
+void ClusterMap::dumpError(const MemberId& dumpee) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::dumpError, this, dumpee));
+ return;
+ }
+ dumpees.erase(dumpee);
+}
framing::ClusterMapBody ClusterMap::toControl() const {
framing::ClusterMapBody b;
@@ -108,15 +121,55 @@ framing::ClusterMapBody ClusterMap::toControl() const {
return b;
}
-void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+void ClusterMap::init(const FieldTable& ftMembers,const FieldTable& ftDumpees, const FieldTable& ftDumps) {
*this = ClusterMap(); // Reset any current contents.
FieldTable::ValueMap::const_iterator i;
- for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i)
+ for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
members[i->first] = Url(i->second->get<std::string>());
- for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i)
+ for (i = ftDumpees.begin(); i != ftDumpees.end(); ++i)
dumpees[i->first].url = Url(i->second->get<std::string>());
- for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i)
+ for (i = ftDumps.begin(); i != ftDumps.end(); ++i)
dumpees[i->first].dumper = MemberId(i->second->get<std::string>());
}
+void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+ init(b.getMembers(), b.getDumpees(), b.getDumps());
+}
+
+std::vector<Url> ClusterMap::memberUrls() const {
+ std::vector<Url> result(members.size());
+ std::transform(members.begin(), members.end(), result.begin(),
+ boost::bind(&MemberMap::value_type::second, _1));
+ return result;
+}
+
+void ClusterMap::stall() { stalled = true; }
+
+namespace {
+template <class F> void call(const F& f) { f(); }
+}
+
+void ClusterMap::unstall() {
+ stalled = false;
+ std::for_each(stallq.begin(), stallq.end(),
+ boost::bind(&boost::function<void()>::operator(), _1));
+ stallq.clear();
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv) {
+ return o << mv.first << "=" << mv.second;
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv) {
+ return o << "dump: " << dv.second.dumper << " to " << dv.first << "=" << dv.second.url;
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
+ std::ostream_iterator<ClusterMap::MemberMap::value_type> im(o, "\n ");
+ std::ostream_iterator<ClusterMap::DumpeeMap::value_type> id(o, "\n ");
+ std::copy(m.members.begin(), m.members.end(), im);
+ std::copy(m.dumpees.begin(), m.dumpees.end(), id);
+ return o;
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 7695ebeabb..04323c5905 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -25,9 +25,11 @@
#include "types.h"
#include "qpid/framing/ClusterMapBody.h"
#include "qpid/Url.h"
-#include <boost/optional.hpp>
+#include <boost/function.hpp>
#include <vector>
+#include <deque>
#include <map>
+#include <iosfwd>
namespace qpid {
namespace cluster {
@@ -41,21 +43,22 @@ class ClusterMap
{
public:
ClusterMap();
+
+ MemberId dumpRequest(const MemberId& from, const Url& url);
+
+ void dumpError(const MemberId&);
+
+ void ready(const MemberId& from, const Url& url);
- /** Update map for url-notice event.
- *@param from Member that sent the notice.
- *@param url URL for from.
- *@return MemberId of member that should dump to URL, or a null
- * MemberId() if no dump is needed.
- */
- MemberId urlNotice(const MemberId& from, const Url& url);
-
- /** Dump failed notice */
- void dumpFailed(const MemberId&);
-
- /** Update map for leave event */
+ /** Update map for cpg leave event */
void leave(const MemberId&);
+ /** Instead of updating the map, queue the updates for unstall */
+ void stall();
+
+ /** Apply queued updates */
+ void unstall();
+
/** Number of unfinished dumps for member. */
int dumps(const MemberId&) const;
@@ -63,13 +66,20 @@ class ClusterMap
framing::ClusterMapBody toControl() const;
/** Initialize map contents from a cluster control body. */
+ void init(const framing::FieldTable& members,
+ const framing::FieldTable& dumpees,
+ const framing::FieldTable& dumps);
+
void fromControl(const framing::ClusterMapBody&);
size_t memberCount() const { return members.size(); }
size_t dumpeeCount() const { return dumpees.size(); }
+
bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); }
+ std::vector<Url> memberUrls() const;
+
private:
struct Dumpee { Url url; MemberId dumper; };
typedef std::map<MemberId, Url> MemberMap;
@@ -80,7 +90,14 @@ class ClusterMap
MemberMap members;
DumpeeMap dumpees;
+ bool stalled;
+ std::deque<boost::function<void()> > stallq;
+
+ friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
+ friend std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv);
+ friend std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv);
};
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_CLUSTERMAP_H*/
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 00d3901886..6cc21633d3 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -79,7 +79,7 @@ void Connection::closed() {
// handler will be deleted.
//
connection.setOutputHandler(&discardHandler);
- cluster.mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+ cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
++mcastSeq;
}
catch (const std::exception& e) {
@@ -93,7 +93,6 @@ void Connection::deliverClose () {
}
size_t Connection::decode(const char* buffer, size_t size) {
- QPID_LOG(trace, "mcastBuffer " << self << " " << mcastSeq << " " << size);
++mcastSeq;
cluster.mcastBuffer(buffer, size, self);
// FIXME aconway 2008-09-01: deserialize?
@@ -101,7 +100,6 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
void Connection::deliverBuffer(Buffer& buf) {
- QPID_LOG(trace, "deliverBuffer " << self << " " << deliverSeq << " " << buf.available());
++deliverSeq;
while (decoder.decode(buf))
deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 5b92552209..f20ceb2ab6 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -28,6 +28,7 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/enum.h"
+#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
@@ -43,7 +44,9 @@ using namespace framing::message;
using namespace client;
-DumpClient::DumpClient(const Url& url) {
+DumpClient::DumpClient(const Url& url, Broker& b, const boost::function<void(const char*)>& f)
+ : donor(b), failed(f)
+{
connection.open(url);
session = connection.newSession();
}
@@ -57,8 +60,7 @@ DumpClient::~DumpClient() {
static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
-void DumpClient::dump(Broker& donor) {
- // TODO aconway 2008-09-08: Caller must handle exceptions
+void DumpClient::dump() {
// FIXME aconway 2008-09-08: send cluster map frame first.
donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
@@ -67,6 +69,15 @@ void DumpClient::dump(Broker& donor) {
session.sync();
}
+void DumpClient::run() {
+ try {
+ dump();
+ } catch (const Exception& e) {
+ failed(e.what());
+ }
+ delete this;
+}
+
void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
session.exchangeDeclare(
ex->getName(), ex->getType(),
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
index 447fd1abef..1c49b417d7 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -29,6 +29,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
@@ -51,12 +52,12 @@ namespace cluster {
/**
* A client that dumps the contents of a local broker to a remote one using AMQP.
*/
-class DumpClient {
+class DumpClient : public sys::Runnable {
public:
- DumpClient(const Url& receiver);
+ DumpClient(const Url& url, broker::Broker& donor, const boost::function<void(const char*)>& onFail);
~DumpClient();
-
- void dump(broker::Broker& donor);
+ void dump();
+ void run(); // Will delete this when finished.
private:
void dumpQueue(const boost::shared_ptr<broker::Queue>&);
@@ -67,6 +68,8 @@ class DumpClient {
private:
client::Connection connection;
client::AsyncSession session;
+ broker::Broker& donor;
+ boost::function<void(const char*)> failed;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 82b0d3f077..3212d34775 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -98,8 +98,7 @@ void OutputInterceptor::sendDoOutput() {
// Send it anyway to keep the doOutput chain going until we are sure there's no more output
// (in deliverDoOutput)
//
- parent.getCluster().mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
- framing::ProtocolVersion(), request)), parent.getId());
+ parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), &parent);
QPID_LOG(trace, &parent << "Send doOutput request for " << request);
}
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index d62ad62b49..f48ba2db30 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -62,13 +62,6 @@ struct ConnectionId : public std::pair<MemberId, Connection*> {
Connection* getConnectionPtr() const { return second; }
};
-/** State of a cluster member */
-enum State {
- DISCARD, // Initially discard connection events up to my own join message.
- STALL, // All members stall while a new member joins.
- READY // Normal processing.
-};
-
std::ostream& operator<<(std::ostream&, const ConnectionId&);
}} // namespace qpid::cluster