diff options
| author | Ted Ross <tross@apache.org> | 2012-02-22 16:41:55 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2012-02-22 16:41:55 +0000 |
| commit | 3314a5cb4d14e94ed8fa29a1ba6348d10d27fdcf (patch) | |
| tree | 52ddc4ee1c0424de91ded89b1cc27884cfa58134 /qpid/tools | |
| parent | 99104dde3f0718b4f9a3aff3427bab6a818fb41b (diff) | |
| download | qpid-python-3314a5cb4d14e94ed8fa29a1ba6348d10d27fdcf.tar.gz | |
QPID-3851 - Unified common CLI options for qpid-config and qpid-stat.
Also in this commit: qpid-config was converted to use the messaging-based qmf2
library. It no longer has a dependency on the qmf library. The CLI tests were also
ported to the faster library.
CLI test time prior to this commit: 2 minutes 12 seconds
CLI test time after this commit: 12.5 seconds
Other items in qpid-config and qpid-stat:
- The deprecated LVQ options (lqv, lqv-no-browse) were removed from qpid-config.
- A new option, --lvq-key, was added to qpid-config to support the new LVQ configuration.
The docs and tests were updated to match.
- qpid-stat was updated so that 'qpid-stat -q <queue-name>' prints full details from the
specified queue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1292388 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/tools')
| -rwxr-xr-x | qpid/tools/src/py/qpid-config | 147 | ||||
| -rwxr-xr-x | qpid/tools/src/py/qpid-stat | 134 | ||||
| -rw-r--r-- | qpid/tools/src/py/qpidtoollibs/broker.py | 83 | ||||
| -rw-r--r-- | qpid/tools/src/py/qpidtoollibs/disp.py | 5 |
4 files changed, 230 insertions, 139 deletions
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index 0110c60aa2..9433029590 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -18,12 +18,18 @@ # specific language governing permissions and limitations # under the License. # +import pdb import os from optparse import OptionParser, OptionGroup, IndentedHelpFormatter import sys import locale -from qmf.console import Session + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpid.messaging import Connection +from qpidtoollibs.broker import BrokerAgent usage = """ Usage: qpid-config [OPTIONS] @@ -43,8 +49,8 @@ Examples: $ qpid-config add queue q $ qpid-config add exchange direct d -a localhost:5672 -$ qpid-config exchanges -a 10.1.1.7:10000 -$ qpid-config queues -a guest/guest@broker-host:10000 +$ qpid-config exchanges -b 10.1.1.7:10000 +$ qpid-config queues -b guest/guest@broker-host:10000 Add Exchange <type> values: @@ -61,13 +67,7 @@ Queue Limit Actions reject - Reject enqueued messages flow-to-disk - Page messages to disk ring - Replace oldest unacquired message with new - ring-strict - Replace oldest message, reject if oldest is acquired - -Queue Ordering Policies - - fifo (default) - First in, first out - lvq - Last Value Queue ordering, allows queue browsing - lvq-no-browse - Last Value Queue ordering, browsing clients may lose data""" + ring-strict - Replace oldest message, reject if oldest is acquired""" class Config: @@ -77,7 +77,6 @@ class Config: self._connTimeout = 10 self._ignoreDefault = False self._altern_ex = None - self._passive = False self._durable = False self._clusterDurable = False self._if_empty = True @@ -87,8 +86,8 @@ class Config: self._maxQueueSize = None self._maxQueueCount = None self._limitPolicy = None - self._order = None self._msgSequence = False + self._lvq_key = None self._ive = False self._eventGeneration = None self._file = None @@ -110,8 +109,7 @@ MAX_QUEUE_SIZE = "qpid.max_size" MAX_QUEUE_COUNT = "qpid.max_count" POLICY_TYPE = "qpid.policy_type" CLUSTER_DURABLE = "qpid.persist_last_node" -LVQ = "qpid.last_value_queue" -LVQNB = "qpid.last_value_queue_no_browse" +LVQ_KEY = "qpid.last_value_queue_key" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" @@ -127,7 +125,7 @@ SHARED_MSG_GROUP = "qpid.shared_msg_group" #arguments for which there are specific program options defined #i.e. the arguments for which there is special processing on add and #list -SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, +SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE, MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP] class JHelpFormatter(IndentedHelpFormatter): @@ -160,8 +158,8 @@ def OptionsAndArguments(argv): group1 = OptionGroup(parser, "General Options") group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list") - group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") + group1.add_option("-r", "--recursive", action="store_true", help="Show bindings in queue or exchange list") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]") group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) @@ -171,7 +169,6 @@ def OptionsAndArguments(argv): group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues") group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.") - group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.") group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.") parser.add_option_group(group2) @@ -182,7 +179,7 @@ def OptionsAndArguments(argv): group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes") group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages") group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached") - group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy") + group3.add_option("--lvq-key", action="store", metavar="<key>", help="Last Value Queue key") group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>", help="Turn on sender flow control when the number of queued bytes exceeds this value.") @@ -224,10 +221,10 @@ def OptionsAndArguments(argv): except: args = encArgs - if opts.bindings: + if opts.recursive: config._recursive = True - if opts.broker_addr: - config._host = opts.broker_addr + if opts.broker: + config._host = opts.broker if opts.timeout is not None: config._connTimeout = opts.timeout if config._connTimeout == 0: @@ -236,8 +233,6 @@ def OptionsAndArguments(argv): config._ignoreDefault = True if opts.alternate_exchange: config._altern_ex = opts.alternate_exchange - if opts.passive: - config._passive = True if opts.durable: config._durable = True if opts.cluster_durable: @@ -254,10 +249,10 @@ def OptionsAndArguments(argv): config._maxQueueCount = opts.max_queue_count if opts.limit_policy: config._limitPolicy = opts.limit_policy - if opts.order: - config._order = opts.order if opts.sequence: config._msgSequence = True + if opts.lvq_key: + config._lvq_key = opts.lvq_key if opts.ive: config._ive = True if opts.generate_queue_events: @@ -331,27 +326,23 @@ def snarf_header_args(args): class BrokerManager: def __init__(self): self.brokerName = None - self.qmf = None + self.conn = None self.broker = None - self.mechanism = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a + self.conn = Connection(self.url, sasl_mechanisms=mechanism) + self.conn.open() + self.broker = BrokerAgent(self.conn) def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) + if self.conn: + self.conn.close() def Overview(self): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - print "Total Exchanges: %d" % len (exchanges) + exchanges = self.broker.getAllExchanges() + queues = self.broker.getAllQueues() + print "Total Exchanges: %d" % len(exchanges) etype = {} for ex in exchanges: if ex.type not in etype: @@ -362,16 +353,16 @@ class BrokerManager: print "%15s: %d" % (typ, etype[typ]) print - print " Total Queues: %d" % len (queues) + print " Total Queues: %d" % len(queues) durable = 0 for queue in queues: if queue.durable: durable = durable + 1 print " durable: %d" % durable - print " non-durable: %d" % (len (queues) - durable) + print " non-durable: %d" % (len(queues) - durable) def ExchangeList(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() caption1 = "Type " caption2 = "Exchange Name" maxNameLen = len(caption2) @@ -401,19 +392,19 @@ class BrokerManager: if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", if IVE in args and args[IVE] == 1: print "--ive", if ex.altExchange: - print "--alternate-exchange=%s" % ex._altExchange_.name, + print "--alternate-exchange=%s" % ex.altExchange, print def ExchangeListRecurse(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() + bindings = self.broker.getAllBindings() + queues = self.broker.getAllQueues() for ex in exchanges: if config._ignoreDefault and not ex.name: continue if self.match(ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.getObjectId(): + if bind.exchangeRef == ex.name: qname = "<unknown>" queue = self.findById(queues, bind.queueRef) if queue != None: @@ -425,7 +416,7 @@ class BrokerManager: def QueueList(self, filter): - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + queues = self.broker.getAllQueues() caption = "Queue Name" maxNameLen = len(caption) found = False @@ -458,8 +449,7 @@ class BrokerManager: if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE], if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT], if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), - if LVQ in args and args[LVQ] == 1: print "--order lvq", - if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", + if LVQ_KEY in args: print "--lvq-key=%s" % args[LVQ_KEY], if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION], if q.altExchange: print "--alternate-exchange=%s" % q._altExchange_.name, @@ -472,14 +462,14 @@ class BrokerManager: print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS]) def QueueListRecurse(self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + exchanges = self.broker.getAllExchanges() + bindings = self.broker.getAllBindings() + queues = self.broker.getAllQueues() for queue in queues: if self.match(queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.getObjectId(): + if bind.queueRef == queue.name: ename = "<unknown>" ex = self.findById(exchanges, bind.exchangeRef) if ex != None: @@ -508,16 +498,19 @@ class BrokerManager: declArgs[MSG_SEQUENCE] = 1 if config._ive: declArgs[IVE] = 1 - if config._altern_ex != None: - self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) - else: - self.broker.getAmqpSession().exchange_declare(exchange=ename, type=etype, passive=config._passive, durable=config._durable, arguments=declArgs) + if config._altern_ex: + declArgs['alternate-exchange'] = config._altern_ex + if config._durable: + declArgs['durable'] = 1 + self.broker.addExchange(etype, ename, declArgs) + def DelExchange(self, args): if len(args) < 1: Usage() ename = args[0] - self.broker.getAmqpSession().exchange_delete(exchange=ename) + self.broker.delExchange(ename) + def AddQueue(self, args): if len(args) < 1: @@ -550,15 +543,10 @@ class BrokerManager: elif config._limitPolicy == "ring-strict": declArgs[POLICY_TYPE] = "ring_strict" - if config._clusterDurable: + if config._clusterDurable: declArgs[CLUSTER_DURABLE] = 1 - if config._order: - if config._order == "fifo": - pass - elif config._order == "lvq": - declArgs[LVQ] = 1 - elif config._order == "lvq-no-browse": - declArgs[LVQNB] = 1 + if config._lvq_key: + declArgs[LVQ_KEY] = config._lvq_key if config._eventGeneration: declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration @@ -576,17 +564,19 @@ class BrokerManager: if config._sharedMsgGroup: declArgs[SHARED_MSG_GROUP] = 1 - if config._altern_ex != None: - self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) - else: - self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs) + if config._altern_ex: + declArgs['alternate-exchange'] = config._altern_ex + if config._durable: + declArgs['durable'] = 1 + + self.broker.addQueue(qname, declArgs) def DelQueue(self, args): if len(args) < 1: Usage() qname = args[0] - self.broker.getAmqpSession().queue_delete(queue=qname, if_empty=config._if_empty, if_unused=config._if_unused) + self.broker.delQueue(qname) def Bind(self, args): @@ -599,7 +589,7 @@ class BrokerManager: key = args[2] # query the exchange to determine its type. - res = self.broker.getAmqpSession().exchange_query(ename) + res = self.broker.getExchange(ename) # type of the xchg determines the processing of the rest of # argv. if it's an xml xchg, we want to find a file @@ -608,7 +598,7 @@ class BrokerManager: # map containing key/value pairs. if neither of those, extra # args are ignored. ok = True - _args = None + _args = {} if res.type == "xml": # this checks/imports the -f arg [ok, xquery] = snarf_xquery_args() @@ -622,10 +612,7 @@ class BrokerManager: if not ok: sys.exit(1) - self.broker.getAmqpSession().exchange_bind(queue=qname, - exchange=ename, - binding_key=key, - arguments=_args) + self.broker.bind(ename, qname, key, _args) def Unbind(self, args): if len(args) < 2: @@ -635,11 +622,11 @@ class BrokerManager: key = "" if len(args) > 2: key = args[2] - self.broker.getAmqpSession().exchange_unbind(queue=qname, exchange=ename, binding_key=key) + self.broker.unbind(ename, qname, key) def findById(self, items, id): for item in items: - if item.getObjectId() == id: + if item.name == id: return item return None diff --git a/qpid/tools/src/py/qpid-stat b/qpid/tools/src/py/qpid-stat index bb094554e6..c9c4da2aeb 100755 --- a/qpid/tools/src/py/qpid-stat +++ b/qpid/tools/src/py/qpid-stat @@ -42,7 +42,6 @@ class Config: self._limit = 50 self._increasing = False self._sortcol = None - self._details = None self._sasl_mechanism = None config = Config() @@ -52,16 +51,19 @@ def OptionsAndArguments(argv): global config - parser = OptionParser(usage="usage: %prog [options] BROKER", - description="Example: $ qpid-stat -q broker-host:10000") + parser = OptionParser(usage="usage: %prog [options] -[gcequm] [object-name]") group1 = OptionGroup(parser, "General Options") - group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)") - group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>", + help="URL of the broker to query") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", + help="Maximum time to wait for broker connection (in seconds)") + group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", + help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") - group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show") + group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") @@ -70,24 +72,21 @@ def OptionsAndArguments(argv): group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") - group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.") + parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) if not opts.show: - parser.error("You must specify one of these options: -b, -c, -e, -q. or -u. For details, try $ qpid-stat --help") + parser.error("You must specify one of these options: -g, -c, -e, -q, -m, or -u. For details, try $ qpid-stat --help") config._types = opts.show config._sortcol = opts.sort_by + config._host = opts.broker config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit config._sasl_mechanism = opts.sasl_mechanism - config._detail = opts.detail - - if args: - config._host = args[0] return args @@ -118,24 +117,23 @@ class IpAddr: class BrokerManager: def __init__(self): - self.brokerName = None - self.connections = [] - self.brokers = [] - self.cluster = None + self.brokerName = None + self.connection = None + self.broker = None + self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.connections.append(Connection(self.url, sasl_mechanism=mechanism)) - self.connections[0].open() - self.brokers.append(BrokerAgent(self.connections[0])) + self.connection = Connection(self.url, sasl_mechanisms=mechanism) + self.connection.open() + self.broker = BrokerAgent(self.connection) def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: - for conn in self.connections: - conn.close() + connection.close() except: pass @@ -184,7 +182,7 @@ class BrokerManager: heads.append(Header('exchanges', Header.COMMAS)) heads.append(Header('queues', Header.COMMAS)) rows = [] - broker = self.brokers[0].getBroker() + broker = self.broker.getBroker() connections = self.getConnectionMap() sessions = self.getSessionMap() exchanges = self.getExchangeMap() @@ -241,8 +239,8 @@ class BrokerManager: heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] - connections = self.brokers[0].getAllConnections() - broker = self.brokers[0].getBroker() + connections = self.broker.getAllConnections() + broker = self.broker.getBroker() for conn in connections: row = [] row.append(conn.address) @@ -279,7 +277,7 @@ class BrokerManager: heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] - exchanges = self.brokers[0].getAllExchanges() + exchanges = self.broker.getAllExchanges() for ex in exchanges: row = [] row.append(ex.name) @@ -317,7 +315,7 @@ class BrokerManager: heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] - queues = self.brokers[0].getAllQueues() + queues = self.broker.getAllQueues() for q in queues: row = [] row.append(q.name) @@ -341,9 +339,65 @@ class BrokerManager: dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueue(self, subs): + + def displayQueue(self, subs, name): + queue = self.broker.getQueue(name) + if not queue: + print "Queue '%s' not found" % name + return + disp = Display(prefix=" ") heads = [] + heads.append(Header('Name')) + heads.append(Header('Durable', Header.YN)) + heads.append(Header('AutoDelete', Header.YN)) + heads.append(Header('Exclusive', Header.YN)) + heads.append(Header('FlowStopped', Header.YN)) + heads.append(Header('FlowStoppedCount', Header.COMMAS)) + heads.append(Header('Consumers', Header.COMMAS)) + heads.append(Header('Bindings', Header.COMMAS)) + rows = [] + rows.append([queue.name, queue.durable, queue.autoDelete, queue.exclusive, + queue.flowStopped, queue.flowStoppedCount, + queue.consumerCount, queue.bindingCount]) + disp.formattedTable("Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Property')) + heads.append(Header('Value')) + rows = [] + rows.append(['arguments', queue.arguments]) + rows.append(['alt-exchange', queue.altExchange]) + disp.formattedTable("Optional Properties:", heads, rows) + print + + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', queue.msgDepth, queue.byteDepth]) + rows.append(['total-enqueues', queue.msgTotalEnqueues, queue.byteTotalEnqueues]) + rows.append(['total-dequeues', queue.msgTotalDequeues, queue.byteTotalDequeues]) + rows.append(['persistent-enqueues', queue.msgPersistEnqueues, queue.bytePersistEnqueues]) + rows.append(['persistent-dequeues', queue.msgPersistDequeues, queue.bytePersistDequeues]) + rows.append(['transactional-enqueues', queue.msgTxnEnqueues, queue.byteTxnEnqueues]) + rows.append(['transactional-dequeues', queue.msgTxnDequeues, queue.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', queue.msgFtdDepth, queue.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', queue.msgFtdEnqueues, queue.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', queue.msgFtdDequeues, queue.byteFtdDequeues]) + rows.append(['acquires', queue.acquires, None]) + rows.append(['releases', queue.releases, None]) + rows.append(['discards-ttl-expired', queue.discardsTtl, None]) + rows.append(['discards-limit-overflow', queue.discardsOverflow, None]) + rows.append(['discards-ring-overflow', queue.discardsRing, None]) + rows.append(['discards-lvq-replace', queue.discardsLvq, None]) + rows.append(['discards-subscriber-reject', queue.discardsSubscriber, None]) + rows.append(['discards-purged', queue.discardsPurge, None]) + rows.append(['reroutes', queue.reroutes, None]) + disp.formattedTable("Statistics:", heads, rows) + def displaySubscriptions(self, subs): disp = Display(prefix=" ") @@ -359,7 +413,7 @@ class BrokerManager: heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] - subscriptions = self.brokers[0].getAllSubscriptions() + subscriptions = self.broker.getAllSubscriptions() sessions = self.getSessionMap() connections = self.getConnectionMap() for s in subscriptions: @@ -392,55 +446,55 @@ class BrokerManager: disp = Display(prefix=" ") heads = [Header('Statistic'), Header('Value', Header.COMMAS)] rows = [] - memory = self.brokers[0].getMemory() + memory = self.broker.getMemory() for k,v in memory.values.items(): if k != 'name': rows.append([k, v]) disp.formattedTable('Broker Memory Statistics:', heads, rows) def getExchangeMap(self): - exchanges = self.brokers[0].getAllExchanges() + exchanges = self.broker.getAllExchanges() emap = {} for e in exchanges: emap[e.name] = e return emap def getQueueMap(self): - queues = self.brokers[0].getAllQueues() + queues = self.broker.getAllQueues() qmap = {} for q in queues: qmap[q.name] = q return qmap def getSessionMap(self): - sessions = self.brokers[0].getAllSessions() + sessions = self.broker.getAllSessions() smap = {} for s in sessions: smap[s.name] = s return smap def getConnectionMap(self): - connections = self.brokers[0].getAllConnections() + connections = self.broker.getAllConnections() cmap = {} for c in connections: cmap[c.address] = c return cmap - def displayMain(self, main, subs): - if main == 'b': self.displayBroker(subs) + def displayMain(self, names, main, subs): + if main == 'g': self.displayBroker(subs) elif main == 'c': self.displayConn(subs) elif main == 's': self.displaySession(subs) elif main == 'e': self.displayExchange(subs) elif main == 'q': - if config._detail: - self.displayQueue(subs, config._detail) + if len(names) >= 1: + self.displayQueue(subs, names[0]) else: self.displayQueues(subs) elif main == 'u': self.displaySubscriptions(subs) elif main == 'm': self.displayMemory(subs) - def display(self): - self.displayMain(config._types[0], config._types[1:]) + def display(self, names): + self.displayMain(names, config._types[0], config._types[1:]) def main(argv=None): @@ -450,7 +504,7 @@ def main(argv=None): try: bm.SetBroker(config._host, config._sasl_mechanism) - bm.display() + bm.display(args) bm.Disconnect() return 0 except KeyboardInterrupt: diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py index 6a380caf8d..b3616f0b3a 100644 --- a/qpid/tools/src/py/qpidtoollibs/broker.py +++ b/qpid/tools/src/py/qpidtoollibs/broker.py @@ -24,6 +24,9 @@ except ImportError: from qpid.datatypes import uuid4 class BrokerAgent(object): + """ + Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection. + """ def __init__(self, conn): self.conn = conn self.sess = self.conn.session() @@ -35,6 +38,9 @@ class BrokerAgent(object): self.next_correlator = 1 def close(self): + """ + Close the proxy session. This will not affect the connection used in creating the object. + """ self.sess.close() def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): @@ -124,9 +130,15 @@ class BrokerAgent(object): return None def getCluster(self): + """ + Get the broker's Cluster object. + """ return self._getAllBrokerObjects(Cluster) def getBroker(self): + """ + Get the Broker object that contains broker-scope statistics and operations. + """ # # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because # of a bug that used to be in the broker whereby by-name queries did not return the @@ -173,8 +185,8 @@ class BrokerAgent(object): def getAllBindings(self): return self._getAllBrokerObjects(Binding) - def getBinding(self, exchange=None, queue=None): - pass + def getAllLinks(self): + return self._getAllBrokerObjects(Link) def echo(self, sequence, body): """Request a response to test the path to the management broker""" @@ -204,23 +216,52 @@ class BrokerAgent(object): """Get the message timestamping configuration""" pass -# def addExchange(self, exchange_type, name, **kwargs): -# pass - -# def delExchange(self, name): -# pass - -# def addQueue(self, name, **kwargs): -# pass - -# def delQueue(self, name): -# pass - -# def bind(self, exchange, queue, key, **kwargs): -# pass - -# def unbind(self, exchange, queue, key, **kwargs): -# pass + def addExchange(self, exchange_type, name, options={}, **kwargs): + properties = {} + properties['exchange-type'] = exchange_type + for k,v in options.items(): + properties[k] = v + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'exchange', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delExchange(self, name): + args = {'type': 'exchange', 'name': name} + self._method('delete', args) + + def addQueue(self, name, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'queue', + 'name': name, + 'properties': properties, + 'strict': True} + self._method('create', args) + + def delQueue(self, name): + args = {'type': 'queue', 'name': name} + self._method('delete', args) + + def bind(self, exchange, queue, key, options={}, **kwargs): + properties = options + for k,v in kwargs.items(): + properties[k] = v + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'properties': properties, + 'strict': True} + self._method('create', args) + + def unbind(self, exchange, queue, key, **kwargs): + args = {'type': 'binding', + 'name': "%s/%s/%s" % (exchange, queue, key), + 'strict': True} + self._method('delete', args) def create(self, _type, name, properties, strict): """Create an object of the specified type""" @@ -328,3 +369,7 @@ class Queue(BrokerObject): self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, "org.apache.qpid.broker:queue:%s" % self.name) +class Link(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + diff --git a/qpid/tools/src/py/qpidtoollibs/disp.py b/qpid/tools/src/py/qpidtoollibs/disp.py index cb7d3da306..7962a13329 100644 --- a/qpid/tools/src/py/qpidtoollibs/disp.py +++ b/qpid/tools/src/py/qpidtoollibs/disp.py @@ -206,6 +206,11 @@ class Display: result += "%ds" % (sec % 60) return result + def YN(self, val): + if val: + return 'Y' + return 'N' + class Sortable: """ """ def __init__(self, row, sortIndex): |
