#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import optparse, time, qpid.messaging from threading import Thread from subprocess import Popen, PIPE, STDOUT op = optparse.OptionParser(usage="usage: %prog [options]", description="simple performance benchmarks") op.add_option("-b", "--broker", default="127.0.0.1", help="url of broker to connect to") op.add_option("-q", "--queues", default=1, type="int", metavar="N", help="create N queues (default %default)") op.add_option("-s", "--senders", default=1, type="int", metavar="N", help="start N senders per queue (default %default)") op.add_option("-r", "--receivers", default=1, type="int", metavar="N", help="start N receivers per queue (default %default)") op.add_option("-m", "--messages", default=100000, type="int", metavar="N", help="send N messages per sender (default %default)") op.add_option("--queue-name", default="benchmark", help="base name for queues (default %default)") op.add_option("--send-rate", default=0, metavar="R", help="send rate limited to R messages/second, 0 means no limit (default %default)") op.add_option("--content-size", default=1024, type="int", metavar="BYTES", help="message size in bytes (default %default)") op.add_option("--ack-frequency", default=0, metavar="N", type="int", help="receiver ack's every N messages, 0 means unconfirmed") def start_receive(queue, opts, ready_queue): return Popen(["qpid_receive", "-b", opts.broker, "-a", "%s;{create:always}"%(queue), "--forever", "--print-content=no", "--report-total", "--ack-frequency", str(opts.ack_frequency), "--ready-address", ready_queue ], stdout=PIPE, stderr=STDOUT) def start_send(queue, opts): return Popen(["qpid_send", "-b", opts.broker, "-a", queue, "--messages", str(opts.messages), "--send-eos", str(opts.receivers), "--content-size", str(opts.content_size), "--rate", str(opts.send_rate), "--report-total"], stdout=PIPE, stderr=STDOUT) def wait_for_output(p): out,err=p.communicate() if p.returncode != 0: raise Exception("ERROR:\n%s"%(out)) return out def delete_queues(queues, broker): c = qpid.messaging.Connection(broker) c.open() for q in queues: try: s = c.session().sender("%s;{delete:always}"%(q)) except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue" c.close() def skip_first_line(text): return "\n".join(text.split("\n")[1:]) def print_output(processes): print wait_for_output(processes.pop(0)), for p in processes: print skip_first_line(wait_for_output(p)), class ReadyReceiver: """A receiver for ready messages""" def __init__(self, queue, broker): delete_queues([queue], broker) self.connection = qpid.messaging.Connection(broker) self.connection.open() self.receiver = self.connection.session().receiver( "%s;{create:always,delete:always}"%(queue)) self.timeout=2 def wait(self, n): try: for i in xrange(n): self.receiver.fetch(self.timeout) except qpid.messaging.Empty: raise "Timed out waiting for receivers to be ready" self.connection.close() def main(): opts, args = op.parse_args() ready_queue="%s-ready"%(opts.queue_name) queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] delete_queues(queues, opts.broker) ready_receiver = ReadyReceiver(ready_queue, opts.broker) receivers = [start_receive(q, opts, ready_queue) for q in queues for i in xrange(opts.receivers)] ready_receiver.wait(len(receivers)) # Wait for receivers to be ready. senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)] print "Send" print_output(senders) print "\nReceive" print_output(receivers) print delete_queues(queues, opts.broker) if __name__ == "__main__": main()