diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-09 17:48:15 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-10-09 17:48:15 +0000 |
commit | 7852367e6a3e6510b4dbcaec21f023ead7827fa1 (patch) | |
tree | f80efb27dbfc4b251811af9a65edb71fd740845c /cpp/src | |
parent | 4ff1d4ca6707275d1306d7f94c79b5d066a548ce (diff) | |
download | qpid-python-7852367e6a3e6510b4dbcaec21f023ead7827fa1.tar.gz |
Added options to perftest
- tx_sub & tx_pub so that they can be set independantly
- async tx pub option
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@703214 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/tests/perftest.cpp | 50 |
1 files changed, 36 insertions, 14 deletions
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 9096854a6d..923405779c 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -97,7 +97,9 @@ struct Opts : public TestOptions { bool summary; uint32_t intervalSub; uint32_t intervalPub; - size_t tx; + size_t tx_pub; + bool tx_pub_async; + size_t tx_sub; static const std::string helpText; @@ -107,7 +109,7 @@ struct Opts : public TestOptions { pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), subs(1), ack(0), qt(1), iterations(1), mode(SHARED), summary(false), - intervalSub(0), intervalPub(0), tx(0) + intervalSub(0), intervalPub(0), tx_pub(0), tx_pub_async(false), tx_sub(0) { addOptions() ("setup", optValue(setup), "Create shared queues.") @@ -143,7 +145,9 @@ struct Opts : public TestOptions { ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") - ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size"); + ("tx_pub", optValue(tx_pub, "N"), "if non-zero, the transaction batch size for publishing") + ("tx_pub_async", optValue(tx_pub_async, "yes|no"), "Publishing tx commit async") + ("tx_sub", optValue(tx_sub, "N"), "if non-zero, the transaction batch size for consuming"); } // Computed values @@ -453,7 +457,13 @@ struct PublishThread : public Client { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - if (opts.tx) sync(session).txSelect(); + if (opts.tx_pub){ + if (opts.tx_pub_async){ + session.txSelect(); + } else { + sync(session).txSelect(); + } + } SubscriptionManager subs(session); LocalQueue lq; subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); @@ -478,7 +488,13 @@ struct PublishThread : public Client { arg::content=msg, arg::acceptMode=1); } - if (opts.tx && ((i+1) % opts.tx == 0)) sync(session).txCommit(); + if (opts.tx_pub && ((i+1) % opts.tx_pub == 0)){ + if (opts.tx_pub_async){ + session.txCommit(); + } else { + sync(session).txCommit(); + } + } if (opts.intervalPub) ::usleep(opts.intervalPub*1000); } if (opts.confirm) session.sync(); @@ -488,7 +504,13 @@ struct PublishThread : public Client { // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); session.messageTransfer(arg::content=report, arg::acceptMode=1); - if (opts.tx) sync(session).txCommit(); + if (opts.tx_pub){ + if (opts.tx_pub_async){ + session.txCommit(); + }else{ + sync(session).txCommit(); + } + } } session.close(); } @@ -530,16 +552,16 @@ struct SubscribeThread : public Client { void run() { // Subscribe try { - if (opts.tx) sync(session).txSelect(); + if (opts.tx_sub) sync(session).txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack)); - subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1); + LocalQueue lq(AckPolicy(opts.tx_sub ? opts.tx_sub : opts.ack)); + subs.setAcceptMode(opts.tx_sub || opts.ack ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); - if (opts.tx) sync(session).txCommit(); + if (opts.tx_sub) sync(session).txCommit(); for (size_t j = 0; j < opts.iterations; ++j) { if (j > 0) { @@ -551,7 +573,7 @@ struct SubscribeThread : public Client { size_t expect=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); - if (opts.tx && ((i+1) % opts.tx == 0)) sync(session).txCommit(); + if (opts.tx_sub && ((i+1) % opts.tx_sub == 0)) sync(session).txCommit(); if (opts.intervalSub) ::usleep(opts.intervalSub*1000); // TODO aconway 2007-11-23: check message order for. // multiple publishers. Need an array of counters, @@ -568,9 +590,9 @@ struct SubscribeThread : public Client { expect = n+1; } } - if (opts.tx || opts.ack) + if (opts.tx_sub || opts.ack) lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. - if (opts.tx) + if (opts.tx_sub) sync(session).txCommit(); AbsTime end=now(); @@ -578,7 +600,7 @@ struct SubscribeThread : public Client { Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); session.messageTransfer(arg::content=result, arg::acceptMode=1); - if (opts.tx) sync(session).txCommit(); + if (opts.tx_sub) sync(session).txCommit(); } session.close(); } |