diff options
Diffstat (limited to 'python/commands')
-rwxr-xr-x | python/commands/qpid-cluster | 327 | ||||
-rwxr-xr-x | python/commands/qpid-config | 572 | ||||
-rwxr-xr-x | python/commands/qpid-printevents | 74 | ||||
-rwxr-xr-x | python/commands/qpid-queue-stats | 146 | ||||
-rwxr-xr-x | python/commands/qpid-route | 524 | ||||
-rwxr-xr-x | python/commands/qpid-stat | 459 | ||||
-rwxr-xr-x | python/commands/qpid-tool | 197 |
7 files changed, 0 insertions, 2299 deletions
diff --git a/python/commands/qpid-cluster b/python/commands/qpid-cluster deleted file mode 100755 index 6d64765184..0000000000 --- a/python/commands/qpid-cluster +++ /dev/null @@ -1,327 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import getopt -import sys -import locale -import socket -import re -from qmf.console import Session - -class Config: - def __init__(self): - self._host = "localhost" - self._connTimeout = 10 - self._stopId = None - self._stopAll = False - self._force = False - self._numeric = False - self._showConn = False - self._delConn = None - -def usage (): - print "Usage: qpid-cluster [OPTIONS] [broker-addr]" - print - print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" - print - print "Options:" - print " --timeout seconds (10) Maximum time to wait for broker connection" - print " -C [--all-connections] View client connections to all cluster members" - print " -c [--connections] ID View client connections to specified member" - print " -d [--del-connection] HOST:PORT" - print " Disconnect a client connection" - print " -s [--stop] ID Stop one member of the cluster by its ID" - print " -k [--all-stop] Shut down the whole cluster" - print " -f [--force] Suppress the 'are-you-sure?' prompt" - print " -n [--numeric] Don't resolve names" - print - -class IpAddr: - def __init__(self, text): - if text.find("@") != -1: - tokens = text.split("@") - text = tokens[1] - if text.find(":") != -1: - tokens = text.split(":") - text = tokens[0] - self.port = int(tokens[1]) - else: - self.port = 5672 - self.dottedQuad = socket.gethostbyname(text) - nums = self.dottedQuad.split(".") - self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) - - def bestAddr(self, addrPortList): - bestDiff = 0xFFFFFFFFL - bestAddr = None - for addrPort in addrPortList: - diff = IpAddr(addrPort[0]).addr ^ self.addr - if diff < bestDiff: - bestDiff = diff - bestAddr = addrPort - return bestAddr - -class BrokerManager: - def __init__(self, config): - self.config = config - self.brokerName = None - self.qmf = None - self.broker = None - - def SetBroker(self, brokerUrl): - self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == 0: - self.brokerAgent = a - - def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) - - def _getClusters(self): - packages = self.qmf.getPackages() - if "org.apache.qpid.cluster" not in packages: - raise Exception("Clustering is not installed on the broker.") - - clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) - if len(clusters) == 0: - raise Exception("Clustering is installed but not enabled on the broker.") - - return clusters - - def _getHostList(self, urlList): - hosts = [] - hostAddr = IpAddr(self.config._host) - for url in urlList: - if url.find("amqp:") != 0: - raise Exception("Invalid URL 1") - url = url[5:] - addrs = str(url).split(",") - addrList = [] - for addr in addrs: - tokens = addr.split(":") - if len(tokens) != 3: - raise Exception("Invalid URL 2") - addrList.append((tokens[1], tokens[2])) - - # Find the address in the list that is most likely to be in the same subnet as the address - # with which we made the original QMF connection. This increases the probability that we will - # be able to reach the cluster member. - - best = hostAddr.bestAddr(addrList) - bestUrl = best[0] + ":" + best[1] - hosts.append(bestUrl) - return hosts - - def overview(self): - clusters = self._getClusters() - cluster = clusters[0] - memberList = cluster.members.split(";") - idList = cluster.memberIDs.split(";") - - print " Cluster Name: %s" % cluster.clusterName - print "Cluster Status: %s" % cluster.status - print " Cluster Size: %d" % cluster.clusterSize - print " Members: ID=%s URL=%s" % (idList[0], memberList[0]) - for idx in range(1,len(idList)): - print " : ID=%s URL=%s" % (idList[idx], memberList[idx]) - - def stopMember(self, id): - clusters = self._getClusters() - cluster = clusters[0] - idList = cluster.memberIDs.split(";") - if id not in idList: - raise Exception("No member with matching ID found") - - if not self.config._force: - prompt = "Warning: " - if len(idList) == 1: - prompt += "This command will shut down the last running cluster member." - else: - prompt += "This command will shut down a cluster member." - prompt += " Are you sure? [N]: " - - confirm = raw_input(prompt) - if len(confirm) == 0 or confirm[0].upper() != 'Y': - raise Exception("Operation canceled") - - cluster.stopClusterNode(id) - - def stopAll(self): - clusters = self._getClusters() - if not self.config._force: - prompt = "Warning: This command will shut down the entire cluster." - prompt += " Are you sure? [N]: " - - confirm = raw_input(prompt) - if len(confirm) == 0 or confirm[0].upper() != 'Y': - raise Exception("Operation canceled") - - cluster = clusters[0] - cluster.stopFullCluster() - - def showConnections(self): - clusters = self._getClusters() - cluster = clusters[0] - memberList = cluster.members.split(";") - idList = cluster.memberIDs.split(";") - displayList = [] - hostList = self._getHostList(memberList) - self.qmf.delBroker(self.broker) - self.broker = None - self.brokers = [] - - idx = 0 - for host in hostList: - if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn: - self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout)) - displayList.append(idList[idx]) - idx += 1 - - idx = 0 - found = False - for broker in self.brokers: - if not self.config._delConn: - print "Clients on Member: ID=%s:" % displayList[idx] - connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) - for conn in connList: - if not conn.shadow: - if self.config._numeric or self.config._delConn: - a = conn.address - else: - tokens = conn.address.split(":") - try: - hostList = socket.gethostbyaddr(tokens[0]) - host = hostList[0] - except: - host = tokens[0] - a = host + ":" + tokens[1] - if self.config._delConn: - tokens = self.config._delConn.split(":") - ip = socket.gethostbyname(tokens[0]) - toDelete = ip + ":" + tokens[1] - if a == toDelete: - print "Closing connection from client: %s" % a - conn.close() - found = True - else: - print " %s" % a - idx += 1 - if not self.config._delConn: - print - if self.config._delConn and not found: - print "Client connection '%s' not found" % self.config._delConn - - for broker in self.brokers: - self.qmf.delBroker(broker) - - -def main(argv=None): - if argv is None: argv = sys.argv - try: - config = Config() - try: - longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=") - (optlist, encArgs) = getopt.gnu_getopt(argv[1:], "s:kfCc:d:n", longOpts) - except: - usage() - return 1 - - try: - encoding = locale.getpreferredencoding() - cargs = [a.decode(encoding) for a in encArgs] - except: - cargs = encArgs - - count = 0 - for opt in optlist: - if opt[0] == "--timeout": - config._connTimeout = int(opt[1]) - if config._connTimeout == 0: - config._connTimeout = None - if opt[0] == "-s" or opt[0] == "--stop": - config._stopId = opt[1] - if len(config._stopId.split(":")) != 2: - raise Exception("Member ID must be of form: <host or ip>:<number>") - count += 1 - if opt[0] == "-k" or opt[0] == "--all-stop": - config._stopAll = True - count += 1 - if opt[0] == "-f" or opt[0] == "--force": - config._force = True - if opt[0] == "-n" or opt[0] == "--numeric": - config._numeric = True - if opt[0] == "-C" or opt[0] == "--all-connections": - config._showConn = "all" - count += 1 - if opt[0] == "-c" or opt[0] == "--connections": - config._showConn = opt[1] - if len(config._showConn.split(":")) != 2: - raise Exception("Member ID must be of form: <host or ip>:<number>") - count += 1 - if opt[0] == "-d" or opt[0] == "--del-connection": - config._delConn = opt[1] - if len(config._delConn.split(":")) != 2: - raise Exception("Connection must be of form: <host or ip>:<port>") - count += 1 - - if count > 1: - print "Only one command option may be supplied" - print - usage() - return 1 - - nargs = len(cargs) - bm = BrokerManager(config) - - if nargs == 1: - config._host = cargs[0] - - try: - bm.SetBroker(config._host) - if config._stopId: - bm.stopMember(config._stopId) - elif config._stopAll: - bm.stopAll() - elif config._showConn or config._delConn: - bm.showConnections() - else: - bm.overview() - except KeyboardInterrupt: - print - except Exception,e: - if str(e).find("connection aborted") > 0: - # we expect this when asking the connected broker to shut down - return 0 - raise Exception("Failed: %s - %s" % (e.__class__.__name__, e)) - - bm.Disconnect() - except Exception, e: - print str(e) - return 1 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/python/commands/qpid-config b/python/commands/qpid-config deleted file mode 100755 index 0db42bc6c7..0000000000 --- a/python/commands/qpid-config +++ /dev/null @@ -1,572 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import getopt -import sys -import locale -from qmf.console import Session - -_recursive = False -_host = "localhost" -_connTimeout = 10 -_altern_ex = None -_passive = False -_durable = False -_clusterDurable = False -_if_empty = True -_if_unused = True -_fileCount = 8 -_fileSize = 24 -_maxQueueSize = None -_maxQueueCount = None -_limitPolicy = None -_order = None -_msgSequence = False -_ive = False -_eventGeneration = None -_file = None - -FILECOUNT = "qpid.file_count" -FILESIZE = "qpid.file_size" -MAX_QUEUE_SIZE = "qpid.max_size" -MAX_QUEUE_COUNT = "qpid.max_count" -POLICY_TYPE = "qpid.policy_type" -CLUSTER_DURABLE = "qpid.persist_last_node" -LVQ = "qpid.last_value_queue" -LVQNB = "qpid.last_value_queue_no_browse" -MSG_SEQUENCE = "qpid.msg_sequence" -IVE = "qpid.ive" -QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" - -def Usage (): - print "Usage: qpid-config [OPTIONS]" - print " qpid-config [OPTIONS] exchanges [filter-string]" - print " qpid-config [OPTIONS] queues [filter-string]" - print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]" - print " qpid-config [OPTIONS] del exchange <name>" - print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]" - print " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]" - print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]" - print " <for type xml> [-f -|filename]" - print " <for type header> [all|any] k1=v1 [, k2=v2...]" - print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]" - print - print "Options:" - print " --timeout seconds (10) Maximum time to wait for broker connection" - print " -b [ --bindings ] Show bindings in queue or exchange list" - print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker" - print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" - print - print "Add Queue Options:" - print " --alternate-exchange [name of the alternate exchange]" - print " The alternate-exchange field specifies how messages on this queue should" - print " be treated when they are rejected by a subscriber, or when they are" - print " orphaned by queue deletion. When present, rejected or orphaned messages" - print " MUST be routed to the alternate-exchange. In all cases the messages MUST" - print " be removed from the queue." - print " --passive Do not actually change the broker state (queue will not be created)" - print " --durable Queue is durable" - print " --cluster-durable Queue becomes durable if there is only one functioning cluster node" - print " --file-count N (8) Number of files in queue's persistence journal" - print " --file-size N (24) File size in pages (64Kib/page)" - print " --max-queue-size N Maximum in-memory queue size as bytes" - print " --max-queue-count N Maximum in-memory queue size as a number of messages" - print " --limit-policy [none | reject | flow-to-disk | ring | ring-strict]" - print " Action taken when queue limit is reached:" - print " none (default) - Use broker's default policy" - print " reject - Reject enqueued messages" - print " flow-to-disk - Page messages to disk" - print " ring - Replace oldest unacquired message with new" - print " ring-strict - Replace oldest message, reject if oldest is acquired" - print " --order [fifo | lvq | lvq-no-browse]" - print " Set queue ordering policy:" - print " fifo (default) - First in, first out" - print " lvq - Last Value Queue ordering, allows queue browsing" - print " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data" - print " --generate-queue-events N" - print " If set to 1, every enqueue will generate an event that can be processed by" - print " registered listeners (e.g. for replication). If set to 2, events will be" - print " generated for enqueues and dequeues" - print - print "Del Queue Options:" - print " --force Force delete of queue even if it's currently used or it's not empty" - print " --force-if-not-empty Force delete of queue even if it's not empty" - print " --force-if-used Force delete of queue even if it's currently used" - print - print "Add Exchange <type> values:" - print " direct Direct exchange for point-to-point communication" - print " fanout Fanout exchange for broadcast communication" - print " topic Topic exchange that routes messages using binding keys with wildcards" - print " headers Headers exchange that matches header fields against the binding keys" - print - print "Add Exchange Options:" - print " --alternate-exchange [name of the alternate exchange]" - print " In the event that a message cannot be routed, this is the name of the exchange to" - print " which the message will be sent. Messages transferred using message.transfer will" - print " be routed to the alternate-exchange only if they are sent with the \"none\"" - print " accept-mode, and the discard-unroutable delivery property is set to false, and" - print " there is no queue to route to for the given message according to the bindings" - print " on this exchange." - print " --passive Do not actually change the broker state (exchange will not be created)" - print " --durable Exchange is durable" - print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header" - print " with a value that increments for each message forwarded." - print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference" - print " to the last message forwarded and enqueuing that message to newly bound" - print " queues." - print - sys.exit (1) - - -# -# helpers for the arg parsing in bind(). return multiple values; "ok" -# followed by the resultant args - -# -# accept -f followed by either -# a filename or "-", for stdin. pull the bits into a string, to be -# passed to the xml binding. -# -def snarf_xquery_args(): - if not _file: - print "Invalid args to bind xml: need an input file or stdin" - return [False] - if _file == "-": - res = sys.stdin.read() - else: - f = open(_file) # let this signal if it can't find it - res = f.read() - f.close() - return [True, res] - -# -# look for "any"/"all" and grok the rest of argv into a map -# -def snarf_header_args(cargs): - if len(cargs) < 2: - print "Invalid args to bind headers: need 'any'/'all' plus conditions" - return [False] - op = cargs[0] - if op == "all" or op == "any": - kv = {} - for thing in cargs[1:]: - k_and_v = thing.split("=") - kv[k_and_v[0]] = k_and_v[1] - return [True, op, kv] - else: - print "Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'" - return [False] - -class BrokerManager: - def __init__ (self): - self.brokerName = None - self.qmf = None - self.broker = None - - def SetBroker (self, brokerUrl): - self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == 0: - self.brokerAgent = a - - def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) - - def Overview (self): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - print "Total Exchanges: %d" % len (exchanges) - etype = {} - for ex in exchanges: - if ex.type not in etype: - etype[ex.type] = 1 - else: - etype[ex.type] = etype[ex.type] + 1 - for typ in etype: - print "%15s: %d" % (typ, etype[typ]) - - print - print " Total Queues: %d" % len (queues) - _durable = 0 - for queue in queues: - if queue.durable: - _durable = _durable + 1 - print " durable: %d" % _durable - print " non-durable: %d" % (len (queues) - _durable) - - def ExchangeList (self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - caption1 = "Type " - caption2 = "Exchange Name" - maxNameLen = len(caption2) - for ex in exchanges: - if self.match(ex.name, filter): - if len(ex.name) > maxNameLen: maxNameLen = len(ex.name) - print "%s%-*s Attributes" % (caption1, maxNameLen, caption2) - line = "" - for i in range(((maxNameLen + len(caption1)) / 5) + 5): - line += "=====" - print line - - for ex in exchanges: - if self.match (ex.name, filter): - print "%-10s%-*s " % (ex.type, maxNameLen, ex.name), - args = ex.arguments - if ex.durable: print "--durable", - if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence", - if IVE in args and args[IVE] == 1: print "--ive", - if ex.altExchange: - print "--alternate-exchange=%s" % ex._altExchange_.name, - print - - def ExchangeListRecurse (self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - for ex in exchanges: - if self.match (ex.name, filter): - print "Exchange '%s' (%s)" % (ex.name, ex.type) - for bind in bindings: - if bind.exchangeRef == ex.getObjectId(): - qname = "<unknown>" - queue = self.findById (queues, bind.queueRef) - if queue != None: - qname = queue.name - print " bind [%s] => %s" % (bind.bindingKey, qname) - - - def QueueList (self, filter): - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - - caption = "Queue Name" - maxNameLen = len(caption) - for q in queues: - if self.match (q.name, filter): - if len(q.name) > maxNameLen: maxNameLen = len(q.name) - print "%-*s Attributes" % (maxNameLen, caption) - line = "" - for i in range((maxNameLen / 5) + 5): - line += "=====" - print line - - for q in queues: - if self.match (q.name, filter): - print "%-*s " % (maxNameLen, q.name), - args = q.arguments - if q.durable: print "--durable", - if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable", - if q.autoDelete: print "auto-del", - if q.exclusive: print "excl", - if FILESIZE in args: print "--file-size=%d" % args[FILESIZE], - if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT], - if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE], - if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT], - if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"), - if LVQ in args and args[LVQ] == 1: print "--order lvq", - if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse", - if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], - if q.altExchange: - print "--alternate-exchange=%s" % q._altExchange_.name, - print - - def QueueListRecurse (self, filter): - exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent) - bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent) - queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent) - for queue in queues: - if self.match (queue.name, filter): - print "Queue '%s'" % queue.name - for bind in bindings: - if bind.queueRef == queue.getObjectId(): - ename = "<unknown>" - ex = self.findById (exchanges, bind.exchangeRef) - if ex != None: - ename = ex.name - if ename == "": - ename = "''" - print " bind [%s] => %s" % (bind.bindingKey, ename) - - def AddExchange (self, args): - if len (args) < 2: - Usage () - etype = args[0] - ename = args[1] - declArgs = {} - if _msgSequence: - declArgs[MSG_SEQUENCE] = 1 - if _ive: - declArgs[IVE] = 1 - if _altern_ex != None: - self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) - else: - self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs) - - def DelExchange (self, args): - if len (args) < 1: - Usage () - ename = args[0] - self.broker.getAmqpSession().exchange_delete (exchange=ename) - - def AddQueue (self, args): - if len (args) < 1: - Usage () - qname = args[0] - declArgs = {} - if _durable: - declArgs[FILECOUNT] = _fileCount - declArgs[FILESIZE] = _fileSize - - if _maxQueueSize: - declArgs[MAX_QUEUE_SIZE] = _maxQueueSize - if _maxQueueCount: - declArgs[MAX_QUEUE_COUNT] = _maxQueueCount - if _limitPolicy: - if _limitPolicy == "none": - pass - elif _limitPolicy == "reject": - declArgs[POLICY_TYPE] = "reject" - elif _limitPolicy == "flow-to-disk": - declArgs[POLICY_TYPE] = "flow_to_disk" - elif _limitPolicy == "ring": - declArgs[POLICY_TYPE] = "ring" - elif _limitPolicy == "ring-strict": - declArgs[POLICY_TYPE] = "ring_strict" - - if _clusterDurable: - declArgs[CLUSTER_DURABLE] = 1 - if _order: - if _order == "fifo": - pass - elif _order == "lvq": - declArgs[LVQ] = 1 - elif _order == "lvq-no-browse": - declArgs[LVQNB] = 1 - if _eventGeneration: - declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration - - if _altern_ex != None: - self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs) - else: - self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs) - - def DelQueue (self, args): - if len (args) < 1: - Usage () - qname = args[0] - self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused) - - def Bind (self, args): - if len (args) < 2: - Usage () - ename = args[0] - qname = args[1] - key = "" - if len (args) > 2: - key = args[2] - - # query the exchange to determine its type. - res = self.broker.getAmqpSession().exchange_query(ename) - - # type of the xchg determines the processing of the rest of - # argv. if it's an xml xchg, we want to find a file - # containing an x-query, and pass that. if it's a headers - # exchange, we need to pass either "any" or all, followed by a - # map containing key/value pairs. if neither of those, extra - # args are ignored. - ok = True - args = None - if res.type == "xml": - # this checks/imports the -f arg - [ok, xquery] = snarf_xquery_args() - args = { "xquery" : xquery } - # print args - else: - if res.type == "headers": - [ok, op, kv] = snarf_header_args(cargs[4:]) - args = kv - args["x-match"] = op - - if not ok: - sys.exit(1) - - self.broker.getAmqpSession().exchange_bind (queue=qname, - exchange=ename, - binding_key=key, - arguments=args) - - def Unbind (self, args): - if len (args) < 2: - Usage () - ename = args[0] - qname = args[1] - key = "" - if len (args) > 2: - key = args[2] - self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) - - def findById (self, items, id): - for item in items: - if item.getObjectId() == id: - return item - return None - - def match (self, name, filter): - if filter == "": - return True - if name.find (filter) == -1: - return False - return True - -def YN (bool): - if bool: - return 'Y' - return 'N' - - -## -## Main Program -## - -try: - longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=", - "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=", - "order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty", - "force_if_used", "alternate-exchange=", "passive", "timeout=", "file=") - (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:bf:", longOpts) -except: - Usage () - -try: - encoding = locale.getpreferredencoding() - cargs = [a.decode(encoding) for a in encArgs] -except: - cargs = encArgs - -for opt in optlist: - if opt[0] == "-b" or opt[0] == "--bindings": - _recursive = True - if opt[0] == "-a" or opt[0] == "--broker-addr": - _host = opt[1] - if opt[0] == "-f" or opt[0] == "--file": - _file = opt[1] - if opt[0] == "--timeout": - _connTimeout = int(opt[1]) - if _connTimeout == 0: - _connTimeout = None - if opt[0] == "--alternate-exchange": - _altern_ex = opt[1] - if opt[0] == "--passive": - _passive = True - if opt[0] == "--durable": - _durable = True - if opt[0] == "--cluster-durable": - _clusterDurable = True - if opt[0] == "--file-count": - _fileCount = int (opt[1]) - if opt[0] == "--file-size": - _fileSize = int (opt[1]) - if opt[0] == "--max-queue-size": - _maxQueueSize = int (opt[1]) - if opt[0] == "--max-queue-count": - _maxQueueCount = int (opt[1]) - if opt[0] == "--limit-policy": - _limitPolicy = opt[1] - if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", "ring-strict"): - print "Error: Invalid --limit-policy argument" - sys.exit(1) - if opt[0] == "--order": - _order = opt[1] - if _order not in ("fifo", "lvq", "lvq-no-browse"): - print "Error: Invalid --order argument" - sys.exit(1) - if opt[0] == "--sequence": - _msgSequence = True - if opt[0] == "--ive": - _ive = True - if opt[0] == "--generate-queue-events": - _eventGeneration = int (opt[1]) - if opt[0] == "--force": - _if_empty = False - _if_unused = False - if opt[0] == "--force-if-not-empty": - _if_empty = False - if opt[0] == "--force-if-used": - _if_unused = False - - -nargs = len (cargs) -bm = BrokerManager () - -try: - bm.SetBroker(_host) - if nargs == 0: - bm.Overview () - else: - cmd = cargs[0] - modifier = "" - if nargs > 1: - modifier = cargs[1] - if cmd == "exchanges": - if _recursive: - bm.ExchangeListRecurse (modifier) - else: - bm.ExchangeList (modifier) - elif cmd == "queues": - if _recursive: - bm.QueueListRecurse (modifier) - else: - bm.QueueList (modifier) - elif cmd == "add": - if modifier == "exchange": - bm.AddExchange (cargs[2:]) - elif modifier == "queue": - bm.AddQueue (cargs[2:]) - else: - Usage () - elif cmd == "del": - if modifier == "exchange": - bm.DelExchange (cargs[2:]) - elif modifier == "queue": - bm.DelQueue (cargs[2:]) - else: - Usage () - elif cmd == "bind": - bm.Bind (cargs[1:]) - elif cmd == "unbind": - bm.Unbind (cargs[1:]) - else: - Usage () -except KeyboardInterrupt: - print -except IOError, e: - print e - sys.exit(1) -except Exception,e: - print "Failed: %s: %s" % (e.__class__.__name__, e) - sys.exit(1) - -bm.Disconnect() diff --git a/python/commands/qpid-printevents b/python/commands/qpid-printevents deleted file mode 100755 index 0c1b618a1f..0000000000 --- a/python/commands/qpid-printevents +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import optparse -import sys -import socket -from time import time, strftime, gmtime, sleep -from qmf.console import Console, Session - -class EventConsole(Console): - def event(self, broker, event): - print event - - def brokerConnected(self, broker): - print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl() - - def brokerDisconnected(self, broker): - print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl() - - -## -## Main Program -## -def main(): - _usage = "%prog [options] [broker-addr]..." - _description = \ -"""Collect and print events from one or more Qpid message brokers. If no broker-addr is -supplied, %prog will connect to 'localhost:5672'. -broker-addr is of the form: [username/password@] hostname | ip-address [:<port>] -ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost -""" - p = optparse.OptionParser(usage=_usage, description=_description) - - options, arguments = p.parse_args() - if len(arguments) == 0: - arguments.append("localhost") - - console = EventConsole() - session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True) - brokers = [] - for host in arguments: - brokers.append(session.addBroker(host)) - - try: - while (True): - sleep(10) - except KeyboardInterrupt: - for broker in brokers: - session.delBroker(broker) - print - sys.exit(0) - -if __name__ == '__main__': - main() - diff --git a/python/commands/qpid-queue-stats b/python/commands/qpid-queue-stats deleted file mode 100755 index 3b8a0dcb19..0000000000 --- a/python/commands/qpid-queue-stats +++ /dev/null @@ -1,146 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import optparse -import sys -import re -import socket -import qpid -from threading import Condition -from qmf.console import Session, Console -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from time import sleep - -class BrokerManager(Console): - def __init__(self, host): - self.url = host - self.objects = {} - self.filter = None - self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, - userBindings=True, manageConnections=True) - self.broker = self.session.addBroker(self.url) - self.firstError = True - - def setFilter(self,filter): - self.filter = filter - - def brokerConnected(self, broker): - if not self.firstError: - print "*** Broker connected" - self.firstError = False - - def brokerDisconnected(self, broker): - print "*** Broker connection lost - %s, retrying..." % broker.getError() - self.firstError = False - self.objects.clear() - - def objectProps(self, broker, record): - className = record.getClassKey().getClassName() - if className != "queue": - return - - id = record.getObjectId().__repr__() - if id not in self.objects: - self.objects[id] = (record.name, None, None) - - def objectStats(self, broker, record): - className = record.getClassKey().getClassName() - if className != "queue": - return - - id = record.getObjectId().__repr__() - if id not in self.objects: - return - - (name, first, last) = self.objects[id] - if first == None: - self.objects[id] = (name, record, None) - return - - if len(self.filter) > 0 : - match = False - - for x in self.filter: - if x.match(name): - match = True - break - if match == False: - return - - if last == None: - lastSample = first - else: - lastSample = last - - self.objects[id] = (name, first, record) - - deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0]) - if deltaTime < 1000000000.0: - return - enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \ - (deltaTime / 1000000000.0) - dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \ - (deltaTime / 1000000000.0) - print "%-41s%10.2f%11d%13.2f%13.2f" % \ - (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate) - sys.stdout.flush() - - - def Display (self): - self.session.bindClass("org.apache.qpid.broker", "queue") - print "Queue Name Sec Depth Enq Rate Deq Rate" - print "========================================================================================" - sys.stdout.flush() - try: - while True: - sleep (1) - if self.firstError and self.broker.getError(): - self.firstError = False - print "*** Error: %s, retrying..." % self.broker.getError() - except KeyboardInterrupt: - print - self.session.delBroker(self.broker) - -## -## Main Program -## -def main(): - p = optparse.OptionParser() - p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost') - p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show') - - options, arguments = p.parse_args() - - host = options.broker_address - filter = [] - if options.filter != None: - for s in options.filter.split(","): - filter.append(re.compile(s)) - - bm = BrokerManager(host) - bm.setFilter(filter) - bm.Display() - -if __name__ == '__main__': - main() - diff --git a/python/commands/qpid-route b/python/commands/qpid-route deleted file mode 100755 index 9965047000..0000000000 --- a/python/commands/qpid-route +++ /dev/null @@ -1,524 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import getopt -import sys -import socket -import os -import locale -from qmf.console import Session, BrokerURL - -def Usage(): - print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]" - print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>" - print - print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]" - print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>" - print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>" - print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>" - print " qpid-route [OPTIONS] route list [<dest-broker>]" - print " qpid-route [OPTIONS] route flush [<dest-broker>]" - print " qpid-route [OPTIONS] route map [<broker>]" - print - print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>" - print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>" - print " qpid-route [OPTIONS] link list [<dest-broker>]" - print - print "Options:" - print " --timeout seconds (10) Maximum time to wait for broker connection" - print " -v [ --verbose ] Verbose output" - print " -q [ --quiet ] Quiet output, don't print duplicate warnings" - print " -d [ --durable ] Added configuration shall be durable" - print " -e [ --del-empty-link ] Delete link after deleting last route on the link" - print " -s [ --src-local ] Make connection to source broker (push route)" - print " --ack N Acknowledge transfers over the bridge in batches of N" - print " -t <transport> [ --transport <transport>]" - print " Specify transport to use for links, defaults to tcp" - print - print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" - print - sys.exit(1) - -_verbose = False -_quiet = False -_durable = False -_dellink = False -_srclocal = False -_transport = "tcp" -_ack = 0 -_connTimeout = 10 - -class RouteManager: - def __init__(self, localBroker): - self.local = BrokerURL(localBroker) - self.remote = None - self.qmf = Session() - self.broker = self.qmf.addBroker(localBroker, _connTimeout) - - def disconnect(self): - self.qmf.delBroker(self.broker) - - def getLink(self): - links = self.qmf.getObjects(_class="link") - for link in links: - if self.remote.match(link.host, link.port): - return link - return None - - def addLink(self, remoteBroker): - self.remote = BrokerURL(remoteBroker) - if self.local.match(self.remote.host, self.remote.port): - raise Exception("Linking broker to itself is not permitted") - - brokers = self.qmf.getObjects(_class="broker") - broker = brokers[0] - link = self.getLink() - if link == None: - if not self.remote.authName or self.remote.authName == "anonymous": - mech = "ANONYMOUS" - else: - mech = "PLAIN" - res = broker.connect(self.remote.host, self.remote.port, _durable, - mech, self.remote.authName or "", self.remote.authPass or "", - _transport) - if _verbose: - print "Connect method returned:", res.status, res.text - - def delLink(self, remoteBroker): - self.remote = BrokerURL(remoteBroker) - brokers = self.qmf.getObjects(_class="broker") - broker = brokers[0] - link = self.getLink() - if link == None: - raise Exception("Link not found") - - res = link.close() - if _verbose: - print "Close method returned:", res.status, res.text - - def listLinks(self): - links = self.qmf.getObjects(_class="link") - if len(links) == 0: - print "No Links Found" - else: - print - print "Host Port Transport Durable State Last Error" - print "=============================================================================" - for link in links: - print "%-16s%-8d%-13s%c %-18s%s" % \ - (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError) - - def mapRoutes(self): - qmf = self.qmf - print - print "Finding Linked Brokers:" - - brokerList = {} - brokerList[self.local.name()] = self.broker - print " %s... Ok" % self.local - - added = True - while added: - added = False - links = qmf.getObjects(_class="link") - for link in links: - url = BrokerURL("%s:%d" % (link.host, link.port)) - if url.name() not in brokerList: - print " %s..." % url.name(), - try: - b = qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout) - brokerList[url.name()] = b - added = True - print "Ok" - except Exception, e: - print e - - print - print "Dynamic Routes:" - bridges = qmf.getObjects(_class="bridge", dynamic=True) - fedExchanges = [] - for bridge in bridges: - if bridge.src not in fedExchanges: - fedExchanges.append(bridge.src) - if len(fedExchanges) == 0: - print " none found" - print - - for ex in fedExchanges: - print " Exchange %s:" % ex - pairs = [] - for bridge in bridges: - if bridge.src == ex: - link = bridge._linkRef_ - fromUrl = "%s:%s" % (link.host, link.port) - toUrl = bridge.getBroker().getUrl() - found = False - for pair in pairs: - if pair.matches(fromUrl, toUrl): - found = True - if not found: - pairs.append(RoutePair(fromUrl, toUrl)) - for pair in pairs: - print " %s" % pair - print - - print "Static Routes:" - bridges = qmf.getObjects(_class="bridge", dynamic=False) - if len(bridges) == 0: - print " none found" - print - - for bridge in bridges: - link = bridge._linkRef_ - fromUrl = "%s:%s" % (link.host, link.port) - toUrl = bridge.getBroker().getUrl() - leftType = "ex" - rightType = "ex" - if bridge.srcIsLocal: - arrow = "=>" - left = bridge.src - right = bridge.dest - if bridge.srcIsQueue: - leftType = "queue" - else: - arrow = "<=" - left = bridge.dest - right = bridge.src - if bridge.srcIsQueue: - rightType = "queue" - - if bridge.srcIsQueue: - print " %s(%s=%s) %s %s(%s=%s)" % \ - (toUrl, leftType, left, arrow, fromUrl, rightType, right) - else: - print " %s(%s=%s) %s %s(%s=%s) key=%s" % \ - (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key) - print - - for broker in brokerList: - if broker != self.local.name(): - qmf.delBroker(brokerList[broker]) - - - def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False): - if dynamic and _srclocal: - raise Exception("--src-local is not permitted on dynamic routes") - - self.addLink(remoteBroker) - link = self.getLink() - if link == None: - raise Exception("Link failed to create") - - bridges = self.qmf.getObjects(_class="bridge") - for bridge in bridges: - if bridge.linkRef == link.getObjectId() and \ - bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue: - if not _quiet: - raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) - sys.exit(0) - - if _verbose: - print "Creating inter-broker binding..." - res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack) - if res.status != 0: - raise Exception(res.text) - if _verbose: - print "Bridge method returned:", res.status, res.text - - def addQueueRoute(self, remoteBroker, exchange, queue): - self.addLink(remoteBroker) - link = self.getLink() - if link == None: - raise Exception("Link failed to create") - - bridges = self.qmf.getObjects(_class="bridge") - for bridge in bridges: - if bridge.linkRef == link.getObjectId() and \ - bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: - if not _quiet: - raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue)) - sys.exit(0) - - if _verbose: - print "Creating inter-broker binding..." - res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack) - if res.status != 0: - raise Exception(res.text) - if _verbose: - print "Bridge method returned:", res.status, res.text - - def delQueueRoute(self, remoteBroker, exchange, queue): - self.remote = BrokerURL(remoteBroker) - link = self.getLink() - if link == None: - if not _quiet: - raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) - sys.exit(0) - - bridges = self.qmf.getObjects(_class="bridge") - for bridge in bridges: - if bridge.linkRef == link.getObjectId() and \ - bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue: - if _verbose: - print "Closing bridge..." - res = bridge.close() - if res.status != 0: - raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) - if len(bridges) == 1 and _dellink: - link = self.getLink() - if link == None: - sys.exit(0) - if _verbose: - print "Last bridge on link, closing link..." - res = link.close() - if res.status != 0: - raise Exception("Error closing link: %d - %s" % (res.status, res.text)) - sys.exit(0) - if not _quiet: - raise Exception("Route not found") - - def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False): - self.remote = BrokerURL(remoteBroker) - link = self.getLink() - if link == None: - if not _quiet: - raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name())) - sys.exit(0) - - bridges = self.qmf.getObjects(_class="bridge") - for bridge in bridges: - if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \ - and bridge.dynamic == dynamic: - if _verbose: - print "Closing bridge..." - res = bridge.close() - if res.status != 0: - raise Exception("Error closing bridge: %d - %s" % (res.status, res.text)) - if len(bridges) == 1 and _dellink: - link = self.getLink() - if link == None: - sys.exit(0) - if _verbose: - print "Last bridge on link, closing link..." - res = link.close() - if res.status != 0: - raise Exception("Error closing link: %d - %s" % (res.status, res.text)) - sys.exit(0) - if not _quiet: - raise Exception("Route not found") - - def listRoutes(self): - links = self.qmf.getObjects(_class="link") - bridges = self.qmf.getObjects(_class="bridge") - - for bridge in bridges: - myLink = None - for link in links: - if bridge.linkRef == link.getObjectId(): - myLink = link - break - if myLink != None: - if bridge.dynamic: - keyText = "<dynamic>" - else: - keyText = bridge.key - print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText) - - def clearAllRoutes(self): - links = self.qmf.getObjects(_class="link") - bridges = self.qmf.getObjects(_class="bridge") - - for bridge in bridges: - if _verbose: - myLink = None - for link in links: - if bridge.linkRef == link.getObjectId(): - myLink = link - break - if myLink != None: - print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), - res = bridge.close() - if res.status != 0: - print "Error: %d - %s" % (res.status, res.text) - elif _verbose: - print "Ok" - - if _dellink: - links = self.qmf.getObjects(_class="link") - for link in links: - if _verbose: - print "Deleting Link: %s:%d... " % (link.host, link.port), - res = link.close() - if res.status != 0: - print "Error: %d - %s" % (res.status, res.text) - elif _verbose: - print "Ok" - -class RoutePair: - def __init__(self, fromUrl, toUrl): - self.fromUrl = fromUrl - self.toUrl = toUrl - self.bidir = False - - def __repr__(self): - if self.bidir: - delimit = "<=>" - else: - delimit = " =>" - return "%s %s %s" % (self.fromUrl, delimit, self.toUrl) - - def matches(self, fromUrl, toUrl): - if fromUrl == self.fromUrl and toUrl == self.toUrl: - return True - if toUrl == self.fromUrl and fromUrl == self.toUrl: - self.bidir = True - return True - return False - - -def YN(val): - if val == 1: - return 'Y' - return 'N' - -## -## Main Program -## - -try: - longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=", "timeout=") - (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts) -except: - Usage() - -try: - encoding = locale.getpreferredencoding() - cargs = [a.decode(encoding) for a in encArgs] -except: - cargs = encArgs - -for opt in optlist: - if opt[0] == "--timeout": - _connTimeout = int(opt[1]) - if _connTimeout == 0: - _connTimeout = None - if opt[0] == "-v" or opt[0] == "--verbose": - _verbose = True - if opt[0] == "-q" or opt[0] == "--quiet": - _quiet = True - if opt[0] == "-d" or opt[0] == "--durable": - _durable = True - if opt[0] == "-e" or opt[0] == "--del-empty-link": - _dellink = True - if opt[0] == "-s" or opt[0] == "--src-local": - _srclocal = True - if opt[0] == "-t" or opt[0] == "--transport": - _transport = opt[1] - if opt[0] == "--ack": - _ack = int(opt[1]) - -nargs = len(cargs) -if nargs < 2: - Usage() -if nargs == 2: - localBroker = "localhost" -else: - if _srclocal: - localBroker = cargs[3] - remoteBroker = cargs[2] - else: - localBroker = cargs[2] - if nargs > 3: - remoteBroker = cargs[3] - -group = cargs[0] -cmd = cargs[1] - -try: - rm = RouteManager(localBroker) - if group == "link": - if cmd == "add": - if nargs != 4: - Usage() - rm.addLink(remoteBroker) - elif cmd == "del": - if nargs != 4: - Usage() - rm.delLink(remoteBroker) - elif cmd == "list": - rm.listLinks() - - elif group == "dynamic": - if cmd == "add": - if nargs < 5 or nargs > 7: - Usage() - - tag = "" - excludes = "" - if nargs > 5: tag = cargs[5] - if nargs > 6: excludes = cargs[6] - rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True) - elif cmd == "del": - if nargs != 5: - Usage() - else: - rm.delRoute(remoteBroker, cargs[4], "", dynamic=True) - - elif group == "route": - if cmd == "add": - if nargs < 6 or nargs > 8: - Usage() - - tag = "" - excludes = "" - if nargs > 6: tag = cargs[6] - if nargs > 7: excludes = cargs[7] - rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False) - elif cmd == "del": - if nargs != 6: - Usage() - rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False) - elif cmd == "map": - rm.mapRoutes() - else: - if cmd == "list": - rm.listRoutes() - elif cmd == "flush": - rm.clearAllRoutes() - else: - Usage() - - elif group == "queue": - if nargs != 6: - Usage() - if cmd == "add": - rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) - elif cmd == "del": - rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5]) - else: - Usage() - -except Exception,e: - print "Failed: %s - %s" % (e.__class__.__name__, e) - sys.exit(1) - -rm.disconnect() diff --git a/python/commands/qpid-stat b/python/commands/qpid-stat deleted file mode 100755 index c6fc5ef0da..0000000000 --- a/python/commands/qpid-stat +++ /dev/null @@ -1,459 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import getopt -import sys -import locale -import socket -import re -from qmf.console import Session, Console -from qpid.disp import Display, Header, Sorter - -_host = "localhost" -_connTimeout = 10 -_types = "" -_limit = 50 -_increasing = False -_sortcol = None - -def Usage (): - print "Usage: qpid-stat [OPTIONS] [broker-addr]" - print - print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost" - print - print "General Options:" - print " --timeout seconds (10) Maximum time to wait for broker connection" -# print " -n [--numeric] Don't resolve names" - print - print "Display Options:" - print - print " -b Show Brokers" - print " -c Show Connections" -# print " -s Show Sessions" - print " -e Show Exchanges" - print " -q Show Queues" - print - print " -S [--sort-by] COLNAME Sort by column name" - print " -I [--increasing] Sort by increasing value (default = decreasing)" - print " -L [--limit] NUM Limit output to NUM rows (default = 50)" - print - sys.exit (1) - -class IpAddr: - def __init__(self, text): - if text.find("@") != -1: - tokens = text.split("@") - text = tokens[1] - if text.find(":") != -1: - tokens = text.split(":") - text = tokens[0] - self.port = int(tokens[1]) - else: - self.port = 5672 - self.dottedQuad = socket.gethostbyname(text) - nums = self.dottedQuad.split(".") - self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) - - def bestAddr(self, addrPortList): - bestDiff = 0xFFFFFFFFL - bestAddr = None - for addrPort in addrPortList: - diff = IpAddr(addrPort[0]).addr ^ self.addr - if diff < bestDiff: - bestDiff = diff - bestAddr = addrPort - return bestAddr - -class Broker(object): - def __init__(self, qmf, broker): - self.broker = broker - - agents = qmf.getAgents() - for a in agents: - if a.getAgentBank() == 0: - self.brokerAgent = a - - bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] - self.currentTime = bobj.getTimestamps()[0] - try: - self.uptime = bobj.uptime - except: - self.uptime = 0 - self.connections = {} - self.sessions = {} - self.exchanges = {} - self.queues = {} - package = "org.apache.qpid.broker" - - list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) - for conn in list: - if not conn.shadow: - self.connections[conn.getObjectId()] = conn - - list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) - for sess in list: - if sess.connectionRef in self.connections: - self.sessions[sess.getObjectId()] = sess - - list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) - for exchange in list: - self.exchanges[exchange.getObjectId()] = exchange - - list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) - for queue in list: - self.queues[queue.getObjectId()] = queue - - def getName(self): - return self.broker.getUrl() - - def getCurrentTime(self): - return self.currentTime - - def getUptime(self): - return self.uptime - -class BrokerManager(Console): - def __init__(self): - self.brokerName = None - self.qmf = None - self.broker = None - self.brokers = [] - self.cluster = None - - def SetBroker(self, brokerUrl): - self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == 0: - self.brokerAgent = a - - def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) - - def _getCluster(self): - packages = self.qmf.getPackages() - if "org.apache.qpid.cluster" not in packages: - return None - - clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) - if len(clusters) == 0: - print "Clustering is installed but not enabled on the broker." - return None - - self.cluster = clusters[0] - - def _getHostList(self, urlList): - hosts = [] - hostAddr = IpAddr(_host) - for url in urlList: - if url.find("amqp:") != 0: - raise Exception("Invalid URL 1") - url = url[5:] - addrs = str(url).split(",") - addrList = [] - for addr in addrs: - tokens = addr.split(":") - if len(tokens) != 3: - raise Exception("Invalid URL 2") - addrList.append((tokens[1], tokens[2])) - - # Find the address in the list that is most likely to be in the same subnet as the address - # with which we made the original QMF connection. This increases the probability that we will - # be able to reach the cluster member. - - best = hostAddr.bestAddr(addrList) - bestUrl = best[0] + ":" + best[1] - hosts.append(bestUrl) - return hosts - - def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): - if len(subs) == 0: - return - this = subs[0] - remaining = subs[1:] - newindent = indent + " " - if this == 'b': - pass - elif this == 'c': - if broker: - for oid in broker.connections: - iconn = broker.connections[oid] - self.printConnSub(indent, broker.getName(), iconn) - self.displaySubs(remaining, newindent, broker=broker, conn=iconn, - sess=sess, exchange=exchange, queue=queue) - elif this == 's': - pass - elif this == 'e': - pass - elif this == 'q': - pass - print - - def displayBroker(self, subs): - disp = Display(prefix=" ") - heads = [] - heads.append(Header('broker')) - heads.append(Header('cluster')) - heads.append(Header('uptime', Header.DURATION)) - heads.append(Header('conn', Header.KMG)) - heads.append(Header('sess', Header.KMG)) - heads.append(Header('exch', Header.KMG)) - heads.append(Header('queue', Header.KMG)) - rows = [] - for broker in self.brokers: - if self.cluster: - ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) - else: - ctext = "<standalone>" - row = (broker.getName(), ctext, broker.getUptime(), - len(broker.connections), len(broker.sessions), - len(broker.exchanges), len(broker.queues)) - rows.append(row) - title = "Brokers" - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) - dispRows = sorter.getSorted() - else: - dispRows = rows - disp.formattedTable(title, heads, dispRows) - - def displayConn(self, subs): - disp = Display(prefix=" ") - heads = [] - if self.cluster: - heads.append(Header('broker')) - heads.append(Header('client-addr')) - heads.append(Header('cproc')) - heads.append(Header('cpid')) - heads.append(Header('auth')) - heads.append(Header('connected', Header.DURATION)) - heads.append(Header('idle', Header.DURATION)) - heads.append(Header('msgIn', Header.KMG)) - heads.append(Header('msgOut', Header.KMG)) - rows = [] - for broker in self.brokers: - for oid in broker.connections: - conn = broker.connections[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(conn.address) - row.append(conn.remoteProcessName) - row.append(conn.remotePid) - row.append(conn.authIdentity) - row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) - idle = broker.getCurrentTime() - conn.getTimestamps()[0] - row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) - row.append(conn.framesFromClient) - row.append(conn.framesToClient) - rows.append(row) - title = "Connections" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) - dispRows = sorter.getSorted() - else: - dispRows = rows - disp.formattedTable(title, heads, dispRows) - - def displaySession(self, subs): - disp = Display(prefix=" ") - - def displayExchange(self, subs): - disp = Display(prefix=" ") - heads = [] - if self.cluster: - heads.append(Header('broker')) - heads.append(Header("exchange")) - heads.append(Header("type")) - heads.append(Header("dur", Header.Y)) - heads.append(Header("bind", Header.KMG)) - heads.append(Header("msgIn", Header.KMG)) - heads.append(Header("msgOut", Header.KMG)) - heads.append(Header("msgDrop", Header.KMG)) - heads.append(Header("byteIn", Header.KMG)) - heads.append(Header("byteOut", Header.KMG)) - heads.append(Header("byteDrop", Header.KMG)) - rows = [] - for broker in self.brokers: - for oid in broker.exchanges: - ex = broker.exchanges[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(ex.name) - row.append(ex.type) - row.append(ex.durable) - row.append(ex.bindingCount) - row.append(ex.msgReceives) - row.append(ex.msgRoutes) - row.append(ex.msgDrops) - row.append(ex.byteReceives) - row.append(ex.byteRoutes) - row.append(ex.byteDrops) - rows.append(row) - title = "Exchanges" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) - dispRows = sorter.getSorted() - else: - dispRows = rows - disp.formattedTable(title, heads, dispRows) - - def displayQueue(self, subs): - disp = Display(prefix=" ") - heads = [] - if self.cluster: - heads.append(Header('broker')) - heads.append(Header("queue")) - heads.append(Header("dur", Header.Y)) - heads.append(Header("autoDel", Header.Y)) - heads.append(Header("excl", Header.Y)) - heads.append(Header("msg", Header.KMG)) - heads.append(Header("msgIn", Header.KMG)) - heads.append(Header("msgOut", Header.KMG)) - heads.append(Header("bytes", Header.KMG)) - heads.append(Header("bytesIn", Header.KMG)) - heads.append(Header("bytesOut", Header.KMG)) - heads.append(Header("cons", Header.KMG)) - heads.append(Header("bind", Header.KMG)) - rows = [] - for broker in self.brokers: - for oid in broker.queues: - q = broker.queues[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(q.name) - row.append(q.durable) - row.append(q.autoDelete) - row.append(q.exclusive) - row.append(q.msgDepth) - row.append(q.msgTotalEnqueues) - row.append(q.msgTotalDequeues) - row.append(q.byteDepth) - row.append(q.byteTotalEnqueues) - row.append(q.byteTotalDequeues) - row.append(q.consumerCount) - row.append(q.bindingCount) - rows.append(row) - title = "Queues" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName - if _sortcol: - sorter = Sorter(heads, rows, _sortcol, _limit, _increasing) - dispRows = sorter.getSorted() - else: - dispRows = rows - disp.formattedTable(title, heads, dispRows) - - def displayMain(self, main, subs): - if main == 'b': self.displayBroker(subs) - elif main == 'c': self.displayConn(subs) - elif main == 's': self.displaySession(subs) - elif main == 'e': self.displayExchange(subs) - elif main == 'q': self.displayQueue(subs) - - def display(self): - self._getCluster() - if self.cluster: - memberList = self.cluster.members.split(";") - hostList = self._getHostList(memberList) - self.qmf.delBroker(self.broker) - self.broker = None - if _host.find("@") > 0: - authString = _host.split("@")[0] + "@" - else: - authString = "" - for host in hostList: - b = self.qmf.addBroker(authString + host, _connTimeout) - self.brokers.append(Broker(self.qmf, b)) - else: - self.brokers.append(Broker(self.qmf, self.broker)) - - self.displayMain(_types[0], _types[1:]) - - -## -## Main Program -## - -try: - longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=") - (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts) -except: - Usage() - -try: - encoding = locale.getpreferredencoding() - cargs = [a.decode(encoding) for a in encArgs] -except: - cargs = encArgs - -for opt in optlist: - if opt[0] == "--timeout": - _connTimeout = int(opt[1]) - if _connTimeout == 0: - _connTimeout = None - elif opt[0] == "-n" or opt[0] == "--numeric": - _numeric = True - elif opt[0] == "-S" or opt[0] == "--sort-by": - _sortcol = opt[1] - elif opt[0] == "-I" or opt[0] == "--increasing": - _increasing = True - elif opt[0] == "-L" or opt[0] == "--limit": - _limit = int(opt[1]) - elif len(opt[0]) == 2: - char = opt[0][1] - if "bcseq".find(char) != -1: - _types += char - else: - Usage() - else: - Usage() - -if len(_types) == 0: - Usage() - -nargs = len(cargs) -bm = BrokerManager() - -if nargs == 1: - _host = cargs[0] - -try: - bm.SetBroker(_host) - bm.display() -except KeyboardInterrupt: - print -except Exception,e: - print "Failed: %s - %s" % (e.__class__.__name__, e) - sys.exit(1) - -bm.Disconnect() diff --git a/python/commands/qpid-tool b/python/commands/qpid-tool deleted file mode 100755 index 05afcc9732..0000000000 --- a/python/commands/qpid-tool +++ /dev/null @@ -1,197 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os -import getopt -import sys -import socket -from cmd import Cmd -from qpid.connection import ConnectionFailed, Timeout -from qpid.managementdata import ManagementData -from shlex import split -from qpid.disp import Display -from qpid.peer import Closed - -class Mcli (Cmd): - """ Management Command Interpreter """ - - def __init__ (self, dataObject, dispObject): - Cmd.__init__ (self) - self.dataObject = dataObject - self.dispObject = dispObject - self.dataObject.setCli (self) - self.prompt = "qpid: " - - def emptyline (self): - pass - - def setPromptMessage (self, p): - if p == None: - self.prompt = "qpid: " - else: - self.prompt = "qpid[%s]: " % p - - def do_help (self, data): - print "Management Tool for QPID" - print - print "Commands:" - print " list - Print summary of existing objects by class" - print " list <className> - Print list of objects of the specified class" - print " list <className> active - Print list of non-deleted objects of the specified class" - print " show <className> - Print contents of all objects of specified class" - print " show <className> active - Print contents of all non-deleted objects of specified class" - print " show <list-of-IDs> - Print contents of one or more objects (infer className)" - print " show <className> <list-of-IDs> - Print contents of one or more objects" - print " list is space-separated, ranges may be specified (i.e. 1004-1010)" - print " call <ID> <methodName> [<args>] - Invoke a method on an object" - print " schema - Print summary of object classes seen on the target" - print " schema <className> - Print details of an object class" - print " set time-format short - Select short timestamp format (default)" - print " set time-format long - Select long timestamp format" - print " id [<ID>] - Display translations of display object ids" - print " quit or ^D - Exit the program" - print - - def complete_set (self, text, line, begidx, endidx): - """ Command completion for the 'set' command """ - tokens = split (line) - if len (tokens) < 2: - return ["time-format "] - elif tokens[1] == "time-format": - if len (tokens) == 2: - return ["long", "short"] - elif len (tokens) == 3: - if "long".find (text) == 0: - return ["long"] - elif "short".find (text) == 0: - return ["short"] - elif "time-format".find (text) == 0: - return ["time-format "] - return [] - - def do_set (self, data): - tokens = split (data) - try: - if tokens[0] == "time-format": - self.dispObject.do_setTimeFormat (tokens[1]) - except: - pass - - def do_id (self, data): - self.dataObject.do_id(data) - - def complete_schema (self, text, line, begidx, endidx): - tokens = split (line) - if len (tokens) > 2: - return [] - return self.dataObject.classCompletions (text) - - def do_schema (self, data): - self.dataObject.do_schema (data) - - def complete_list (self, text, line, begidx, endidx): - tokens = split (line) - if len (tokens) > 2: - return [] - return self.dataObject.classCompletions (text) - - def do_list (self, data): - self.dataObject.do_list (data) - - def do_show (self, data): - self.dataObject.do_show (data) - - def do_call (self, data): - try: - self.dataObject.do_call (data) - except ValueError, e: - print "ValueError:", e - - def do_EOF (self, data): - print "quit" - try: - self.dataObject.do_exit () - except: - pass - return True - - def do_quit (self, data): - try: - self.dataObject.do_exit () - except: - pass - return True - - def postcmd (self, stop, line): - return stop - - def postloop (self): - print "Exiting..." - self.dataObject.close () - -def Usage (): - print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]" - print - sys.exit (1) - -#========================================================= -# Main Program -#========================================================= - -# Get host name and port if specified on the command line -cargs = sys.argv[1:] -_host = "localhost" - -if len (cargs) > 0: - _host = cargs[0] - -if _host[0] == '-': - Usage() - -disp = Display () - -# Attempt to make a connection to the target broker -try: - data = ManagementData (disp, _host) -except socket.error, e: - print "Socket Error (%s):" % _host, e[1] - sys.exit (1) -except IOError, e: - print "IOError: %d - %s: %s" % (e.errno, e.strerror, e.filename) - sys.exit (1) -except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) -except Exception, e: - if str(e).find ("Exchange not found") != -1: - print "Management not enabled on broker: Use '-m yes' option on broker startup." - else: - print "Failed: %s - %s" % (e.__class__.__name__, e) - sys.exit(1) - -# Instantiate the CLI interpreter and launch it. -cli = Mcli (data, disp) -print ("Management Tool for QPID") -try: - cli.cmdloop () -except Closed, e: - print "Connection to Broker Lost:", e - sys.exit (1) |