From 710b8a1f1285b9aa5bccee5b1906500667dd7bc5 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 7 Nov 2007 19:57:46 +0000 Subject: client::SubscriptionManager: - Added autoStop support. - Added LocalQueue subscriptions. - Expose AckPolicy settings to user. client::Message: - incoming Messages carry their session for acknowledge perftest: (see perftest --help for details...) - allow multiple consumers. - 3 queue modes: shared, fanout, topic. - set size of messages git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592869 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/ClientSessionTest.cpp | 2 +- cpp/src/tests/perftest.cpp | 380 +++++++++++++++--------------------- 2 files changed, 159 insertions(+), 223 deletions(-) (limited to 'cpp/src/tests') diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index ed3d733c20..369477131c 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -48,7 +48,7 @@ struct DummyListener : public MessageListener void listen() { - dispatcher.listen(name, this, true, 1); + dispatcher.listen(name, this); dispatcher.run(); } diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index bc816f6597..80157da7f4 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -21,44 +21,48 @@ #include "TestOptions.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Exchange.h" -#include "qpid/client/Queue.h" +#include "qpid/client/Session_0_10.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" -#include "qpid/client/MessageListener.h" #include "qpid/client/Message.h" -#include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include -#include -#include -#include -#include +#include - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::sys; using namespace std; +using namespace qpid; +using namespace client; +using namespace sys; struct Opts : public TestOptions { bool listen; bool publish; int count; - bool durable; + int size; + bool durable; + int consumers; + std::string mode; - Opts() : listen(false), publish(false), count(500000) { + Opts() : + listen(false), publish(false), count(500000), size(64), consumers(1), + mode("shared") + { addOptions() ("listen", optValue(listen), "Consume messages.") ("publish", optValue(publish), "Produce messages.") - ("count", optValue(count, "N"), "Messages to send/receive.") - ("durable", optValue(durable, "N"), "Publish messages as durable."); + ("count", optValue(count, "N"), "Messages to send.") + ("size", optValue(size, "BYTES"), "Size of messages.") + ("durable", optValue(durable, "N"), "Publish messages as durable.") + ("consumers", optValue(consumers, "N"), "Number of consumers.") + ("mode", optValue(mode, "shared|fanout|topic"), "consume mode"); } }; Opts opts; +enum Mode { SHARED, FANOUT, TOPIC }; +Mode mode; struct ListenThread : public Runnable { Thread thread; void run(); }; struct PublishThread : public Runnable { Thread thread; void run(); }; @@ -66,16 +70,22 @@ struct PublishThread : public Runnable { Thread thread; void run(); }; int main(int argc, char** argv) { try { opts.parse(argc, argv); + if (opts.mode=="shared") mode=SHARED; + else if (opts.mode=="fanout") mode = FANOUT; + else if (opts.mode=="topic") mode = TOPIC; + else throw Exception("Invalid mode"); if (!opts.listen && !opts.publish) opts.listen = opts.publish = true; - ListenThread listen; + std::vector listen(opts.consumers); PublishThread publish; - if (opts.listen) - listen.thread=Thread(listen); + if (opts.listen) + for (int i = 0; i < opts.consumers; ++i) + listen[i].thread=Thread(listen[i]); if (opts.publish) publish.thread=Thread(publish); if (opts.listen) - listen.thread.join(); + for (int i = 0; i < opts.consumers; ++i) + listen[i].thread.join(); if (opts.publish) publish.thread.join(); } @@ -84,223 +94,149 @@ int main(int argc, char** argv) { } } -// ================================================================ -// Publish client -// +double secs(Duration d) { return double(d)/TIME_SEC; } +double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); } -struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) { - timespec r; - r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec; - r.tv_sec = lhs.tv_sec - rhs.tv_sec; - if (r.tv_nsec < 0) { - r.tv_nsec += 1000000000; - r.tv_sec -= 1; - } - return r; -} -ostream& operator<<(ostream& o, const struct timespec& ts) { - o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec; - return o; -} +void expect(string actual, string expect) { + if (expect != actual) + throw Exception("Expecting "+expect+" but received "+actual); -double toDouble(const struct timespec& ts) { - return double(ts.tv_nsec)/1000000000 + ts.tv_sec; } -class PublishListener : public MessageListener { - - void set_time() { - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - startTime = ts; - } - - void print_time() { - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - cout << "Total Time:" << ts-startTime << endl; - double rate = messageCount*2/toDouble(ts-startTime); - cout << "returned Messages:" << messageCount << endl; - cout << "round trip Rate:" << rate << endl; - } - - struct timespec startTime; - int messageCount; - bool done; - Monitor lock; - - public: - - PublishListener(int mcount): messageCount(mcount), done(false) { - set_time(); - } - - void received(Message& msg) { - print_time(); - QPID_LOG(info, "Publisher: received: " << msg.getData()); - Mutex::ScopedLock l(lock); - QPID_LOG(info, "Publisher: done."); - done = true; - lock.notify(); +const char* exchange() { + switch (mode) { + case SHARED: return ""; // Deafult exchange. + case FANOUT: return "amq.fanout"; + case TOPIC: return "amq.topic"; } - - void wait() { - Mutex::ScopedLock l(lock); - while (!done) - lock.wait(); - } -}; - + assert(0); + return 0; +} void PublishThread::run() { - Connection connection; - Channel channel; - Message msg; - opts.open(connection); - connection.openChannel(channel); - channel.start(); - - cout << "Started publisher." << endl; - string queueControl = "control"; - Queue response(queueControl); - channel.declareQueue(response); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + try { + Connection connection; + opts.open(connection); + Session_0_10 session = connection.newSession(); + + session.queueDeclare(arg::queue="control"); // Control queue + session.queuePurge(arg::queue="control"); + if (mode==SHARED) { + session.queueDeclare(arg::queue="perftest"); // Shared data queue + session.queuePurge(arg::queue="perftest"); + } - string queueName ="queue01"; - string queueNameC =queueName+"-1"; - - // create publish queue - Queue publish(queueName); - channel.declareQueue(publish); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName); - - // create completion queue - Queue completion(queueNameC); - channel.declareQueue(completion); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); - - // pass queue name - msg.setData(queueName); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl); - - QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC); - - int count = opts.count; - PublishListener listener(count); - channel.consume(completion, queueNameC, &listener); - QPID_LOG(info, "Publisher setup consumer: "<< queueNameC); - - struct timespec startTime; - if (::clock_gettime(CLOCK_REALTIME, &startTime)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - - bool durable = opts.durable; - if (durable) + // Wait for consumers. + SubscriptionManager subs(session); + LocalQueue control; + subs.subscribe(control, "control"); + for (int i = 0; i < opts.consumers; ++i) + expect(control.pop().getData(), "ready"); + + // Create test message + size_t msgSize=max(opts.size, 32); + Message msg(string(msgSize, 'X'), "perftest"); + char* msgBuf = const_cast(msg.getData().data()); + if (opts.durable) msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + // Time sending message. + AbsTime start=now(); + cout << "Publishing " << opts.count << " messages " << flush; + for (int i=0; i