From dfd13447e4e26c14ea8c71cd7bdbea886f4f7d4b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 14 Dec 2010 21:30:55 +0000 Subject: Add end-to-end flow control to qpid-send, qpid-receive and qpid-cpp-benchmark. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1049286 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/qpid-receive.cpp | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'cpp/src/tests/qpid-receive.cpp') diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp index a85d882a0f..9b84306605 100644 --- a/cpp/src/tests/qpid-receive.cpp +++ b/cpp/src/tests/qpid-receive.cpp @@ -191,6 +191,9 @@ int main(int argc, char ** argv) int64_t interval = 0; if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; + Address replyToAddress; + Sender replyToSender; + while (!done && receiver.fetch(msg, timeout)) { reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { @@ -223,12 +226,21 @@ int main(int argc, char ** argv) } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } + if (msg.getReplyTo()) { // Echo message back to reply-to address. + if (msg.getReplyTo() != replyToAddress) { + replyToSender = session.createSender(msg.getReplyTo()); + replyToSender.setCapacity(opts.capacity); + replyToAddress = msg.getReplyTo(); + } + replyToSender.send(msg); + } if (opts.receiveRate) { qpid::sys::AbsTime waitTill(start, count*interval); int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } - //opts.rejectFrequency?? + // Clear out message properties & content for next iteration. + msg = Message(); // TODO aconway 2010-12-01: should be done by fetch } if (opts.reportTotal) reporter.report(); if (opts.tx) { -- cgit v1.2.1