diff options
| author | Alan Conway <aconway@apache.org> | 2010-04-07 19:51:20 +0000 | 
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-04-07 19:51:20 +0000 | 
| commit | db10ca2521cff96eae94d11a8acb51e8173aba3c (patch) | |
| tree | 71142c67ba439e80e8148e6662e7ec4ec58bc695 | |
| parent | a98f0cfe299d147366c8baa26840b5100b8dc0b9 (diff) | |
| download | qpid-python-db10ca2521cff96eae94d11a8acb51e8173aba3c.tar.gz | |
Extend qpid_send, qpid_recv to measure throughput and latency.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931657 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | cpp/src/tests/Makefile.am | 6 | ||||
| -rw-r--r-- | cpp/src/tests/Statistics.cpp | 119 | ||||
| -rw-r--r-- | cpp/src/tests/Statistics.h | 106 | ||||
| -rw-r--r-- | cpp/src/tests/qpid_recv.cpp | 26 | ||||
| -rw-r--r-- | cpp/src/tests/qpid_send.cpp | 79 | ||||
| -rw-r--r-- | doc/book/.gitignore | 2 | 
6 files changed, 319 insertions, 19 deletions
| diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 9c1a761062..b5fddd82a0 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -166,14 +166,16 @@ qpidtest_PROGRAMS += qpid_recv  qpid_recv_SOURCES = \    qpid_recv.cpp \    TestOptions.h \ -  ConnectionOptions.h +  ConnectionOptions.h \ +  Statistics.cpp  qpid_recv_LDADD = $(lib_client)  qpidtest_PROGRAMS += qpid_send  qpid_send_SOURCES = \    qpid_send.cpp \    TestOptions.h \ -  ConnectionOptions.h +  ConnectionOptions.h \ +  Statistics.cpp  qpid_send_LDADD = $(lib_client)  qpidtest_PROGRAMS+=perftest diff --git a/cpp/src/tests/Statistics.cpp b/cpp/src/tests/Statistics.cpp new file mode 100644 index 0000000000..24fac8d100 --- /dev/null +++ b/cpp/src/tests/Statistics.cpp @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *   http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Statistics.h" +#include <qpid/messaging/Message.h> +#include <ostream> + +namespace qpid { +namespace tests { + +Throughput::Throughput() : messages(0), started(false) {} + +void Throughput::message(const messaging::Message&) { +    ++messages; +    if (!started) { +        start = sys::now(); +        started = true; +    } +} + +void Throughput::header(std::ostream& o) const { +    o << "msg/sec"; +} + +void Throughput::report(std::ostream& o) const { +    double elapsed(int64_t(sys::Duration(start, sys::now()))/double(sys::TIME_SEC)); +    o << messages/elapsed; +} + +ThroughputAndLatency::ThroughputAndLatency() : +    total(0), +    min(std::numeric_limits<double>::max()), +    max(std::numeric_limits<double>::min()) +{} + +void ThroughputAndLatency::message(const messaging::Message& m) { +    Throughput::message(m); +    types::Variant::Map::const_iterator i = m.getProperties().find("ts"); +    if (i != m.getProperties().end()) { +        int64_t start(i->second.asInt64()); +        int64_t end(sys::Duration(sys::AbsTime::epoch(),sys::now())); +        double latency = double(end - start)/sys::TIME_MSEC; +        if (latency > 0) { +            total += latency; +            if (latency < min) min = latency; +            if (latency > max) max = latency; +        } +    } +} + +void ThroughputAndLatency::header(std::ostream& o) const { +    Throughput::header(o); +    o << "  latency(ms)min max avg"; +} + +void ThroughputAndLatency::report(std::ostream& o) const { +    Throughput::report(o); +    o << "  "; +    if (messages) +        o << min << " "  << max << " " << total/messages; +    else +        o << "Can't compute latency for 0 messages."; +} + +ReporterBase::ReporterBase(std::ostream& o, int batch) +    : wantBatch(batch), batchCount(0), headerPrinted(false), out(o) {} + +/** Count message in the statistics */ +void ReporterBase::message(const messaging::Message& m) { +    if (!overall.get()) overall = create(); +    overall->message(m); +    if (wantBatch) { +        if (!batch.get()) batch = create(); +        batch->message(m); +        if (++batchCount == wantBatch) { +            header(); +            batch->report(out); +            out << std::endl; +            batch = create(); +            batchCount = 0; +        } +    } +} + +/** Print overall report. */ +void ReporterBase::report() { +    header(); +    overall->report(out); +    out << std::endl; +} + +void ReporterBase::header() { +    if (!headerPrinted) { +        if (!overall.get()) overall = create(); +        overall->header(out); +        out << std::endl; +        headerPrinted = true; +    } +} + + +}} // namespace qpid::tests diff --git a/cpp/src/tests/Statistics.h b/cpp/src/tests/Statistics.h new file mode 100644 index 0000000000..5c5b21c49c --- /dev/null +++ b/cpp/src/tests/Statistics.h @@ -0,0 +1,106 @@ +#ifndef TESTS_STATISTICS_H +#define TESTS_STATISTICS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *   http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <qpid/sys/Time.h> +#include <limits> +#include <iosfwd> +#include <memory> + +namespace qpid { + +namespace messaging { +class Message; +} + +namespace tests { + +class Statistic { +  public: +    virtual void message(const messaging::Message&) = 0; +    virtual void report(std::ostream&) const = 0; +    virtual void header(std::ostream&) const = 0; +}; + +class Throughput : public Statistic { +  public: +    Throughput(); +    virtual void message(const messaging::Message&); +    virtual void report(std::ostream&) const; +    virtual void header(std::ostream&) const; + +  protected: +    int messages; + +  private: +    bool started; +    sys::AbsTime start; +}; + +class ThroughputAndLatency : public Throughput { +  public: +    ThroughputAndLatency(); +    virtual void message(const messaging::Message&); +    virtual void report(std::ostream&) const; +    virtual void header(std::ostream&) const; + +  private: +    double total, min, max;     // Milliseconds +}; + +/** Report batch and overall statistics */ +class ReporterBase { +  public: +    /** Count message in the statistics */ +    void message(const messaging::Message& m); + +    /** Print overall report. */ +    void report(); + +  protected: +    ReporterBase(std::ostream& o, int batchSize); +    virtual std::auto_ptr<Statistic> create() = 0; + +  private: +    void header(); +    void report(const Statistic& s); +    std::auto_ptr<Statistic> overall; +    std::auto_ptr<Statistic> batch; +    bool wantOverall; +    int wantBatch, batchCount; +    bool stopped, headerPrinted; +    std::ostream& out; +}; + +template <class Stats> class Reporter : public ReporterBase { +  public: +    Reporter(std::ostream& o, int batchSize) : ReporterBase(o, batchSize) {} +    virtual std::auto_ptr<Statistic> create() { +        return std::auto_ptr<Statistic>(new Stats); +    } +}; + +}} // namespace qpid::tests + +#endif  /*!TESTS_STATISTICS_H*/ diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index 160830c826..ff19464621 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -29,6 +29,7 @@  #include <qpid/log/Options.h>  #include <qpid/client/amqp0_10/FailoverUpdates.h>  #include "TestOptions.h" +#include "Statistics.h"  #include <iostream>  #include <memory> @@ -56,9 +57,12 @@ struct Options : public qpid::Options      uint ackFrequency;      uint tx;      uint rollbackFrequency; +    bool printContent;      bool printHeaders;      bool failoverUpdates;      qpid::log::Options log; +    bool report; +    uint reportEvery;      Options(const std::string& argv0=std::string())          : qpid::Options("Options"), @@ -68,13 +72,16 @@ struct Options : public qpid::Options            forever(false),            messages(0),            ignoreDuplicates(false), -          capacity(0), -          ackFrequency(1), +          capacity(10000), +          ackFrequency(100),            tx(0),            rollbackFrequency(0), +          printContent(true),            printHeaders(false),            failoverUpdates(false), -          log(argv0) +          log(argv0), +          report(false), +          reportEvery(0)      {          addOptions()              ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -84,12 +91,15 @@ struct Options : public qpid::Options              ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")              ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")              ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") -            ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)") +            ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")              ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")              ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")              ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)") -            ("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content") +            ("print-content", qpid::optValue(printContent, "yes|no"), "print out message content") +            ("print-headers", qpid::optValue(printHeaders, "yes|no"), "print out message headers")              ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") +            ("report", qpid::optValue(report), "Report throughput statistics") +            ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")              ("help", qpid::optValue(help), "print this usage statement");          add(log);      } @@ -162,7 +172,9 @@ int main(int argc, char ** argv)              SequenceTracker sequenceTracker;              Duration timeout = opts.getTimeout();              bool done = false; +            Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery);              while (!done && receiver.fetch(msg, timeout)) { +                reporter.message(msg);                  if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {                      if (msg.getContent() == EOS) {                          done = true; @@ -179,7 +191,8 @@ int main(int argc, char ** argv)                              std::cout << "Properties: " << msg.getProperties() << std::endl;                              std::cout << std::endl;                          } -                        std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages +                        if (opts.printContent) +                            std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages                          if (opts.messages && count >= opts.messages) done = true;                      }                  } @@ -194,6 +207,7 @@ int main(int argc, char ** argv)                  }                  //opts.rejectFrequency??              } +            if (opts.report) reporter.report();              if (opts.tx) {                  if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {                      session.rollback(); diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index f828e6077c..1e9711d206 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -25,7 +25,10 @@  #include <qpid/messaging/Sender.h>  #include <qpid/messaging/Session.h>  #include <qpid/client/amqp0_10/FailoverUpdates.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/Monitor.h>  #include "TestOptions.h" +#include "Statistics.h"  #include <fstream>  #include <iostream> @@ -34,7 +37,6 @@  using namespace qpid::messaging;  using namespace qpid::types;  using qpid::client::amqp0_10::FailoverUpdates; -  typedef std::vector<std::string> string_vector;  using namespace std; @@ -64,20 +66,26 @@ struct Options : public qpid::Options      uint capacity;      bool failoverUpdates;      qpid::log::Options log; +    bool report; +    uint reportEvery; +    uint rate;      Options(const std::string& argv0=std::string())          : qpid::Options("Options"),            help(false),            url("amqp:tcp:127.0.0.1"), -          count(1), +          count(0),            sendEos(0),            durable(false),            ttl(0),            tx(0),            rollbackFrequency(0), -          capacity(0), +          capacity(1000),            failoverUpdates(false), -          log(argv0) +          log(argv0), +          report(false), +          reportEvery(0), +          rate(0)      {          addOptions()              ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -87,17 +95,20 @@ struct Options : public qpid::Options              ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")              ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")              ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") -            ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.") +            ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")  	    ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")              ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")              ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")              ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")              ("user-id", qpid::optValue(userid, "USERID"), "userid for message") -            ("content", qpid::optValue(content, "CONTENT"), "specify textual content") +            ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content instead of reading from stdin")              ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")              ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")              ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")              ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") +            ("report", qpid::optValue(report), "Report throughput statistics") +            ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages") +            ("rate", qpid::optValue(rate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")              ("help", qpid::optValue(help), "print this usage statement");          add(log);      } @@ -177,6 +188,29 @@ const string EOS("eos");  using namespace qpid::tests; +class ContentGenerator { +  public: +    virtual bool getContent(std::string& content) = 0; +}; + +class GetlineContentGenerator : public ContentGenerator { +  public: +    virtual bool getContent(std::string& content) { return getline(std::cin, content); } +}; + +class FixedContentGenerator   : public ContentGenerator { +  public: +    FixedContentGenerator(std::string s) : content(s) {} +    virtual bool getContent(std::string& contentOut) { +        contentOut = content; +        return true; +    } +  private: +    std::string content; +}; + + +  int main(int argc, char ** argv)  {      Options opts; @@ -200,18 +234,41 @@ int main(int argc, char ** argv)              std::string content;              uint sent = 0;              uint txCount = 0; -            while (getline(std::cin, content)) { +            Reporter<Throughput> reporter(std::cout, opts.reportEvery); + +            std::auto_ptr<ContentGenerator> contentGen; +            if (!opts.content.empty()) +                contentGen.reset(new FixedContentGenerator(opts.content)); +            else +                contentGen.reset(new GetlineContentGenerator); + +            qpid::sys::AbsTime start = qpid::sys::now(); +            int64_t interval = 0; +            if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate; + +            while (contentGen->getContent(content)) {                  msg.setContent(content);                  msg.getProperties()["sn"] = ++sent; +                msg.getProperties()["ts"] = int64_t( +                    qpid::sys::Duration(qpid::sys::AbsTime::epoch(), qpid::sys::now()));                  sender.send(msg); +                reporter.message(msg);                  if (opts.tx && (sent % opts.tx == 0)) { -                    if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) { +                    if (opts.rollbackFrequency && +                        (++txCount % opts.rollbackFrequency == 0))                          session.rollback(); -                    } else { +                    else                          session.commit(); -                    } -                }                 +                } +                if (opts.count && sent >= opts.count) break; +                if (opts.rate) { +                    qpid::sys::AbsTime waitTill(start, sent*interval); +                    int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); +                    if (delay > 0) +                        qpid::sys::usleep(delay/qpid::sys::TIME_USEC); +                }              } +            if (opts.report) reporter.report();              for (uint i = opts.sendEos; i > 0; --i) {                  msg.getProperties()["sn"] = ++sent;                  msg.setContent(EOS);//TODO: add in ability to send digest or similar diff --git a/doc/book/.gitignore b/doc/book/.gitignore new file mode 100644 index 0000000000..1603eacced --- /dev/null +++ b/doc/book/.gitignore @@ -0,0 +1,2 @@ +/build +/out | 
