diff options
| author | Alan Conway <aconway@apache.org> | 2010-04-09 20:31:36 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-04-09 20:31:36 +0000 |
| commit | 0d241bfaf809689f8a5cf022ff9418a5f26c9e7f (patch) | |
| tree | 27590ccb6cabbc9302a35dc342a0389404f9a968 /cpp | |
| parent | 86e70636779edaee5c0a26de5995a707ab2206f9 (diff) | |
| download | qpid-python-0d241bfaf809689f8a5cf022ff9418a5f26c9e7f.tar.gz | |
Rationalize message count and message content options in new API send/receive/benchmark.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932580 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rwxr-xr-x | cpp/src/tests/qpid_cpp_benchmark | 8 | ||||
| -rw-r--r-- | cpp/src/tests/qpid_receive.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/tests/qpid_send.cpp | 68 |
3 files changed, 53 insertions, 25 deletions
diff --git a/cpp/src/tests/qpid_cpp_benchmark b/cpp/src/tests/qpid_cpp_benchmark index 26bebf0506..e82251078f 100755 --- a/cpp/src/tests/qpid_cpp_benchmark +++ b/cpp/src/tests/qpid_cpp_benchmark @@ -37,7 +37,9 @@ op.add_option("-m", "--messages", default=100000, type="int", metavar="N", op.add_option("--queue-name", default="benchmark", help="base name for queues (default %default)") op.add_option("--send-rate", default=0, metavar="R", - help="send rate limited to R messages/second, 0 means no limit") + help="send rate limited to R messages/second, 0 means no limit (default %default)") +op.add_option("--content-size", default=1024, type="int", metavar="BYTES", + help="message size in bytes (default %default)") def start_receive(queue, opts): return Popen(["qpid_receive", @@ -53,9 +55,9 @@ def start_send(queue, opts): return Popen(["qpid_send", "-b", opts.broker, "-a", queue, - "--count", str(opts.messages), + "--messages", str(opts.messages), "--send-eos", str(opts.receivers), - "--content", "benchmark", + "--content-size", str(opts.content_size), "--rate", str(opts.send_rate), "--report-total"], stdout=PIPE, stderr=STDOUT) diff --git a/cpp/src/tests/qpid_receive.cpp b/cpp/src/tests/qpid_receive.cpp index 46f3db6718..902e855c2b 100644 --- a/cpp/src/tests/qpid_receive.cpp +++ b/cpp/src/tests/qpid_receive.cpp @@ -89,7 +89,7 @@ struct Options : public qpid::Options ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting") ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever") - ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") + ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)") diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index 914f910224..08210da7ee 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -34,13 +34,12 @@ #include <iostream> #include <memory> +using namespace std; using namespace qpid::messaging; using namespace qpid::types; using qpid::client::amqp0_10::FailoverUpdates; typedef std::vector<std::string> string_vector; -using namespace std; - namespace qpid { namespace tests { @@ -50,7 +49,7 @@ struct Options : public qpid::Options std::string url; std::string connectionOptions; std::string address; - uint count; + uint messages; std::string id; std::string replyto; uint sendEos; @@ -60,7 +59,9 @@ struct Options : public qpid::Options std::string correlationid; string_vector properties; string_vector entries; - std::string content; + std::string contentString; + uint contentSize; + bool contentStdin; uint tx; uint rollbackFrequency; uint capacity; @@ -74,10 +75,13 @@ struct Options : public qpid::Options : qpid::Options("Options"), help(false), url("amqp:tcp:127.0.0.1"), - count(0), + messages(1), sendEos(0), durable(false), ttl(0), + contentString(), + contentSize(0), + contentStdin(false), tx(0), rollbackFrequency(0), capacity(1000), @@ -91,17 +95,19 @@ struct Options : public qpid::Options ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from") ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") - ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables") + ("messages,m", qpid::optValue(messages, "N"), "stop after N messages have been sent, 0 means no limit") ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one") ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address") ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") - ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") - ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content instead of reading from stdin") + ("content-string", qpid::optValue(contentString, "CONTENT"), "use CONTENT as message content") + ("content-size", qpid::optValue(contentSize, "N"), "create an N-byte message content") + ("content-map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content") + ("content-stdin", qpid::optValue(contentStdin), "read message content from stdin, one line per message") ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue") ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)") ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") @@ -191,26 +197,42 @@ using namespace qpid::tests; class ContentGenerator { public: virtual ~ContentGenerator() {} - virtual bool getContent(std::string& content) = 0; + virtual bool setContent(Message& msg) = 0; }; class GetlineContentGenerator : public ContentGenerator { public: - virtual bool getContent(std::string& content) { return getline(std::cin, content); } + virtual bool setContent(Message& msg) { + string content; + bool got = getline(std::cin, content); + if (got) msg.setContent(content); + return got; + } }; class FixedContentGenerator : public ContentGenerator { public: - FixedContentGenerator(std::string s) : content(s) {} - virtual bool getContent(std::string& contentOut) { - contentOut = content; + FixedContentGenerator(const string& s) : content(s) {} + virtual bool setContent(Message& msg) { + msg.setContent(content); return true; } private: std::string content; }; - +class MapContentGenerator : public ContentGenerator { + public: + MapContentGenerator(const Options& opt) : opts(opt) {} + virtual bool setContent(Message& msg) { + Variant::Map map; + opts.setEntries(map); + encode(map, msg); + return true; + } + private: + const Options& opts; +}; int main(int argc, char ** argv) { @@ -232,23 +254,27 @@ int main(int argc, char ** argv) if (!opts.userid.empty()) msg.setUserId(opts.userid); if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid); opts.setProperties(msg); - std::string content; uint sent = 0; uint txCount = 0; Reporter<Throughput> reporter(std::cout, opts.reportEvery); std::auto_ptr<ContentGenerator> contentGen; - if (!opts.content.empty()) - contentGen.reset(new FixedContentGenerator(opts.content)); - else + if (opts.contentStdin) { + opts.messages = 0; // Don't limit # messages sent. contentGen.reset(new GetlineContentGenerator); + } + else if (opts.entries.size() > 0) + contentGen.reset(new MapContentGenerator(opts)); + else if (opts.contentSize > 0) + contentGen.reset(new FixedContentGenerator(string(opts.contentSize, 'X'))); + else + contentGen.reset(new FixedContentGenerator(opts.contentString)); qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate; - while (contentGen->getContent(content)) { - msg.setContent(content); + while (contentGen->setContent(msg)) { msg.getProperties()["sn"] = ++sent; msg.getProperties()["ts"] = int64_t( qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); @@ -261,7 +287,7 @@ int main(int argc, char ** argv) else session.commit(); } - if (opts.count && sent >= opts.count) break; + if (opts.messages && sent >= opts.messages) break; if (opts.rate) { qpid::sys::AbsTime waitTill(start, sent*interval); int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); |
