summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/qpid-cpp-benchmark
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-11-21 18:46:35 +0000
committerAlan Conway <aconway@apache.org>2014-11-21 18:46:35 +0000
commit0b47f82aabfbb8ca02ee484a097b4d197af5e1de (patch)
treed28482ccea526ce11db5021127d4b61c5eb9af78 /qpid/cpp/src/tests/qpid-cpp-benchmark
parent5110a6f09a5e8a0d5ebaabce9fdb8b52ba0940cd (diff)
downloadqpid-python-0b47f82aabfbb8ca02ee484a097b4d197af5e1de.tar.gz
NO-JIRA: Fix qpid-cpp-benchmark to support AMQP 1.0.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1640975 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark85
1 files changed, 63 insertions, 22 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 9ac6a10883..70a8a849ad 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -18,7 +18,14 @@
# under the License.
#
-import optparse, time, qpid.messaging, re, os
+import optparse, time, re, os
+
+try:
+ import qpid_messaging as qm
+except ImportError:
+ qpid_messaging = None
+ import qpid.messaging as qm
+
from threading import Thread
from subprocess import Popen, PIPE, STDOUT
@@ -46,6 +53,8 @@ op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
help="message size in bytes (default %default)")
op.add_option("--ack-frequency", default=100, metavar="N", type="int",
help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
+op.add_option("--tx", default=0, metavar="N", type="int",
+ help="Transaction batch size, 0 means no transactions")
op.add_option("--no-report-header", dest="report_header", default=True,
action="store_false", help="don't print header on report")
op.add_option("--summarize", default=False, action="store_true",
@@ -73,8 +82,6 @@ op.add_option("--save-received", default=False, action="store_true",
help="Save received message content to files <queuename>-receiver-<n>.msg")
op.add_option("--verbose", default=False, action="store_true",
help="Show commands executed")
-op.add_option("--no-delete", default=False, action="store_true",
- help="Don't delete the test queues.")
op.add_option("--fill-drain", default=False, action="store_true",
help="First fill the queues, then drain them")
op.add_option("--qpid-send-path", default="", type="str", metavar="PATH",
@@ -129,7 +136,8 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
"--ready-address", "%s;{create:always}"%ready_queue,
- "--report-header=no"
+ "--report-header=no",
+ "--tx=%s" % opts.tx
]
if opts.save_received:
command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
@@ -152,7 +160,8 @@ def start_send(queue, opts, broker, host):
"--report-header=no",
"--timestamp=%s"%(opts.timestamp and "yes" or "no"),
"--sequence=%s"%(opts.sequence and "yes" or "no"),
- "--durable", str(opts.durable)
+ "--durable=%d" % opts.durable,
+ "--tx=%s" % opts.tx
]
command += opts.send_arg
if opts.connection_options:
@@ -170,17 +179,50 @@ def first_line(p):
raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
return out.split("\n")[0]
-def recreate_queues(queues, brokers, no_delete, opts):
- c = qpid.messaging.Connection(brokers[0])
- c.open()
- s = c.session()
+def connect(broker, opts):
+ if opts.connection_options:
+ copts = dict([kv.strip().split(":") for kv in opts.connection_options.strip("{}").split(",")])
+ else:
+ copts = {}
+ return qm.Connection.establish(broker, **copts)
+
+def drain(queue, session, opts):
+ """
+ Drain a queue to make sure it is empty. Throw away the messages.
+ """
+ if opts.verbose: print "Draining", queue
+ r = session.receiver(queue, capacity=1000)
+ n = 0
+ try:
+ while True:
+ r.fetch(timeout=0)
+ n += 1
+ if n % 500 == 0: r.session.acknowledge()
+ r.session.acknowledge()
+ except qm.Empty:
+ pass
+ r.close()
+ if opts.verbose: print "Drained", queue, n
+
+def clear_queues(queues, brokers, opts):
+ c = connect(brokers[0], opts)
for q in queues:
- if not no_delete:
- try: s.sender("%s;{delete:always}"%(q)).close()
- except qpid.messaging.exceptions.NotFound: pass
- address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
- if opts.verbose: print "Creating", address
+ s = c.session()
+ need_drain = False
+ try:
+ s.sender("%s;{delete:always}"%(q)).close()
+ if opts.verbose: print "Deleted", q
+ except qm.NotFound:
+ s = c.session()
+ except qm.AddressError:
+ need_drain = True # AMQP 1.0 does not support delete, drain instead.
+ s = c.session()
+ address_opts = ["create:always"]
+ if opts.durable: address_opts += ["node:{durable:true}"]
+ address = "%s;{%s}"%(q, ",".join(opts.create_option + address_opts))
+ if opts.verbose: print "Declaring", address
s.sender(address)
+ if need_drain: drain(q, s, opts)
c.close()
def print_header(timestamp):
@@ -225,19 +267,18 @@ def print_summary(send_stats, recv_stats, total_tp):
class ReadyReceiver:
"""A receiver for ready messages"""
- def __init__(self, queue, broker):
- self.connection = qpid.messaging.Connection(broker)
- self.connection.open()
- self.receiver = self.connection.session().receiver(
- "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
+ def __init__(self, queue, broker, opts):
+ self.connection = connect(broker, opts)
+ self.receiver = self.connection.session().receiver(queue)
self.receiver.session.sync()
self.timeout=10
def wait(self, receivers):
try:
for i in receivers: self.receiver.fetch(self.timeout)
+ self.receiver.session.acknowledge()
self.connection.close()
- except qpid.messaging.Empty:
+ except qm.Empty:
for r in receivers:
if (r.poll() is not None):
out,err=r.communicate()
@@ -275,8 +316,8 @@ def main():
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
try:
for i in xrange(opts.repeat):
- recreate_queues(queues, opts.broker, opts.no_delete, opts)
- ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+ clear_queues(queues+[ready_queue], opts.broker, opts)
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0], opts)
def start_receivers():
return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())