summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp28
-rw-r--r--cpp/src/qpid/cluster/Cluster.h9
2 files changed, 18 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8d9b5a1864..222aa07548 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,7 +32,6 @@
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterDumpOfferBody.h"
-#include "qpid/framing/ClusterDumpStartBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
@@ -79,7 +78,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); }
- void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -167,14 +165,16 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con
void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
-bool Cluster::sendMcast(const Event& e) {
+void Cluster::sendMcast(PollableEventQueue::Queue& values) {
try {
- return e.mcast(cpg);
+ PollableEventQueue::Queue::iterator i = values.begin();
+ while (i != values.end() && i->mcast(cpg))
+ ++i;
+ values.erase(values.begin(), i);
}
catch (const std::exception& e) {
QPID_LOG(critical, "Multicast failure: " << e.what());
leave();
- return false;
}
}
@@ -241,23 +241,23 @@ void Cluster::deliver(const Event& e, Lock&) {
}
// Entry point: called when deliverQueue has events to process.
-bool Cluster::delivered(const Event& e) {
+void Cluster::delivered(PollableEventQueue::Queue& events) {
try {
- Lock l(lock);
- delivered(e,l);
+ for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
+ events.clear();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
leave();
}
- return true;
}
-void Cluster::delivered(const Event& e, Lock& l) {
+void Cluster::deliveredEvent(const Event& e) {
Buffer buf(e);
AMQFrame frame;
if (e.isCluster()) {
while (frame.decode(buf)) {
QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope is too big.
ClusterDispatcher dispatch(*this, e.getMemberId(), l);
if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
@@ -428,7 +428,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
if (dumper == myId) {
assert(state == OFFER);
if (url) { // My offer was first.
- dumpStart(myId, dumpee, url->str(), l);
+ dumpStart(dumpee, *url, l);
}
else { // Another offer was first.
state = READY;
@@ -448,13 +448,11 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
// FIXME aconway 2008-10-15: no longer need a separate control now
// that the dump control is in the deliver queue.
-void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock&) {
+void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
if (state == LEFT) return;
- MemberId dumpee(dumpeeInt);
- Url url(urlStr);
assert(state == OFFER);
state = DUMPER;
- QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr);
+ QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
deliverQueue.stop();
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index e172a0f180..feeb68fd4b 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -115,7 +115,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave(Lock&);
std::vector<Url> getUrls(Lock&) const;
- bool sendMcast(const Event& e);
+ void sendMcast(PollableEventQueue::Queue& );
// Called via CPG, deliverQueue or DumpClient threads.
void tryMakeOffer(const MemberId&, Lock&);
@@ -128,12 +128,13 @@ class Cluster : private Cpg::Handler, public management::Manageable {
//
void dumpRequest(const MemberId&, const std::string&, Lock&);
void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&);
- void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void shutdown(const MemberId&, Lock&);
- bool delivered(const Event&); // deliverQueue callback
- void delivered(const Event&, Lock&); // unlocked version
+ void delivered(PollableEventQueue::Queue&); // deliverQueue callback
+ void deliveredEvent(const Event&);
+
+ void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
// CPG callbacks, called in CPG IO thread.
void dispatch(sys::DispatchHandle&); // Dispatch CPG events.