summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-11 20:33:26 +0000
committerAlan Conway <aconway@apache.org>2008-12-11 20:33:26 +0000
commitcc781622299a4de5af2fdde6bfc1e2eb42e1623a (patch)
treef2488e6e4006b441694d9d1cfc8f775aab3c850b /cpp/src/qpid/cluster/Cluster.cpp
parentf54d88f90490a2e7eaf93f5e11d788aeeb858390 (diff)
downloadqpid-python-cc781622299a4de5af2fdde6bfc1e2eb42e1623a.tar.gz
sys/PollableQueue: dispatch in batches, allow put-back.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725802 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp28
1 files changed, 13 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8d9b5a1864..222aa07548 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,7 +32,6 @@
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterDumpOfferBody.h"
-#include "qpid/framing/ClusterDumpStartBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
@@ -79,7 +78,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); }
- void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -167,14 +165,16 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con
void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
-bool Cluster::sendMcast(const Event& e) {
+void Cluster::sendMcast(PollableEventQueue::Queue& values) {
try {
- return e.mcast(cpg);
+ PollableEventQueue::Queue::iterator i = values.begin();
+ while (i != values.end() && i->mcast(cpg))
+ ++i;
+ values.erase(values.begin(), i);
}
catch (const std::exception& e) {
QPID_LOG(critical, "Multicast failure: " << e.what());
leave();
- return false;
}
}
@@ -241,23 +241,23 @@ void Cluster::deliver(const Event& e, Lock&) {
}
// Entry point: called when deliverQueue has events to process.
-bool Cluster::delivered(const Event& e) {
+void Cluster::delivered(PollableEventQueue::Queue& events) {
try {
- Lock l(lock);
- delivered(e,l);
+ for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
+ events.clear();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
leave();
}
- return true;
}
-void Cluster::delivered(const Event& e, Lock& l) {
+void Cluster::deliveredEvent(const Event& e) {
Buffer buf(e);
AMQFrame frame;
if (e.isCluster()) {
while (frame.decode(buf)) {
QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope is too big.
ClusterDispatcher dispatch(*this, e.getMemberId(), l);
if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
@@ -428,7 +428,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
if (dumper == myId) {
assert(state == OFFER);
if (url) { // My offer was first.
- dumpStart(myId, dumpee, url->str(), l);
+ dumpStart(dumpee, *url, l);
}
else { // Another offer was first.
state = READY;
@@ -448,13 +448,11 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
// FIXME aconway 2008-10-15: no longer need a separate control now
// that the dump control is in the deliver queue.
-void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock&) {
+void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
if (state == LEFT) return;
- MemberId dumpee(dumpeeInt);
- Url url(urlStr);
assert(state == OFFER);
state = DUMPER;
- QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr);
+ QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
deliverQueue.stop();
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(