summaryrefslogtreecommitdiff
path: root/RC9/qpid/cpp/src/tests/latencytest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'RC9/qpid/cpp/src/tests/latencytest.cpp')
-rw-r--r--RC9/qpid/cpp/src/tests/latencytest.cpp432
1 files changed, 432 insertions, 0 deletions
diff --git a/RC9/qpid/cpp/src/tests/latencytest.cpp b/RC9/qpid/cpp/src/tests/latencytest.cpp
new file mode 100644
index 0000000000..6895964133
--- /dev/null
+++ b/RC9/qpid/cpp/src/tests/latencytest.cpp
@@ -0,0 +1,432 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <algorithm>
+#include <limits>
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include "TestOptions.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/sys/Time.h"
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+typedef std::vector<std::string> StringSet;
+
+struct Args : public qpid::TestOptions {
+ uint size;
+ uint count;
+ uint rate;
+ bool sync;
+ uint reportFrequency;
+ uint timeLimit;
+ uint queues;
+ uint prefetch;
+ uint ack;
+ bool cumulative;
+ bool csv;
+ bool durable;
+ string base;
+
+ Args() : size(256), count(1000), rate(0), reportFrequency(1000),
+ timeLimit(0), queues(1),
+ prefetch(100), ack(0),
+ durable(false), base("latency-test")
+ {
+ addOptions()
+
+ ("size", optValue(size, "N"), "message size")
+ ("queues", optValue(queues, "N"), "number of queues")
+ ("count", optValue(count, "N"), "number of messages to send")
+ ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
+ ("sync", optValue(sync), "send messages synchronously")
+ ("report-frequency", optValue(reportFrequency, "N"),
+ "number of milliseconds to wait between reports (ignored unless rate specified)")
+ ("time-limit", optValue(timeLimit, "N"),
+ "test duration, in seconds")
+ ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)")
+ ("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)")
+ ("durable", optValue(durable, "yes|no"), "use durable messages")
+ ("csv", optValue(csv), "print stats in csv format (rate,min,max,avg)")
+ ("cumulative", optValue(cumulative), "cumulative stats in csv format")
+ ("queue-base-name", optValue(base, "<name>"), "base name for queues");
+ }
+};
+
+const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+
+Args opts;
+double c_min, c_avg, c_max;
+
+uint64_t current_time()
+{
+ Duration t(now());
+ return t;
+}
+
+struct Stats
+{
+ Mutex lock;
+ uint count;
+ double minLatency;
+ double maxLatency;
+ double totalLatency;
+
+ Stats();
+ void update(double l);
+ void print();
+ void reset();
+};
+
+class Client : public Runnable
+{
+protected:
+ Connection connection;
+ AsyncSession session;
+ Thread thread;
+ string queue;
+
+public:
+ Client(const string& q);
+ virtual ~Client() {}
+
+ void start();
+ void join();
+ void run();
+ virtual void test() = 0;
+};
+
+class Receiver : public Client, public MessageListener
+{
+ SubscriptionManager mgr;
+ uint count;
+ Stats& stats;
+
+public:
+ Receiver(const string& queue, Stats& stats);
+ void test();
+ void received(Message& msg);
+ Stats getStats();
+ uint getCount() { return count; }
+ void stop() { mgr.stop(); mgr.cancel(queue); }
+};
+
+
+class Sender : public Client
+{
+ string generateData(uint size);
+ void sendByRate();
+ void sendByCount();
+ Receiver& receiver;
+ const string data;
+public:
+ Sender(const string& queue, Receiver& receiver);
+ void test();
+};
+
+
+class Test
+{
+ const string queue;
+ Stats stats;
+ Receiver receiver;
+ Sender sender;
+ AbsTime begin;
+
+public:
+ Test(const string& q) : queue(q), receiver(queue, stats), sender(queue, receiver), begin(now()) {}
+ void start();
+ void join();
+ void report();
+};
+
+
+Client::Client(const string& q) : queue(q)
+{
+ opts.open(connection);
+ session = connection.newSession();
+}
+
+void Client::start()
+{
+ thread = Thread(this);
+}
+
+void Client::join()
+{
+ thread.join();
+}
+
+void Client::run()
+{
+ try{
+ test();
+ session.close();
+ connection.close();
+ } catch(const std::exception& e) {
+ std::cout << "Error in receiver: " << e.what() << std::endl;
+ }
+}
+
+Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0), stats(s)
+{
+ session.queueDeclare(arg::queue=queue, arg::durable=opts.durable, arg::autoDelete=true);
+ uint msgCount = session.queueQuery(arg::queue=queue).get().getMessageCount();
+ if (msgCount) {
+ std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl;
+ session.queuePurge(arg::queue=queue);
+ }
+ SubscriptionSettings settings;
+ if (opts.prefetch) {
+ settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2));
+ settings.flowControl = FlowControl::messageWindow(opts.prefetch);
+ } else {
+ settings.acceptMode = ACCEPT_MODE_NONE;
+ settings.flowControl = FlowControl::unlimited();
+ }
+ mgr.subscribe(*this, queue, settings);
+}
+
+void Receiver::test()
+{
+ mgr.run();
+ mgr.cancel(queue);
+}
+
+void Receiver::received(Message& msg)
+{
+ ++count;
+ uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
+ //uint64_t sentAt = msg.getHeaders().getTimestamp("sent-at");// TODO: add support for uint64_t as a field table type
+ uint64_t receivedAt = current_time();
+
+ //std::cerr << "Latency: " << (receivedAt - sentAt) << std::endl;
+ stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC);
+
+ if (!opts.rate && count >= opts.count) {
+ mgr.stop();
+ }
+}
+
+void Stats::update(double latency)
+{
+ Mutex::ScopedLock l(lock);
+ count++;
+ minLatency = std::min(minLatency, latency);
+ maxLatency = std::max(maxLatency, latency);
+ totalLatency += latency;
+}
+
+Stats::Stats() : count(0), minLatency(std::numeric_limits<double>::max()), maxLatency(0), totalLatency(0) {}
+
+void Stats::print()
+{
+ static bool already_have_stats = false;
+ uint value;
+
+ if (opts.rate)
+ value = opts.rate;
+ else
+ value = opts.count;
+ Mutex::ScopedLock l(lock);
+ double aux_avg = (totalLatency / count);
+ if (!opts.cumulative) {
+ if (!opts.csv) {
+ std::cout << "Latency(ms): min=" << minLatency << ", max=" <<
+ maxLatency << ", avg=" << aux_avg;
+ } else {
+ std::cout << value << "," << minLatency << "," << maxLatency <<
+ "," << aux_avg;
+ }
+ } else {
+ if (already_have_stats) {
+ c_avg = (c_min + aux_avg) / 2;
+ if (c_min > minLatency) c_min = minLatency;
+ if (c_max < maxLatency) c_max = maxLatency;
+ } else {
+ c_avg = aux_avg;
+ c_min = minLatency;
+ c_max = maxLatency;
+ already_have_stats = true;
+ }
+ std::cout << value << "," << c_min << "," << c_max <<
+ "," << c_avg;
+ }
+
+}
+
+void Stats::reset()
+{
+ Mutex::ScopedLock l(lock);
+ count = 0;
+ totalLatency = maxLatency = 0;
+ minLatency = std::numeric_limits<double>::max();
+}
+
+Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {}
+
+void Sender::test()
+{
+ if (opts.rate) sendByRate();
+ else sendByCount();
+}
+
+void Sender::sendByCount()
+{
+ Message msg(data, queue);
+ if (opts.durable) {
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ }
+
+ for (uint i = 0; i < opts.count; i++) {
+ uint64_t sentAt(current_time());
+ msg.getDeliveryProperties().setTimestamp(sentAt);
+ async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
+ if (opts.sync) session.sync();
+ }
+ session.sync();
+}
+
+void Sender::sendByRate()
+{
+ Message msg(data, queue);
+ if (opts.durable) {
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ }
+
+ //calculate interval (in micro secs) between messages to achieve desired rate
+ uint64_t interval = (1000*1000)/opts.rate;
+ uint64_t timeLimit(opts.timeLimit * TIME_SEC);
+ uint64_t start(current_time());
+
+ while (true) {
+ uint64_t start_msg(current_time());
+ msg.getDeliveryProperties().setTimestamp(start_msg);
+ async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
+ if (opts.sync) session.sync();
+
+ uint64_t now = current_time();
+
+ if (timeLimit != 0 && (now - start) > timeLimit) {
+ session.sync();
+ receiver.stop();
+ break;
+ }
+
+ uint64_t timeTaken = (now - start_msg) / TIME_USEC;
+ if (timeTaken < interval) {
+ qpid::sys::usleep(interval - timeTaken);
+ } else if (timeTaken > interval &&
+ !opts.csv && !opts.cumulative) { // Don't be so verbose in this case, we're piping the results to another program
+ std::cout << "Could not achieve desired rate! (Took " << timeTaken
+ << " microsecs to send message, aiming for " << interval << " microsecs)" << std::endl;
+ }
+ }
+}
+
+string Sender::generateData(uint size)
+{
+ if (size < chars.length()) {
+ return chars.substr(0, size);
+ }
+ std::string data;
+ for (uint i = 0; i < (size / chars.length()); i++) {
+ data += chars;
+ }
+ data += chars.substr(0, size % chars.length());
+ return data;
+}
+
+
+void Test::start()
+{
+ receiver.start();
+ begin = AbsTime(now());
+ sender.start();
+}
+
+void Test::join()
+{
+ sender.join();
+ receiver.join();
+ AbsTime end = now();
+ Duration time(begin, end);
+ double msecs(time / TIME_MSEC);
+ if (!opts.csv) {
+ std::cout << "Sent " << receiver.getCount() << " msgs through " << queue
+ << " in " << msecs << "ms (" << (receiver.getCount() * 1000 / msecs) << " msgs/s) ";
+ }
+ stats.print();
+ std::cout << std::endl;
+}
+
+void Test::report()
+{
+ stats.print();
+ std::cout << std::endl;
+ stats.reset();
+}
+
+int main(int argc, char** argv)
+{
+ try {
+ opts.parse(argc, argv);
+ if (opts.cumulative)
+ opts.csv = true;
+ boost::ptr_vector<Test> tests(opts.queues);
+ for (uint i = 0; i < opts.queues; i++) {
+ std::ostringstream out;
+ out << opts.base << "-" << (i+1);
+ tests.push_back(new Test(out.str()));
+ }
+ for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) {
+ i->start();
+ }
+ if (opts.rate && !opts.timeLimit) {
+ while (true) {
+ qpid::sys::usleep(opts.reportFrequency * 1000);
+ //print latency report:
+ for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) {
+ i->report();
+ }
+ }
+ } else {
+ for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) {
+ i->join();
+ }
+ }
+
+ return 0;
+ } catch(const std::exception& e) {
+ std::cout << e.what() << std::endl;
+ }
+ return 1;
+}