diff options
author | Alan Conway <aconway@apache.org> | 2010-04-07 19:51:20 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-04-07 19:51:20 +0000 |
commit | db10ca2521cff96eae94d11a8acb51e8173aba3c (patch) | |
tree | 71142c67ba439e80e8148e6662e7ec4ec58bc695 /cpp/src/tests/qpid_recv.cpp | |
parent | a98f0cfe299d147366c8baa26840b5100b8dc0b9 (diff) | |
download | qpid-python-db10ca2521cff96eae94d11a8acb51e8173aba3c.tar.gz |
Extend qpid_send, qpid_recv to measure throughput and latency.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid_recv.cpp')
-rw-r--r-- | cpp/src/tests/qpid_recv.cpp | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index 160830c826..ff19464621 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -29,6 +29,7 @@ #include <qpid/log/Options.h> #include <qpid/client/amqp0_10/FailoverUpdates.h> #include "TestOptions.h" +#include "Statistics.h" #include <iostream> #include <memory> @@ -56,9 +57,12 @@ struct Options : public qpid::Options uint ackFrequency; uint tx; uint rollbackFrequency; + bool printContent; bool printHeaders; bool failoverUpdates; qpid::log::Options log; + bool report; + uint reportEvery; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -68,13 +72,16 @@ struct Options : public qpid::Options forever(false), messages(0), ignoreDuplicates(false), - capacity(0), - ackFrequency(1), + capacity(10000), + ackFrequency(100), tx(0), rollbackFrequency(0), + printContent(true), printHeaders(false), failoverUpdates(false), - log(argv0) + log(argv0), + report(false), + reportEvery(0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -84,12 +91,15 @@ struct Options : public qpid::Options ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") - ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)") + ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") - ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content") + ("print-content", qpid::optValue(printContent, "yes|no"), "print out message content") + ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message headers") ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("report", qpid::optValue(report), "Report throughput statistics") + ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -162,7 +172,9 @@ int main(int argc, char ** argv) SequenceTracker sequenceTracker; Duration timeout = opts.getTimeout(); bool done = false; + Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery); while (!done && receiver.fetch(msg, timeout)) { + reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { done = true; @@ -179,7 +191,8 @@ int main(int argc, char ** argv) std::cout << "Properties: " << msg.getProperties() << std::endl; std::cout << std::endl; } - std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages + if (opts.printContent) + std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages if (opts.messages && count >= opts.messages) done = true; } } @@ -194,6 +207,7 @@ int main(int argc, char ** argv) } //opts.rejectFrequency?? } + if (opts.report) reporter.report(); if (opts.tx) { if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { session.rollback(); |