diff options
| author | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
| commit | f160cb6566c17945f7ebc4f3a752b2cc6a051685 (patch) | |
| tree | 809f04fc1967c22e5abc52de07602555bed0e920 /qpid/cpp/src/tests/qpid-cpp-benchmark | |
| parent | ebb276cca41582b73223b55eff9f2d4386f4f746 (diff) | |
| download | qpid-python-f160cb6566c17945f7ebc4f3a752b2cc6a051685.tar.gz | |
QPID-7207: remove cpp and python subdirs from svn trunk, they have migrated to their own git repositories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1751566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/qpid-cpp-benchmark')
| -rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 366 |
1 files changed, 0 insertions, 366 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark deleted file mode 100755 index 3a5419de5e..0000000000 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ /dev/null @@ -1,366 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, time, re, os - -try: - import qpid_messaging as qm -except ImportError: - qpid_messaging = None - import qpid.messaging as qm - -from threading import Thread -from subprocess import Popen, PIPE, STDOUT - -op = optparse.OptionParser(usage="usage: %prog [options]", - description="simple performance benchmarks") -op.add_option("-b", "--broker", default=[], action="append", type="str", - help="url of broker(s) to connect to, round robin on multiple brokers") -op.add_option("-c", "--client-host", default=[], action="append", type="str", - help="host(s) to run clients on via ssh, round robin on mulple hosts") -op.add_option("-q", "--queues", default=1, type="int", metavar="N", - help="create N queues (default %default)") -op.add_option("-s", "--senders", default=1, type="int", metavar="N", - help="start N senders per queue (default %default)") -op.add_option("-r", "--receivers", default=1, type="int", metavar="N", - help="start N receivers per queue (default %default)") -op.add_option("-m", "--messages", default=100000, type="int", metavar="N", - help="send N messages per sender (default %default)") -op.add_option("--queue-name", default="benchmark", metavar="NAME", - help="base name for queues (default %default)") -op.add_option("--send-rate", default=0, metavar="N", - help="send rate limited to N messages/second, 0 means no limit (default %default)") -op.add_option("--receive-rate", default=0, metavar="N", - help="receive rate limited to N messages/second, 0 means no limit (default %default)") -op.add_option("--content-size", default=1024, type="int", metavar="BYTES", - help="message size in bytes (default %default)") -op.add_option("--ack-frequency", default=100, metavar="N", type="int", - help="receiver ack's every N messages, 0 means unconfirmed (default %default)") -op.add_option("--tx", default=0, metavar="N", type="int", - help="Transaction batch size, 0 means no transactions") -op.add_option("--no-report-header", dest="report_header", default=True, - action="store_false", help="don't print header on report") -op.add_option("--summarize", default=False, action="store_true", - help="print summary statistics for multiple senders/receivers: total throughput, average latency") -op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") -op.add_option("--send-option", default=[], action="append", type="str", - help="Additional option for sending addresses") -op.add_option("--receive-option", default=[], action="append", type="str", - help="Additional option for receiving addresses") -op.add_option("--create-option", default=[], action="append", type="str", - help="Additional option for creating addresses") -op.add_option("--send-arg", default=[], action="append", type="str", - help="Additional argument for qpid-send") -op.add_option("--receive-arg", default=[], action="append", type="str", - help="Additional argument for qpid-receive") -op.add_option("--no-timestamp", dest="timestamp", default=True, - action="store_false", help="don't add a timestamp, no latency results") -op.add_option("--sequence", dest="sequence", default=False, - action="store_true", help="add a sequence number to each message") -op.add_option("--connection-options", type="str", - help="Connection options for senders & receivers") -op.add_option("--durable", default=False, action="store_true", - help="Use durable queues and messages") -op.add_option("-t", "--timeout", default=1.0, type="float", metavar="SECONDS", - help="Timeout for fetch operations (default %default)") -op.add_option("--save-received", default=False, action="store_true", - help="Save received message content to files <queuename>-receiver-<n>.msg") -op.add_option("--verbose", default=False, action="store_true", - help="Show commands executed") -op.add_option("--fill-drain", default=False, action="store_true", - help="First fill the queues, then drain them") -op.add_option("--qpid-send-path", default="", type="str", metavar="PATH", - help="path to qpid-send binary") -op.add_option("--qpid-receive-path", default="", type="str", metavar="PATH", - help="path to qpid-receive binary") - -single_quote_re = re.compile("'") -def posix_quote(string): - """ Quote a string for use as an argument in a posix shell""" - return "'" + single_quote_re.sub("\\'", string) + "'"; - -def ssh_command(host, command): - """ Convert command into an ssh command on host with quoting""" - return ["ssh", host] + [posix_quote(arg) for arg in command] - -class Clients: - def __init__(self): self.clients=[] - - def add(self, client): - self.clients.append(client) - return client - - def kill(self): - for c in self.clients: - try: c.kill() - except: pass - -class PopenCommand(Popen): - """Like Popen but you can query for the command""" - def __init__(self, command, *args, **kwargs): - self.command = command - Popen.__init__(self, command, *args, **kwargs) - -clients = Clients() - -def start_receive(queue, index, opts, ready_queue, broker, host): - address_opts=opts.receive_option - if opts.durable: address_opts += ["node:{durable:true}"] - address="%s;{%s}"%(queue,",".join(address_opts)) - msg_total=opts.senders*opts.messages - messages = msg_total/opts.receivers; - if (index < msg_total%opts.receivers): messages += 1 - if (messages == 0): return None - command = [os.path.join(opts.qpid_receive_path, "qpid-receive"), - "-b", broker, - "-a", address, - "-m", str(messages), - "--forever", - "--print-content=no", - "--receive-rate", str(opts.receive_rate), - "--report-total", - "--ack-frequency", str(opts.ack_frequency), - "--ready-address", "%s;{create:always}"%ready_queue, - "--report-header=no", - "--tx=%s" % opts.tx - ] - if opts.save_received: - command += ["--save-content=%s-receiver-%s.msg"%(queue,index)] - command += opts.receive_arg - if opts.connection_options: - command += ["--connection-options",opts.connection_options] - if host: command = ssh_command(host, command) - if opts.verbose: print "Receiver: ", command - return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) - -def start_send(queue, opts, broker, host): - address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) - command = [os.path.join(opts.qpid_send_path, "qpid-send"), - "-b", broker, - "-a", address, - "--messages", str(opts.messages), - "--content-size", str(opts.content_size), - "--send-rate", str(opts.send_rate), - "--report-total", - "--report-header=no", - "--timestamp=%s"%(opts.timestamp and "yes" or "no"), - "--sequence=%s"%(opts.sequence and "yes" or "no"), - "--durable=%d" % opts.durable, - "--tx=%s" % opts.tx - ] - command += opts.send_arg - if opts.connection_options: - command += ["--connection-options",opts.connection_options] - if host: command = ssh_command(host, command) - if opts.verbose: print "Sender: ", command - return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) - -def error_msg(out, err): - return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) - -def first_line(p): - out,err=p.communicate() - if p.returncode != 0: - raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) - return out.split("\n")[0] - -def connect(broker, opts): - if opts.connection_options: - copts = dict([kv.strip().split(":") for kv in opts.connection_options.strip("{}").split(",")]) - else: - copts = {} - return qm.Connection.establish(broker, **copts) - -def drain(queue, session, opts): - """ - Drain a queue to make sure it is empty. Throw away the messages. - """ - if opts.verbose: print "Draining", queue - r = session.receiver(queue, capacity=1000) - n = 0 - try: - while True: - # FIXME aconway 2014-11-21: activemq broker does not respect the drain flag - # so fetch on an empty queue will hang forever, use get with timeout instead. - # r.fetch(timeout=0) - m = qm.Message() - r.get(timeout=opts.timeout) - n += 1 - if n % 500 == 0: r.session.acknowledge() - r.session.acknowledge() - except qm.Empty: - pass - r.close() - if opts.verbose: print "Drained", queue, n - -def clear_queues(queues, brokers, opts): - c = connect(brokers[0], opts) - for q in queues: - s = c.session() - need_drain = False - try: - s.sender("%s;{delete:always}"%(q)).close() - if opts.verbose: print "Deleted", q - except qm.NotFound: - s = c.session() - except qm.AddressError: - need_drain = True # AMQP 1.0 does not support delete, drain instead. - s = c.session() - address_opts = ["create:always"] - if opts.durable: address_opts += ["node:{durable:true}"] - address = "%s;{%s}"%(q, ",".join(opts.create_option + address_opts)) - if opts.verbose: print "Declaring", address - s.sender(address) - if need_drain: drain(q, s, opts) - c.close() - -def print_header(timestamp): - if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp" - else: latency_header="" - print "send-tp\trecv-tp%s"%latency_header - -def parse(parser, lines): # Parse sender/receiver output - return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] - -def parse_senders(senders): - return parse([int],[first_line(p) for p in senders]) - -def parse_receivers(receivers): - return parse([int,float,float,float],[first_line(p) for p in receivers if p]) - -def print_data(send_stats, recv_stats, total_tp): - for send,recv in map(None, send_stats, recv_stats): - line="" - if send: line += "%d"%send[0] - if recv: - line += "\t%d"%recv[0] - if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) - if total_tp is not None: - line += "\t%d"%total_tp - total_tp = None - print line - -def print_summary(send_stats, recv_stats, total_tp): - def avg(s): sum(s) / len(s) - send_tp = sum([l[0] for l in send_stats]) - recv_tp = sum([l[0] for l in recv_stats]) - summary = "%d\t%d"%(send_tp, recv_tp) - if recv_stats and len(recv_stats[0]) == 4: - l_min = sum(l[1] for l in recv_stats)/len(recv_stats) - l_max = sum(l[2] for l in recv_stats)/len(recv_stats) - l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) - summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) - summary += "\t%d"%total_tp - print summary - - -class ReadyReceiver: - """A receiver for ready messages""" - def __init__(self, queue, broker, opts): - self.connection = connect(broker, opts) - self.receiver = self.connection.session().receiver(queue) - self.receiver.session.sync() - self.timeout=opts.timeout - - def wait(self, receivers): - try: - for i in receivers: self.receiver.fetch(self.timeout) - self.receiver.session.acknowledge() - self.connection.close() - except qm.Empty: - for r in receivers: - if (r.poll() is not None): - out,err=r.communicate() - raise Exception("Receiver error: %s\n%s" % - (" ".join(r.command), error_msg(out,err))) - raise Exception("Timed out waiting for receivers to be ready") - -def flatten(l): - return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), []) - -class RoundRobin: - def __init__(self,items): - self.items = items - self.index = 0 - - def next(self): - if not self.items: return None - ret = self.items[self.index] - self.index = (self.index+1)%len(self.items) - return ret - -def main(): - opts, args = op.parse_args() - opts.client_host = flatten(opts.client_host) - if not opts.broker: - if opts.client_host: - raise Exception("--broker must be specified if --client_host is.") - opts.broker = ["127.0.0.1"] # Deafult to local broker - opts.broker = flatten(opts.broker) - brokers = RoundRobin(opts.broker) - client_hosts = RoundRobin(opts.client_host) - send_out = "" - receive_out = "" - ready_queue="%s-ready"%(opts.queue_name) - queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] - try: - for i in xrange(opts.repeat): - clear_queues(queues+[ready_queue], opts.broker, opts) - ready_receiver = ReadyReceiver(ready_queue, opts.broker[0], opts) - - def start_receivers(): - return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.receivers) ] - - - def start_senders(): - return [ start_send(q, opts,brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.senders) ] - - if opts.report_header and i == 0: print_header(opts.timestamp) - - if opts.fill_drain: - # First fill the queues, then drain them - start = time.time() - senders = start_senders() - for p in senders: - if p: p.wait() - receivers = start_receivers() - for p in receivers: - if p: p.wait() - else: - # Run senders and receivers in parallel - receivers = start_receivers() - ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready - start = time.time() - senders = start_senders() - for p in senders + receivers: - if p: p.wait() - - total_sent = opts.queues * opts.senders * opts.messages - total_tp = total_sent / (time.time()-start) - send_stats=parse_senders(senders) - recv_stats=parse_receivers(receivers) - if opts.summarize: print_summary(send_stats, recv_stats, total_tp) - else: print_data(send_stats, recv_stats, total_tp) - finally: clients.kill() # No strays - -if __name__ == "__main__": main() - |
