diff options
| author | Alan Conway <aconway@apache.org> | 2008-12-11 20:33:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-12-11 20:33:26 +0000 |
| commit | cc781622299a4de5af2fdde6bfc1e2eb42e1623a (patch) | |
| tree | f2488e6e4006b441694d9d1cfc8f775aab3c850b /cpp | |
| parent | f54d88f90490a2e7eaf93f5e11d788aeeb858390 (diff) | |
| download | qpid-python-cc781622299a4de5af2fdde6bfc1e2eb42e1623a.tar.gz | |
sys/PollableQueue: dispatch in batches, allow put-back.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@725802 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 35 | ||||
| -rw-r--r-- | cpp/xml/cluster.xml | 8 |
4 files changed, 35 insertions, 45 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8d9b5a1864..222aa07548 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -32,7 +32,6 @@ #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterDumpOfferBody.h" -#include "qpid/framing/ClusterDumpStartBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" @@ -79,7 +78,6 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); } - void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -167,14 +165,16 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con void Cluster::mcast(const Event& e) { mcastQueue.push(e); } -bool Cluster::sendMcast(const Event& e) { +void Cluster::sendMcast(PollableEventQueue::Queue& values) { try { - return e.mcast(cpg); + PollableEventQueue::Queue::iterator i = values.begin(); + while (i != values.end() && i->mcast(cpg)) + ++i; + values.erase(values.begin(), i); } catch (const std::exception& e) { QPID_LOG(critical, "Multicast failure: " << e.what()); leave(); - return false; } } @@ -241,23 +241,23 @@ void Cluster::deliver(const Event& e, Lock&) { } // Entry point: called when deliverQueue has events to process. -bool Cluster::delivered(const Event& e) { +void Cluster::delivered(PollableEventQueue::Queue& events) { try { - Lock l(lock); - delivered(e,l); + for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1)); + events.clear(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); leave(); } - return true; } -void Cluster::delivered(const Event& e, Lock& l) { +void Cluster::deliveredEvent(const Event& e) { Buffer buf(e); AMQFrame frame; if (e.isCluster()) { while (frame.decode(buf)) { QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); + Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope is too big. ClusterDispatcher dispatch(*this, e.getMemberId(), l); if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); @@ -428,7 +428,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& if (dumper == myId) { assert(state == OFFER); if (url) { // My offer was first. - dumpStart(myId, dumpee, url->str(), l); + dumpStart(dumpee, *url, l); } else { // Another offer was first. state = READY; @@ -448,13 +448,11 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& // FIXME aconway 2008-10-15: no longer need a separate control now // that the dump control is in the deliver queue. -void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock&) { +void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) { if (state == LEFT) return; - MemberId dumpee(dumpeeInt); - Url url(urlStr); assert(state == OFFER); state = DUMPER; - QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << urlStr); + QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url); deliverQueue.stop(); if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread( diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index e172a0f180..feeb68fd4b 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -115,7 +115,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(Lock&); std::vector<Url> getUrls(Lock&) const; - bool sendMcast(const Event& e); + void sendMcast(PollableEventQueue::Queue& ); // Called via CPG, deliverQueue or DumpClient threads. void tryMakeOffer(const MemberId&, Lock&); @@ -128,12 +128,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { // void dumpRequest(const MemberId&, const std::string&, Lock&); void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&); - void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); - bool delivered(const Event&); // deliverQueue callback - void delivered(const Event&, Lock&); // unlocked version + void delivered(PollableEventQueue::Queue&); // deliverQueue callback + void deliveredEvent(const Event&); + + void dumpStart(const MemberId& dumpee, const Url& url, Lock&); // CPG callbacks, called in CPG IO thread. void dispatch(sys::DispatchHandle&); // Dispatch CPG events. diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 953d198fb0..7f11cc35a9 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -44,13 +44,13 @@ class Poller; template <class T> class PollableQueue { public: + typedef std::deque<T> Queue; + /** - * Callback to process an item from the queue. - * - * @return If true the item is removed from the queue else it - * remains on the queue and the queue is stopped. + * Callback to process a batch of items from the queue. + * @param values to process, any items remaining after call are put back on the queue. */ - typedef boost::function<bool (const T&)> Callback; + typedef boost::function<void (Queue& values)> Callback; /** When the queue is selected by the poller, values are passed to callback cb. */ PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); @@ -73,7 +73,6 @@ class PollableQueue { bool empty() { ScopedLock l(lock); return queue.empty(); } private: - typedef std::deque<T> Queue; typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; @@ -84,7 +83,7 @@ class PollableQueue { boost::shared_ptr<sys::Poller> poller; PollableCondition condition; DispatchHandle handle; - Queue queue; + Queue queue, batch; Thread dispatcher; bool stopped; }; @@ -117,21 +116,19 @@ template <class T> void PollableQueue<T>::push(const T& t) { } template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); // Prevent concurrent push - assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id()); + ScopedLock l(lock); + assert(dispatcher.id() == 0); dispatcher = Thread::current(); while (!stopped && !queue.empty()) { - T value = queue.front(); - queue.pop_front(); - bool ok = false; - { // unlock to allow concurrent push or call to stop() in callback. - ScopedUnlock u(lock); - // FIXME aconway 2008-12-02: not exception safe if callback throws. - ok = callback(value); + assert(batch.empty()); + batch.swap(queue); + { + ScopedUnlock u(lock); // Allow concurrent push to queue. + callback(batch); } - if (!ok) { // callback cannot process value, put it back. - queue.push_front(value); - stopped=true; + if (!batch.empty()) { + queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items. + batch.clear(); } } dispatcher = Thread(); diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index b76ae538e3..19d9f7ea56 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -36,13 +36,7 @@ <field name="cluster-id" type="uuid"/> </control> - <control name = "dump-start" code="0x3" label="Used internally by dumper to mark stall point."> - <field name="dumpee" type="uint64"/> - <field name="url" type="str16"/> - </control> - - - <control name="ready" code="0x10" label="New member is ready."> +Min <control name="ready" code="0x10" label="New member is ready."> <field name="url" type="str16"/> </control> |
