summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid_send.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-04-07 19:51:20 +0000
committerAlan Conway <aconway@apache.org>2010-04-07 19:51:20 +0000
commitdb10ca2521cff96eae94d11a8acb51e8173aba3c (patch)
tree71142c67ba439e80e8148e6662e7ec4ec58bc695 /cpp/src/tests/qpid_send.cpp
parenta98f0cfe299d147366c8baa26840b5100b8dc0b9 (diff)
downloadqpid-python-db10ca2521cff96eae94d11a8acb51e8173aba3c.tar.gz
Extend qpid_send, qpid_recv to measure throughput and latency.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid_send.cpp')
-rw-r--r--cpp/src/tests/qpid_send.cpp79
1 files changed, 68 insertions, 11 deletions
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index f828e6077c..1e9711d206 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -25,7 +25,10 @@
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/client/amqp0_10/FailoverUpdates.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Monitor.h>
#include "TestOptions.h"
+#include "Statistics.h"
#include <fstream>
#include <iostream>
@@ -34,7 +37,6 @@
using namespace qpid::messaging;
using namespace qpid::types;
using qpid::client::amqp0_10::FailoverUpdates;
-
typedef std::vector<std::string> string_vector;
using namespace std;
@@ -64,20 +66,26 @@ struct Options : public qpid::Options
uint capacity;
bool failoverUpdates;
qpid::log::Options log;
+ bool report;
+ uint reportEvery;
+ uint rate;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
help(false),
url("amqp:tcp:127.0.0.1"),
- count(1),
+ count(0),
sendEos(0),
durable(false),
ttl(0),
tx(0),
rollbackFrequency(0),
- capacity(0),
+ capacity(1000),
failoverUpdates(false),
- log(argv0)
+ log(argv0),
+ report(false),
+ reportEvery(0),
+ rate(0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -87,17 +95,20 @@ struct Options : public qpid::Options
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
- ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+ ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
- ("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+ ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content instead of reading from stdin")
("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
+ ("report", qpid::optValue(report), "Report throughput statistics")
+ ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages")
+ ("rate", qpid::optValue(rate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -177,6 +188,29 @@ const string EOS("eos");
using namespace qpid::tests;
+class ContentGenerator {
+ public:
+ virtual bool getContent(std::string& content) = 0;
+};
+
+class GetlineContentGenerator : public ContentGenerator {
+ public:
+ virtual bool getContent(std::string& content) { return getline(std::cin, content); }
+};
+
+class FixedContentGenerator : public ContentGenerator {
+ public:
+ FixedContentGenerator(std::string s) : content(s) {}
+ virtual bool getContent(std::string& contentOut) {
+ contentOut = content;
+ return true;
+ }
+ private:
+ std::string content;
+};
+
+
+
int main(int argc, char ** argv)
{
Options opts;
@@ -200,18 +234,41 @@ int main(int argc, char ** argv)
std::string content;
uint sent = 0;
uint txCount = 0;
- while (getline(std::cin, content)) {
+ Reporter<Throughput> reporter(std::cout, opts.reportEvery);
+
+ std::auto_ptr<ContentGenerator> contentGen;
+ if (!opts.content.empty())
+ contentGen.reset(new FixedContentGenerator(opts.content));
+ else
+ contentGen.reset(new GetlineContentGenerator);
+
+ qpid::sys::AbsTime start = qpid::sys::now();
+ int64_t interval = 0;
+ if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate;
+
+ while (contentGen->getContent(content)) {
msg.setContent(content);
msg.getProperties()["sn"] = ++sent;
+ msg.getProperties()["ts"] = int64_t(
+ qpid::sys::Duration(qpid::sys::AbsTime::epoch(), qpid::sys::now()));
sender.send(msg);
+ reporter.message(msg);
if (opts.tx && (sent % opts.tx == 0)) {
- if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ if (opts.rollbackFrequency &&
+ (++txCount % opts.rollbackFrequency == 0))
session.rollback();
- } else {
+ else
session.commit();
- }
- }
+ }
+ if (opts.count && sent >= opts.count) break;
+ if (opts.rate) {
+ qpid::sys::AbsTime waitTill(start, sent*interval);
+ int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+ if (delay > 0)
+ qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ }
}
+ if (opts.report) reporter.report();
for (uint i = opts.sendEos; i > 0; --i) {
msg.getProperties()["sn"] = ++sent;
msg.setContent(EOS);//TODO: add in ability to send digest or similar