summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid_recv.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-04-07 19:51:20 +0000
committerAlan Conway <aconway@apache.org>2010-04-07 19:51:20 +0000
commitdb10ca2521cff96eae94d11a8acb51e8173aba3c (patch)
tree71142c67ba439e80e8148e6662e7ec4ec58bc695 /cpp/src/tests/qpid_recv.cpp
parenta98f0cfe299d147366c8baa26840b5100b8dc0b9 (diff)
downloadqpid-python-db10ca2521cff96eae94d11a8acb51e8173aba3c.tar.gz
Extend qpid_send, qpid_recv to measure throughput and latency.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid_recv.cpp')
-rw-r--r--cpp/src/tests/qpid_recv.cpp26
1 files changed, 20 insertions, 6 deletions
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp
index 160830c826..ff19464621 100644
--- a/cpp/src/tests/qpid_recv.cpp
+++ b/cpp/src/tests/qpid_recv.cpp
@@ -29,6 +29,7 @@
#include <qpid/log/Options.h>
#include <qpid/client/amqp0_10/FailoverUpdates.h>
#include "TestOptions.h"
+#include "Statistics.h"
#include <iostream>
#include <memory>
@@ -56,9 +57,12 @@ struct Options : public qpid::Options
uint ackFrequency;
uint tx;
uint rollbackFrequency;
+ bool printContent;
bool printHeaders;
bool failoverUpdates;
qpid::log::Options log;
+ bool report;
+ uint reportEvery;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -68,13 +72,16 @@ struct Options : public qpid::Options
forever(false),
messages(0),
ignoreDuplicates(false),
- capacity(0),
- ackFrequency(1),
+ capacity(10000),
+ ackFrequency(100),
tx(0),
rollbackFrequency(0),
+ printContent(true),
printHeaders(false),
failoverUpdates(false),
- log(argv0)
+ log(argv0),
+ report(false),
+ reportEvery(0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -84,12 +91,15 @@ struct Options : public qpid::Options
("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
- ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
+ ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
- ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
+ ("print-content", qpid::optValue(printContent, "yes|no"), "print out message content")
+ ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message headers")
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
+ ("report", qpid::optValue(report), "Report throughput statistics")
+ ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -162,7 +172,9 @@ int main(int argc, char ** argv)
SequenceTracker sequenceTracker;
Duration timeout = opts.getTimeout();
bool done = false;
+ Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery);
while (!done && receiver.fetch(msg, timeout)) {
+ reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
if (msg.getContent() == EOS) {
done = true;
@@ -179,7 +191,8 @@ int main(int argc, char ** argv)
std::cout << "Properties: " << msg.getProperties() << std::endl;
std::cout << std::endl;
}
- std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
+ if (opts.printContent)
+ std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
if (opts.messages && count >= opts.messages) done = true;
}
}
@@ -194,6 +207,7 @@ int main(int argc, char ** argv)
}
//opts.rejectFrequency??
}
+ if (opts.report) reporter.report();
if (opts.tx) {
if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
session.rollback();