diff options
author | Jonathan Robie <jonathan@apache.org> | 2010-12-17 15:29:41 +0000 |
---|---|---|
committer | Jonathan Robie <jonathan@apache.org> | 2010-12-17 15:29:41 +0000 |
commit | 483bbe20da6960ef43773f5889e04c82e16dd00c (patch) | |
tree | bea09b1f55b67253d3c27bb79a7029110dd08cbe /tools | |
parent | b01bb777d319ef8e5920fbe5c5805a27c64e32ef (diff) | |
download | qpid-python-483bbe20da6960ef43773f5889e04c82e16dd00c.tar.gz |
Made qpid-xxx management scripts callable as python functions.
Examples (from cli_tests.py):
def qpid_config_api(self, arg = ""):
script = import_script(checkenv("QPID_CONFIG_EXEC"))
broker = ["-a", "localhost:"+str(self.broker.port)]
return script.main(broker + arg.split())
def qpid_route_api(self, arg = ""):
script = import_script(checkenv("QPID_ROUTE_EXEC"))
return script.main(arg.split())
Useful primarily for qpid-config, qpid-route, and qpid-cluster.
Probably not useful for qpid-stat, qpid-printevents, qpid-queue-stats, which just create screen output.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1050425 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/src/py/qpid-cluster-store | 8 | ||||
-rwxr-xr-x | tools/src/py/qpid-config | 593 | ||||
-rwxr-xr-x | tools/src/py/qpid-printevents | 53 | ||||
-rwxr-xr-x | tools/src/py/qpid-queue-stats | 6 | ||||
-rwxr-xr-x | tools/src/py/qpid-route | 437 | ||||
-rwxr-xr-x | tools/src/py/qpid-stat | 185 |
6 files changed, 676 insertions, 606 deletions
diff --git a/tools/src/py/qpid-cluster-store b/tools/src/py/qpid-cluster-store index 8cbfa5505b..3541b6679c 100755 --- a/tools/src/py/qpid-cluster-store +++ b/tools/src/py/qpid-cluster-store @@ -19,6 +19,7 @@ # under the License. # + from qpid.datatypes import uuid4, UUID, parseUUID import optparse, os.path, sys, string @@ -61,8 +62,8 @@ class ClusterStoreStatus: self.shutdown_id = uuid4() self.write() -def main(): - opts, args = op.parse_args() +def main(argv=None): + opts, args = op.parse_args(args=argv) if len(args) != 1: op.error("incorrect number of arguments") try: status = ClusterStoreStatus(args[0]+"/cluster/store.status") except Exception,e: print e; return 1 @@ -70,4 +71,5 @@ def main(): if opts.mark_clean: status.mark_clean(); print status return 0 -if __name__ == "__main__": sys.exit(main()) +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config index 28581daae9..a2358a0abd 100755 --- a/tools/src/py/qpid-config +++ b/tools/src/py/qpid-config @@ -25,25 +25,29 @@ import sys import locale from qmf.console import Session -_recursive = False -_host = "localhost" -_connTimeout = 10 -_altern_ex = None -_passive = False -_durable = False -_clusterDurable = False -_if_empty = True -_if_unused = True -_fileCount = 8 -_fileSize = 24 -_maxQueueSize = None -_maxQueueCount = None -_limitPolicy = None -_order = None -_msgSequence = False -_ive = False -_eventGeneration = None -_file = None +class Config: + def __init__(self): + self._recursive = False + self._host = "localhost" + self._connTimeout = 10 + self._altern_ex = None + self._passive = False + self._durable = False + self._clusterDurable = False + self._if_empty = True + self._if_unused = True + self._fileCount = 8 + self._fileSize = 24 + self._maxQueueSize = None + self._maxQueueCount = None + self._limitPolicy = None + self._order = None + self._msgSequence = False + self._ive = False + self._eventGeneration = None + self._file = None + +config = Config() FILECOUNT = "qpid.file_count" FILESIZE = "qpid.file_size" @@ -57,6 +61,176 @@ MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" +class JHelpFormatter(IndentedHelpFormatter): + """Format usage and description without stripping newlines from usage strings + """ + + def format_usage(self, usage): + return usage + + + def format_description(self, description): + if description: + return description + "\n" + else: + return "" + +def Usage(): + print "qpid-config: invalid arguments. Try $ qpid-config --help" + exit(-1) + +def OptionsAndArguments(argv): + """ Set global variables for options, return arguments """ + + global config + + usage = """ + Usage: qpid-config [OPTIONS] + qpid-config [OPTIONS] exchanges [filter-string] + qpid-config [OPTIONS] queues [filter-string] + qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions] + qpid-config [OPTIONS] del exchange <name> + qpid-config [OPTIONS] add queue <name> [AddQueueOptions] + qpid-config [OPTIONS] del queue <name> [DelQueueOptions] + qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key] + <for type xml> [-f -|filename] + <for type header> [all|any] k1=v1 [, k2=v2...] + qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]""" + + description = """ + ADDRESS syntax: + + [username/password@] hostname + ip-address [:<port>] + + Examples: + + $ qpid-config add queue q + $ qpid-config add exchange direct d localhost:5672 + $ qpid-config exchanges 10.1.1.7:10000 + $ qpid-config queues guest/guest@broker-host:10000 + + Add Exchange <type> values: + + direct Direct exchange for point-to-point communication + fanout Fanout exchange for broadcast communication + topic Topic exchange that routes messages using binding keys with wildcards + headers Headers exchange that matches header fields against the binding keys + xml XML Exchange - allows content filtering using an XQuery + + + Queue Limit Actions + + none (default) - Use broker's default policy + reject - Reject enqueued messages + flow-to-disk - Page messages to disk + ring - Replace oldest unacquired message with new + ring-strict - Replace oldest message, reject if oldest is acquired + + Queue Ordering Policies + + fifo (default) - First in, first out + lvq - Last Value Queue ordering, allows queue browsing + lvq-no-browse - Last Value Queue ordering, browsing clients may lose data""" + + parser = OptionParser(usage=usage, + description=description, + formatter=JHelpFormatter()) + + group1 = OptionGroup(parser, "General Options") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list") + group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="ADDRESS", help="Maximum time to wait for broker connection (in seconds)") + parser.add_option_group(group1) + + group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues") + group2.add_option("--alternate-exchange", action="store", type="string", metavar="NAME", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.") + group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.") + group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.") + parser.add_option_group(group2) + + group3 = OptionGroup(parser, "Options for Adding Queues") + group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node") + group3.add_option("--file-count", action="store", type="int", default=8, metavar="N", help="Number of files in queue's persistence journal") + group3.add_option("--file-size", action="store", type="int", default=24, metavar="N", help="File size in pages (64Kib/page)") + group3.add_option("--max-queue-size", action="store", type="int", metavar="N", help="Number of files in queue's persistence journal") + group3.add_option("--max-queue-count", action="store", type="int", metavar="N", help="Number of files in queue's persistence journal") + group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="CHOICE", help="Action to take when queue limit is reached") + group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="CHOICE", help="Queue ordering policy") + group3.add_option("--generate-queue-events", action="store", type="int", metavar="N", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") + # no option for declaring an exclusive queue - which can only be used by the session that creates it. + parser.add_option_group(group3) + + group4 = OptionGroup(parser, "Options for Adding Exchanges") + group4.add_option("--sequence", action="store_true", help="Exchange will insert a 'qpid.msg_sequence' field in the message header") + group4.add_option("--ive", action="store_true", help="Exchange will behave as an 'initial-value-exchange', keeping a reference to the last message forwarded and enqueuing that message to newly bound queues.") + parser.add_option_group(group4) + + group5 = OptionGroup(parser, "Options for Deleting Queues") + group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty") + group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty") + group5.add_option("--force-if-not-used", action="store_true", help="Force delete of queue even if it's currently used") + parser.add_option_group(group5) + + group6 = OptionGroup(parser, "Options for Declaring Bindings") + group6.add_option("-f", "--file", action="store", type="string", metavar="FILE.xq", help="For XML Exchange bindings - specifies the name of a file containing an XQuery.") + parser.add_option_group(group6) + + opts, encArgs = parser.parse_args(args=argv) + + try: + encoding = locale.getpreferredencoding() + args = [a.decode(encoding) for a in encArgs] + except: + args = encArgs + + if opts.bindings: + config._recursive = True + if opts.broker_addr: + config._host = opts.broker_addr + if opts.timeout: + config._connTimeout = opts.timeout + if config._connTimeout == 0: + config._connTimeout = None + if opts.alternate_exchange: + config._altern_ex = opts.alternate_exchange + if opts.passive: + config._passive = True + if opts.durable: + config._durable = True + if opts.cluster_durable: + config._clusterDurable = True + if opts.file: + config._file = opts.file + if opts.file_count: + config._fileCount = opts.file_count + if opts.file_size: + config._fileSize = opts.file_size + if opts.max_queue_size: + config._maxQueueSize = opts.max_queue_size + if opts.max_queue_count: + config._maxQueueCount = opts.max_queue_count + if opts.limit_policy: + config._limitPolicy = opts.limit_policy + if opts.order: + config._order = opts.order + if opts.sequence: + config._msgSequence = True + if opts.ive: + config._ive = True + if opts.generate_queue_events: + config._eventGeneration = opts.generate_queue_events + if opts.force: + config._if_empty = False + config._if_unused = False + if opts.force_if_not_empty: + config._if_empty = False + if opts.force_if_not_used: + config._if_unused = False + + return args + + # # helpers for the arg parsing in bind(). return multiple values; "ok" # followed by the resultant args @@ -67,13 +241,13 @@ QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" # passed to the xml binding. # def snarf_xquery_args(): - if not _file: + if not config._file: print "Invalid args to bind xml: need an input file or stdin" return [False] - if _file == "-": + if config._file == "-": res = sys.stdin.read() else: - f = open(_file) # let this signal if it can't find it + f = open(config._file) # let this signal if it can't find it res = f.read() f.close() return [True, res] @@ -106,7 +280,7 @@ class BrokerManager: def SetBroker (self, brokerUrl): self.url = brokerUrl self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) + self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout) agents = self.qmf.getAgents() for a in agents: if a.getAgentBank() == '0': @@ -131,12 +305,12 @@ class BrokerManager: print print " Total Queues: %d" % len (queues) - _durable = 0 + durable = 0 for queue in queues: if queue.durable: - _durable = _durable + 1 - print " durable: %d" % _durable - print " non-durable: %d" % (len (queues) - _durable) + durable = durable + 1 + print " durable: %d" % durable + print " non-durable: %d" % (len (queues) - durable) def ExchangeList (self, filter): exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) @@ -232,77 +406,77 @@ class BrokerManager: def AddExchange (self, args): if len (args) < 2: - Usage(parser) + Usage() etype = args[0] ename = args[1] declArgs = {} - if _msgSequence: + if config._msgSequence: declArgs[MSG_SEQUENCE] = 1 - if _ive: + if config._ive: declArgs[IVE] = 1 - if _altern_ex != None: - self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) + if config._altern_ex != None: + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) else: - self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs) + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=config._passive, durable=config._durable, arguments=declArgs) def DelExchange (self, args): if len (args) < 1: - Usage(parser) + Usage() ename = args[0] self.broker.getAmqpSession().exchange_delete (exchange=ename) def AddQueue (self, args): if len (args) < 1: - Usage(parser) + Usage() qname = args[0] declArgs = {} - if _durable: - declArgs[FILECOUNT] = _fileCount - declArgs[FILESIZE] = _fileSize - - if _maxQueueSize: - declArgs[MAX_QUEUE_SIZE] = _maxQueueSize - if _maxQueueCount: - declArgs[MAX_QUEUE_COUNT] = _maxQueueCount - if _limitPolicy: - if _limitPolicy == "none": + if config._durable: + declArgs[FILECOUNT] = config._fileCount + declArgs[FILESIZE] = config._fileSize + + if config._maxQueueSize: + declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize + if config._maxQueueCount: + declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount + if config._limitPolicy: + if config._limitPolicy == "none": pass - elif _limitPolicy == "reject": + elif config._limitPolicy == "reject": declArgs[POLICY_TYPE] = "reject" - elif _limitPolicy == "flow-to-disk": + elif config._limitPolicy == "flow-to-disk": declArgs[POLICY_TYPE] = "flow_to_disk" - elif _limitPolicy == "ring": + elif config._limitPolicy == "ring": declArgs[POLICY_TYPE] = "ring" - elif _limitPolicy == "ring-strict": + elif config._limitPolicy == "ring-strict": declArgs[POLICY_TYPE] = "ring_strict" - if _clusterDurable: + if config._clusterDurable: declArgs[CLUSTER_DURABLE] = 1 - if _order: - if _order == "fifo": + if config._order: + if config._order == "fifo": pass - elif _order == "lvq": + elif config._order == "lvq": declArgs[LVQ] = 1 - elif _order == "lvq-no-browse": + elif config._order == "lvq-no-browse": declArgs[LVQNB] = 1 - if _eventGeneration: - declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration + if config._eventGeneration: + declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration - if _altern_ex != None: - self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) + if config._altern_ex != None: + self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) else: - self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs) + self.broker.getAmqpSession().queue_declare (queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs) def DelQueue (self, args): if len (args) < 1: - Usage(parser) + Usage() qname = args[0] - self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused) + self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=config._if_empty, if_unused=config._if_unused) def Bind (self, args): if len (args) < 2: - Usage(parser) + Usage() ename = args[0] qname = args[1] key = "" @@ -340,7 +514,7 @@ class BrokerManager: def Unbind (self, args): if len (args) < 2: - Usage(parser) + Usage() ename = args[0] qname = args[1] key = "" @@ -366,236 +540,79 @@ def YN (bool): return 'Y' return 'N' -class JHelpFormatter(IndentedHelpFormatter): - """Format usage and description without stripping newlines from usage strings - """ - - def format_usage(self, usage): - return usage +def main(argv=None): + args = OptionsAndArguments(argv) + bm = BrokerManager () - def format_description(self, description): - if description: - return description + "\n" + try: + bm.SetBroker(config._host) + if len(args) == 0: + bm.Overview () else: - return "" - -def Usage(parser): - parser.print_usage() - exit(-1) - -## -## Main Program -## - -usage = """ -Usage: qpid-config [OPTIONS] - qpid-config [OPTIONS] exchanges [filter-string] - qpid-config [OPTIONS] queues [filter-string] - qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions] - qpid-config [OPTIONS] del exchange <name> - qpid-config [OPTIONS] add queue <name> [AddQueueOptions] - qpid-config [OPTIONS] del queue <name> [DelQueueOptions] - qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key] - <for type xml> [-f -|filename] - <for type header> [all|any] k1=v1 [, k2=v2...] - qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]""" - -description = """ -ADDRESS syntax: - - [username/password@] hostname - ip-address [:<port>] - -Examples: - -$ qpid-config add queue q -$ qpid-config add exchange direct d localhost:5672 -$ qpid-config exchanges 10.1.1.7:10000 -$ qpid-config queues guest/guest@broker-host:10000 - -Add Exchange <type> values: - - direct Direct exchange for point-to-point communication - fanout Fanout exchange for broadcast communication - topic Topic exchange that routes messages using binding keys with wildcards - headers Headers exchange that matches header fields against the binding keys - xml XML Exchange - allows content filtering using an XQuery - - -Queue Limit Actions - - none (default) - Use broker's default policy - reject - Reject enqueued messages - flow-to-disk - Page messages to disk - ring - Replace oldest unacquired message with new - ring-strict - Replace oldest message, reject if oldest is acquired - -Queue Ordering Policies - - fifo (default) - First in, first out - lvq - Last Value Queue ordering, allows queue browsing - lvq-no-browse - Last Value Queue ordering, browsing clients may lose data""" - -parser = OptionParser(usage=usage, - description=description, - formatter=JHelpFormatter()) - -group1 = OptionGroup(parser, "General Options") -group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)") -group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list") -group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="ADDRESS", help="Maximum time to wait for broker connection (in seconds)") -parser.add_option_group(group1) - -group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues") -group2.add_option("--alternate-exchange", action="store", type="string", metavar="NAME", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.") -group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.") -group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.") -parser.add_option_group(group2) - -group3 = OptionGroup(parser, "Options for Adding Queues") -group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node") -group3.add_option("--file-count", action="store", type="int", default=8, metavar="N", help="Number of files in queue's persistence journal") -group3.add_option("--file-size", action="store", type="int", default=24, metavar="N", help="File size in pages (64Kib/page)") -group3.add_option("--max-queue-size", action="store", type="int", metavar="N", help="Number of files in queue's persistence journal") -group3.add_option("--max-queue-count", action="store", type="int", metavar="N", help="Number of files in queue's persistence journal") -group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="CHOICE", help="Action to take when queue limit is reached") -group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="CHOICE", help="Queue ordering policy") -group3.add_option("--generate-queue-events", action="store", type="int", metavar="N", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") -# no option for declaring an exclusive queue - which can only be used by the session that creates it. -parser.add_option_group(group3) - -group4 = OptionGroup(parser, "Options for Adding Exchanges") -group4.add_option("--sequence", action="store_true", help="Exchange will insert a 'qpid.msg_sequence' field in the message header") -group4.add_option("--ive", action="store_true", help="Exchange will behave as an 'initial-value-exchange', keeping a reference to the last message forwarded and enqueuing that message to newly bound queues.") -parser.add_option_group(group4) - -group5 = OptionGroup(parser, "Options for Deleting Queues") -group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty") -group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty") -group5.add_option("--force-if-not-used", action="store_true", help="Force delete of queue even if it's currently used") -parser.add_option_group(group5) - -group6 = OptionGroup(parser, "Options for Declaring Bindings") -group6.add_option("-f", "--file", action="store", type="string", metavar="FILE.xq", help="For XML Exchange bindings - specifies the name of a file containing an XQuery.") -parser.add_option_group(group6) - -opts, encArgs = parser.parse_args() - -try: - encoding = locale.getpreferredencoding() - args = [a.decode(encoding) for a in encArgs] -except: - args = encArgs - -if opts.bindings: - _recursive = True -if opts.broker_addr: - _host = opts.broker_addr -if opts.timeout: - _connTimeout = opts.timeout - if _connTimeout == 0: - _connTimeout = None -if opts.alternate_exchange: - _altern_ex = opts.alternate_exchange -if opts.passive: - _passive = True -if opts.durable: - _durable = True -if opts.cluster_durable: - _clusterDurable = True -if opts.file: - _file = opts.file -if opts.file_count: - _fileCount = opts.file_count -if opts.file_size: - _fileSize = opts.file_size -if opts.max_queue_size: - _maxQueueSize = opts.max_queue_size -if opts.max_queue_count: - _maxQueueCount = opts.max_queue_count -if opts.limit_policy: - _limitPolicy = opts.limit_policy -if opts.order: - _order = opts.order -if opts.sequence: - _msgSequence = True -if opts.ive: - _ive = True -if opts.generate_queue_events: - _eventGeneration = opts.generate_queue_events -if opts.force: - _if_empty = False - _if_unused = False -if opts.force_if_not_empty: - _if_empty = False -if opts.force_if_not_used: - _if_unused = False - -bm = BrokerManager () - -try: - bm.SetBroker(_host) - if len(args) == 0: - bm.Overview () - else: - cmd = args[0] - modifier = "" - if len(args) > 1: - modifier = args[1] - if cmd == "exchanges": - if _recursive: - bm.ExchangeListRecurse (modifier) - else: - bm.ExchangeList (modifier) - elif cmd == "queues": - if _recursive: - bm.QueueListRecurse (modifier) - else: - bm.QueueList (modifier) - elif cmd == "add": - if modifier == "exchange": - bm.AddExchange (args[2:]) - elif modifier == "queue": - bm.AddQueue (args[2:]) + cmd = args[0] + modifier = "" + if len(args) > 1: + modifier = args[1] + if cmd == "exchanges": + if config._recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd == "queues": + if config._recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (args[2:]) + elif modifier == "queue": + bm.AddQueue (args[2:]) + else: + Usage() + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (args[2:]) + elif modifier == "queue": + bm.DelQueue (args[2:]) + else: + Usage() + elif cmd == "bind": + bm.Bind (args[1:]) + elif cmd == "unbind": + bm.Unbind (args[1:]) else: - Usage(parser) - elif cmd == "del": - if modifier == "exchange": - bm.DelExchange (args[2:]) - elif modifier == "queue": - bm.DelQueue (args[2:]) - else: - Usage(parser) - elif cmd == "bind": - bm.Bind (args[1:]) - elif cmd == "unbind": - bm.Unbind (args[1:]) - else: - Usage(parser) -except KeyboardInterrupt: - print -except IOError, e: - print e - bm.Disconnect() - sys.exit(1) -except SystemExit, e: - bm.Disconnect() - sys.exit(1) -except Exception,e: - if e.__class__.__name__ != "Timeout": - # ignore Timeout exception, handle in the loop below - print "Failed: %s: %s" % (e.__class__.__name__, e) + Usage() + except KeyboardInterrupt: + print + except IOError, e: + print e bm.Disconnect() - sys.exit(1) - -while True: - # some commands take longer than the default amqp timeout to complete, - # so attempt to disconnect until successful, ignoring Timeouts - try: + return 1 + except SystemExit, e: bm.Disconnect() - break - except Exception, e: + return 1 + except Exception,e: if e.__class__.__name__ != "Timeout": + # ignore Timeout exception, handle in the loop below print "Failed: %s: %s" % (e.__class__.__name__, e) - sys.exit(1) + bm.Disconnect() + return 1 + + while True: + # some commands take longer than the default amqp timeout to complete, + # so attempt to disconnect until successful, ignoring Timeouts + try: + bm.Disconnect() + break + except Exception, e: + if e.__class__.__name__ != "Timeout": + print "Failed: %s: %s" % (e.__class__.__name__, e) + return 1 + + return 0 + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/tools/src/py/qpid-printevents b/tools/src/py/qpid-printevents index e14d85e12a..5da74ca9ef 100755 --- a/tools/src/py/qpid-printevents +++ b/tools/src/py/qpid-printevents @@ -20,7 +20,8 @@ # import os -import optparse +import optparse +from optparse import IndentedHelpFormatter import sys import socket from time import time, strftime, gmtime, sleep @@ -39,20 +40,43 @@ class EventConsole(Console): print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl() sys.stdout.flush() -## -## Main Program -## -def main(): - _usage = "%prog [options] [broker-addr]..." - _description = \ -"""Collect and print events from one or more Qpid message brokers. If no broker-addr is -supplied, %prog will connect to 'localhost:5672'. -broker-addr is of the form: [username/password@] hostname | ip-address [:<port>] -ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost +class JHelpFormatter(IndentedHelpFormatter): + """Format usage and description without stripping newlines from usage strings + """ + + def format_usage(self, usage): + return usage + + def format_description(self, description): + if description: + return description + "\n" + else: + return "" + +_usage = "%prog [options] [broker-addr]..." + +_description = \ +""" +Collect and print events from one or more Qpid message brokers. + +If no broker-addr is supplied, %prog connects to 'localhost:5672'. + +[broker-addr] syntax: + + [username/password@] hostname + ip-address [:<port>] + +Examples: + +$ %prog localhost:5672 +$ %prog 10.1.1.7:10000 +$ %prog guest/guest@broker-host:10000 """ - p = optparse.OptionParser(usage=_usage, description=_description) - options, arguments = p.parse_args() +def main(argv=None): + p = optparse.OptionParser(usage=_usage, description=_description, formatter=JHelpFormatter()) + + options, arguments = p.parse_args(args=argv) if len(arguments) == 0: arguments.append("localhost") @@ -74,5 +98,4 @@ ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost session.delBroker(b) if __name__ == '__main__': - main() - + sys.exit(main()) diff --git a/tools/src/py/qpid-queue-stats b/tools/src/py/qpid-queue-stats index 3b8a0dcb19..6c737a080e 100755 --- a/tools/src/py/qpid-queue-stats +++ b/tools/src/py/qpid-queue-stats @@ -124,12 +124,12 @@ class BrokerManager(Console): ## ## Main Program ## -def main(): +def main(argv=None): p = optparse.OptionParser() p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost') p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show') - options, arguments = p.parse_args() + options, arguments = p.parse_args(args=argv) host = options.broker_address filter = [] @@ -142,5 +142,5 @@ def main(): bm.Display() if __name__ == '__main__': - main() + sys.exit(main()) diff --git a/tools/src/py/qpid-route b/tools/src/py/qpid-route index db0fca679e..3674ed7913 100755 --- a/tools/src/py/qpid-route +++ b/tools/src/py/qpid-route @@ -26,14 +26,113 @@ import os import locale from qmf.console import Session, BrokerURL -_verbose = False -_quiet = False -_durable = False -_dellink = False -_srclocal = False -_transport = "tcp" -_ack = 0 -_connTimeout = 10 +class Config: + def __init__(self): + self._verbose = False + self._quiet = False + self._durable = False + self._dellink = False + self._srclocal = False + self._transport = "tcp" + self._ack = 0 + self._connTimeout = 10 + +config = Config() + +class JHelpFormatter(IndentedHelpFormatter): + """Format usage and description without stripping newlines from usage strings + """ + + def format_usage(self, usage): + return usage + + + def format_description(self, description): + if description: + return description + "\n" + else: + return "" + +def usage(parser): + parser.print_help() + exit(-1) + +usage = """ +Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] + qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange> + + qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism] + qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key> + qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> + qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue> + qpid-route [OPTIONS] route list [<dest-broker>] + qpid-route [OPTIONS] route flush [<dest-broker>] + qpid-route [OPTIONS] route map [<broker>] + + qpid-route [OPTIONS] link add <dest-broker> <src-broker> + qpid-route [OPTIONS] link del <dest-broker> <src-broker> + qpid-route [OPTIONS] link list [<dest-broker>]""" + +description = """ +ADDRESS syntax: + + [username/password@] hostname + ip-address [:<port>]""" + + +def OptionsAndArguments(argv): + + parser = OptionParser(usage=usage, + description=description, + formatter=JHelpFormatter()) + + parser.add_option("--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)") + parser.add_option("-v", "--verbose", action="store_true", help="Verbose output") + parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings") + parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable") + + parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link") + parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)") + + parser.add_option("--ack", action="store", type="int", metavar="N", help="Acknowledge transfers over the bridge in batches of N") + parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp") + + opts, encArgs = parser.parse_args(args=argv) + + try: + encoding = locale.getpreferredencoding() + args = [a.decode(encoding) for a in encArgs] + except: + args = encArgs + + if opts.timeout: + config._connTimeout = opts.timeout + if config._connTimeout == 0: + config._connTimeout = None + + if opts.verbose: + config._verbose = True + + if opts.quiet: + config._quiet = True + + if opts.durable: + config._durable = True + + if opts.del_empty_link: + config._dellink = True + + if opts.src_local: + config._srclocal = true + + if opts.transport: + config._transport = opts.transport + + if opts.ack: + config._ack = opts.ack + + return args + class RouteManager: def __init__(self, localBroker): @@ -41,7 +140,7 @@ class RouteManager: self.local = BrokerURL(localBroker) self.remote = None self.qmf = Session() - self.broker = self.qmf.addBroker(localBroker, _connTimeout) + self.broker = self.qmf.addBroker(localBroker, config._connTimeout) self.broker._waitForStable() self.agent = self.broker.getBrokerAgent() @@ -73,10 +172,10 @@ class RouteManager: broker = brokers[0] link = self.getLink() if link == None: - res = broker.connect(self.remote.host, self.remote.port, _durable, + res = broker.connect(self.remote.host, self.remote.port, config._durable, mech, self.remote.authName or "", self.remote.authPass or "", - _transport) - if _verbose: + config._transport) + if config._verbose: print "Connect method returned:", res.status, res.text def delLink(self, remoteBroker): @@ -88,7 +187,7 @@ class RouteManager: raise Exception("Link not found") res = link.close() - if _verbose: + if config._verbose: print "Close method returned:", res.status, res.text def listLinks(self): @@ -119,7 +218,7 @@ class RouteManager: if url.name() not in self.brokerList: print " %s..." % url.name(), try: - b = self.qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout) + b = self.qmf.addBroker("%s:%d" % (link.host, link.port), config._connTimeout) self.brokerList[url.name()] = b added = True print "Ok" @@ -194,7 +293,7 @@ class RouteManager: self.qmf.delBroker(b[1]) def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, mech="PLAIN", dynamic=False): - if dynamic and _srclocal: + if dynamic and config._srclocal: raise Exception("--src-local is not permitted on dynamic routes") self.addLink(remoteBroker, mech) @@ -206,16 +305,16 @@ class RouteManager: for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: - if not _quiet: + if not config._quiet: raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) sys.exit(0) - if _verbose: + if config._verbose: print "Creating inter-broker binding..." - res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack) + res = link.bridge(config._durable, exchange, exchange, routingKey, tag, excludes, False, config._srclocal, dynamic, config._ack) if res.status != 0: raise Exception(res.text) - if _verbose: + if config._verbose: print "Bridge method returned:", res.status, res.text def addQueueRoute(self, remoteBroker, exchange, queue): @@ -228,23 +327,23 @@ class RouteManager: for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: - if not _quiet: + if not config._quiet: raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) sys.exit(0) - if _verbose: + if config._verbose: print "Creating inter-broker binding..." - res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack) + res = link.bridge(config._durable, queue, exchange, "", "", "", True, config._srclocal, False, config._ack) if res.status != 0: raise Exception(res.text) - if _verbose: + if config._verbose: print "Bridge method returned:", res.status, res.text def delQueueRoute(self, remoteBroker, exchange, queue): self.remote = BrokerURL(remoteBroker) link = self.getLink() if link == None: - if not _quiet: + if not config._quiet: raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) sys.exit(0) @@ -252,29 +351,29 @@ class RouteManager: for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: - if _verbose: + if config._verbose: print "Closing bridge..." res = bridge.close() if res.status != 0: raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) - if len(bridges) == 1 and _dellink: + if len(bridges) == 1 and config._dellink: link = self.getLink() if link == None: sys.exit(0) - if _verbose: + if config._verbose: print "Last bridge on link, closing link..." res = link.close() if res.status != 0: raise Exception("Error closing link: %d - %s" % (res.status, res.text)) sys.exit(0) - if not _quiet: + if not config._quiet: raise Exception("Route not found") def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): self.remote = BrokerURL(remoteBroker) link = self.getLink() if link == None: - if not _quiet: + if not config._quiet: raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) sys.exit(0) @@ -282,22 +381,22 @@ class RouteManager: for bridge in bridges: if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ and bridge.dynamic == dynamic: - if _verbose: + if config._verbose: print "Closing bridge..." res = bridge.close() if res.status != 0: raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) - if len(bridges) == 1 and _dellink: + if len(bridges) == 1 and config._dellink: link = self.getLink() if link == None: sys.exit(0) - if _verbose: + if config._verbose: print "Last bridge on link, closing link..." res = link.close() if res.status != 0: raise Exception("Error closing link: %d - %s" % (res.status, res.text)) return - if not _quiet: + if not config._quiet: raise Exception("Route not found") def listRoutes(self): @@ -322,7 +421,7 @@ class RouteManager: bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: - if _verbose: + if config._verbose: myLink = None for link in links: if bridge.linkRef == link.getObjectId(): @@ -333,18 +432,18 @@ class RouteManager: res = bridge.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.text) - elif _verbose: + elif config._verbose: print "Ok" - if _dellink: + if config._dellink: links = self.qmf.getObjects(_class="link") for link in links: - if _verbose: + if config._verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), res = link.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.text) - elif _verbose: + elif config._verbose: print "Ok" class RoutePair: @@ -374,190 +473,104 @@ def YN(val): return 'Y' return 'N' -## -## Main Program -## - - -class JHelpFormatter(IndentedHelpFormatter): - """Format usage and description without stripping newlines from usage strings - """ - - def format_usage(self, usage): - return usage - - - def format_description(self, description): - if description: - return description + "\n" - else: - return "" - -def usage(parser): - parser.print_help() - exit(-1) - -usage = """ -Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] - qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange> - - qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism] - qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key> - qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue> - qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue> - qpid-route [OPTIONS] route list [<dest-broker>] - qpid-route [OPTIONS] route flush [<dest-broker>] - qpid-route [OPTIONS] route map [<broker>] - - qpid-route [OPTIONS] link add <dest-broker> <src-broker> - qpid-route [OPTIONS] link del <dest-broker> <src-broker> - qpid-route [OPTIONS] link list [<dest-broker>]""" - -description = """ -ADDRESS syntax: - - [username/password@] hostname - ip-address [:<port>]""" - -parser = OptionParser(usage=usage, - description=description, - formatter=JHelpFormatter()) - -parser.add_option("--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)") -parser.add_option("-v", "--verbose", action="store_true", help="Verbose output") -parser.add_option("-q", "--quiet", action="store_true", help="Quiet output, don't print duplicate warnings") -parser.add_option("-d", "--durable", action="store_true", help="Added configuration shall be durable") - -parser.add_option("-e", "--del-empty-link", action="store_true", help="Delete link after deleting last route on the link") -parser.add_option("-s", "--src-local", action="store_true", help="Make connection to source broker (push route)") - -parser.add_option("--ack", action="store", type="int", metavar="N", help="Acknowledge transfers over the bridge in batches of N") -parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp") - -opts, encArgs = parser.parse_args() - -try: - encoding = locale.getpreferredencoding() - args = [a.decode(encoding) for a in encArgs] -except: - args = encArgs - -if opts.timeout: - _connTimeout = opts.timeout - if _connTimeout == 0: - _connTimeout = None - -if opts.verbose: - _verbose = True - -if opts.quiet: - _quiet = True - -if opts.durable: - _durable = True - -if opts.del_empty_link: - _dellink = True -if opts.src_local: - _srclocal = true +def main(argv=None): -if opts.transport: - _transport = opts.transport + args = OptionsAndArguments(argv) + nargs = len(args) + if nargs < 2: + usage(parser) -if opts.ack: - _ack = opts.ack - -nargs = len(args) -if nargs < 2: - usage(parser) - -if nargs == 2: - localBroker = socket.gethostname() -else: - if _srclocal: - localBroker = args[3] - remoteBroker = args[2] + if nargs == 2: + localBroker = socket.gethostname() else: - localBroker = args[2] - if nargs > 3: - remoteBroker = args[3] - -group = args[0] -cmd = args[1] - -rm = None -try: - rm = RouteManager(localBroker) - if group == "link": - if cmd == "add": - if nargs != 4: - usage(parser) - rm.addLink(remoteBroker) - elif cmd == "del": - if nargs != 4: - usage(parser) - rm.delLink(remoteBroker) - elif cmd == "list": - rm.listLinks() - - elif group == "dynamic": - if cmd == "add": - if nargs < 5 or nargs > 7: - usage(parser) - - tag = "" - excludes = "" - mech = "PLAIN" - if nargs > 5: tag = args[5] - if nargs > 6: excludes = args[6] - rm.addRoute(remoteBroker, args[4], "", tag, excludes, mech, dynamic=True) - elif cmd == "del": - if nargs != 5: - usage(parser) + if config._srclocal: + localBroker = args[3] + remoteBroker = args[2] + else: + localBroker = args[2] + if nargs > 3: + remoteBroker = args[3] + + group = args[0] + cmd = args[1] + + rm = None + try: + rm = RouteManager(localBroker) + if group == "link": + if cmd == "add": + if nargs != 4: + usage(parser) + rm.addLink(remoteBroker) + elif cmd == "del": + if nargs != 4: + usage(parser) + rm.delLink(remoteBroker) + elif cmd == "list": + rm.listLinks() + + elif group == "dynamic": + if cmd == "add": + if nargs < 5 or nargs > 7: + usage(parser) + + tag = "" + excludes = "" + mech = "PLAIN" + if nargs > 5: tag = args[5] + if nargs > 6: excludes = args[6] + rm.addRoute(remoteBroker, args[4], "", tag, excludes, mech, dynamic=True) + elif cmd == "del": + if nargs != 5: + usage(parser) + else: + rm.delRoute(remoteBroker, args[4], "", dynamic=True) + + elif group == "route": + if cmd == "add": + if nargs < 6 or nargs > 9: + usage(parser) + + tag = "" + excludes = "" + mech = "PLAIN" + if nargs > 6: tag = args[6] + if nargs > 7: excludes = args[7] + if nargs > 8: mech = args[8] + rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, mech, dynamic=False) + elif cmd == "del": + if nargs != 6: + usage(parser) + rm.delRoute(remoteBroker, args[4], args[5], dynamic=False) + elif cmd == "map": + rm.mapRoutes() else: - rm.delRoute(remoteBroker, args[4], "", dynamic=True) - - elif group == "route": - if cmd == "add": - if nargs < 6 or nargs > 9: - usage(parser) + if cmd == "list": + rm.listRoutes() + elif cmd == "flush": + rm.clearAllRoutes() + else: + usage(parser) - tag = "" - excludes = "" - mech = "PLAIN" - if nargs > 6: tag = args[6] - if nargs > 7: excludes = args[7] - if nargs > 8: mech = args[8] - rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, mech, dynamic=False) - elif cmd == "del": + elif group == "queue": if nargs != 6: usage(parser) - rm.delRoute(remoteBroker, args[4], args[5], dynamic=False) - elif cmd == "map": - rm.mapRoutes() - else: - if cmd == "list": - rm.listRoutes() - elif cmd == "flush": - rm.clearAllRoutes() + if cmd == "add": + rm.addQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) + elif cmd == "del": + rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) else: usage(parser) - elif group == "queue": - if nargs != 6: - usage(parser) - if cmd == "add": - rm.addQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) - elif cmd == "del": - rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5]) - else: - usage(parser) + except Exception,e: + if rm: + rm.disconnect() # try to release broker resources + print "Failed: %s - %s" % (e.__class__.__name__, e) + return 1 -except Exception,e: - if rm: - rm.disconnect() # try to release broker resources - print "Failed: %s - %s" % (e.__class__.__name__, e) - sys.exit(1) + rm.disconnect() + return 0 -rm.disconnect() +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat index 9229fbec29..150c918f6d 100755 --- a/tools/src/py/qpid-stat +++ b/tools/src/py/qpid-stat @@ -21,6 +21,7 @@ import os from optparse import OptionParser, OptionGroup +from time import sleep ### debug import sys import locale import socket @@ -28,13 +29,68 @@ import re from qmf.console import Session, Console from qpid.disp import Display, Header, Sorter -_host = "localhost" -_connTimeout = 10 -_types = "" -_limit = 50 -_increasing = False -_sortcol = None -_cluster_detail = False +class Config: + def __init__(self): + self._host = "localhost" + self._connTimeout = 10 + self._types = "" + self._limit = 50 + self._increasing = False + self._sortcol = None + self._cluster_detail = False + +config = Config() + +def OptionsAndArguments(argv): + """ Set global variables for options, return arguments """ + + global config + + parser = OptionParser(usage="usage: %prog [options] BROKER", + description="Example: $ qpid-stat -q broker-host:10000") + + group1 = OptionGroup(parser, "General Options") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)") + + parser.add_option_group(group1) + + group2 = OptionGroup(parser, "Display Options") + group2.add_option("-b", "--broker", help="Show Brokers", + action="store_const", const="b", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", + action="store_const", const="c", dest="show") + group2.add_option("-e", "--exchanges", help="Show Exchanges", + action="store_const", const="e", dest="show") + group2.add_option("-q", "--queues", help="Show Queues", + action="store_const", const="q", dest="show") + group2.add_option("-u", "--subscriptions", help="Show Subscriptions", + action="store_const", const="u", dest="show") + group2.add_option("-S", "--sort-by", metavar="COLNAME", + help="Sort by column name") + group2.add_option("-I", "--increasing", action="store_true", default=False, + help="Sort by increasing value (default = decreasing)") + group2.add_option("-L", "--limit", default=50, metavar="NUM", + help="Limit output to NUM rows") + group2.add_option("-C", "--cluster", action="store_true", default=False, + help="Display per-broker cluster detail.") + parser.add_option_group(group2) + + opts, args = parser.parse_args(args=argv) + + if not opts.show: + parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") + + config._types = opts.show + config._sortcol = opts.sort_by + config._connTimeout = opts.timeout + config._increasing = opts.increasing + config._limit = opts.limit + config._cluster_detail = opts.cluster + + if args: + config._host = args[0] + + return args class IpAddr: def __init__(self, text): @@ -125,7 +181,7 @@ class BrokerManager(Console): def SetBroker(self, brokerUrl): self.url = brokerUrl self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) + self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout) agents = self.qmf.getAgents() for a in agents: if a.getAgentBank() == '0': @@ -157,7 +213,7 @@ class BrokerManager(Console): def _getHostList(self, urlList): hosts = [] - hostAddr = IpAddr(_host) + hostAddr = IpAddr(config._host) for url in urlList: if url.find("amqp:") != 0: raise Exception("Invalid URL 1") @@ -223,8 +279,8 @@ class BrokerManager(Console): len(broker.exchanges), len(broker.queues)) rows.append(row) title = "Brokers" - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows @@ -263,8 +319,8 @@ class BrokerManager(Console): title = "Connections" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows @@ -309,8 +365,8 @@ class BrokerManager(Console): title = "Exchanges" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows @@ -356,8 +412,8 @@ class BrokerManager(Console): title = "Queues" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows @@ -403,8 +459,8 @@ class BrokerManager(Console): title = "Subscriptions" if self.cluster: title += " for cluster '%s'" % self.cluster.clusterName - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) + if config._sortcol: + sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() else: dispRows = rows @@ -419,7 +475,7 @@ class BrokerManager(Console): elif main == 'u': self.displaySubscriptions(subs) def display(self): - if _cluster_detail or _types[0] == 'b': + if config._cluster_detail or config._types[0] == 'b': # always show cluster detail when dumping broker stats self._getCluster() if self.cluster: @@ -427,77 +483,36 @@ class BrokerManager(Console): hostList = self._getHostList(memberList) self.qmf.delBroker(self.broker) self.broker = None - if _host.find("@") > 0: - authString = _host.split("@")[0] + "@" + if config._host.find("@") > 0: + authString = config._host.split("@")[0] + "@" else: authString = "" for host in hostList: - b = self.qmf.addBroker(authString + host, _connTimeout) + b = self.qmf.addBroker(authString + host, config._connTimeout) self.brokers.append(Broker(self.qmf, b)) else: self.brokers.append(Broker(self.qmf, self.broker)) - self.displayMain(_types[0], _types[1:]) - - -## -## Main Program -## - -parser = OptionParser(usage="usage: %prog [options] BROKER", - description="Example: $ qpid-stat -q broker-host:10000") - -group1 = OptionGroup(parser, "General Options") -group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="SECS", help="Maximum time to wait for broker connection (in seconds)") -parser.add_option_group(group1) - -group2 = OptionGroup(parser, "Display Options") -group2.add_option("-b", "--broker", help="Show Brokers", - action="store_const", const="b", dest="show") -group2.add_option("-c", "--connections", help="Show Connections", - action="store_const", const="c", dest="show") -group2.add_option("-e", "--exchanges", help="Show Exchanges", - action="store_const", const="e", dest="show") -group2.add_option("-q", "--queues", help="Show Queues", - action="store_const", const="q", dest="show") -group2.add_option("-u", "--subscriptions", help="Show Subscriptions", - action="store_const", const="u", dest="show") -group2.add_option("-S", "--sort-by", metavar="COLNAME", - help="Sort by column name") -group2.add_option("-I", "--increasing", action="store_true", default=False, - help="Sort by increasing value (default = decreasing)") -group2.add_option("-L", "--limit", default=50, metavar="NUM", - help="Limit output to NUM rows") -group2.add_option("-C", "--cluster", action="store_true", default=False, - help="Display per-broker cluster detail.") -parser.add_option_group(group2) - -opts, args = parser.parse_args() - -if not opts.show: - parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") - -_types = opts.show -_sortcol = opts.sort_by -_connTimeout = opts.timeout -_increasing = opts.increasing -_limit = opts.limit -_cluster_detail = opts.cluster - -if args: - _host = args[0] - -bm = BrokerManager() - -try: - bm.SetBroker(_host) - bm.display() -except KeyboardInterrupt: - print -except Exception,e: - print "Failed: %s - %s" % (e.__class__.__name__, e) + self.displayMain(config._types[0], config._types[1:]) + + +def main(argv=None): + + args = OptionsAndArguments(argv) + bm = BrokerManager() + + try: + bm.SetBroker(config._host) + bm.display() + bm.Disconnect() + return 0 + except KeyboardInterrupt: + print + except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + bm.Disconnect() # try to deallocate brokers - raise # FIXME aconway 2010-03-03: - sys.exit(1) + return 1 -bm.Disconnect() +if __name__ == "__main__": + sys.exit(main()) |