diff options
| author | Gordon Sim <gsim@apache.org> | 2010-03-11 18:23:46 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2010-03-11 18:23:46 +0000 |
| commit | ab38a2dd7c26749f9bbeb3b3d0999c41cc146a80 (patch) | |
| tree | d6e45d87b6982305a02b9a2383e48892976694b4 /qpid/cpp/src/tests/qpid_stream.cpp | |
| parent | 87dbb1d56d73cc1238ec3811274da0a8b88b3a5f (diff) | |
| download | qpid-python-ab38a2dd7c26749f9bbeb3b3d0999c41cc146a80.tar.gz | |
QPID-2382: Created separate utility class for handling updates from failover exchange; cleaned up reconnection options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@921971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/qpid_stream.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/qpid_stream.cpp | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/qpid/cpp/src/tests/qpid_stream.cpp b/qpid/cpp/src/tests/qpid_stream.cpp index ca21fa248b..ef0aea52e4 100644 --- a/qpid/cpp/src/tests/qpid_stream.cpp +++ b/qpid/cpp/src/tests/qpid_stream.cpp @@ -40,16 +40,33 @@ struct Args : public qpid::Options { std::string url; std::string address; + uint size; uint rate; bool durable; - - Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false) + uint receiverCapacity; + uint senderCapacity; + uint ackFrequency; + + Args() : + url("amqp:tcp:127.0.0.1:5672"), + address("test-queue"), + size(512), + rate(1000), + durable(false), + receiverCapacity(0), + senderCapacity(0), + ackFrequency(1) { addOptions() ("url", qpid::optValue(url, "URL"), "Url to connect to.") ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.") + ("size", qpid::optValue(size, "bytes"), "Message size in bytes (content only, not headers).") ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.") - ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable."); + ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") + ("sender-capacity", qpid::optValue(senderCapacity, "N"), "Credit window (0 implies infinite window)") + ("receiver-capacity", qpid::optValue(receiverCapacity, "N"), "Credit window (0 implies infinite window)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), + "Ack frequency (0 implies none of the messages will get accepted)"); } }; @@ -93,7 +110,8 @@ struct Publish : Client void doWork(Session& session) { Sender sender = session.createSender(opts.address); - Message msg; + if (opts.senderCapacity) sender.setCapacity(opts.senderCapacity); + Message msg(std::string(opts.size, 'X')); uint64_t interval = qpid::sys::TIME_SEC / opts.rate; uint64_t sent = 0, missedRate = 0; qpid::sys::AbsTime start = qpid::sys::now(); @@ -123,9 +141,12 @@ struct Consume : Client double maxLatency = 0; double totalLatency = 0; Receiver receiver = session.createReceiver(opts.address); + if (opts.receiverCapacity) receiver.setCapacity(opts.receiverCapacity); while (receiver.fetch(msg)) { - session.acknowledge();//TODO: add batching option ++received; + if (opts.ackFrequency && (received % opts.ackFrequency == 0)) { + session.acknowledge(); + } //calculate latency uint64_t receivedAt = timestamp(qpid::sys::now()); uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64(); |
