diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
commit | 7e34266b9a23f4536415bfbc3f161b84615b6550 (patch) | |
tree | 484008cf2d413f58b5e4ab80b373303c66200888 /RC9/qpid/python/commands | |
parent | 4612263ea692f00a4bd810438bdaf9bc88022091 (diff) | |
download | qpid-python-M4.tar.gz |
Tag M4 RC9M4
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@734202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC9/qpid/python/commands')
-rwxr-xr-x | RC9/qpid/python/commands/qpid-config | 392 | ||||
-rwxr-xr-x | RC9/qpid/python/commands/qpid-printevents | 74 | ||||
-rwxr-xr-x | RC9/qpid/python/commands/qpid-queue-stats | 144 | ||||
-rwxr-xr-x | RC9/qpid/python/commands/qpid-route | 514 | ||||
-rwxr-xr-x | RC9/qpid/python/commands/qpid-tool | 195 |
5 files changed, 1319 insertions, 0 deletions
diff --git a/RC9/qpid/python/commands/qpid-config b/RC9/qpid/python/commands/qpid-config new file mode 100755 index 0000000000..ff3c7db46e --- /dev/null +++ b/RC9/qpid/python/commands/qpid-config @@ -0,0 +1,392 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import locale +from qmf.console import Session + +_recursive = False +_host = "localhost" +_durable = False +_clusterDurable = False +_fileCount = 8 +_fileSize = 24 +_maxQueueSize = None +_maxQueueCount = None +_policyType = None +_lvq = False +_msgSequence = False +_ive = False + +FILECOUNT = "qpid.file_count" +FILESIZE = "qpid.file_size" +MAX_QUEUE_SIZE = "qpid.max_size" +MAX_QUEUE_COUNT = "qpid.max_count" +POLICY_TYPE = "qpid.policy_type" +CLUSTER_DURABLE = "qpid.persist_last_node" +LVQ = "qpid.last_value_queue" +MSG_SEQUENCE = "qpid.msg_sequence" +IVE = "qpid.ive" + +def Usage (): + print "Usage: qpid-config [OPTIONS]" + print " qpid-config [OPTIONS] exchanges [filter-string]" + print " qpid-config [OPTIONS] queues [filter-string]" + print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]" + print " qpid-config [OPTIONS] del exchange <name>" + print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]" + print " qpid-config [OPTIONS] del queue <name>" + print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]" + print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]" + print + print "Options:" + print " -b [ --bindings ] Show bindings in queue or exchange list" + print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" + print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + print "Add Queue Options:" + print " --durable Queue is durable" + print " --cluster-durable Queue becomes durable if there is only one functioning cluster node" + print " --file-count N (8) Number of files in queue's persistence journal" + print " --file-size N (24) File size in pages (64Kib/page)" + print " --max-queue-size N Maximum in-memory queue size as bytes" + print " --max-queue-count N Maximum in-memory queue size as a number of messages" + print " --policy-type TYPE Action taken when queue limit is reached (reject, flow_to_disk, ring, ring_strict)" + print " --last-value-queue Enable LVQ behavior on the queue" + print + print "Add Exchange Options:" + print " --durable Exchange is durable" + print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header" + print " with a value that increments for each message forwarded." + print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference" + print " to the last message forwarded and enqueuing that message to newly bound" + print " queues." + print + sys.exit (1) + +class BrokerManager: + def __init__ (self): + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker (self, brokerUrl): + self.url = brokerUrl + self.qmf = Session() + self.broker = self.qmf.addBroker(brokerUrl) + agents = self.qmf.getAgents() + for a in agents: + if a.getAgentBank() == 0: + self.brokerAgent = a + + def Disconnect(self): + if self.broker: + self.qmf.delBroker(self.broker) + + def Overview (self): + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + print "Total Exchanges: %d" % len (exchanges) + etype = {} + for ex in exchanges: + if ex.type not in etype: + etype[ex.type] = 1 + else: + etype[ex.type] = etype[ex.type] + 1 + for typ in etype: + print "%15s: %d" % (typ, etype[typ]) + + print + print " Total Queues: %d" % len (queues) + _durable = 0 + for queue in queues: + if queue.durable: + _durable = _durable + 1 + print " durable: %d" % _durable + print " non-durable: %d" % (len (queues) - _durable) + + def ExchangeList (self, filter): + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + caption1 = "Type " + caption2 = "Exchange Name" + maxNameLen = len(caption2) + for ex in exchanges: + if self.match(ex.name, filter): + if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) + print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) + line = "" + for i in range(((maxNameLen + len(caption1)) / 5) + 5): + line += "=====" + print line + + for ex in exchanges: + if self.match (ex.name, filter): + print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), + args = ex.arguments + if ex.durable: print "--durable", + if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", + if IVE in args and args[IVE] == 1: print "--ive", + print + + def ExchangeListRecurse (self, filter): + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + for ex in exchanges: + if self.match (ex.name, filter): + print "Exchange '%s' (%s)" % (ex.name, ex.type) + for bind in bindings: + if bind.exchangeRef == ex.getObjectId(): + qname = "<unknown>" + queue = self.findById (queues, bind.queueRef) + if queue != None: + qname = queue.name + print " bind [%s] => %s" % (bind.bindingKey, qname) + + + def QueueList (self, filter): + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + + caption = "Queue Name" + maxNameLen = len(caption) + for q in queues: + if self.match (q.name, filter): + if len(q.name) > maxNameLen: maxNameLen = len(q.name) + print "%-*s Attributes" % (maxNameLen, caption) + line = "" + for i in range((maxNameLen / 5) + 5): + line += "=====" + print line + + for q in queues: + if self.match (q.name, filter): + print "%-*s " % (maxNameLen, q.name), + args = q.arguments + if q.durable: print "--durable", + if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", + if q.autoDelete: print "auto-del", + if q.exclusive: print "excl", + if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], + if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], + if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], + if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], + if POLICY_TYPE in args: print "--policy-type=%s" % args[POLICY_TYPE], + if LVQ in args and args[LVQ] == 1: print "--last-value-queue", + print + + def QueueListRecurse (self, filter): + exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) + bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) + queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) + for queue in queues: + if self.match (queue.name, filter): + print "Queue '%s'" % queue.name + for bind in bindings: + if bind.queueRef == queue.getObjectId(): + ename = "<unknown>" + ex = self.findById (exchanges, bind.exchangeRef) + if ex != None: + ename = ex.name + if ename == "": + ename = "''" + print " bind [%s] => %s" % (bind.bindingKey, ename) + + def AddExchange (self, args): + if len (args) < 2: + Usage () + etype = args[0] + ename = args[1] + declArgs = {} + if _msgSequence: + declArgs[MSG_SEQUENCE] = 1 + if _ive: + declArgs[IVE] = 1 + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable, arguments=declArgs) + + def DelExchange (self, args): + if len (args) < 1: + Usage () + ename = args[0] + self.broker.getAmqpSession().exchange_delete (exchange=ename) + + def AddQueue (self, args): + if len (args) < 1: + Usage () + qname = args[0] + declArgs = {} + if _durable: + declArgs[FILECOUNT] = _fileCount + declArgs[FILESIZE] = _fileSize + + if _maxQueueSize: + declArgs[MAX_QUEUE_SIZE] = _maxQueueSize + if _maxQueueCount: + declArgs[MAX_QUEUE_COUNT] = _maxQueueCount + if _policyType: + declArgs[POLICY_TYPE] = _policyType + if _clusterDurable: + declArgs[CLUSTER_DURABLE] = 1 + if _lvq: + declArgs[LVQ] = 1 + + self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs) + + def DelQueue (self, args): + if len (args) < 1: + Usage () + qname = args[0] + self.broker.getAmqpSession().queue_delete (queue=qname) + + def Bind (self, args): + if len (args) < 2: + Usage () + ename = args[0] + qname = args[1] + key = "" + if len (args) > 2: + key = args[2] + self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) + + def Unbind (self, args): + if len (args) < 2: + Usage () + ename = args[0] + qname = args[1] + key = "" + if len (args) > 2: + key = args[2] + self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) + + def findById (self, items, id): + for item in items: + if item.getObjectId() == id: + return item + return None + + def match (self, name, filter): + if filter == "": + return True + if name.find (filter) == -1: + return False + return True + +def YN (bool): + if bool: + return 'Y' + return 'N' + + +## +## Main Program +## + +try: + longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=", + "file-size=", "max-queue-size=", "max-queue-count=", "policy-type=", + "last-value-queue", "sequence", "ive") + (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts) +except: + Usage () + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "-b" or opt[0] == "--bindings": + _recursive = True + if opt[0] == "-a" or opt[0] == "--broker-addr": + _host = opt[1] + if opt[0] == "--durable": + _durable = True + if opt[0] == "--cluster-durable": + _clusterDurable = True + if opt[0] == "--file-count": + _fileCount = int (opt[1]) + if opt[0] == "--file-size": + _fileSize = int (opt[1]) + if opt[0] == "--max-queue-size": + _maxQueueSize = int (opt[1]) + if opt[0] == "--max-queue-count": + _maxQueueCount = int (opt[1]) + if opt[0] == "--policy-type": + _policyType = opt[1] + if opt[0] == "--last-value-queue": + _lvq = True + if opt[0] == "--sequence": + _msgSequence = True + if opt[0] == "--ive": + _ive = True + +nargs = len (cargs) +bm = BrokerManager () + +try: + bm.SetBroker(_host) + if nargs == 0: + bm.Overview () + else: + cmd = cargs[0] + modifier = "" + if nargs > 1: + modifier = cargs[1] + if cmd == "exchanges": + if _recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd == "queues": + if _recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (cargs[2:]) + elif modifier == "queue": + bm.AddQueue (cargs[2:]) + else: + Usage () + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (cargs[2:]) + elif modifier == "queue": + bm.DelQueue (cargs[2:]) + else: + Usage () + elif cmd == "bind": + bm.Bind (cargs[1:]) + elif cmd == "unbind": + bm.Unbind (cargs[1:]) + else: + Usage () +except KeyboardInterrupt: + print +except Exception,e: + print "Failed:", e.args + sys.exit(1) + +bm.Disconnect() diff --git a/RC9/qpid/python/commands/qpid-printevents b/RC9/qpid/python/commands/qpid-printevents new file mode 100755 index 0000000000..0c1b618a1f --- /dev/null +++ b/RC9/qpid/python/commands/qpid-printevents @@ -0,0 +1,74 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import optparse +import sys +import socket +from time import time, strftime, gmtime, sleep +from qmf.console import Console, Session + +class EventConsole(Console): + def event(self, broker, event): + print event + + def brokerConnected(self, broker): + print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl() + + def brokerDisconnected(self, broker): + print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl() + + +## +## Main Program +## +def main(): + _usage = "%prog [options] [broker-addr]..." + _description = \ +"""Collect and print events from one or more Qpid message brokers. If no broker-addr is +supplied, %prog will connect to 'localhost:5672'. +broker-addr is of the form: [username/password@] hostname | ip-address [:<port>] +ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost +""" + p = optparse.OptionParser(usage=_usage, description=_description) + + options, arguments = p.parse_args() + if len(arguments) == 0: + arguments.append("localhost") + + console = EventConsole() + session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True) + brokers = [] + for host in arguments: + brokers.append(session.addBroker(host)) + + try: + while (True): + sleep(10) + except KeyboardInterrupt: + for broker in brokers: + session.delBroker(broker) + print + sys.exit(0) + +if __name__ == '__main__': + main() + diff --git a/RC9/qpid/python/commands/qpid-queue-stats b/RC9/qpid/python/commands/qpid-queue-stats new file mode 100755 index 0000000000..356a1d2d8d --- /dev/null +++ b/RC9/qpid/python/commands/qpid-queue-stats @@ -0,0 +1,144 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import optparse +import sys +import re +import socket +import qpid +from threading import Condition +from qmf.console import Session, Console +from qpid.peer import Closed +from qpid.connection import Connection, ConnectionFailed +from time import sleep + +class BrokerManager(Console): + def __init__(self, host): + self.url = host + self.objects = {} + self.filter = None + self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, + userBindings=True, manageConnections=True) + self.broker = self.session.addBroker(self.url) + self.firstError = True + + def setFilter(self,filter): + self.filter = filter + + def brokerConnected(self, broker): + if not self.firstError: + print "*** Broker connected" + self.firstError = False + + def brokerDisconnected(self, broker): + print "*** Broker connection lost - %s, retrying..." % broker.getError() + self.firstError = False + self.objects.clear() + + def objectProps(self, broker, record): + className = record.getClassKey().getClassName() + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + self.objects[id] = (record.name, None, None) + + def objectStats(self, broker, record): + className = record.getClassKey().getClassName() + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + return + + (name, first, last) = self.objects[id] + if first == None: + self.objects[id] = (name, record, None) + return + + if len(self.filter) > 0 : + match = False + + for x in self.filter: + if x.match(name): + match = True + break + if match == False: + return + + if last == None: + lastSample = first + else: + lastSample = last + + self.objects[id] = (name, first, record) + + deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0]) + if deltaTime < 1000000000.0: + return + enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \ + (deltaTime / 1000000000.0) + dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \ + (deltaTime / 1000000000.0) + print "%-41s%10.2f%11d%13.2f%13.2f" % \ + (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate) + + + def Display (self): + self.session.bindClass("org.apache.qpid.broker", "queue") + print "Queue Name Sec Depth Enq Rate Deq Rate" + print "========================================================================================" + try: + while True: + sleep (1) + if self.firstError and self.broker.getError(): + self.firstError = False + print "*** Error: %s, retrying..." % self.broker.getError() + except KeyboardInterrupt: + print + self.session.delBroker(self.broker) + +## +## Main Program +## +def main(): + p = optparse.OptionParser() + p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost') + p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show') + + options, arguments = p.parse_args() + + host = options.broker_address + filter = [] + if options.filter != None: + for s in options.filter.split(","): + filter.append(re.compile(s)) + + bm = BrokerManager(host) + bm.setFilter(filter) + bm.Display() + +if __name__ == '__main__': + main() + diff --git a/RC9/qpid/python/commands/qpid-route b/RC9/qpid/python/commands/qpid-route new file mode 100755 index 0000000000..e0e655683a --- /dev/null +++ b/RC9/qpid/python/commands/qpid-route @@ -0,0 +1,514 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import getopt +import sys +import socket +import os +import locale +from qmf.console import Session, BrokerURL + +def Usage(): + print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]" + print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>" + print + print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]" + print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>" + print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>" + print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>" + print " qpid-route [OPTIONS] route list [<dest-broker>]" + print " qpid-route [OPTIONS] route flush [<dest-broker>]" + print " qpid-route [OPTIONS] route map [<broker>]" + print + print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>" + print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" + print " qpid-route [OPTIONS] link list [<dest-broker>]" + print + print "Options:" + print " -v [ --verbose ] Verbose output" + print " -q [ --quiet ] Quiet output, don't print duplicate warnings" + print " -d [ --durable ] Added configuration shall be durable" + print " -e [ --del-empty-link ] Delete link after deleting last route on the link" + print " -s [ --src-local ] Make connection to source broker (push route)" + print " -t <transport> [ --transport <transport>]" + print " Specify transport to use for links, defaults to tcp" + print + print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]" + print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" + print + sys.exit(1) + +_verbose = False +_quiet = False +_durable = False +_dellink = False +_srclocal = False +_transport = "tcp" + +class RouteManager: + def __init__(self, localBroker): + self.local = BrokerURL(localBroker) + self.remote = None + self.qmf = Session() + self.broker = self.qmf.addBroker(localBroker) + + def disconnect(self): + self.qmf.delBroker(self.broker) + + def getLink(self): + links = self.qmf.getObjects(_class="link") + for link in links: + if self.remote.match(link.host, link.port): + return link + return None + + def addLink(self, remoteBroker): + self.remote = BrokerURL(remoteBroker) + if self.local.match(self.remote.host, self.remote.port): + raise Exception("Linking broker to itself is not permitted") + + brokers = self.qmf.getObjects(_class="broker") + broker = brokers[0] + link = self.getLink() + if link == None: + if self.remote.authName == "anonymous": + mech = "ANONYMOUS" + else: + mech = "PLAIN" + res = broker.connect(self.remote.host, self.remote.port, _durable, + mech, self.remote.authName, self.remote.authPass, + _transport) + if _verbose: + print "Connect method returned:", res.status, res.text + + def delLink(self, remoteBroker): + self.remote = BrokerURL(remoteBroker) + brokers = self.qmf.getObjects(_class="broker") + broker = brokers[0] + link = self.getLink() + if link == None: + raise Exception("Link not found") + + res = link.close() + if _verbose: + print "Close method returned:", res.status, res.text + + def listLinks(self): + links = self.qmf.getObjects(_class="link") + if len(links) == 0: + print "No Links Found" + else: + print + print "Host Port Transport Durable State Last Error" + print "=============================================================================" + for link in links: + print "%-16s%-8d%-13s%c %-18s%s" % \ + (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) + + def mapRoutes(self): + qmf = self.qmf + print + print "Finding Linked Brokers:" + + brokerList = {} + brokerList[self.local.name()] = self.broker + print " %s... Ok" % self.local + + added = True + while added: + added = False + links = qmf.getObjects(_class="link") + for link in links: + url = BrokerURL("%s:%d" % (link.host, link.port)) + if url.name() not in brokerList: + print " %s..." % url.name(), + try: + b = qmf.addBroker("%s:%d" % (link.host, link.port)) + brokerList[url.name()] = b + added = True + print "Ok" + except Exception, e: + print e + + print + print "Dynamic Routes:" + bridges = qmf.getObjects(_class="bridge", dynamic=True) + fedExchanges = [] + for bridge in bridges: + if bridge.src not in fedExchanges: + fedExchanges.append(bridge.src) + if len(fedExchanges) == 0: + print " none found" + print + + for ex in fedExchanges: + print " Exchange %s:" % ex + pairs = [] + for bridge in bridges: + if bridge.src == ex: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + found = False + for pair in pairs: + if pair.matches(fromUrl, toUrl): + found = True + if not found: + pairs.append(RoutePair(fromUrl, toUrl)) + for pair in pairs: + print " %s" % pair + print + + print "Static Routes:" + bridges = qmf.getObjects(_class="bridge", dynamic=False) + if len(bridges) == 0: + print " none found" + print + + for bridge in bridges: + link = bridge._linkRef_ + fromUrl = "%s:%s" % (link.host, link.port) + toUrl = bridge.getBroker().getUrl() + leftType = "ex" + rightType = "ex" + if bridge.srcIsLocal: + arrow = "=>" + left = bridge.src + right = bridge.dest + if bridge.srcIsQueue: + leftType = "queue" + else: + arrow = "<=" + left = bridge.dest + right = bridge.src + if bridge.srcIsQueue: + rightType = "queue" + + if bridge.srcIsQueue: + print " %s(%s=%s) %s %s(%s=%s)" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right) + else: + print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ + (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) + print + + for broker in brokerList: + if broker != self.local.name(): + qmf.delBroker(brokerList[broker]) + + + def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False): + if dynamic and _srclocal: + raise Exception("--src-local is not permitted on dynamic routes") + + self.addLink(remoteBroker) + link = self.getLink() + if link == None: + raise Exception("Link failed to create") + + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: + if not _quiet: + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) + sys.exit(0) + + if _verbose: + print "Creating inter-broker binding..." + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic) + if res.status != 0: + raise Exception(res.text) + if _verbose: + print "Bridge method returned:", res.status, res.text + + def addQueueRoute(self, remoteBroker, exchange, queue): + self.addLink(remoteBroker) + link = self.getLink() + if link == None: + raise Exception("Link failed to create") + + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: + if not _quiet: + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) + sys.exit(0) + + if _verbose: + print "Creating inter-broker binding..." + res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False) + if res.status != 0: + raise Exception(res.text) + if _verbose: + print "Bridge method returned:", res.status, res.text + + def delQueueRoute(self, remoteBroker, exchange, queue): + self.remote = BrokerURL(remoteBroker) + link = self.getLink() + if link == None: + if not _quiet: + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) + + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: + if _verbose: + print "Closing bridge..." + res = bridge.close() + if res.status != 0: + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and _dellink: + link = self.getLink() + if link == None: + sys.exit(0) + if _verbose: + print "Last bridge on link, closing link..." + res = link.close() + if res.status != 0: + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + sys.exit(0) + if not _quiet: + raise Exception("Route not found") + + def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): + self.remote = BrokerURL(remoteBroker) + link = self.getLink() + if link == None: + if not _quiet: + raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) + sys.exit(0) + + bridges = self.qmf.getObjects(_class="bridge") + for bridge in bridges: + if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ + and bridge.dynamic == dynamic: + if _verbose: + print "Closing bridge..." + res = bridge.close() + if res.status != 0: + raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) + if len(bridges) == 1 and _dellink: + link = self.getLink() + if link == None: + sys.exit(0) + if _verbose: + print "Last bridge on link, closing link..." + res = link.close() + if res.status != 0: + raise Exception("Error closing link: %d - %s" % (res.status, res.text)) + sys.exit(0) + if not _quiet: + raise Exception("Route not found") + + def listRoutes(self): + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") + + for bridge in bridges: + myLink = None + for link in links: + if bridge.linkRef == link.getObjectId(): + myLink = link + break + if myLink != None: + if bridge.dynamic: + keyText = "<dynamic>" + else: + keyText = bridge.key + print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) + + def clearAllRoutes(self): + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") + + for bridge in bridges: + if _verbose: + myLink = None + for link in links: + if bridge.linkRef == link.getObjectId(): + myLink = link + break + if myLink != None: + print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), + res = bridge.close() + if res.status != 0: + print "Error: %d - %s" % (res.status, res.text) + elif _verbose: + print "Ok" + + if _dellink: + links = self.qmf.getObjects(_class="link") + for link in links: + if _verbose: + print "Deleting Link: %s:%d... " % (link.host, link.port), + res = link.close() + if res.status != 0: + print "Error: %d - %s" % (res.status, res.text) + elif _verbose: + print "Ok" + +class RoutePair: + def __init__(self, fromUrl, toUrl): + self.fromUrl = fromUrl + self.toUrl = toUrl + self.bidir = False + + def __repr__(self): + if self.bidir: + delimit = "<=>" + else: + delimit = " =>" + return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) + + def matches(self, fromUrl, toUrl): + if fromUrl == self.fromUrl and toUrl == self.toUrl: + return True + if toUrl == self.fromUrl and fromUrl == self.toUrl: + self.bidir = True + return True + return False + + +def YN(val): + if val == 1: + return 'Y' + return 'N' + +## +## Main Program +## + +try: + longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=") + (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts) +except: + Usage() + +try: + encoding = locale.getpreferredencoding() + cargs = [a.decode(encoding) for a in encArgs] +except: + cargs = encArgs + +for opt in optlist: + if opt[0] == "-v" or opt[0] == "--verbose": + _verbose = True + if opt[0] == "-q" or opt[0] == "--quiet": + _quiet = True + if opt[0] == "-d" or opt[0] == "--durable": + _durable = True + if opt[0] == "-e" or opt[0] == "--del-empty-link": + _dellink = True + if opt[0] == "-s" or opt[0] == "--src-local": + _srclocal = True + if opt[0] == "-t" or opt[0] == "--transport": + _transport = opt[1] + +nargs = len(cargs) +if nargs < 2: + Usage() +if nargs == 2: + localBroker = "localhost" +else: + if _srclocal: + localBroker = cargs[3] + remoteBroker = cargs[2] + else: + localBroker = cargs[2] + if nargs > 3: + remoteBroker = cargs[3] + +group = cargs[0] +cmd = cargs[1] + +try: + rm = RouteManager(localBroker) + if group == "link": + if cmd == "add": + if nargs != 4: + Usage() + rm.addLink(remoteBroker) + elif cmd == "del": + if nargs != 4: + Usage() + rm.delLink(remoteBroker) + elif cmd == "list": + rm.listLinks() + + elif group == "dynamic": + if cmd == "add": + if nargs < 5 or nargs > 7: + Usage() + + tag = "" + excludes = "" + if nargs > 5: tag = cargs[5] + if nargs > 6: excludes = cargs[6] + rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True) + elif cmd == "del": + if nargs != 5: + Usage() + else: + rm.delRoute(remoteBroker, cargs[4], "", dynamic=True) + + elif group == "route": + if cmd == "add": + if nargs < 6 or nargs > 8: + Usage() + + tag = "" + excludes = "" + if nargs > 6: tag = cargs[6] + if nargs > 7: excludes = cargs[7] + rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False) + elif cmd == "del": + if nargs != 6: + Usage() + rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False) + elif cmd == "map": + rm.mapRoutes() + else: + if cmd == "list": + rm.listRoutes() + elif cmd == "flush": + rm.clearAllRoutes() + else: + Usage() + + elif group == "queue": + if nargs != 6: + Usage() + if cmd == "add": + rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) + elif cmd == "del": + rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) + else: + Usage() + +except Exception,e: + print "Failed:", e.args + sys.exit(1) + +rm.disconnect() diff --git a/RC9/qpid/python/commands/qpid-tool b/RC9/qpid/python/commands/qpid-tool new file mode 100755 index 0000000000..14308f69fb --- /dev/null +++ b/RC9/qpid/python/commands/qpid-tool @@ -0,0 +1,195 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import getopt +import sys +import socket +from cmd import Cmd +from qpid.connection import ConnectionFailed +from qpid.managementdata import ManagementData +from shlex import split +from qpid.disp import Display +from qpid.peer import Closed + +class Mcli (Cmd): + """ Management Command Interpreter """ + + def __init__ (self, dataObject, dispObject): + Cmd.__init__ (self) + self.dataObject = dataObject + self.dispObject = dispObject + self.dataObject.setCli (self) + self.prompt = "qpid: " + + def emptyline (self): + pass + + def setPromptMessage (self, p): + if p == None: + self.prompt = "qpid: " + else: + self.prompt = "qpid[%s]: " % p + + def do_help (self, data): + print "Management Tool for QPID" + print + print "Commands:" + print " list - Print summary of existing objects by class" + print " list <className> - Print list of objects of the specified class" + print " list <className> active - Print list of non-deleted objects of the specified class" + print " show <className> - Print contents of all objects of specified class" + print " show <className> active - Print contents of all non-deleted objects of specified class" + print " show <list-of-IDs> - Print contents of one or more objects (infer className)" + print " show <className> <list-of-IDs> - Print contents of one or more objects" + print " list is space-separated, ranges may be specified (i.e. 1004-1010)" + print " call <ID> <methodName> [<args>] - Invoke a method on an object" + print " schema - Print summary of object classes seen on the target" + print " schema <className> - Print details of an object class" + print " set time-format short - Select short timestamp format (default)" + print " set time-format long - Select long timestamp format" + print " id [<ID>] - Display translations of display object ids" + print " quit or ^D - Exit the program" + print + + def complete_set (self, text, line, begidx, endidx): + """ Command completion for the 'set' command """ + tokens = split (line) + if len (tokens) < 2: + return ["time-format "] + elif tokens[1] == "time-format": + if len (tokens) == 2: + return ["long", "short"] + elif len (tokens) == 3: + if "long".find (text) == 0: + return ["long"] + elif "short".find (text) == 0: + return ["short"] + elif "time-format".find (text) == 0: + return ["time-format "] + return [] + + def do_set (self, data): + tokens = split (data) + try: + if tokens[0] == "time-format": + self.dispObject.do_setTimeFormat (tokens[1]) + except: + pass + + def do_id (self, data): + self.dataObject.do_id(data) + + def complete_schema (self, text, line, begidx, endidx): + tokens = split (line) + if len (tokens) > 2: + return [] + return self.dataObject.classCompletions (text) + + def do_schema (self, data): + self.dataObject.do_schema (data) + + def complete_list (self, text, line, begidx, endidx): + tokens = split (line) + if len (tokens) > 2: + return [] + return self.dataObject.classCompletions (text) + + def do_list (self, data): + self.dataObject.do_list (data) + + def do_show (self, data): + self.dataObject.do_show (data) + + def do_call (self, data): + try: + self.dataObject.do_call (data) + except ValueError, e: + print "ValueError:", e + + def do_EOF (self, data): + print "quit" + try: + self.dataObject.do_exit () + except: + pass + return True + + def do_quit (self, data): + try: + self.dataObject.do_exit () + except: + pass + return True + + def postcmd (self, stop, line): + return stop + + def postloop (self): + print "Exiting..." + self.dataObject.close () + +def Usage (): + print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]" + print + sys.exit (1) + +#========================================================= +# Main Program +#========================================================= + +# Get host name and port if specified on the command line +cargs = sys.argv[1:] +_host = "localhost" + +if len (cargs) > 0: + _host = cargs[0] + +if _host[0] == '-': + Usage() + +disp = Display () + +# Attempt to make a connection to the target broker +try: + data = ManagementData (disp, _host) +except socket.error, e: + print "Socket Error (%s):" % _host, e[1] + sys.exit (1) +except IOError, e: + print "IOError: %d - %s: %s" % (e.errno, e.strerror, e.filename) + sys.exit (1) +except ConnectionFailed, e: + print "Connect Failed %d - %s" % (e[0], e[1]) + sys.exit(1) +except Exception, e: + if str(e).find ("Exchange not found") != -1: + print "Management not enabled on broker: Use '-m yes' option on broker startup." + sys.exit(1) + +# Instantiate the CLI interpreter and launch it. +cli = Mcli (data, disp) +print ("Management Tool for QPID") +try: + cli.cmdloop () +except Closed, e: + print "Connection to Broker Lost:", e + sys.exit (1) |