summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid_cpp_benchmark
blob: e82251078fe6a8421c206b4ba7bac2ee95ca3329 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import optparse, time, qpid.messaging
from threading import Thread
from subprocess import Popen, PIPE, STDOUT

op = optparse.OptionParser(usage="usage: %prog [options]",
                           description="simple performance benchmarks")
op.add_option("-b", "--broker", default="127.0.0.1",
              help="url of broker to connect to")
op.add_option("-q", "--queues", default=1, type="int", metavar="N",
              help="create N queues (default %default)")
op.add_option("-s", "--senders", default=1, type="int", metavar="N",
                  help="start N senders per queue (default %default)")
op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
                  help="start N receivers per queue (default %default)")
op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
                      help="send N messages per sender (default %default)")
op.add_option("--queue-name", default="benchmark",
               help="base name for queues (default %default)")
op.add_option("--send-rate", default=0, metavar="R",
              help="send rate limited to R messages/second, 0 means no limit (default %default)")
op.add_option("--content-size", default=1024, type="int", metavar="BYTES", 
              help="message size in bytes (default %default)")

def start_receive(queue, opts):
    return Popen(["qpid_receive",
                  "-b", opts.broker,
                  "-a", "%s;{create:always}"%(queue),
                  "--forever",
                  "--print-content=no",
                  "--report-total",
                  ],
                 stdout=PIPE, stderr=STDOUT)

def start_send(queue, opts):
    return Popen(["qpid_send",
                  "-b", opts.broker,
                  "-a", queue,
                  "--messages", str(opts.messages),
                  "--send-eos", str(opts.receivers),
                  "--content-size", str(opts.content_size),
                  "--rate", str(opts.send_rate),
                  "--report-total"],
                 stdout=PIPE, stderr=STDOUT)

def wait_for_output(p):
    out,err=p.communicate()
    if p.returncode != 0: raise Exception("ERROR:\n%s"%(out))
    return out

def delete_queues(queues, broker):
    c = qpid.messaging.Connection(broker)
    c.open()
    for q in queues:
        try: s = c.session().sender("%s;{delete:always}"%(q))
        except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue"
    c.close()

def wait_for_queues(queues, broker):
    c = qpid.messaging.Connection(broker)
    c.open()
    s = c.session()
    while True:
        try:
            for q in queues: s.sender(q)
            break
        except: pass
    c.close()

def skip_first_line(text): return "\n".join(text.split("\n")[1:])

def print_output(processes):
    print wait_for_output(processes.pop(0)),
    for p in processes: print skip_first_line(wait_for_output(p)),

def main():
    opts, args = op.parse_args()
    queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
    delete_queues(queues, opts.broker)
    receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)]
    wait_for_queues(queues, opts.broker)     # Wait for receivers to be ready
    senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)]
    print "Send"
    print_output(senders)
    print "\nReceive"
    print_output(receivers)
    delete_queues(queues, opts.broker)

if __name__ == "__main__": main()