summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid-cpp-benchmark
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 01:19:00 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 01:19:00 +0000
commitebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch)
treedcfb94e75656c6c239fc3dcb754cd2015126424d /cpp/src/tests/qpid-cpp-benchmark
parent5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff)
downloadqpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-xcpp/src/tests/qpid-cpp-benchmark71
1 files changed, 25 insertions, 46 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark
index 300d34774f..1f77226b4d 100755
--- a/cpp/src/tests/qpid-cpp-benchmark
+++ b/cpp/src/tests/qpid-cpp-benchmark
@@ -77,20 +77,6 @@ def ssh_command(host, command):
"""Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
-class Clients:
- def __init__(self): self.clients=[]
-
- def add(self, client):
- self.clients.append(client)
- return client
-
- def kill(self):
- for c in self.clients:
- try: c.kill()
- except: pass
-
-clients = Clients()
-
def start_receive(queue, index, opts, ready_queue, broker, host):
address_opts=["create:receiver"] + opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
@@ -115,7 +101,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE))
+ return Popen(command, stdout=PIPE)
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option))
@@ -136,7 +122,7 @@ def start_send(queue, opts, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE))
+ return Popen(command, stdout=PIPE)
def first_line(p):
out,err=p.communicate()
@@ -147,11 +133,7 @@ def delete_queues(queues, broker):
c = qpid.messaging.Connection(broker)
c.open()
for q in queues:
- try:
- s = c.session()
- snd = s.sender("%s;{delete:always}"%(q))
- snd.close()
- s.sync()
+ try: s = c.session().sender("%s;{delete:always}"%(q))
except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
c.close()
@@ -163,6 +145,7 @@ def print_header(timestamp):
def parse(parser, lines): # Parse sender/receiver output
for l in lines:
fn_val = zip(parser, l)
+
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
def parse_senders(senders):
@@ -173,12 +156,11 @@ def parse_receivers(receivers):
def print_data(send_stats, recv_stats):
for send,recv in map(None, send_stats, recv_stats):
- line=""
- if send: line += "%d"%send[0]
+ if send: print send[0],
if recv:
- line += "\t\t%d"%recv[0]
- if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
- print line
+ print "\t\t%d"%recv[0],
+ if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]),
+ print
def print_summary(send_stats, recv_stats):
def avg(s): sum(s) / len(s)
@@ -202,11 +184,11 @@ class ReadyReceiver:
self.receiver = self.connection.session().receiver(
"%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
self.receiver.session.sync()
- self.timeout=10
+ self.timeout=2
def wait(self, receivers):
try:
- for i in receivers: self.receiver.fetch(self.timeout)
+ for i in xrange(len(receivers)): self.receiver.fetch(self.timeout)
self.connection.close()
except qpid.messaging.Empty:
for r in receivers:
@@ -215,8 +197,7 @@ class ReadyReceiver:
raise Exception("Receiver error: %s"%(out))
raise Exception("Timed out waiting for receivers to be ready")
-def flatten(l):
- return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
+def flatten(l): return sum(map(lambda s: s.split(","), l),[])
class RoundRobin:
def __init__(self,items):
@@ -240,22 +221,20 @@ def main():
receive_out = ""
ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
- try:
- for i in xrange(opts.repeat):
- delete_queues(queues, opts.broker[0])
- ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.receivers)]
- ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
- senders = [start_send(q, opts,brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.senders)]
- if opts.report_header and i == 0: print_header(opts.timestamp)
- send_stats=parse_senders(senders)
- recv_stats=parse_receivers(receivers)
- if opts.summarize: print_summary(send_stats, recv_stats)
- else: print_data(send_stats, recv_stats)
- delete_queues(queues, opts.broker[0])
- finally: clients.kill() # No strays
+ for i in xrange(opts.repeat):
+ delete_queues(queues, opts.broker[0])
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+ receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers)]
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+ senders = [start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders)]
+ if opts.report_header and i == 0: print_header(opts.timestamp)
+ send_stats=parse_senders(senders)
+ recv_stats=parse_receivers(receivers)
+ if opts.summarize: print_summary(send_stats, recv_stats)
+ else: print_data(send_stats, recv_stats)
+ delete_queues(queues, opts.broker[0])
if __name__ == "__main__": main()