diff options
Diffstat (limited to 'cpp/src/tests/qpid_send.cpp')
-rw-r--r-- | cpp/src/tests/qpid_send.cpp | 79 |
1 files changed, 68 insertions, 11 deletions
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index f828e6077c..1e9711d206 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -25,7 +25,10 @@ #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <qpid/client/amqp0_10/FailoverUpdates.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/Monitor.h> #include "TestOptions.h" +#include "Statistics.h" #include <fstream> #include <iostream> @@ -34,7 +37,6 @@ using namespace qpid::messaging; using namespace qpid::types; using qpid::client::amqp0_10::FailoverUpdates; - typedef std::vector<std::string> string_vector; using namespace std; @@ -64,20 +66,26 @@ struct Options : public qpid::Options uint capacity; bool failoverUpdates; qpid::log::Options log; + bool report; + uint reportEvery; + uint rate; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), help(false), url("amqp:tcp:127.0.0.1"), - count(1), + count(0), sendEos(0), durable(false), ttl(0), tx(0), rollbackFrequency(0), - capacity(0), + capacity(1000), failoverUpdates(false), - log(argv0) + log(argv0), + report(false), + reportEvery(0), + rate(0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -87,17 +95,20 @@ struct Options : public qpid::Options ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") - ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") - ("content", qpid::optValue(content, "CONTENT"), "specify textual content") + ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content instead of reading from stdin") ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") + ("report", qpid::optValue(report), "Report throughput statistics") + ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages") + ("rate", qpid::optValue(rate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -177,6 +188,29 @@ const string EOS("eos"); using namespace qpid::tests; +class ContentGenerator { + public: + virtual bool getContent(std::string& content) = 0; +}; + +class GetlineContentGenerator : public ContentGenerator { + public: + virtual bool getContent(std::string& content) { return getline(std::cin, content); } +}; + +class FixedContentGenerator : public ContentGenerator { + public: + FixedContentGenerator(std::string s) : content(s) {} + virtual bool getContent(std::string& contentOut) { + contentOut = content; + return true; + } + private: + std::string content; +}; + + + int main(int argc, char ** argv) { Options opts; @@ -200,18 +234,41 @@ int main(int argc, char ** argv) std::string content; uint sent = 0; uint txCount = 0; - while (getline(std::cin, content)) { + Reporter<Throughput> reporter(std::cout, opts.reportEvery); + + std::auto_ptr<ContentGenerator> contentGen; + if (!opts.content.empty()) + contentGen.reset(new FixedContentGenerator(opts.content)); + else + contentGen.reset(new GetlineContentGenerator); + + qpid::sys::AbsTime start = qpid::sys::now(); + int64_t interval = 0; + if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate; + + while (contentGen->getContent(content)) { msg.setContent(content); msg.getProperties()["sn"] = ++sent; + msg.getProperties()["ts"] = int64_t( + qpid::sys::Duration(qpid::sys::AbsTime::epoch(), qpid::sys::now())); sender.send(msg); + reporter.message(msg); if (opts.tx && (sent % opts.tx == 0)) { - if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { + if (opts.rollbackFrequency && + (++txCount % opts.rollbackFrequency == 0)) session.rollback(); - } else { + else session.commit(); - } - } + } + if (opts.count && sent >= opts.count) break; + if (opts.rate) { + qpid::sys::AbsTime waitTill(start, sent*interval); + int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); + if (delay > 0) + qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + } } + if (opts.report) reporter.report(); for (uint i = opts.sendEos; i > 0; --i) { msg.getProperties()["sn"] = ++sent; msg.setContent(EOS);//TODO: add in ability to send digest or similar |