summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-11 20:33:26 +0000
committerAlan Conway <aconway@apache.org>2008-12-11 20:33:26 +0000
commitcc781622299a4de5af2fdde6bfc1e2eb42e1623a (patch)
treef2488e6e4006b441694d9d1cfc8f775aab3c850b /cpp
parentf54d88f90490a2e7eaf93f5e11d788aeeb858390 (diff)
downloadqpid-python-cc781622299a4de5af2fdde6bfc1e2eb42e1623a.tar.gz
sys/PollableQueue: dispatch in batches, allow put-back.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725802 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp28
-rw-r--r--cpp/src/qpid/cluster/Cluster.h9
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h35
-rw-r--r--cpp/xml/cluster.xml8
4 files changed, 35 insertions, 45 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8d9b5a1864..222aa07548 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,7 +32,6 @@
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterDumpOfferBody.h"
-#include "qpid/framing/ClusterDumpStartBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
@@ -79,7 +78,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); }
- void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -167,14 +165,16 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con
void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
-bool Cluster::sendMcast(const Event& e) {
+void Cluster::sendMcast(PollableEventQueue::Queue& values) {
try {
- return e.mcast(cpg);
+ PollableEventQueue::Queue::iterator i = values.begin();
+ while (i != values.end() && i->mcast(cpg))
+ ++i;
+ values.erase(values.begin(), i);
}
catch (const std::exception& e) {
QPID_LOG(critical, "Multicast failure: " << e.what());
leave();
- return false;
}
}
@@ -241,23 +241,23 @@ void Cluster::deliver(const Event& e, Lock&) {
}
// Entry point: called when deliverQueue has events to process.
-bool Cluster::delivered(const Event& e) {
+void Cluster::delivered(PollableEventQueue::Queue& events) {
try {
- Lock l(lock);
- delivered(e,l);
+ for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
+ events.clear();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
leave();
}
- return true;
}
-void Cluster::delivered(const Event& e, Lock& l) {
+void Cluster::deliveredEvent(const Event& e) {
Buffer buf(e);
AMQFrame frame;
if (e.isCluster()) {
while (frame.decode(buf)) {
QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope is too big.
ClusterDispatcher dispatch(*this, e.getMemberId(), l);
if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
@@ -428,7 +428,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
if (dumper == myId) {
assert(state == OFFER);
if (url) { // My offer was first.
- dumpStart(myId, dumpee, url->str(), l);
+ dumpStart(dumpee, *url, l);
}
else { // Another offer was first.
state = READY;
@@ -448,13 +448,11 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
// FIXME aconway 2008-10-15: no longer need a separate control now
// that the dump control is in the deliver queue.
-void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock&) {
+void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
if (state == LEFT) return;
- MemberId dumpee(dumpeeInt);
- Url url(urlStr);
assert(state == OFFER);
state = DUMPER;
- QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr);
+ QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
deliverQueue.stop();
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index e172a0f180..feeb68fd4b 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -115,7 +115,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave(Lock&);
std::vector<Url> getUrls(Lock&) const;
- bool sendMcast(const Event& e);
+ void sendMcast(PollableEventQueue::Queue& );
// Called via CPG, deliverQueue or DumpClient threads.
void tryMakeOffer(const MemberId&, Lock&);
@@ -128,12 +128,13 @@ class Cluster : private Cpg::Handler, public management::Manageable {
//
void dumpRequest(const MemberId&, const std::string&, Lock&);
void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&);
- void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void shutdown(const MemberId&, Lock&);
- bool delivered(const Event&); // deliverQueue callback
- void delivered(const Event&, Lock&); // unlocked version
+ void delivered(PollableEventQueue::Queue&); // deliverQueue callback
+ void deliveredEvent(const Event&);
+
+ void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
// CPG callbacks, called in CPG IO thread.
void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 953d198fb0..7f11cc35a9 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -44,13 +44,13 @@ class Poller;
template <class T>
class PollableQueue {
public:
+ typedef std::deque<T> Queue;
+
/**
- * Callback to process an item from the queue.
- *
- * @return If true the item is removed from the queue else it
- * remains on the queue and the queue is stopped.
+ * Callback to process a batch of items from the queue.
+ * @param values to process, any items remaining after call are put back on the queue.
*/
- typedef boost::function<bool (const T&)> Callback;
+ typedef boost::function<void (Queue& values)> Callback;
/** When the queue is selected by the poller, values are passed to callback cb. */
PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller);
@@ -73,7 +73,6 @@ class PollableQueue {
bool empty() { ScopedLock l(lock); return queue.empty(); }
private:
- typedef std::deque<T> Queue;
typedef sys::Monitor::ScopedLock ScopedLock;
typedef sys::Monitor::ScopedUnlock ScopedUnlock;
@@ -84,7 +83,7 @@ class PollableQueue {
boost::shared_ptr<sys::Poller> poller;
PollableCondition condition;
DispatchHandle handle;
- Queue queue;
+ Queue queue, batch;
Thread dispatcher;
bool stopped;
};
@@ -117,21 +116,19 @@ template <class T> void PollableQueue<T>::push(const T& t) {
}
template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock); // Prevent concurrent push
- assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
+ ScopedLock l(lock);
+ assert(dispatcher.id() == 0);
dispatcher = Thread::current();
while (!stopped && !queue.empty()) {
- T value = queue.front();
- queue.pop_front();
- bool ok = false;
- { // unlock to allow concurrent push or call to stop() in callback.
- ScopedUnlock u(lock);
- // FIXME aconway 2008-12-02: not exception safe if callback throws.
- ok = callback(value);
+ assert(batch.empty());
+ batch.swap(queue);
+ {
+ ScopedUnlock u(lock); // Allow concurrent push to queue.
+ callback(batch);
}
- if (!ok) { // callback cannot process value, put it back.
- queue.push_front(value);
- stopped=true;
+ if (!batch.empty()) {
+ queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items.
+ batch.clear();
}
}
dispatcher = Thread();
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index b76ae538e3..19d9f7ea56 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -36,13 +36,7 @@
<field name="cluster-id" type="uuid"/>
</control>
- <control name = "dump-start" code="0x3" label="Used internally by dumper to mark stall point.">
- <field name="dumpee" type="uint64"/>
- <field name="url" type="str16"/>
- </control>
-
-
- <control name="ready" code="0x10" label="New member is ready.">
+Min <control name="ready" code="0x10" label="New member is ready.">
<field name="url" type="str16"/>
</control>