diff options
Diffstat (limited to 'cpp/src/tests/qpid-perftest.cpp')
-rw-r--r-- | cpp/src/tests/qpid-perftest.cpp | 746 |
1 files changed, 0 insertions, 746 deletions
diff --git a/cpp/src/tests/qpid-perftest.cpp b/cpp/src/tests/qpid-perftest.cpp deleted file mode 100644 index 8a5cf05775..0000000000 --- a/cpp/src/tests/qpid-perftest.cpp +++ /dev/null @@ -1,746 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "TestOptions.h" - -#include "qpid/client/AsyncSession.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Completion.h" -#include "qpid/client/Message.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Thread.h" - -#include <boost/lexical_cast.hpp> -#include <boost/bind.hpp> -#include <boost/function.hpp> -#include <boost/ptr_container/ptr_vector.hpp> - -#include <iostream> -#include <sstream> -#include <numeric> -#include <algorithm> -#include <math.h> - - -using namespace std; -using namespace qpid; -using namespace client; -using namespace sys; -using boost::lexical_cast; -using boost::bind; - -namespace qpid { -namespace tests { - -enum Mode { SHARED, FANOUT, TOPIC }; -const char* modeNames[] = { "shared", "fanout", "topic" }; - -// istream/ostream ops so Options can read/display Mode. -istream& operator>>(istream& in, Mode& mode) { - string s; - in >> s; - int i = find(modeNames, modeNames+3, s) - modeNames; - if (i >= 3) throw Exception("Invalid mode: "+s); - mode = Mode(i); - return in; -} - -ostream& operator<<(ostream& out, Mode mode) { - return out << modeNames[mode]; -} - - -struct Opts : public TestOptions { - - // Actions - bool setup, control, publish, subscribe; - - // Queue policy - uint32_t queueMaxCount; - uint64_t queueMaxSize; - std::string baseName; - bool queueDurable; - - // Publisher - size_t pubs; - size_t count ; - size_t size; - bool confirm; - bool durable; - bool uniqueData; - bool syncPub; - - // Subscriber - size_t subs; - size_t ack; - - // General - size_t qt; - bool singleConnect; - size_t iterations; - Mode mode; - bool summary; - uint32_t intervalSub; - uint32_t intervalPub; - size_t tx; - size_t txPub; - size_t txSub; - bool commitAsync; - - static const std::string helpText; - - Opts() : - TestOptions(helpText), - setup(false), control(false), publish(false), subscribe(false), baseName("qpid-perftest"), - pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), - subs(1), ack(0), - qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false), - intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false) - { - addOptions() - ("setup", optValue(setup), "Create shared queues.") - ("control", optValue(control), "Run test, print report.") - ("publish", optValue(publish), "Publish messages.") - ("subscribe", optValue(subscribe), "Subscribe for messages.") - - ("mode", optValue(mode, "shared|fanout|topic"), "Test mode." - "\nshared: --qt queues, --npubs publishers and --nsubs subscribers per queue.\n" - "\nfanout: --npubs publishers, --nsubs subscribers, fanout exchange." - "\ntopic: --qt topics, --npubs publishers and --nsubs subscribers per topic.\n") - - ("npubs", optValue(pubs, "N"), "Create N publishers.") - ("count", optValue(count, "N"), "Each publisher sends N messages.") - ("size", optValue(size, "BYTES"), "Size of messages in bytes.") - ("pub-confirm", optValue(confirm, "yes|no"), "Publisher use confirm-mode.") - ("durable", optValue(durable, "yes|no"), "Publish messages as durable.") - ("unique-data", optValue(uniqueData, "yes|no"), "Make data for each message unique.") - ("sync-publish", optValue(syncPub, "yes|no"), "Wait for confirmation of each message before sending the next one.") - - ("nsubs", optValue(subs, "N"), "Create N subscribers.") - ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n" - "N==0: Subscriber uses unconfirmed mode") - - ("qt", optValue(qt, "N"), "Create N queues or topics.") - ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") - - ("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.") - ("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec") - - ("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'") - ("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'") - ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics") - ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") - - ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") - ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") - - ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming") - ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing") - ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit") - ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming"); - } - - // Computed values - size_t totalPubs; - size_t totalSubs; - size_t transfers; - size_t subQuota; - - void parse(int argc, char** argv) { - TestOptions::parse(argc, argv); - switch (mode) { - case SHARED: - if (count % subs) { - count += subs - (count % subs); - cout << "WARNING: Adjusted --count to " << count - << " the nearest multiple of --nsubs" << endl; - } - totalPubs = pubs*qt; - totalSubs = subs*qt; - subQuota = (pubs*count)/subs; - break; - case FANOUT: - if (qt != 1) cerr << "WARNING: Fanout mode, ignoring --qt=" - << qt << endl; - qt=1; - totalPubs = pubs; - totalSubs = subs; - subQuota = totalPubs*count; - break; - case TOPIC: - totalPubs = pubs*qt; - totalSubs = subs*qt; - subQuota = pubs*count; - break; - } - transfers=(totalPubs*count) + (totalSubs*subQuota); - if (tx) { - if (txPub) { - cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl; - } else { - txPub = tx; - } - if (txSub) { - cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl; - } else { - txSub = tx; - } - } - } -}; - -const std::string Opts::helpText= -"There are two ways to use qpid-perftest: single process or multi-process.\n\n" -"If none of the --setup, --publish, --subscribe or --control options\n" -"are given qpid-perftest will run a single-process test.\n" -"For a multi-process test first run:\n" -" qpid-perftest --setup <other options>\n" -"and wait for it to complete. The remaining process should run concurrently::\n" -"Run --npubs times: qpid-perftest --publish <other options>\n" -"Run --nsubs times: qpid-perftest --subscribe <other options>\n" -"Run once: qpid-perftest --control <other options>\n" -"Note the <other options> must be identical for all processes.\n"; - -Opts opts; -Connection globalConnection; - -std::string fqn(const std::string& name) -{ - ostringstream fqn; - fqn << opts.baseName << "_" << name; - return fqn.str(); -} - -struct Client : public Runnable { - Connection* connection; - Connection localConnection; - AsyncSession session; - Thread thread; - - Client() { - if (opts.singleConnect){ - connection = &globalConnection; - if (!globalConnection.isOpen()) opts.open(globalConnection); - }else{ - connection = &localConnection; - opts.open(localConnection); - } - session = connection->newSession(); - } - - ~Client() { - try { - if (connection->isOpen()) { - session.close(); - connection->close(); - } - } catch (const std::exception& e) { - std::cerr << "Error in shutdown: " << e.what() << std::endl; - } - } -}; - -struct Setup : public Client { - - void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) { - session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings); - session.queuePurge(arg::queue=name); - session.sync(); - } - - void run() { - queueInit(fqn("pub_start")); - queueInit(fqn("pub_done")); - queueInit(fqn("sub_ready")); - queueInit(fqn("sub_done")); - if (opts.iterations > 1) queueInit(fqn("sub_iteration")); - if (opts.mode==SHARED) { - framing::FieldTable settings;//queue policy settings - settings.setInt("qpid.max_count", opts.queueMaxCount); - settings.setInt("qpid.max_size", opts.queueMaxSize); - for (size_t i = 0; i < opts.qt; ++i) { - ostringstream qname; - qname << opts.baseName << i; - queueInit(qname.str(), opts.durable || opts.queueDurable, settings); - } - } - } -}; - -void expect(string actual, string expect) { - if (expect != actual) - throw Exception("Expecting "+expect+" but received "+actual); - -} - -double secs(Duration d) { return double(d)/TIME_SEC; } -double secs(AbsTime start, AbsTime finish) { - return secs(Duration(start,finish)); -} - - -// Collect rates & print stats. -class Stats { - vector<double> values; - double sum; - - public: - Stats() : sum(0) {} - - // Functor to collect rates. - void operator()(const string& data) { - try { - double d=lexical_cast<double>(data); - values.push_back(d); - sum += d; - } catch (const std::exception&) { - throw Exception("Bad report: "+data); - } - } - - double mean() const { - return sum/values.size(); - } - - double stdev() const { - if (values.size() <= 1) return 0; - double avg = mean(); - double ssq = 0; - for (vector<double>::const_iterator i = values.begin(); - i != values.end(); ++i) { - double x=*i; - x -= avg; - ssq += x*x; - } - return sqrt(ssq/(values.size()-1)); - } - - ostream& print(ostream& out) { - ostream_iterator<double> o(out, "\n"); - copy(values.begin(), values.end(), o); - out << "Average: " << mean(); - if (values.size() > 1) - out << " (std.dev. " << stdev() << ")"; - return out << endl; - } -}; - - -// Manage control queues, collect and print reports. -struct Controller : public Client { - - SubscriptionManager subs; - - Controller() : subs(session) {} - - /** Process messages from queue by applying a functor. */ - void process(size_t n, string queue, - boost::function<void (const string&)> msgFn) - { - if (!opts.summary) - cout << "Processing " << n << " messages from " - << queue << " " << flush; - LocalQueue lq; - subs.setFlowControl(n, SubscriptionManager::UNLIMITED, false); - subs.subscribe(lq, queue); - for (size_t i = 0; i < n; ++i) { - if (!opts.summary) cout << "." << flush; - msgFn(lq.pop().getData()); - } - if (!opts.summary) cout << " done." << endl; - } - - void process(size_t n, LocalQueue lq, string queue, - boost::function<void (const string&)> msgFn) - { - session.messageFlow(queue, 0, n); - if (!opts.summary) - cout << "Processing " << n << " messages from " - << queue << " " << flush; - for (size_t i = 0; i < n; ++i) { - if (!opts.summary) cout << "." << flush; - msgFn(lq.pop().getData()); - } - if (!opts.summary) cout << " done." << endl; - } - - void send(size_t n, string queue, string data) { - if (!opts.summary) - cout << "Sending " << data << " " << n << " times to " << queue - << endl; - Message msg(data, queue); - for (size_t i = 0; i < n; ++i) - session.messageTransfer(arg::content=msg, arg::acceptMode=1); - } - - void run() { // Controller - try { - // Wait for subscribers to be ready. - process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready")); - - LocalQueue pubDone; - LocalQueue subDone; - subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false); - subs.subscribe(pubDone, fqn("pub_done")); - subs.subscribe(subDone, fqn("sub_done")); - - double txrateTotal(0); - double mbytesTotal(0); - double pubRateTotal(0); - double subRateTotal(0); - - for (size_t j = 0; j < opts.iterations; ++j) { - AbsTime start=now(); - send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers - if (j) { - send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration - } - - Stats pubRates; - Stats subRates; - - process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates)); - process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates)); - - AbsTime end=now(); - double time=secs(start, end); - if (time <= 0.0) { - throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count."); - } - double txrate=opts.transfers/time; - double mbytes=(txrate*opts.size)/(1024*1024); - - if (!opts.summary) { - cout << endl << "Total " << opts.transfers << " transfers of " - << opts.size << " bytes in " - << time << " seconds." << endl; - cout << endl << "Publish transfers/sec: " << endl; - pubRates.print(cout); - cout << endl << "Subscribe transfers/sec: " << endl; - subRates.print(cout); - cout << endl - << "Total transfers/sec: " << txrate << endl - << "Total Mbytes/sec: " << mbytes << endl; - } - else { - cout << pubRates.mean() << "\t" - << subRates.mean() << "\t" - << txrate << "\t" - << mbytes << endl; - } - - txrateTotal += txrate; - mbytesTotal += mbytes; - pubRateTotal += pubRates.mean(); - subRateTotal += subRates.mean(); - } - if (opts.iterations > 1) { - cout << "Averages: "<< endl - << (pubRateTotal / opts.iterations) << "\t" - << (subRateTotal / opts.iterations) << "\t" - << (txrateTotal / opts.iterations) << "\t" - << (mbytesTotal / opts.iterations) << endl; - } - } - catch (const std::exception& e) { - cout << "Controller exception: " << e.what() << endl; - } - } -}; - - -struct PublishThread : public Client { - string destination; - string routingKey; - - PublishThread() {}; - - PublishThread(string key, string dest=string()) { - destination=dest; - routingKey=key; - } - - void run() { // Publisher - try { - string data; - size_t offset(0); - if (opts.uniqueData) { - offset = 5; - data += "data:";//marker (requested for latency testing tool scripts) - data += string(sizeof(size_t), 'X');//space for seq no - data += session.getId().str(); - if (opts.size > data.size()) { - data += string(opts.size - data.size(), 'X'); - } else if(opts.size < data.size()) { - cout << "WARNING: Increased --size to " << data.size() - << " to honour --unique-data" << endl; - } - } else { - size_t msgSize=max(opts.size, sizeof(size_t)); - data = string(msgSize, 'X'); - } - - Message msg(data, routingKey); - if (opts.durable) - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - - - if (opts.txPub){ - session.txSelect(); - } - SubscriptionManager subs(session); - LocalQueue lq; - subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); - subs.subscribe(lq, fqn("pub_start")); - - for (size_t j = 0; j < opts.iterations; ++j) { - expect(lq.pop().getData(), "start"); - AbsTime start=now(); - for (size_t i=0; i<opts.count; i++) { - // Stamp the iteration into the message data, avoid - // any heap allocation. - const_cast<std::string&>(msg.getData()).replace(offset, sizeof(size_t), - reinterpret_cast<const char*>(&i), sizeof(size_t)); - if (opts.syncPub) { - sync(session).messageTransfer( - arg::destination=destination, - arg::content=msg, - arg::acceptMode=1); - } else { - session.messageTransfer( - arg::destination=destination, - arg::content=msg, - arg::acceptMode=1); - } - if (opts.txPub && ((i+1) % opts.txPub == 0)){ - if (opts.commitAsync){ - session.txCommit(); - } else { - sync(session).txCommit(); - } - } - if (opts.intervalPub) - qpid::sys::usleep(opts.intervalPub*1000); - } - if (opts.confirm) session.sync(); - AbsTime end=now(); - double time=secs(start,end); - if (time <= 0.0) { - throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count."); - } - - // Send result to controller. - Message report(lexical_cast<string>(opts.count/time), fqn("pub_done")); - session.messageTransfer(arg::content=report, arg::acceptMode=1); - if (opts.txPub){ - sync(session).txCommit(); - } - } - session.close(); - } - catch (const std::exception& e) { - cout << "PublishThread exception: " << e.what() << endl; - } - } -}; - -struct SubscribeThread : public Client { - - string queue; - - SubscribeThread() {} - - SubscribeThread(string q) { queue = q; } - - SubscribeThread(string key, string ex) { - queue=session.getId().str(); // Unique name. - session.queueDeclare(arg::queue=queue, - arg::exclusive=true, - arg::autoDelete=true, - arg::durable=opts.durable); - session.exchangeBind(arg::queue=queue, - arg::exchange=ex, - arg::bindingKey=key); - } - - void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) { - if (!cond) { - Message error( - QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual), - "sub_done"); - session.messageTransfer(arg::content=error, arg::acceptMode=1); - throw Exception(error.getData()); - } - } - - void run() { // Subscribe - try { - if (opts.txSub) sync(session).txSelect(); - SubscriptionManager subs(session); - SubscriptionSettings settings; - settings.autoAck = opts.txSub ? opts.txSub : opts.ack; - settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE); - settings.flowControl = FlowControl::messageCredit(opts.subQuota); - LocalQueue lq; - Subscription subscription = subs.subscribe(lq, queue, settings); - // Notify controller we are ready. - session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1); - if (opts.txSub) { - if (opts.commitAsync) session.txCommit(); - else sync(session).txCommit(); - } - - LocalQueue iterationControl; - if (opts.iterations > 1) { - subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0))); - } - - for (size_t j = 0; j < opts.iterations; ++j) { - if (j > 0) { - //need to wait here until all subs are done - session.messageFlow(fqn("sub_iteration"), 0, 1); - iterationControl.pop(); - - //need to allocate some more credit for subscription - session.messageFlow(queue, 0, opts.subQuota); - } - Message msg; - AbsTime start=now(); - size_t expect=0; - for (size_t i = 0; i < opts.subQuota; ++i) { - msg=lq.pop(); - if (opts.txSub && ((i+1) % opts.txSub == 0)) { - if (opts.commitAsync) session.txCommit(); - else sync(session).txCommit(); - } - if (opts.intervalSub) - qpid::sys::usleep(opts.intervalSub*1000); - // TODO aconway 2007-11-23: check message order for. - // multiple publishers. Need an array of counters, - // one per publisher and a publisher ID in the - // message. Careful not to introduce a lot of overhead - // here, e.g. no std::map, std::string etc. - // - // For now verify order only for a single publisher. - size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0; - size_t n = *reinterpret_cast<const size_t*>(msg.getData().data() + offset); - if (opts.pubs == 1) { - if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n); - else verify(n>=expect, ">=", expect, n); - expect = n+1; - } - } - if (opts.txSub || opts.ack) - subscription.accept(subscription.getUnaccepted()); - if (opts.txSub) { - if (opts.commitAsync) session.txCommit(); - else sync(session).txCommit(); - } - AbsTime end=now(); - - // Report to publisher. - Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), - fqn("sub_done")); - session.messageTransfer(arg::content=result, arg::acceptMode=1); - if (opts.txSub) sync(session).txCommit(); - } - session.close(); - } - catch (const std::exception& e) { - cout << "SubscribeThread exception: " << e.what() << endl; - } - } -}; - -}} // namespace qpid::tests - -using namespace qpid::tests; - -int main(int argc, char** argv) { - int exitCode = 0; - boost::ptr_vector<Client> subs(opts.subs); - boost::ptr_vector<Client> pubs(opts.pubs); - - try { - opts.parse(argc, argv); - - string exchange; - switch (opts.mode) { - case FANOUT: exchange="amq.fanout"; break; - case TOPIC: exchange="amq.topic"; break; - case SHARED: break; - } - - bool singleProcess= - (!opts.setup && !opts.control && !opts.publish && !opts.subscribe); - if (singleProcess) - opts.setup = opts.control = opts.publish = opts.subscribe = true; - - if (opts.setup) Setup().run(); // Set up queues - - // Start pubs/subs for each queue/topic. - for (size_t i = 0; i < opts.qt; ++i) { - ostringstream key; - key << opts.baseName << i; // Queue or topic name. - if (opts.publish) { - size_t n = singleProcess ? opts.pubs : 1; - for (size_t j = 0; j < n; ++j) { - pubs.push_back(new PublishThread(key.str(), exchange)); - pubs.back().thread=Thread(pubs.back()); - } - } - if (opts.subscribe) { - size_t n = singleProcess ? opts.subs : 1; - for (size_t j = 0; j < n; ++j) { - if (opts.mode==SHARED) - subs.push_back(new SubscribeThread(key.str())); - else - subs.push_back(new SubscribeThread(key.str(),exchange)); - subs.back().thread=Thread(subs.back()); - } - } - } - - if (opts.control) Controller().run(); - } - catch (const std::exception& e) { - cout << endl << e.what() << endl; - exitCode = 1; - } - - // Wait for started threads. - if (opts.publish) { - for (boost::ptr_vector<Client>::iterator i=pubs.begin(); - i != pubs.end(); - ++i) - i->thread.join(); - } - - if (opts.subscribe) { - for (boost::ptr_vector<Client>::iterator i=subs.begin(); - i != subs.end(); - ++i) - i->thread.join(); - } - return exitCode; -} |