From 077facba2cddd2c49d14e496dfa942c23a5e66c9 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 15 Dec 2010 18:10:12 +0000 Subject: Fix flow control for qpid-cpp-benchmark with multiple senders. Ensure senders & receivers agree on number of messages sent/received. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1049656 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/qpid-cpp-benchmark | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) (limited to 'cpp/src/tests/qpid-cpp-benchmark') diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark index 559adee5a7..e865a49813 100755 --- a/cpp/src/tests/qpid-cpp-benchmark +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -75,12 +75,16 @@ def ssh_command(host, command): """Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] -def start_receive(queue, opts, ready_queue, broker, host): +def start_receive(queue, index, opts, ready_queue, broker, host): address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) + msg_total=opts.senders*opts.messages + messages = msg_total/opts.receivers; + if (index < msg_total%opts.receivers): messages += 1 + if (messages == 0): return None command = ["qpid-receive", "-b", broker, "-a", address, - "-m", str((opts.senders*opts.messages)/opts.receivers), + "-m", str(messages), "--forever", "--print-content=no", "--receive-rate", str(opts.receive_rate), @@ -101,7 +105,6 @@ def start_send(queue, opts, broker, host): "-b", broker, "-a", address, "--messages", str(opts.messages), - "--send-eos", str(opts.receivers), "--content-size", str(opts.content_size), "--send-rate", str(opts.send_rate), "--report-total", @@ -118,7 +121,7 @@ def start_send(queue, opts, broker, host): def first_line(p): out,err=p.communicate() - if p.returncode != 0: raise Exception("ERROR:\n%s"%(out)) + if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip())) return out.split("\n")[0] def delete_queues(queues, broker): @@ -144,7 +147,7 @@ def parse_senders(senders): return parse([int],[first_line(p) for p in senders]) def parse_receivers(receivers): - return parse([int,float,float,float],[first_line(p) for p in receivers]) + return parse([int,float,float,float],[first_line(p) for p in receivers if p]) def print_data(send_stats, recv_stats): for send,recv in map(None, send_stats, recv_stats): @@ -216,9 +219,9 @@ def main(): for i in xrange(opts.repeat): delete_queues(queues, opts.broker[0]) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - receivers = [start_receive(q, opts, ready_queue, brokers.next(), client_hosts.next()) + receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.receivers)] - ready_receiver.wait(receivers) # Wait for receivers to be ready. + ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. senders = [start_send(q, opts,brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.senders)] if opts.report_header and i == 0: print_header(opts.timestamp) -- cgit v1.2.1