From 9cf58ef6be38185c9a9d5325fb2dd522aa774529 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 14 Jun 2010 13:44:00 +0000 Subject: Rename tests qpid_* to qpid-* for consistency. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@954471 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/qpid-cpp-benchmark | 145 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100755 cpp/src/tests/qpid-cpp-benchmark (limited to 'cpp/src/tests/qpid-cpp-benchmark') diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark new file mode 100755 index 0000000000..b8a74a2b90 --- /dev/null +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -0,0 +1,145 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time, qpid.messaging +from threading import Thread +from subprocess import Popen, PIPE, STDOUT + +op = optparse.OptionParser(usage="usage: %prog [options]", + description="simple performance benchmarks") +op.add_option("-b", "--broker", default="127.0.0.1", + help="url of broker to connect to") +op.add_option("-q", "--queues", default=1, type="int", metavar="N", + help="create N queues (default %default)") +op.add_option("-s", "--senders", default=1, type="int", metavar="N", + help="start N senders per queue (default %default)") +op.add_option("-r", "--receivers", default=1, type="int", metavar="N", + help="start N receivers per queue (default %default)") +op.add_option("-m", "--messages", default=100000, type="int", metavar="N", + help="send N messages per sender (default %default)") +op.add_option("--queue-name", default="benchmark", + help="base name for queues (default %default)") +op.add_option("--send-rate", default=0, metavar="R", + help="send rate limited to R messages/second, 0 means no limit (default %default)") +op.add_option("--content-size", default=1024, type="int", metavar="BYTES", + help="message size in bytes (default %default)") +op.add_option("--ack-frequency", default=0, metavar="N", type="int", + help="receiver ack's every N messages, 0 means unconfirmed") +op.add_option("--no-report-header", dest="report_header", default=True, + action="store_false", help="don't print header on report") +op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") +op.add_option("--send-option", default=[], action="append", type="str", + help="Additional option for sending addresses") +op.add_option("--receive-option", default=[], action="append", type="str", + help="Additional option for receiving addresses") +op.add_option("--no-timestamp", dest="timestamp", default=True, + action="store_false", help="don't add a timestamp, no latency results") + +def start_receive(queue, opts, ready_queue): + address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) + return Popen(["qpid-receive", + "-b", opts.broker, + "-a", address, + "--forever", + "--print-content=no", + "--report-total", + "--ack-frequency", str(opts.ack_frequency), + "--ready-address", ready_queue, + "--report-header=no", + ], + stdout=PIPE, stderr=STDOUT) + +def start_send(queue, opts): + address="%s;{%s}"%(queue,",".join(opts.send_option)) + return Popen(["qpid-send", + "-b", opts.broker, + "-a", address, + "--messages", str(opts.messages), + "--send-eos", str(opts.receivers), + "--content-size", str(opts.content_size), + "--send-rate", str(opts.send_rate), + "--report-total", + "--report-header=no", + "--timestamp=%s"%(opts.timestamp and "yes" or "no"), + "--sequence=no", + ], + stdout=PIPE, stderr=STDOUT) + +def wait_for_output(p): + out,err=p.communicate() + if p.returncode != 0: raise Exception("ERROR:\n%s"%(out)) + return out + +def delete_queues(queues, broker): + c = qpid.messaging.Connection(broker) + c.open() + for q in queues: + try: s = c.session().sender("%s;{delete:always}"%(q)) + except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" + c.close() + +def print_output(senders, receivers, want_header): + send_stats = sum([wait_for_output(p).split("\n")[:-1] for p in senders],[]) + recv_stats = sum([wait_for_output(p).split("\n")[:-1] for p in receivers],[]) + def empty_if_none(s): + if s: return s + else: return "" + stats = map(lambda s,r: empty_if_none(s)+"\t\t"+empty_if_none(r), + send_stats, recv_stats) + if want_header: print "send-tp\t\trecv-tp\tl-min\tl-max\tl-avg" + for s in stats: print s; + +class ReadyReceiver: + """A receiver for ready messages""" + def __init__(self, queue, broker): + delete_queues([queue], broker) + self.connection = qpid.messaging.Connection(broker) + self.connection.open() + self.receiver = self.connection.session().receiver( + "%s;{create:always,delete:always}"%(queue)) + self.timeout=2 + + def wait(self, receivers): + try: + for i in xrange(len(receivers)): self.receiver.fetch(self.timeout) + self.connection.close() + except qpid.messaging.Empty: + for r in receivers: + if (r.poll()): raise "Receiver error: %s"%(wait_for_output(r)) + raise "Timed out waiting for receivers to be ready" + +def main(): + opts, args = op.parse_args() + send_out = "" + receive_out = "" + ready_queue="%s-ready"%(opts.queue_name) + queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] + for i in xrange(opts.repeat): + delete_queues(queues, opts.broker) + ready_receiver = ReadyReceiver(ready_queue, opts.broker) + receivers = [start_receive(q, opts, ready_queue) + for q in queues for j in xrange(opts.receivers)] + ready_receiver.wait(receivers) # Wait for receivers to be ready. + senders = [start_send(q, opts) for q in queues for j in xrange(opts.senders)] + print_output(senders, receivers, opts.report_header and i == 0) + delete_queues(queues, opts.broker) + +if __name__ == "__main__": main() + -- cgit v1.2.1