summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/qpid-cpp-benchmark
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2016-07-05 21:55:35 +0000
committerRobert Gemmell <robbie@apache.org>2016-07-05 21:55:35 +0000
commitf160cb6566c17945f7ebc4f3a752b2cc6a051685 (patch)
tree809f04fc1967c22e5abc52de07602555bed0e920 /qpid/cpp/src/tests/qpid-cpp-benchmark
parentebb276cca41582b73223b55eff9f2d4386f4f746 (diff)
downloadqpid-python-f160cb6566c17945f7ebc4f3a752b2cc6a051685.tar.gz
QPID-7207: remove cpp and python subdirs from svn trunk, they have migrated to their own git repositories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1751566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark366
1 files changed, 0 insertions, 366 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
deleted file mode 100755
index 3a5419de5e..0000000000
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ /dev/null
@@ -1,366 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import optparse, time, re, os
-
-try:
- import qpid_messaging as qm
-except ImportError:
- qpid_messaging = None
- import qpid.messaging as qm
-
-from threading import Thread
-from subprocess import Popen, PIPE, STDOUT
-
-op = optparse.OptionParser(usage="usage: %prog [options]",
- description="simple performance benchmarks")
-op.add_option("-b", "--broker", default=[], action="append", type="str",
- help="url of broker(s) to connect to, round robin on multiple brokers")
-op.add_option("-c", "--client-host", default=[], action="append", type="str",
- help="host(s) to run clients on via ssh, round robin on mulple hosts")
-op.add_option("-q", "--queues", default=1, type="int", metavar="N",
- help="create N queues (default %default)")
-op.add_option("-s", "--senders", default=1, type="int", metavar="N",
- help="start N senders per queue (default %default)")
-op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
- help="start N receivers per queue (default %default)")
-op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
- help="send N messages per sender (default %default)")
-op.add_option("--queue-name", default="benchmark", metavar="NAME",
- help="base name for queues (default %default)")
-op.add_option("--send-rate", default=0, metavar="N",
- help="send rate limited to N messages/second, 0 means no limit (default %default)")
-op.add_option("--receive-rate", default=0, metavar="N",
- help="receive rate limited to N messages/second, 0 means no limit (default %default)")
-op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
- help="message size in bytes (default %default)")
-op.add_option("--ack-frequency", default=100, metavar="N", type="int",
- help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
-op.add_option("--tx", default=0, metavar="N", type="int",
- help="Transaction batch size, 0 means no transactions")
-op.add_option("--no-report-header", dest="report_header", default=True,
- action="store_false", help="don't print header on report")
-op.add_option("--summarize", default=False, action="store_true",
- help="print summary statistics for multiple senders/receivers: total throughput, average latency")
-op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int")
-op.add_option("--send-option", default=[], action="append", type="str",
- help="Additional option for sending addresses")
-op.add_option("--receive-option", default=[], action="append", type="str",
- help="Additional option for receiving addresses")
-op.add_option("--create-option", default=[], action="append", type="str",
- help="Additional option for creating addresses")
-op.add_option("--send-arg", default=[], action="append", type="str",
- help="Additional argument for qpid-send")
-op.add_option("--receive-arg", default=[], action="append", type="str",
- help="Additional argument for qpid-receive")
-op.add_option("--no-timestamp", dest="timestamp", default=True,
- action="store_false", help="don't add a timestamp, no latency results")
-op.add_option("--sequence", dest="sequence", default=False,
- action="store_true", help="add a sequence number to each message")
-op.add_option("--connection-options", type="str",
- help="Connection options for senders & receivers")
-op.add_option("--durable", default=False, action="store_true",
- help="Use durable queues and messages")
-op.add_option("-t", "--timeout", default=1.0, type="float", metavar="SECONDS",
- help="Timeout for fetch operations (default %default)")
-op.add_option("--save-received", default=False, action="store_true",
- help="Save received message content to files <queuename>-receiver-<n>.msg")
-op.add_option("--verbose", default=False, action="store_true",
- help="Show commands executed")
-op.add_option("--fill-drain", default=False, action="store_true",
- help="First fill the queues, then drain them")
-op.add_option("--qpid-send-path", default="", type="str", metavar="PATH",
- help="path to qpid-send binary")
-op.add_option("--qpid-receive-path", default="", type="str", metavar="PATH",
- help="path to qpid-receive binary")
-
-single_quote_re = re.compile("'")
-def posix_quote(string):
- """ Quote a string for use as an argument in a posix shell"""
- return "'" + single_quote_re.sub("\\'", string) + "'";
-
-def ssh_command(host, command):
- """ Convert command into an ssh command on host with quoting"""
- return ["ssh", host] + [posix_quote(arg) for arg in command]
-
-class Clients:
- def __init__(self): self.clients=[]
-
- def add(self, client):
- self.clients.append(client)
- return client
-
- def kill(self):
- for c in self.clients:
- try: c.kill()
- except: pass
-
-class PopenCommand(Popen):
- """Like Popen but you can query for the command"""
- def __init__(self, command, *args, **kwargs):
- self.command = command
- Popen.__init__(self, command, *args, **kwargs)
-
-clients = Clients()
-
-def start_receive(queue, index, opts, ready_queue, broker, host):
- address_opts=opts.receive_option
- if opts.durable: address_opts += ["node:{durable:true}"]
- address="%s;{%s}"%(queue,",".join(address_opts))
- msg_total=opts.senders*opts.messages
- messages = msg_total/opts.receivers;
- if (index < msg_total%opts.receivers): messages += 1
- if (messages == 0): return None
- command = [os.path.join(opts.qpid_receive_path, "qpid-receive"),
- "-b", broker,
- "-a", address,
- "-m", str(messages),
- "--forever",
- "--print-content=no",
- "--receive-rate", str(opts.receive_rate),
- "--report-total",
- "--ack-frequency", str(opts.ack_frequency),
- "--ready-address", "%s;{create:always}"%ready_queue,
- "--report-header=no",
- "--tx=%s" % opts.tx
- ]
- if opts.save_received:
- command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
- command += opts.receive_arg
- if opts.connection_options:
- command += ["--connection-options",opts.connection_options]
- if host: command = ssh_command(host, command)
- if opts.verbose: print "Receiver: ", command
- return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
-
-def start_send(queue, opts, broker, host):
- address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
- command = [os.path.join(opts.qpid_send_path, "qpid-send"),
- "-b", broker,
- "-a", address,
- "--messages", str(opts.messages),
- "--content-size", str(opts.content_size),
- "--send-rate", str(opts.send_rate),
- "--report-total",
- "--report-header=no",
- "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
- "--sequence=%s"%(opts.sequence and "yes" or "no"),
- "--durable=%d" % opts.durable,
- "--tx=%s" % opts.tx
- ]
- command += opts.send_arg
- if opts.connection_options:
- command += ["--connection-options",opts.connection_options]
- if host: command = ssh_command(host, command)
- if opts.verbose: print "Sender: ", command
- return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
-
-def error_msg(out, err):
- return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
-
-def first_line(p):
- out,err=p.communicate()
- if p.returncode != 0:
- raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
- return out.split("\n")[0]
-
-def connect(broker, opts):
- if opts.connection_options:
- copts = dict([kv.strip().split(":") for kv in opts.connection_options.strip("{}").split(",")])
- else:
- copts = {}
- return qm.Connection.establish(broker, **copts)
-
-def drain(queue, session, opts):
- """
- Drain a queue to make sure it is empty. Throw away the messages.
- """
- if opts.verbose: print "Draining", queue
- r = session.receiver(queue, capacity=1000)
- n = 0
- try:
- while True:
- # FIXME aconway 2014-11-21: activemq broker does not respect the drain flag
- # so fetch on an empty queue will hang forever, use get with timeout instead.
- # r.fetch(timeout=0)
- m = qm.Message()
- r.get(timeout=opts.timeout)
- n += 1
- if n % 500 == 0: r.session.acknowledge()
- r.session.acknowledge()
- except qm.Empty:
- pass
- r.close()
- if opts.verbose: print "Drained", queue, n
-
-def clear_queues(queues, brokers, opts):
- c = connect(brokers[0], opts)
- for q in queues:
- s = c.session()
- need_drain = False
- try:
- s.sender("%s;{delete:always}"%(q)).close()
- if opts.verbose: print "Deleted", q
- except qm.NotFound:
- s = c.session()
- except qm.AddressError:
- need_drain = True # AMQP 1.0 does not support delete, drain instead.
- s = c.session()
- address_opts = ["create:always"]
- if opts.durable: address_opts += ["node:{durable:true}"]
- address = "%s;{%s}"%(q, ",".join(opts.create_option + address_opts))
- if opts.verbose: print "Declaring", address
- s.sender(address)
- if need_drain: drain(q, s, opts)
- c.close()
-
-def print_header(timestamp):
- if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
- else: latency_header=""
- print "send-tp\trecv-tp%s"%latency_header
-
-def parse(parser, lines): # Parse sender/receiver output
- return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
-
-def parse_senders(senders):
- return parse([int],[first_line(p) for p in senders])
-
-def parse_receivers(receivers):
- return parse([int,float,float,float],[first_line(p) for p in receivers if p])
-
-def print_data(send_stats, recv_stats, total_tp):
- for send,recv in map(None, send_stats, recv_stats):
- line=""
- if send: line += "%d"%send[0]
- if recv:
- line += "\t%d"%recv[0]
- if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
- if total_tp is not None:
- line += "\t%d"%total_tp
- total_tp = None
- print line
-
-def print_summary(send_stats, recv_stats, total_tp):
- def avg(s): sum(s) / len(s)
- send_tp = sum([l[0] for l in send_stats])
- recv_tp = sum([l[0] for l in recv_stats])
- summary = "%d\t%d"%(send_tp, recv_tp)
- if recv_stats and len(recv_stats[0]) == 4:
- l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
- l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
- l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
- summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
- summary += "\t%d"%total_tp
- print summary
-
-
-class ReadyReceiver:
- """A receiver for ready messages"""
- def __init__(self, queue, broker, opts):
- self.connection = connect(broker, opts)
- self.receiver = self.connection.session().receiver(queue)
- self.receiver.session.sync()
- self.timeout=opts.timeout
-
- def wait(self, receivers):
- try:
- for i in receivers: self.receiver.fetch(self.timeout)
- self.receiver.session.acknowledge()
- self.connection.close()
- except qm.Empty:
- for r in receivers:
- if (r.poll() is not None):
- out,err=r.communicate()
- raise Exception("Receiver error: %s\n%s" %
- (" ".join(r.command), error_msg(out,err)))
- raise Exception("Timed out waiting for receivers to be ready")
-
-def flatten(l):
- return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
-
-class RoundRobin:
- def __init__(self,items):
- self.items = items
- self.index = 0
-
- def next(self):
- if not self.items: return None
- ret = self.items[self.index]
- self.index = (self.index+1)%len(self.items)
- return ret
-
-def main():
- opts, args = op.parse_args()
- opts.client_host = flatten(opts.client_host)
- if not opts.broker:
- if opts.client_host:
- raise Exception("--broker must be specified if --client_host is.")
- opts.broker = ["127.0.0.1"] # Deafult to local broker
- opts.broker = flatten(opts.broker)
- brokers = RoundRobin(opts.broker)
- client_hosts = RoundRobin(opts.client_host)
- send_out = ""
- receive_out = ""
- ready_queue="%s-ready"%(opts.queue_name)
- queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
- try:
- for i in xrange(opts.repeat):
- clear_queues(queues+[ready_queue], opts.broker, opts)
- ready_receiver = ReadyReceiver(ready_queue, opts.broker[0], opts)
-
- def start_receivers():
- return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.receivers) ]
-
-
- def start_senders():
- return [ start_send(q, opts,brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.senders) ]
-
- if opts.report_header and i == 0: print_header(opts.timestamp)
-
- if opts.fill_drain:
- # First fill the queues, then drain them
- start = time.time()
- senders = start_senders()
- for p in senders:
- if p: p.wait()
- receivers = start_receivers()
- for p in receivers:
- if p: p.wait()
- else:
- # Run senders and receivers in parallel
- receivers = start_receivers()
- ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready
- start = time.time()
- senders = start_senders()
- for p in senders + receivers:
- if p: p.wait()
-
- total_sent = opts.queues * opts.senders * opts.messages
- total_tp = total_sent / (time.time()-start)
- send_stats=parse_senders(senders)
- recv_stats=parse_receivers(receivers)
- if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
- else: print_data(send_stats, recv_stats, total_tp)
- finally: clients.kill() # No strays
-
-if __name__ == "__main__": main()
-