summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-04-07 19:51:20 +0000
committerAlan Conway <aconway@apache.org>2010-04-07 19:51:20 +0000
commitdb10ca2521cff96eae94d11a8acb51e8173aba3c (patch)
tree71142c67ba439e80e8148e6662e7ec4ec58bc695
parenta98f0cfe299d147366c8baa26840b5100b8dc0b9 (diff)
downloadqpid-python-db10ca2521cff96eae94d11a8acb51e8173aba3c.tar.gz
Extend qpid_send, qpid_recv to measure throughput and latency.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931657 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/tests/Makefile.am6
-rw-r--r--cpp/src/tests/Statistics.cpp119
-rw-r--r--cpp/src/tests/Statistics.h106
-rw-r--r--cpp/src/tests/qpid_recv.cpp26
-rw-r--r--cpp/src/tests/qpid_send.cpp79
-rw-r--r--doc/book/.gitignore2
6 files changed, 319 insertions, 19 deletions
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 9c1a761062..b5fddd82a0 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -166,14 +166,16 @@ qpidtest_PROGRAMS += qpid_recv
qpid_recv_SOURCES = \
qpid_recv.cpp \
TestOptions.h \
- ConnectionOptions.h
+ ConnectionOptions.h \
+ Statistics.cpp
qpid_recv_LDADD = $(lib_client)
qpidtest_PROGRAMS += qpid_send
qpid_send_SOURCES = \
qpid_send.cpp \
TestOptions.h \
- ConnectionOptions.h
+ ConnectionOptions.h \
+ Statistics.cpp
qpid_send_LDADD = $(lib_client)
qpidtest_PROGRAMS+=perftest
diff --git a/cpp/src/tests/Statistics.cpp b/cpp/src/tests/Statistics.cpp
new file mode 100644
index 0000000000..24fac8d100
--- /dev/null
+++ b/cpp/src/tests/Statistics.cpp
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Statistics.h"
+#include <qpid/messaging/Message.h>
+#include <ostream>
+
+namespace qpid {
+namespace tests {
+
+Throughput::Throughput() : messages(0), started(false) {}
+
+void Throughput::message(const messaging::Message&) {
+ ++messages;
+ if (!started) {
+ start = sys::now();
+ started = true;
+ }
+}
+
+void Throughput::header(std::ostream& o) const {
+ o << "msg/sec";
+}
+
+void Throughput::report(std::ostream& o) const {
+ double elapsed(int64_t(sys::Duration(start, sys::now()))/double(sys::TIME_SEC));
+ o << messages/elapsed;
+}
+
+ThroughputAndLatency::ThroughputAndLatency() :
+ total(0),
+ min(std::numeric_limits<double>::max()),
+ max(std::numeric_limits<double>::min())
+{}
+
+void ThroughputAndLatency::message(const messaging::Message& m) {
+ Throughput::message(m);
+ types::Variant::Map::const_iterator i = m.getProperties().find("ts");
+ if (i != m.getProperties().end()) {
+ int64_t start(i->second.asInt64());
+ int64_t end(sys::Duration(sys::AbsTime::epoch(),sys::now()));
+ double latency = double(end - start)/sys::TIME_MSEC;
+ if (latency > 0) {
+ total += latency;
+ if (latency < min) min = latency;
+ if (latency > max) max = latency;
+ }
+ }
+}
+
+void ThroughputAndLatency::header(std::ostream& o) const {
+ Throughput::header(o);
+ o << " latency(ms)min max avg";
+}
+
+void ThroughputAndLatency::report(std::ostream& o) const {
+ Throughput::report(o);
+ o << " ";
+ if (messages)
+ o << min << " " << max << " " << total/messages;
+ else
+ o << "Can't compute latency for 0 messages.";
+}
+
+ReporterBase::ReporterBase(std::ostream& o, int batch)
+ : wantBatch(batch), batchCount(0), headerPrinted(false), out(o) {}
+
+/** Count message in the statistics */
+void ReporterBase::message(const messaging::Message& m) {
+ if (!overall.get()) overall = create();
+ overall->message(m);
+ if (wantBatch) {
+ if (!batch.get()) batch = create();
+ batch->message(m);
+ if (++batchCount == wantBatch) {
+ header();
+ batch->report(out);
+ out << std::endl;
+ batch = create();
+ batchCount = 0;
+ }
+ }
+}
+
+/** Print overall report. */
+void ReporterBase::report() {
+ header();
+ overall->report(out);
+ out << std::endl;
+}
+
+void ReporterBase::header() {
+ if (!headerPrinted) {
+ if (!overall.get()) overall = create();
+ overall->header(out);
+ out << std::endl;
+ headerPrinted = true;
+ }
+}
+
+
+}} // namespace qpid::tests
diff --git a/cpp/src/tests/Statistics.h b/cpp/src/tests/Statistics.h
new file mode 100644
index 0000000000..5c5b21c49c
--- /dev/null
+++ b/cpp/src/tests/Statistics.h
@@ -0,0 +1,106 @@
+#ifndef TESTS_STATISTICS_H
+#define TESTS_STATISTICS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <qpid/sys/Time.h>
+#include <limits>
+#include <iosfwd>
+#include <memory>
+
+namespace qpid {
+
+namespace messaging {
+class Message;
+}
+
+namespace tests {
+
+class Statistic {
+ public:
+ virtual void message(const messaging::Message&) = 0;
+ virtual void report(std::ostream&) const = 0;
+ virtual void header(std::ostream&) const = 0;
+};
+
+class Throughput : public Statistic {
+ public:
+ Throughput();
+ virtual void message(const messaging::Message&);
+ virtual void report(std::ostream&) const;
+ virtual void header(std::ostream&) const;
+
+ protected:
+ int messages;
+
+ private:
+ bool started;
+ sys::AbsTime start;
+};
+
+class ThroughputAndLatency : public Throughput {
+ public:
+ ThroughputAndLatency();
+ virtual void message(const messaging::Message&);
+ virtual void report(std::ostream&) const;
+ virtual void header(std::ostream&) const;
+
+ private:
+ double total, min, max; // Milliseconds
+};
+
+/** Report batch and overall statistics */
+class ReporterBase {
+ public:
+ /** Count message in the statistics */
+ void message(const messaging::Message& m);
+
+ /** Print overall report. */
+ void report();
+
+ protected:
+ ReporterBase(std::ostream& o, int batchSize);
+ virtual std::auto_ptr<Statistic> create() = 0;
+
+ private:
+ void header();
+ void report(const Statistic& s);
+ std::auto_ptr<Statistic> overall;
+ std::auto_ptr<Statistic> batch;
+ bool wantOverall;
+ int wantBatch, batchCount;
+ bool stopped, headerPrinted;
+ std::ostream& out;
+};
+
+template <class Stats> class Reporter : public ReporterBase {
+ public:
+ Reporter(std::ostream& o, int batchSize) : ReporterBase(o, batchSize) {}
+ virtual std::auto_ptr<Statistic> create() {
+ return std::auto_ptr<Statistic>(new Stats);
+ }
+};
+
+}} // namespace qpid::tests
+
+#endif /*!TESTS_STATISTICS_H*/
diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp
index 160830c826..ff19464621 100644
--- a/cpp/src/tests/qpid_recv.cpp
+++ b/cpp/src/tests/qpid_recv.cpp
@@ -29,6 +29,7 @@
#include <qpid/log/Options.h>
#include <qpid/client/amqp0_10/FailoverUpdates.h>
#include "TestOptions.h"
+#include "Statistics.h"
#include <iostream>
#include <memory>
@@ -56,9 +57,12 @@ struct Options : public qpid::Options
uint ackFrequency;
uint tx;
uint rollbackFrequency;
+ bool printContent;
bool printHeaders;
bool failoverUpdates;
qpid::log::Options log;
+ bool report;
+ uint reportEvery;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -68,13 +72,16 @@ struct Options : public qpid::Options
forever(false),
messages(0),
ignoreDuplicates(false),
- capacity(0),
- ackFrequency(1),
+ capacity(10000),
+ ackFrequency(100),
tx(0),
rollbackFrequency(0),
+ printContent(true),
printHeaders(false),
failoverUpdates(false),
- log(argv0)
+ log(argv0),
+ report(false),
+ reportEvery(0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -84,12 +91,15 @@ struct Options : public qpid::Options
("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
- ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
+ ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
- ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
+ ("print-content", qpid::optValue(printContent, "yes|no"), "print out message content")
+ ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message headers")
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
+ ("report", qpid::optValue(report), "Report throughput statistics")
+ ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -162,7 +172,9 @@ int main(int argc, char ** argv)
SequenceTracker sequenceTracker;
Duration timeout = opts.getTimeout();
bool done = false;
+ Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery);
while (!done && receiver.fetch(msg, timeout)) {
+ reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
if (msg.getContent() == EOS) {
done = true;
@@ -179,7 +191,8 @@ int main(int argc, char ** argv)
std::cout << "Properties: " << msg.getProperties() << std::endl;
std::cout << std::endl;
}
- std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
+ if (opts.printContent)
+ std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
if (opts.messages && count >= opts.messages) done = true;
}
}
@@ -194,6 +207,7 @@ int main(int argc, char ** argv)
}
//opts.rejectFrequency??
}
+ if (opts.report) reporter.report();
if (opts.tx) {
if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
session.rollback();
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index f828e6077c..1e9711d206 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -25,7 +25,10 @@
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/client/amqp0_10/FailoverUpdates.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Monitor.h>
#include "TestOptions.h"
+#include "Statistics.h"
#include <fstream>
#include <iostream>
@@ -34,7 +37,6 @@
using namespace qpid::messaging;
using namespace qpid::types;
using qpid::client::amqp0_10::FailoverUpdates;
-
typedef std::vector<std::string> string_vector;
using namespace std;
@@ -64,20 +66,26 @@ struct Options : public qpid::Options
uint capacity;
bool failoverUpdates;
qpid::log::Options log;
+ bool report;
+ uint reportEvery;
+ uint rate;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
help(false),
url("amqp:tcp:127.0.0.1"),
- count(1),
+ count(0),
sendEos(0),
durable(false),
ttl(0),
tx(0),
rollbackFrequency(0),
- capacity(0),
+ capacity(1000),
failoverUpdates(false),
- log(argv0)
+ log(argv0),
+ report(false),
+ reportEvery(0),
+ rate(0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -87,17 +95,20 @@ struct Options : public qpid::Options
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
- ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.")
+ ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
- ("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+ ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content instead of reading from stdin")
("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
+ ("report", qpid::optValue(report), "Report throughput statistics")
+ ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages")
+ ("rate", qpid::optValue(rate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -177,6 +188,29 @@ const string EOS("eos");
using namespace qpid::tests;
+class ContentGenerator {
+ public:
+ virtual bool getContent(std::string& content) = 0;
+};
+
+class GetlineContentGenerator : public ContentGenerator {
+ public:
+ virtual bool getContent(std::string& content) { return getline(std::cin, content); }
+};
+
+class FixedContentGenerator : public ContentGenerator {
+ public:
+ FixedContentGenerator(std::string s) : content(s) {}
+ virtual bool getContent(std::string& contentOut) {
+ contentOut = content;
+ return true;
+ }
+ private:
+ std::string content;
+};
+
+
+
int main(int argc, char ** argv)
{
Options opts;
@@ -200,18 +234,41 @@ int main(int argc, char ** argv)
std::string content;
uint sent = 0;
uint txCount = 0;
- while (getline(std::cin, content)) {
+ Reporter<Throughput> reporter(std::cout, opts.reportEvery);
+
+ std::auto_ptr<ContentGenerator> contentGen;
+ if (!opts.content.empty())
+ contentGen.reset(new FixedContentGenerator(opts.content));
+ else
+ contentGen.reset(new GetlineContentGenerator);
+
+ qpid::sys::AbsTime start = qpid::sys::now();
+ int64_t interval = 0;
+ if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate;
+
+ while (contentGen->getContent(content)) {
msg.setContent(content);
msg.getProperties()["sn"] = ++sent;
+ msg.getProperties()["ts"] = int64_t(
+ qpid::sys::Duration(qpid::sys::AbsTime::epoch(), qpid::sys::now()));
sender.send(msg);
+ reporter.message(msg);
if (opts.tx && (sent % opts.tx == 0)) {
- if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ if (opts.rollbackFrequency &&
+ (++txCount % opts.rollbackFrequency == 0))
session.rollback();
- } else {
+ else
session.commit();
- }
- }
+ }
+ if (opts.count && sent >= opts.count) break;
+ if (opts.rate) {
+ qpid::sys::AbsTime waitTill(start, sent*interval);
+ int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+ if (delay > 0)
+ qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ }
}
+ if (opts.report) reporter.report();
for (uint i = opts.sendEos; i > 0; --i) {
msg.getProperties()["sn"] = ++sent;
msg.setContent(EOS);//TODO: add in ability to send digest or similar
diff --git a/doc/book/.gitignore b/doc/book/.gitignore
new file mode 100644
index 0000000000..1603eacced
--- /dev/null
+++ b/doc/book/.gitignore
@@ -0,0 +1,2 @@
+/build
+/out