diff options
| author | Alan Conway <aconway@apache.org> | 2008-08-12 21:03:43 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-08-12 21:03:43 +0000 |
| commit | 6884ded02594404ac07a590b0677738baf851672 (patch) | |
| tree | 12b5680470befb344bb0f490fb47c512d5043eb3 /cpp/src/qpid/cluster/Cluster.h | |
| parent | e72b261cb7f9c24cff62cd256c2aab4ce56e4a46 (diff) | |
| download | qpid-python-6884ded02594404ac07a590b0677738baf851672.tar.gz | |
Queue cluster send frames, do cpg_mcast in separate thread, batching if possible.
5x thruput improvement :)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@685317 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.h')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 6f5e6d9cfb..1c43bdac43 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -97,11 +97,13 @@ class Cluster : private Cpg::Handler, public RefCounted typedef std::map<Id, Member> MemberMap; typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap; - struct DeliveredFrame { + /** Message sent over the cluster. */ + struct Message { framing::AMQFrame frame; Id from; void* connection; - DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c) + Message(const framing::AMQFrame& f, const Id i, void* c) : frame(f), from(i), connection(c) {} }; + typedef PollableQueue<Message> MessageQueue; boost::function<void()> shutdownNext; @@ -126,10 +128,17 @@ class Cluster : private Cpg::Handler, public RefCounted ); /** Callback to handle delivered frames from the deliverQueue. */ - void deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin, - const PollableQueue<DeliveredFrame>::iterator& end); + void deliverQueueCb(const MessageQueue::iterator& begin, + const MessageQueue::iterator& end); + /** Callback to multi-cast frames from mcastQueue */ + void mcastQueueCb(const MessageQueue::iterator& begin, + const MessageQueue::iterator& end); + + + /** Callback to dispatch CPG events. */ void dispatch(sys::DispatchHandle&); + /** Callback if CPG fd is disconnected. */ void disconnect(sys::DispatchHandle&); void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method); @@ -147,7 +156,8 @@ class Cluster : private Cpg::Handler, public RefCounted ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - PollableQueue<DeliveredFrame> deliverQueue; + MessageQueue deliverQueue; + MessageQueue mcastQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); |
