diff options
| author | Alan Conway <aconway@apache.org> | 2014-11-21 18:46:35 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-11-21 18:46:35 +0000 |
| commit | 0b47f82aabfbb8ca02ee484a097b4d197af5e1de (patch) | |
| tree | d28482ccea526ce11db5021127d4b61c5eb9af78 /qpid/cpp/src/tests/qpid-cpp-benchmark | |
| parent | 5110a6f09a5e8a0d5ebaabce9fdb8b52ba0940cd (diff) | |
| download | qpid-python-0b47f82aabfbb8ca02ee484a097b4d197af5e1de.tar.gz | |
NO-JIRA: Fix qpid-cpp-benchmark to support AMQP 1.0.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1640975 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/qpid-cpp-benchmark')
| -rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 85 |
1 files changed, 63 insertions, 22 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 9ac6a10883..70a8a849ad 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -18,7 +18,14 @@ # under the License. # -import optparse, time, qpid.messaging, re, os +import optparse, time, re, os + +try: + import qpid_messaging as qm +except ImportError: + qpid_messaging = None + import qpid.messaging as qm + from threading import Thread from subprocess import Popen, PIPE, STDOUT @@ -46,6 +53,8 @@ op.add_option("--content-size", default=1024, type="int", metavar="BYTES", help="message size in bytes (default %default)") op.add_option("--ack-frequency", default=100, metavar="N", type="int", help="receiver ack's every N messages, 0 means unconfirmed (default %default)") +op.add_option("--tx", default=0, metavar="N", type="int", + help="Transaction batch size, 0 means no transactions") op.add_option("--no-report-header", dest="report_header", default=True, action="store_false", help="don't print header on report") op.add_option("--summarize", default=False, action="store_true", @@ -73,8 +82,6 @@ op.add_option("--save-received", default=False, action="store_true", help="Save received message content to files <queuename>-receiver-<n>.msg") op.add_option("--verbose", default=False, action="store_true", help="Show commands executed") -op.add_option("--no-delete", default=False, action="store_true", - help="Don't delete the test queues.") op.add_option("--fill-drain", default=False, action="store_true", help="First fill the queues, then drain them") op.add_option("--qpid-send-path", default="", type="str", metavar="PATH", @@ -129,7 +136,8 @@ def start_receive(queue, index, opts, ready_queue, broker, host): "--report-total", "--ack-frequency", str(opts.ack_frequency), "--ready-address", "%s;{create:always}"%ready_queue, - "--report-header=no" + "--report-header=no", + "--tx=%s" % opts.tx ] if opts.save_received: command += ["--save-content=%s-receiver-%s.msg"%(queue,index)] @@ -152,7 +160,8 @@ def start_send(queue, opts, broker, host): "--report-header=no", "--timestamp=%s"%(opts.timestamp and "yes" or "no"), "--sequence=%s"%(opts.sequence and "yes" or "no"), - "--durable", str(opts.durable) + "--durable=%d" % opts.durable, + "--tx=%s" % opts.tx ] command += opts.send_arg if opts.connection_options: @@ -170,17 +179,50 @@ def first_line(p): raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) return out.split("\n")[0] -def recreate_queues(queues, brokers, no_delete, opts): - c = qpid.messaging.Connection(brokers[0]) - c.open() - s = c.session() +def connect(broker, opts): + if opts.connection_options: + copts = dict([kv.strip().split(":") for kv in opts.connection_options.strip("{}").split(",")]) + else: + copts = {} + return qm.Connection.establish(broker, **copts) + +def drain(queue, session, opts): + """ + Drain a queue to make sure it is empty. Throw away the messages. + """ + if opts.verbose: print "Draining", queue + r = session.receiver(queue, capacity=1000) + n = 0 + try: + while True: + r.fetch(timeout=0) + n += 1 + if n % 500 == 0: r.session.acknowledge() + r.session.acknowledge() + except qm.Empty: + pass + r.close() + if opts.verbose: print "Drained", queue, n + +def clear_queues(queues, brokers, opts): + c = connect(brokers[0], opts) for q in queues: - if not no_delete: - try: s.sender("%s;{delete:always}"%(q)).close() - except qpid.messaging.exceptions.NotFound: pass - address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"])) - if opts.verbose: print "Creating", address + s = c.session() + need_drain = False + try: + s.sender("%s;{delete:always}"%(q)).close() + if opts.verbose: print "Deleted", q + except qm.NotFound: + s = c.session() + except qm.AddressError: + need_drain = True # AMQP 1.0 does not support delete, drain instead. + s = c.session() + address_opts = ["create:always"] + if opts.durable: address_opts += ["node:{durable:true}"] + address = "%s;{%s}"%(q, ",".join(opts.create_option + address_opts)) + if opts.verbose: print "Declaring", address s.sender(address) + if need_drain: drain(q, s, opts) c.close() def print_header(timestamp): @@ -225,19 +267,18 @@ def print_summary(send_stats, recv_stats, total_tp): class ReadyReceiver: """A receiver for ready messages""" - def __init__(self, queue, broker): - self.connection = qpid.messaging.Connection(broker) - self.connection.open() - self.receiver = self.connection.session().receiver( - "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) + def __init__(self, queue, broker, opts): + self.connection = connect(broker, opts) + self.receiver = self.connection.session().receiver(queue) self.receiver.session.sync() self.timeout=10 def wait(self, receivers): try: for i in receivers: self.receiver.fetch(self.timeout) + self.receiver.session.acknowledge() self.connection.close() - except qpid.messaging.Empty: + except qm.Empty: for r in receivers: if (r.poll() is not None): out,err=r.communicate() @@ -275,8 +316,8 @@ def main(): queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] try: for i in xrange(opts.repeat): - recreate_queues(queues, opts.broker, opts.no_delete, opts) - ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) + clear_queues(queues+[ready_queue], opts.broker, opts) + ready_receiver = ReadyReceiver(ready_queue, opts.broker[0], opts) def start_receivers(): return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) |
