diff options
Diffstat (limited to 'qpid/cpp/src/tests/qpid-latency-test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/qpid-latency-test.cpp | 480 |
1 files changed, 0 insertions, 480 deletions
diff --git a/qpid/cpp/src/tests/qpid-latency-test.cpp b/qpid/cpp/src/tests/qpid-latency-test.cpp deleted file mode 100644 index a03963467b..0000000000 --- a/qpid/cpp/src/tests/qpid-latency-test.cpp +++ /dev/null @@ -1,480 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include <algorithm> -#include <limits> -#include <iostream> -#include <memory> -#include <sstream> -#include <vector> - -#include "TestOptions.h" -#include "qpid/sys/Thread.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/sys/Time.h" - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::sys; -using std::string; - -namespace qpid { -namespace tests { - -typedef std::vector<std::string> StringSet; - -struct Args : public qpid::TestOptions { - uint size; - uint count; - uint rate; - bool sync; - uint reportFrequency; - uint timeLimit; - uint concurrentConnections; - uint prefetch; - uint ack; - bool cumulative; - bool csv; - bool durable; - string base; - bool singleConnect; - - Args() : size(256), count(1000), rate(0), reportFrequency(1000), - timeLimit(0), concurrentConnections(1), - prefetch(100), ack(0), - durable(false), base("latency-test"), singleConnect(false) - - { - addOptions() - - ("size", optValue(size, "N"), "message size") - ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setups, will create another publisher,\ - subcriber, queue, and connections") - ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") - ("count", optValue(count, "N"), "number of messages to send") - ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)") - ("sync", optValue(sync), "send messages synchronously") - ("report-frequency", optValue(reportFrequency, "N"), - "number of milliseconds to wait between reports (ignored unless rate specified)") - ("time-limit", optValue(timeLimit, "N"), - "test duration, in seconds") - ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)") - ("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)") - ("durable", optValue(durable, "yes|no"), "use durable messages") - ("csv", optValue(csv), "print stats in csv format (rate,min,max,avg)") - ("cumulative", optValue(cumulative), "cumulative stats in csv format") - ("queue-base-name", optValue(base, "<name>"), "base name for queues"); - } -}; - -const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); - -Args opts; -double c_min, c_avg, c_max; -Connection globalConnection; - -uint64_t current_time() -{ - return Duration::FromEpoch(); -} - -struct Stats -{ - Mutex lock; - uint count; - double minLatency; - double maxLatency; - double totalLatency; - - Stats(); - void update(double l); - void print(); - void reset(); -}; - -class Client : public Runnable -{ -protected: - Connection* connection; - Connection localConnection; - AsyncSession session; - Thread thread; - string queue; - -public: - Client(const string& q); - virtual ~Client(); - - void start(); - void join(); - void run(); - virtual void test() = 0; -}; - -class Receiver : public Client, public MessageListener -{ - SubscriptionManager mgr; - uint count; - Stats& stats; - -public: - Receiver(const string& queue, Stats& stats); - void test(); - void received(Message& msg); - Stats getStats(); - uint getCount() { return count; } - void stop() { mgr.stop(); mgr.cancel(queue); } -}; - - -class Sender : public Client -{ - string generateData(uint size); - void sendByRate(); - void sendByCount(); - Receiver& receiver; - const string data; - -public: - Sender(const string& queue, Receiver& receiver); - void test(); -}; - - -class Test -{ - const string queue; - Stats stats; - Receiver receiver; - Sender sender; - AbsTime begin; - -public: - Test(const string& q) : queue(q), receiver(queue, stats), sender(queue, receiver), begin(now()) {} - void start(); - void join(); - void report(); -}; - - -Client::Client(const string& q) : queue(q) -{ - if (opts.singleConnect){ - connection = &globalConnection; - if (!globalConnection.isOpen()) opts.open(globalConnection); - }else{ - connection = &localConnection; - opts.open(localConnection); - } - session = connection->newSession(); -} - -void Client::start() -{ - thread = Thread(this); -} - -void Client::join() -{ - thread.join(); -} - -void Client::run() -{ - try{ - test(); - } catch(const std::exception& e) { - std::cout << "Error in receiver: " << e.what() << std::endl; - } -} - -Client::~Client() -{ - try{ - session.close(); - connection->close(); - } catch(const std::exception& e) { - std::cout << "Error in receiver: " << e.what() << std::endl; - } -} - -Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0), stats(s) -{ - session.queueDeclare(arg::queue=queue, arg::durable=opts.durable, arg::autoDelete=true); - uint msgCount = session.queueQuery(arg::queue=queue).get().getMessageCount(); - if (msgCount) { - std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl; - session.queuePurge(arg::queue=queue); - session.sync(); - } - SubscriptionSettings settings; - if (opts.prefetch) { - settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2)); - settings.flowControl = FlowControl::messageWindow(opts.prefetch); - } else { - settings.acceptMode = ACCEPT_MODE_NONE; - settings.flowControl = FlowControl::unlimited(); - } - mgr.subscribe(*this, queue, settings); -} - -void Receiver::test() -{ - mgr.run(); - mgr.cancel(queue); -} - -void Receiver::received(Message& msg) -{ - ++count; - uint64_t receivedAt = current_time(); - uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); - - stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC); - - if (!opts.rate && count >= opts.count) { - mgr.stop(); - } -} - -void Stats::update(double latency) -{ - Mutex::ScopedLock l(lock); - count++; - minLatency = std::min(minLatency, latency); - maxLatency = std::max(maxLatency, latency); - totalLatency += latency; -} - -Stats::Stats() : count(0), minLatency(std::numeric_limits<double>::max()), maxLatency(0), totalLatency(0) {} - -void Stats::print() -{ - static bool already_have_stats = false; - uint value; - - if (opts.rate) - value = opts.rate; - else - value = opts.count; - Mutex::ScopedLock l(lock); - double aux_avg = (totalLatency / count); - if (!opts.cumulative) { - if (!opts.csv) { - if (count) { - std::cout << "Latency(ms): min=" << minLatency << ", max=" << - maxLatency << ", avg=" << aux_avg; - } else { - std::cout << "Stalled: no samples for interval"; - } - } else { - if (count) { - std::cout << value << "," << minLatency << "," << maxLatency << - "," << aux_avg; - } else { - std::cout << value << "," << minLatency << "," << maxLatency << - ", Stalled"; - } - } - } else { - if (count) { - if (already_have_stats) { - c_avg = (c_min + aux_avg) / 2; - if (c_min > minLatency) c_min = minLatency; - if (c_max < maxLatency) c_max = maxLatency; - } else { - c_avg = aux_avg; - c_min = minLatency; - c_max = maxLatency; - already_have_stats = true; - } - std::cout << value << "," << c_min << "," << c_max << - "," << c_avg; - } else { - std::cout << "Stalled: no samples for interval"; - } - } -} - -void Stats::reset() -{ - Mutex::ScopedLock l(lock); - count = 0; - totalLatency = maxLatency = 0; - minLatency = std::numeric_limits<double>::max(); -} - -Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {} - -void Sender::test() -{ - if (opts.rate) sendByRate(); - else sendByCount(); -} - -void Sender::sendByCount() -{ - Message msg(data, queue); - if (opts.durable) { - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - } - - for (uint i = 0; i < opts.count; i++) { - uint64_t sentAt(current_time()); - msg.getDeliveryProperties().setTimestamp(sentAt); - async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); - if (opts.sync) session.sync(); - } - session.sync(); -} - -void Sender::sendByRate() -{ - Message msg(data, queue); - if (opts.durable) { - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - } - uint64_t interval = TIME_SEC/opts.rate; - int64_t timeLimit = opts.timeLimit * TIME_SEC; - uint64_t sent = 0; - AbsTime start = now(); - AbsTime last = start; - while (true) { - AbsTime sentAt=now(); - msg.getDeliveryProperties().setTimestamp(Duration::FromEpoch()); - async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); - if (opts.sync) session.sync(); - ++sent; - if (Duration(last, sentAt) > (opts.reportFrequency*TIME_MSEC)) { - Duration t(start, now()); - //check rate actually achieved thus far - if (t/TIME_SEC) { - uint actualRate = sent / (t/TIME_SEC); - //report inability to stay within 1% of desired rate - if (actualRate < opts.rate && opts.rate - actualRate > opts.rate/100) { - std::cerr << "WARNING: Desired send rate: " << opts.rate << ", actual send rate: " << actualRate << std::endl; - } - } - last = sentAt; - } - - AbsTime waitTill(start, sent*interval); - Duration delay(sentAt, waitTill); - if (delay > 0) - sys::usleep(delay / TIME_USEC); - if (timeLimit != 0 && Duration(start, now()) > timeLimit) { - session.sync(); - receiver.stop(); - break; - } - } -} - -string Sender::generateData(uint size) -{ - if (size < chars.length()) { - return chars.substr(0, size); - } - std::string data; - for (uint i = 0; i < (size / chars.length()); i++) { - data += chars; - } - data += chars.substr(0, size % chars.length()); - return data; -} - - -void Test::start() -{ - receiver.start(); - begin = AbsTime(now()); - sender.start(); -} - -void Test::join() -{ - sender.join(); - receiver.join(); - AbsTime end = now(); - Duration time(begin, end); - double msecs(time / TIME_MSEC); - if (!opts.csv) { - std::cout << "Sent " << receiver.getCount() << " msgs through " << queue - << " in " << msecs << "ms (" << (receiver.getCount() * 1000 / msecs) << " msgs/s) "; - } - stats.print(); - std::cout << std::endl; -} - -void Test::report() -{ - stats.print(); - std::cout << std::endl; - stats.reset(); -} - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char** argv) -{ - try { - opts.parse(argc, argv); - if (opts.cumulative) - opts.csv = true; - - Connection localConnection; - AsyncSession session; - - boost::ptr_vector<Test> tests(opts.concurrentConnections); - for (uint i = 0; i < opts.concurrentConnections; i++) { - std::ostringstream out; - out << opts.base << "-" << (i+1); - tests.push_back(new Test(out.str())); - } - for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) { - i->start(); - } - if (opts.rate && !opts.timeLimit) { - while (true) { - qpid::sys::usleep(opts.reportFrequency * 1000); - //print latency report: - for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) { - i->report(); - } - } - } else { - for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) { - i->join(); - } - } - - return 0; - } catch(const std::exception& e) { - std::cout << e.what() << std::endl; - } - return 1; -} |