From 4ea2e527fedb69bd0ae93e79acefebf106b34318 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 15 Aug 2007 14:56:51 +0000 Subject: * perftest/topic_publisher.cpp, topic_listener.cpp: Combined into a single preftest.cpp executable and moved to src/tests. New perftest: - Supports all client-side options (--host, --port etc.) - Can be run as producer (--listen), consumer (--publish) or both. - --count specifies number of messages (default 500000 as before) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566211 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/perftest.cpp | 298 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 cpp/src/tests/perftest.cpp (limited to 'cpp/src/tests/perftest.cpp') diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp new file mode 100644 index 0000000000..7c0941bf68 --- /dev/null +++ b/cpp/src/tests/perftest.cpp @@ -0,0 +1,298 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "TestOptions.h" + +#include "qpid/client/ClientChannel.h" +#include "qpid/client/ClientExchange.h" +#include "qpid/client/ClientQueue.h" +#include "qpid/client/Connection.h" +#include "qpid/client/MessageListener.h" +#include "qpid/QpidError.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" + +#include +#include +#include +#include +#include + + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; +using namespace std; + +struct Opts : public TestOptions { + + bool listen; + bool publish; + int count; + + Opts() : listen(false), publish(false), count(500000) { + addOptions() + ("listen", optValue(listen), "Consume messages.") + ("publish", optValue(publish), "Produce messages.") + ("count", optValue(count, "N"), "Messages to send/receive."); + } +}; + +Opts opts; + +struct ListenThread : public Runnable { Thread thread; void run(); }; +struct PublishThread : public Runnable { Thread thread; void run(); }; + +int main(int argc, char** argv) { + try { + opts.parse(argc, argv); + ListenThread listen; + PublishThread publish; + if (opts.listen) + listen.thread=Thread(listen); + if (opts.publish) + publish.thread=Thread(publish); + if (opts.listen) + listen.thread.join(); + if (opts.publish) + publish.thread.join(); + } + catch (const std::exception& e) { + cout << "Unexpected exception: " << e.what() << endl; + } +} + +// ================================================================ +// Publish client +// + +struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) { + timespec r; + r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec; + r.tv_sec = lhs.tv_sec - rhs.tv_sec; + if (r.tv_nsec < 0) { + r.tv_nsec += 1000000000; + r.tv_sec -= 1; + } + return r; +} + +ostream& operator<<(ostream& o, const struct timespec& ts) { + o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec; + return o; +} + +double toDouble(const struct timespec& ts) { + return double(ts.tv_nsec)/1000000000 + ts.tv_sec; +} + +class PublishListener : public MessageListener { + + void set_time() { + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + startTime = ts; + } + + void print_time() { + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + cout << "Total Time:" << ts-startTime << endl; + double rate = messageCount*2/toDouble(ts-startTime); + cout << "returned Messages:" << messageCount << endl; + cout << "round trip Rate:" << rate << endl; + } + + struct timespec startTime; + int messageCount; + bool done; + Monitor lock; + + public: + + PublishListener(int mcount): messageCount(mcount), done(false) { + set_time(); + } + + void received(Message& msg) { + print_time(); + QPID_LOG(info, "Publisher: received: " << msg.getData()); + Mutex::ScopedLock l(lock); + QPID_LOG(info, "Publisher: done."); + done = true; + lock.notify(); + } + + void wait() { + Mutex::ScopedLock l(lock); + while (!done) + lock.wait(); + } +}; + + +void PublishThread::run() { + Connection connection; + Channel channel; + Message msg; + opts.open(connection); + connection.openChannel(channel); + channel.start(); + + cout << "Started publisher." << endl; + string queueControl = "control"; + Queue response(queueControl); + channel.declareQueue(response); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + + string queueName ="queue01"; + string queueNameC =queueName+"-1"; + + // create publish queue + Queue publish(queueName); + channel.declareQueue(publish); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName); + + // create completion queue + Queue completion(queueNameC); + channel.declareQueue(completion); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); + + // pass queue name + msg.setData(queueName); + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl); + + QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC); + + int count = opts.count; + PublishListener listener(count); + channel.consume(completion, queueNameC, &listener); + QPID_LOG(info, "Publisher setup consumer: "<< queueNameC); + + struct timespec startTime; + if (::clock_gettime(CLOCK_REALTIME, &startTime)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + + for (int i=0; i