summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid_cpp_benchmark
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-04-12 17:49:57 +0000
committerAlan Conway <aconway@apache.org>2010-04-12 17:49:57 +0000
commit9c6d2488b20df5ca73300f01a36f7f89fe2dd929 (patch)
tree9747e1e9b0b223b72bcb1e80ef6cf540e9f1df97 /cpp/src/tests/qpid_cpp_benchmark
parent9d2af98f3234ef0f635d01cbb1540e0d2b00264d (diff)
downloadqpid-python-9c6d2488b20df5ca73300f01a36f7f89fe2dd929.tar.gz
qpid_cpp_benchmark waits for receivers to be ready before starting senders.
This avoids exaggerated latency numbers due to messages siting on the queue while receivers are connecting and subscribing. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@933333 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid_cpp_benchmark')
-rwxr-xr-xcpp/src/tests/qpid_cpp_benchmark37
1 files changed, 23 insertions, 14 deletions
diff --git a/cpp/src/tests/qpid_cpp_benchmark b/cpp/src/tests/qpid_cpp_benchmark
index d830804407..177231f026 100755
--- a/cpp/src/tests/qpid_cpp_benchmark
+++ b/cpp/src/tests/qpid_cpp_benchmark
@@ -43,7 +43,7 @@ op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
op.add_option("--ack-frequency", default=0, metavar="N", type="int",
help="receiver ack's every N messages, 0 means unconfirmed")
-def start_receive(queue, opts):
+def start_receive(queue, opts, ready_queue):
return Popen(["qpid_receive",
"-b", opts.broker,
"-a", "%s;{create:always}"%(queue),
@@ -51,6 +51,7 @@ def start_receive(queue, opts):
"--print-content=no",
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
+ "--ready-address", ready_queue
],
stdout=PIPE, stderr=STDOUT)
@@ -78,34 +79,42 @@ def delete_queues(queues, broker):
except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue"
c.close()
-def wait_for_queues(queues, broker):
- c = qpid.messaging.Connection(broker)
- c.open()
- s = c.session()
- while True:
- try:
- for q in queues: s.sender(q)
- break
- except: pass
- c.close()
-
def skip_first_line(text): return "\n".join(text.split("\n")[1:])
def print_output(processes):
print wait_for_output(processes.pop(0)),
for p in processes: print skip_first_line(wait_for_output(p)),
+class ReadyReceiver:
+ """A receiver for ready messages"""
+ def __init__(self, queue, broker):
+ delete_queues([queue], broker)
+ self.connection = qpid.messaging.Connection(broker)
+ self.connection.open()
+ self.receiver = self.connection.session().receiver(
+ "%s;{create:always,delete:always}"%(queue))
+ self.timeout=2
+
+ def wait(self, n):
+ try:
+ for i in xrange(n): self.receiver.fetch(self.timeout)
+ except qpid.messaging.Empty: raise "Timed out waiting for receivers to be ready"
+ self.connection.close()
+
def main():
opts, args = op.parse_args()
+ ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
delete_queues(queues, opts.broker)
- receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)]
- wait_for_queues(queues, opts.broker) # Wait for receivers to be ready
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker)
+ receivers = [start_receive(q, opts, ready_queue) for q in queues for i in xrange(opts.receivers)]
+ ready_receiver.wait(len(receivers)) # Wait for receivers to be ready.
senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)]
print "Send"
print_output(senders)
print "\nReceive"
print_output(receivers)
+ print
delete_queues(queues, opts.broker)
if __name__ == "__main__": main()