diff options
| author | Alan Conway <aconway@apache.org> | 2008-08-12 21:03:43 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-08-12 21:03:43 +0000 |
| commit | 6884ded02594404ac07a590b0677738baf851672 (patch) | |
| tree | 12b5680470befb344bb0f490fb47c512d5043eb3 /cpp/src/qpid/cluster/Cluster.cpp | |
| parent | e72b261cb7f9c24cff62cd256c2aab4ce56e4a46 (diff) | |
| download | qpid-python-6884ded02594404ac07a590b0677738baf851672.tar.gz | |
Queue cluster send frames, do cpg_mcast in separate thread, batching if possible.
5x thruput improvement :)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@685317 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 52 |
1 files changed, 35 insertions, 17 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 37b126f5a9..84edfa201d 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -68,7 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2)) + deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)), + mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2)) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); QPID_LOG(trace, "Joining cluster: " << name_); @@ -83,6 +84,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); deliverQueue.start(poller); + mcastQueue.start(poller); } Cluster::~Cluster() {} @@ -124,12 +126,26 @@ template <class T> void encodePtr(Buffer& buf, T* ptr) { void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) { QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. - Buffer buf(buffer); - frame.encode(buf); - encodePtr(buf, connection); - iovec iov = { buffer, buf.getPosition() }; - cpg.mcast(name, &iov, 1); + mcastQueue.push(Message(frame, self, connection)); +} + +void Cluster::mcastQueueCb(const MessageQueue::iterator& begin, + const MessageQueue::iterator& end) +{ + // Static is OK because there is only one cluster allowed per + // process and only one thread in mcastQueueCb at a time. + static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. + MessageQueue::iterator i = begin; + while (i != end) { + Buffer buf(buffer, sizeof(buffer)); + while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) { + i->frame.encode(buf); + encodePtr(buf, i->connection); + ++i; + } + iovec iov = { buffer, buf.getPosition() }; + cpg.mcast(name, &iov, 1); + } } void Cluster::notify() { @@ -181,12 +197,14 @@ void Cluster::deliver( Id from(nodeid, pid); try { Buffer buf(static_cast<char*>(msg), msg_len); - AMQFrame frame; - if (!frame.decode(buf)) // Not enough data. - throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling. - void* connection; - decodePtr(buf, connection); - deliverQueue.push(DeliveredFrame(frame, from, connection)); + while (buf.available() > 0) { + AMQFrame frame; + if (!frame.decode(buf)) // Not enough data. + throw Exception("Received incomplete cluster event."); + void* connection; + decodePtr(buf, connection); + deliverQueue.push(Message(frame, from, connection)); + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -196,10 +214,10 @@ void Cluster::deliver( } } -void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin, - const PollableQueue<DeliveredFrame>::iterator& end) +void Cluster::deliverQueueCb(const MessageQueue::iterator& begin, + const MessageQueue::iterator& end) { - for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) { + for (MessageQueue::iterator i = begin; i != end; ++i) { AMQFrame& frame(i->frame); Id from(i->from); ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection); @@ -220,7 +238,7 @@ void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. - QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what()); + QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what()); assert(0); throw; } |
