From a3493f1e3dccba026b7e4fc9874dcb2195918e81 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 14 Nov 2007 23:05:49 +0000 Subject: perftest.cpp - Remove heap allocation per message in. - Verify sequence numbers in message data. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@595115 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/perftest.cpp | 59 +++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 31 deletions(-) (limited to 'qpid/cpp/src/tests/perftest.cpp') diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index abf090af8d..019b1e1fce 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -41,12 +41,12 @@ struct Opts : public TestOptions { bool listen; bool publish; bool purge; - int count; - int size; + size_t count; + size_t size; bool durable; - int consumers; + size_t consumers; std::string mode; - int autoAck; + size_t autoAck; bool summary; Opts() : @@ -109,12 +109,12 @@ int main(int argc, char** argv) { std::vector listen(opts.consumers); PublishThread publish; if (opts.listen) - for (int i = 0; i < opts.consumers; ++i) + for (size_t i = 0; i < opts.consumers; ++i) listen[i].thread=Thread(listen[i]); if (opts.publish) publish.thread=Thread(publish); if (opts.listen) - for (int i = 0; i < opts.consumers; ++i) + for (size_t i = 0; i < opts.consumers; ++i) listen[i].thread.join(); if (opts.publish) publish.thread.join(); @@ -155,26 +155,25 @@ void PublishThread::run() { SubscriptionManager subs(session); LocalQueue control; subs.subscribe(control, "control"); - for (int i = 0; i < opts.consumers; ++i) { + for (size_t i = 0; i < opts.consumers; ++i) { if (!opts.summary) cout << "." << flush; expect(control.pop().getData(), "ready"); } if (!opts.summary) cout << endl; - // Create test message - size_t msgSize=max(opts.size, 32); - char* msgBuf = new char[msgSize]; - memset(msgBuf,'X', msgSize); - - Message msg(string(), "perftest"); + size_t msgSize=max(opts.size, sizeof(size_t)); + Message msg(string(msgSize, 'X'), "perftest"); if (opts.durable) msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - // Time sending message. + AbsTime start=now(); - if (!opts.summary) cout << "Publishing " << opts.count << " messages " << flush; - for (int i=0; i(msg.getData().data()); + *reinterpret_cast(data) = i; session.messageTransfer(arg::destination=exchange(), arg::content=msg); if (!opts.summary && (i%10000)==0){ @@ -182,13 +181,12 @@ void PublishThread::run() { session.execution().sendSyncRequest(); } } - delete [] msgBuf; //Completion compl; if (!opts.summary) cout << " done." << endl; msg.setData("done"); // Send done messages. if (mode==SHARED) - for (int i = 0; i < opts.consumers; ++i) + for (size_t i = 0; i < opts.consumers; ++i) session.messageTransfer(arg::destination=exchange(), arg::content=msg); else session.messageTransfer(arg::destination=exchange(), arg::content=msg); @@ -203,20 +201,10 @@ void PublishThread::run() { << "publish secs:" << secs(start,end) << endl << "publish rate:" << publish_rate << endl; - - - // Report -// end=now(); //compl.wait(); (wait for publish confirm of write if durable) -// publish_rate=(opts.count)/secs(start,end); -// if (!opts.summary) -// cout << endl -// << "synced secs:" << secs(start,end) << endl -// << "synced rate:" << publish_rate << endl; - double consume_rate = 0; // Average rate for consumers. // Wait for consumer(s) to finish. if (!opts.summary) cout << "Waiting for consumers done " << endl; - for (int i = 0; i < opts.consumers; ++i) { + for (size_t i = 0; i < opts.consumers; ++i) { string report=control.pop().getData(); if (!opts.summary) cout << endl << report; @@ -283,7 +271,16 @@ void ListenThread::run() { int consumed=0; AbsTime start=now(); Message msg; + size_t i = 0; while ((msg=consume.pop()).getData() != "done") { + char* data=const_cast(msg.getData().data()); + size_t j=*reinterpret_cast(data); + if (i > j) + throw Exception( + QPID_MSG("Messages out of order " << i + << " before " << j)); + else + i = j; ++consumed; } msg.acknowledge(); // Ack all outstanding messages -- ?? -- cgit v1.2.1