summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rwxr-xr-xtools/setup.py33
-rwxr-xr-xtools/src/py/qpid-cluster327
-rwxr-xr-xtools/src/py/qpid-config572
-rwxr-xr-xtools/src/py/qpid-printevents74
-rwxr-xr-xtools/src/py/qpid-queue-stats146
-rwxr-xr-xtools/src/py/qpid-route524
-rwxr-xr-xtools/src/py/qpid-stat459
-rwxr-xr-xtools/src/py/qpid-tool197
8 files changed, 2332 insertions, 0 deletions
diff --git a/tools/setup.py b/tools/setup.py
new file mode 100755
index 0000000000..eeabd858ae
--- /dev/null
+++ b/tools/setup.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from distutils.core import setup
+
+setup(name="qpid-tools",
+ version="0.7",
+ scripts=["src/py/qpid-cluster",
+ "src/py/qpid-config",
+ "src/py/qpid-printevents",
+ "src/py/qpid-queue-stats",
+ "src/py/qpid-route",
+ "src/py/qpid-stat",
+ "src/py/qpid-tool"],
+ url="http://qpid.apache.org/",
+ license="Apache Software License",
+ description="Diagnostic and management tools for Apache Qpid brokers.")
diff --git a/tools/src/py/qpid-cluster b/tools/src/py/qpid-cluster
new file mode 100755
index 0000000000..6d64765184
--- /dev/null
+++ b/tools/src/py/qpid-cluster
@@ -0,0 +1,327 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session
+
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+ self._stopId = None
+ self._stopAll = False
+ self._force = False
+ self._numeric = False
+ self._showConn = False
+ self._delConn = None
+
+def usage ():
+ print "Usage: qpid-cluster [OPTIONS] [broker-addr]"
+ print
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+ print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
+ print " -C [--all-connections] View client connections to all cluster members"
+ print " -c [--connections] ID View client connections to specified member"
+ print " -d [--del-connection] HOST:PORT"
+ print " Disconnect a client connection"
+ print " -s [--stop] ID Stop one member of the cluster by its ID"
+ print " -k [--all-stop] Shut down the whole cluster"
+ print " -f [--force] Suppress the 'are-you-sure?' prompt"
+ print " -n [--numeric] Don't resolve names"
+ print
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFFL
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class BrokerManager:
+ def __init__(self, config):
+ self.config = config
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def _getClusters(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ raise Exception("Clustering is not installed on the broker.")
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ raise Exception("Clustering is installed but not enabled on the broker.")
+
+ return clusters
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(self.config._host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def overview(self):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+
+ print " Cluster Name: %s" % cluster.clusterName
+ print "Cluster Status: %s" % cluster.status
+ print " Cluster Size: %d" % cluster.clusterSize
+ print " Members: ID=%s URL=%s" % (idList[0], memberList[0])
+ for idx in range(1,len(idList)):
+ print " : ID=%s URL=%s" % (idList[idx], memberList[idx])
+
+ def stopMember(self, id):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ idList = cluster.memberIDs.split(";")
+ if id not in idList:
+ raise Exception("No member with matching ID found")
+
+ if not self.config._force:
+ prompt = "Warning: "
+ if len(idList) == 1:
+ prompt += "This command will shut down the last running cluster member."
+ else:
+ prompt += "This command will shut down a cluster member."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ raise Exception("Operation canceled")
+
+ cluster.stopClusterNode(id)
+
+ def stopAll(self):
+ clusters = self._getClusters()
+ if not self.config._force:
+ prompt = "Warning: This command will shut down the entire cluster."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ raise Exception("Operation canceled")
+
+ cluster = clusters[0]
+ cluster.stopFullCluster()
+
+ def showConnections(self):
+ clusters = self._getClusters()
+ cluster = clusters[0]
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+ displayList = []
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ self.brokers = []
+
+ idx = 0
+ for host in hostList:
+ if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn:
+ self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout))
+ displayList.append(idList[idx])
+ idx += 1
+
+ idx = 0
+ found = False
+ for broker in self.brokers:
+ if not self.config._delConn:
+ print "Clients on Member: ID=%s:" % displayList[idx]
+ connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
+ for conn in connList:
+ if not conn.shadow:
+ if self.config._numeric or self.config._delConn:
+ a = conn.address
+ else:
+ tokens = conn.address.split(":")
+ try:
+ hostList = socket.gethostbyaddr(tokens[0])
+ host = hostList[0]
+ except:
+ host = tokens[0]
+ a = host + ":" + tokens[1]
+ if self.config._delConn:
+ tokens = self.config._delConn.split(":")
+ ip = socket.gethostbyname(tokens[0])
+ toDelete = ip + ":" + tokens[1]
+ if a == toDelete:
+ print "Closing connection from client: %s" % a
+ conn.close()
+ found = True
+ else:
+ print " %s" % a
+ idx += 1
+ if not self.config._delConn:
+ print
+ if self.config._delConn and not found:
+ print "Client connection '%s' not found" % self.config._delConn
+
+ for broker in self.brokers:
+ self.qmf.delBroker(broker)
+
+
+def main(argv=None):
+ if argv is None: argv = sys.argv
+ try:
+ config = Config()
+ try:
+ longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=")
+ (optlist, encArgs) = getopt.gnu_getopt(argv[1:], "s:kfCc:d:n", longOpts)
+ except:
+ usage()
+ return 1
+
+ try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+ except:
+ cargs = encArgs
+
+ count = 0
+ for opt in optlist:
+ if opt[0] == "--timeout":
+ config._connTimeout = int(opt[1])
+ if config._connTimeout == 0:
+ config._connTimeout = None
+ if opt[0] == "-s" or opt[0] == "--stop":
+ config._stopId = opt[1]
+ if len(config._stopId.split(":")) != 2:
+ raise Exception("Member ID must be of form: <host or ip>:<number>")
+ count += 1
+ if opt[0] == "-k" or opt[0] == "--all-stop":
+ config._stopAll = True
+ count += 1
+ if opt[0] == "-f" or opt[0] == "--force":
+ config._force = True
+ if opt[0] == "-n" or opt[0] == "--numeric":
+ config._numeric = True
+ if opt[0] == "-C" or opt[0] == "--all-connections":
+ config._showConn = "all"
+ count += 1
+ if opt[0] == "-c" or opt[0] == "--connections":
+ config._showConn = opt[1]
+ if len(config._showConn.split(":")) != 2:
+ raise Exception("Member ID must be of form: <host or ip>:<number>")
+ count += 1
+ if opt[0] == "-d" or opt[0] == "--del-connection":
+ config._delConn = opt[1]
+ if len(config._delConn.split(":")) != 2:
+ raise Exception("Connection must be of form: <host or ip>:<port>")
+ count += 1
+
+ if count > 1:
+ print "Only one command option may be supplied"
+ print
+ usage()
+ return 1
+
+ nargs = len(cargs)
+ bm = BrokerManager(config)
+
+ if nargs == 1:
+ config._host = cargs[0]
+
+ try:
+ bm.SetBroker(config._host)
+ if config._stopId:
+ bm.stopMember(config._stopId)
+ elif config._stopAll:
+ bm.stopAll()
+ elif config._showConn or config._delConn:
+ bm.showConnections()
+ else:
+ bm.overview()
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ if str(e).find("connection aborted") > 0:
+ # we expect this when asking the connected broker to shut down
+ return 0
+ raise Exception("Failed: %s - %s" % (e.__class__.__name__, e))
+
+ bm.Disconnect()
+ except Exception, e:
+ print str(e)
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config
new file mode 100755
index 0000000000..0db42bc6c7
--- /dev/null
+++ b/tools/src/py/qpid-config
@@ -0,0 +1,572 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+from qmf.console import Session
+
+_recursive = False
+_host = "localhost"
+_connTimeout = 10
+_altern_ex = None
+_passive = False
+_durable = False
+_clusterDurable = False
+_if_empty = True
+_if_unused = True
+_fileCount = 8
+_fileSize = 24
+_maxQueueSize = None
+_maxQueueCount = None
+_limitPolicy = None
+_order = None
+_msgSequence = False
+_ive = False
+_eventGeneration = None
+_file = None
+
+FILECOUNT = "qpid.file_count"
+FILESIZE = "qpid.file_size"
+MAX_QUEUE_SIZE = "qpid.max_size"
+MAX_QUEUE_COUNT = "qpid.max_count"
+POLICY_TYPE = "qpid.policy_type"
+CLUSTER_DURABLE = "qpid.persist_last_node"
+LVQ = "qpid.last_value_queue"
+LVQNB = "qpid.last_value_queue_no_browse"
+MSG_SEQUENCE = "qpid.msg_sequence"
+IVE = "qpid.ive"
+QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+
+def Usage ():
+ print "Usage: qpid-config [OPTIONS]"
+ print " qpid-config [OPTIONS] exchanges [filter-string]"
+ print " qpid-config [OPTIONS] queues [filter-string]"
+ print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]"
+ print " qpid-config [OPTIONS] del exchange <name>"
+ print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]"
+ print " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]"
+ print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]"
+ print " <for type xml> [-f -|filename]"
+ print " <for type header> [all|any] k1=v1 [, k2=v2...]"
+ print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"
+ print
+ print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
+ print " -b [ --bindings ] Show bindings in queue or exchange list"
+ print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker"
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+ print "Add Queue Options:"
+ print " --alternate-exchange [name of the alternate exchange]"
+ print " The alternate-exchange field specifies how messages on this queue should"
+ print " be treated when they are rejected by a subscriber, or when they are"
+ print " orphaned by queue deletion. When present, rejected or orphaned messages"
+ print " MUST be routed to the alternate-exchange. In all cases the messages MUST"
+ print " be removed from the queue."
+ print " --passive Do not actually change the broker state (queue will not be created)"
+ print " --durable Queue is durable"
+ print " --cluster-durable Queue becomes durable if there is only one functioning cluster node"
+ print " --file-count N (8) Number of files in queue's persistence journal"
+ print " --file-size N (24) File size in pages (64Kib/page)"
+ print " --max-queue-size N Maximum in-memory queue size as bytes"
+ print " --max-queue-count N Maximum in-memory queue size as a number of messages"
+ print " --limit-policy [none | reject | flow-to-disk | ring | ring-strict]"
+ print " Action taken when queue limit is reached:"
+ print " none (default) - Use broker's default policy"
+ print " reject - Reject enqueued messages"
+ print " flow-to-disk - Page messages to disk"
+ print " ring - Replace oldest unacquired message with new"
+ print " ring-strict - Replace oldest message, reject if oldest is acquired"
+ print " --order [fifo | lvq | lvq-no-browse]"
+ print " Set queue ordering policy:"
+ print " fifo (default) - First in, first out"
+ print " lvq - Last Value Queue ordering, allows queue browsing"
+ print " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data"
+ print " --generate-queue-events N"
+ print " If set to 1, every enqueue will generate an event that can be processed by"
+ print " registered listeners (e.g. for replication). If set to 2, events will be"
+ print " generated for enqueues and dequeues"
+ print
+ print "Del Queue Options:"
+ print " --force Force delete of queue even if it's currently used or it's not empty"
+ print " --force-if-not-empty Force delete of queue even if it's not empty"
+ print " --force-if-used Force delete of queue even if it's currently used"
+ print
+ print "Add Exchange <type> values:"
+ print " direct Direct exchange for point-to-point communication"
+ print " fanout Fanout exchange for broadcast communication"
+ print " topic Topic exchange that routes messages using binding keys with wildcards"
+ print " headers Headers exchange that matches header fields against the binding keys"
+ print
+ print "Add Exchange Options:"
+ print " --alternate-exchange [name of the alternate exchange]"
+ print " In the event that a message cannot be routed, this is the name of the exchange to"
+ print " which the message will be sent. Messages transferred using message.transfer will"
+ print " be routed to the alternate-exchange only if they are sent with the \"none\""
+ print " accept-mode, and the discard-unroutable delivery property is set to false, and"
+ print " there is no queue to route to for the given message according to the bindings"
+ print " on this exchange."
+ print " --passive Do not actually change the broker state (exchange will not be created)"
+ print " --durable Exchange is durable"
+ print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header"
+ print " with a value that increments for each message forwarded."
+ print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference"
+ print " to the last message forwarded and enqueuing that message to newly bound"
+ print " queues."
+ print
+ sys.exit (1)
+
+
+#
+# helpers for the arg parsing in bind(). return multiple values; "ok"
+# followed by the resultant args
+
+#
+# accept -f followed by either
+# a filename or "-", for stdin. pull the bits into a string, to be
+# passed to the xml binding.
+#
+def snarf_xquery_args():
+ if not _file:
+ print "Invalid args to bind xml: need an input file or stdin"
+ return [False]
+ if _file == "-":
+ res = sys.stdin.read()
+ else:
+ f = open(_file) # let this signal if it can't find it
+ res = f.read()
+ f.close()
+ return [True, res]
+
+#
+# look for "any"/"all" and grok the rest of argv into a map
+#
+def snarf_header_args(cargs):
+ if len(cargs) < 2:
+ print "Invalid args to bind headers: need 'any'/'all' plus conditions"
+ return [False]
+ op = cargs[0]
+ if op == "all" or op == "any":
+ kv = {}
+ for thing in cargs[1:]:
+ k_and_v = thing.split("=")
+ kv[k_and_v[0]] = k_and_v[1]
+ return [True, op, kv]
+ else:
+ print "Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'"
+ return [False]
+
+class BrokerManager:
+ def __init__ (self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+
+ def SetBroker (self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def Overview (self):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ print "Total Exchanges: %d" % len (exchanges)
+ etype = {}
+ for ex in exchanges:
+ if ex.type not in etype:
+ etype[ex.type] = 1
+ else:
+ etype[ex.type] = etype[ex.type] + 1
+ for typ in etype:
+ print "%15s: %d" % (typ, etype[typ])
+
+ print
+ print " Total Queues: %d" % len (queues)
+ _durable = 0
+ for queue in queues:
+ if queue.durable:
+ _durable = _durable + 1
+ print " durable: %d" % _durable
+ print " non-durable: %d" % (len (queues) - _durable)
+
+ def ExchangeList (self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ caption1 = "Type "
+ caption2 = "Exchange Name"
+ maxNameLen = len(caption2)
+ for ex in exchanges:
+ if self.match(ex.name, filter):
+ if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
+ print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
+ line = ""
+ for i in range(((maxNameLen + len(caption1)) / 5) + 5):
+ line += "====="
+ print line
+
+ for ex in exchanges:
+ if self.match (ex.name, filter):
+ print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
+ args = ex.arguments
+ if ex.durable: print "--durable",
+ if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
+ if IVE in args and args[IVE] == 1: print "--ive",
+ if ex.altExchange:
+ print "--alternate-exchange=%s" % ex._altExchange_.name,
+ print
+
+ def ExchangeListRecurse (self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ for ex in exchanges:
+ if self.match (ex.name, filter):
+ print "Exchange '%s' (%s)" % (ex.name, ex.type)
+ for bind in bindings:
+ if bind.exchangeRef == ex.getObjectId():
+ qname = "<unknown>"
+ queue = self.findById (queues, bind.queueRef)
+ if queue != None:
+ qname = queue.name
+ print " bind [%s] => %s" % (bind.bindingKey, qname)
+
+
+ def QueueList (self, filter):
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+
+ caption = "Queue Name"
+ maxNameLen = len(caption)
+ for q in queues:
+ if self.match (q.name, filter):
+ if len(q.name) > maxNameLen: maxNameLen = len(q.name)
+ print "%-*s Attributes" % (maxNameLen, caption)
+ line = ""
+ for i in range((maxNameLen / 5) + 5):
+ line += "====="
+ print line
+
+ for q in queues:
+ if self.match (q.name, filter):
+ print "%-*s " % (maxNameLen, q.name),
+ args = q.arguments
+ if q.durable: print "--durable",
+ if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
+ if q.autoDelete: print "auto-del",
+ if q.exclusive: print "excl",
+ if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
+ if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
+ if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE],
+ if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT],
+ if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
+ if LVQ in args and args[LVQ] == 1: print "--order lvq",
+ if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse",
+ if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
+ if q.altExchange:
+ print "--alternate-exchange=%s" % q._altExchange_.name,
+ print
+
+ def QueueListRecurse (self, filter):
+ exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
+ bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
+ queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
+ for queue in queues:
+ if self.match (queue.name, filter):
+ print "Queue '%s'" % queue.name
+ for bind in bindings:
+ if bind.queueRef == queue.getObjectId():
+ ename = "<unknown>"
+ ex = self.findById (exchanges, bind.exchangeRef)
+ if ex != None:
+ ename = ex.name
+ if ename == "":
+ ename = "''"
+ print " bind [%s] => %s" % (bind.bindingKey, ename)
+
+ def AddExchange (self, args):
+ if len (args) < 2:
+ Usage ()
+ etype = args[0]
+ ename = args[1]
+ declArgs = {}
+ if _msgSequence:
+ declArgs[MSG_SEQUENCE] = 1
+ if _ive:
+ declArgs[IVE] = 1
+ if _altern_ex != None:
+ self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs)
+ else:
+ self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs)
+
+ def DelExchange (self, args):
+ if len (args) < 1:
+ Usage ()
+ ename = args[0]
+ self.broker.getAmqpSession().exchange_delete (exchange=ename)
+
+ def AddQueue (self, args):
+ if len (args) < 1:
+ Usage ()
+ qname = args[0]
+ declArgs = {}
+ if _durable:
+ declArgs[FILECOUNT] = _fileCount
+ declArgs[FILESIZE] = _fileSize
+
+ if _maxQueueSize:
+ declArgs[MAX_QUEUE_SIZE] = _maxQueueSize
+ if _maxQueueCount:
+ declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
+ if _limitPolicy:
+ if _limitPolicy == "none":
+ pass
+ elif _limitPolicy == "reject":
+ declArgs[POLICY_TYPE] = "reject"
+ elif _limitPolicy == "flow-to-disk":
+ declArgs[POLICY_TYPE] = "flow_to_disk"
+ elif _limitPolicy == "ring":
+ declArgs[POLICY_TYPE] = "ring"
+ elif _limitPolicy == "ring-strict":
+ declArgs[POLICY_TYPE] = "ring_strict"
+
+ if _clusterDurable:
+ declArgs[CLUSTER_DURABLE] = 1
+ if _order:
+ if _order == "fifo":
+ pass
+ elif _order == "lvq":
+ declArgs[LVQ] = 1
+ elif _order == "lvq-no-browse":
+ declArgs[LVQNB] = 1
+ if _eventGeneration:
+ declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration
+
+ if _altern_ex != None:
+ self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs)
+ else:
+ self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs)
+
+ def DelQueue (self, args):
+ if len (args) < 1:
+ Usage ()
+ qname = args[0]
+ self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused)
+
+ def Bind (self, args):
+ if len (args) < 2:
+ Usage ()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len (args) > 2:
+ key = args[2]
+
+ # query the exchange to determine its type.
+ res = self.broker.getAmqpSession().exchange_query(ename)
+
+ # type of the xchg determines the processing of the rest of
+ # argv. if it's an xml xchg, we want to find a file
+ # containing an x-query, and pass that. if it's a headers
+ # exchange, we need to pass either "any" or all, followed by a
+ # map containing key/value pairs. if neither of those, extra
+ # args are ignored.
+ ok = True
+ args = None
+ if res.type == "xml":
+ # this checks/imports the -f arg
+ [ok, xquery] = snarf_xquery_args()
+ args = { "xquery" : xquery }
+ # print args
+ else:
+ if res.type == "headers":
+ [ok, op, kv] = snarf_header_args(cargs[4:])
+ args = kv
+ args["x-match"] = op
+
+ if not ok:
+ sys.exit(1)
+
+ self.broker.getAmqpSession().exchange_bind (queue=qname,
+ exchange=ename,
+ binding_key=key,
+ arguments=args)
+
+ def Unbind (self, args):
+ if len (args) < 2:
+ Usage ()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len (args) > 2:
+ key = args[2]
+ self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key)
+
+ def findById (self, items, id):
+ for item in items:
+ if item.getObjectId() == id:
+ return item
+ return None
+
+ def match (self, name, filter):
+ if filter == "":
+ return True
+ if name.find (filter) == -1:
+ return False
+ return True
+
+def YN (bool):
+ if bool:
+ return 'Y'
+ return 'N'
+
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=",
+ "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=",
+ "order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty",
+ "force_if_used", "alternate-exchange=", "passive", "timeout=", "file=")
+ (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:bf:", longOpts)
+except:
+ Usage ()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "-b" or opt[0] == "--bindings":
+ _recursive = True
+ if opt[0] == "-a" or opt[0] == "--broker-addr":
+ _host = opt[1]
+ if opt[0] == "-f" or opt[0] == "--file":
+ _file = opt[1]
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
+ if opt[0] == "--alternate-exchange":
+ _altern_ex = opt[1]
+ if opt[0] == "--passive":
+ _passive = True
+ if opt[0] == "--durable":
+ _durable = True
+ if opt[0] == "--cluster-durable":
+ _clusterDurable = True
+ if opt[0] == "--file-count":
+ _fileCount = int (opt[1])
+ if opt[0] == "--file-size":
+ _fileSize = int (opt[1])
+ if opt[0] == "--max-queue-size":
+ _maxQueueSize = int (opt[1])
+ if opt[0] == "--max-queue-count":
+ _maxQueueCount = int (opt[1])
+ if opt[0] == "--limit-policy":
+ _limitPolicy = opt[1]
+ if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", "ring-strict"):
+ print "Error: Invalid --limit-policy argument"
+ sys.exit(1)
+ if opt[0] == "--order":
+ _order = opt[1]
+ if _order not in ("fifo", "lvq", "lvq-no-browse"):
+ print "Error: Invalid --order argument"
+ sys.exit(1)
+ if opt[0] == "--sequence":
+ _msgSequence = True
+ if opt[0] == "--ive":
+ _ive = True
+ if opt[0] == "--generate-queue-events":
+ _eventGeneration = int (opt[1])
+ if opt[0] == "--force":
+ _if_empty = False
+ _if_unused = False
+ if opt[0] == "--force-if-not-empty":
+ _if_empty = False
+ if opt[0] == "--force-if-used":
+ _if_unused = False
+
+
+nargs = len (cargs)
+bm = BrokerManager ()
+
+try:
+ bm.SetBroker(_host)
+ if nargs == 0:
+ bm.Overview ()
+ else:
+ cmd = cargs[0]
+ modifier = ""
+ if nargs > 1:
+ modifier = cargs[1]
+ if cmd == "exchanges":
+ if _recursive:
+ bm.ExchangeListRecurse (modifier)
+ else:
+ bm.ExchangeList (modifier)
+ elif cmd == "queues":
+ if _recursive:
+ bm.QueueListRecurse (modifier)
+ else:
+ bm.QueueList (modifier)
+ elif cmd == "add":
+ if modifier == "exchange":
+ bm.AddExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.AddQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "del":
+ if modifier == "exchange":
+ bm.DelExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.DelQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "bind":
+ bm.Bind (cargs[1:])
+ elif cmd == "unbind":
+ bm.Unbind (cargs[1:])
+ else:
+ Usage ()
+except KeyboardInterrupt:
+ print
+except IOError, e:
+ print e
+ sys.exit(1)
+except Exception,e:
+ print "Failed: %s: %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+bm.Disconnect()
diff --git a/tools/src/py/qpid-printevents b/tools/src/py/qpid-printevents
new file mode 100755
index 0000000000..0c1b618a1f
--- /dev/null
+++ b/tools/src/py/qpid-printevents
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+import socket
+from time import time, strftime, gmtime, sleep
+from qmf.console import Console, Session
+
+class EventConsole(Console):
+ def event(self, broker, event):
+ print event
+
+ def brokerConnected(self, broker):
+ print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl()
+
+ def brokerDisconnected(self, broker):
+ print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
+
+
+##
+## Main Program
+##
+def main():
+ _usage = "%prog [options] [broker-addr]..."
+ _description = \
+"""Collect and print events from one or more Qpid message brokers. If no broker-addr is
+supplied, %prog will connect to 'localhost:5672'.
+broker-addr is of the form: [username/password@] hostname | ip-address [:<port>]
+ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost
+"""
+ p = optparse.OptionParser(usage=_usage, description=_description)
+
+ options, arguments = p.parse_args()
+ if len(arguments) == 0:
+ arguments.append("localhost")
+
+ console = EventConsole()
+ session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True)
+ brokers = []
+ for host in arguments:
+ brokers.append(session.addBroker(host))
+
+ try:
+ while (True):
+ sleep(10)
+ except KeyboardInterrupt:
+ for broker in brokers:
+ session.delBroker(broker)
+ print
+ sys.exit(0)
+
+if __name__ == '__main__':
+ main()
+
diff --git a/tools/src/py/qpid-queue-stats b/tools/src/py/qpid-queue-stats
new file mode 100755
index 0000000000..3b8a0dcb19
--- /dev/null
+++ b/tools/src/py/qpid-queue-stats
@@ -0,0 +1,146 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import optparse
+import sys
+import re
+import socket
+import qpid
+from threading import Condition
+from qmf.console import Session, Console
+from qpid.peer import Closed
+from qpid.connection import Connection, ConnectionFailed
+from time import sleep
+
+class BrokerManager(Console):
+ def __init__(self, host):
+ self.url = host
+ self.objects = {}
+ self.filter = None
+ self.session = Session(self, rcvEvents=False, rcvHeartbeats=False,
+ userBindings=True, manageConnections=True)
+ self.broker = self.session.addBroker(self.url)
+ self.firstError = True
+
+ def setFilter(self,filter):
+ self.filter = filter
+
+ def brokerConnected(self, broker):
+ if not self.firstError:
+ print "*** Broker connected"
+ self.firstError = False
+
+ def brokerDisconnected(self, broker):
+ print "*** Broker connection lost - %s, retrying..." % broker.getError()
+ self.firstError = False
+ self.objects.clear()
+
+ def objectProps(self, broker, record):
+ className = record.getClassKey().getClassName()
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ self.objects[id] = (record.name, None, None)
+
+ def objectStats(self, broker, record):
+ className = record.getClassKey().getClassName()
+ if className != "queue":
+ return
+
+ id = record.getObjectId().__repr__()
+ if id not in self.objects:
+ return
+
+ (name, first, last) = self.objects[id]
+ if first == None:
+ self.objects[id] = (name, record, None)
+ return
+
+ if len(self.filter) > 0 :
+ match = False
+
+ for x in self.filter:
+ if x.match(name):
+ match = True
+ break
+ if match == False:
+ return
+
+ if last == None:
+ lastSample = first
+ else:
+ lastSample = last
+
+ self.objects[id] = (name, first, record)
+
+ deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0])
+ if deltaTime < 1000000000.0:
+ return
+ enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \
+ (deltaTime / 1000000000.0)
+ dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \
+ (deltaTime / 1000000000.0)
+ print "%-41s%10.2f%11d%13.2f%13.2f" % \
+ (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate)
+ sys.stdout.flush()
+
+
+ def Display (self):
+ self.session.bindClass("org.apache.qpid.broker", "queue")
+ print "Queue Name Sec Depth Enq Rate Deq Rate"
+ print "========================================================================================"
+ sys.stdout.flush()
+ try:
+ while True:
+ sleep (1)
+ if self.firstError and self.broker.getError():
+ self.firstError = False
+ print "*** Error: %s, retrying..." % self.broker.getError()
+ except KeyboardInterrupt:
+ print
+ self.session.delBroker(self.broker)
+
+##
+## Main Program
+##
+def main():
+ p = optparse.OptionParser()
+ p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost')
+ p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show')
+
+ options, arguments = p.parse_args()
+
+ host = options.broker_address
+ filter = []
+ if options.filter != None:
+ for s in options.filter.split(","):
+ filter.append(re.compile(s))
+
+ bm = BrokerManager(host)
+ bm.setFilter(filter)
+ bm.Display()
+
+if __name__ == '__main__':
+ main()
+
diff --git a/tools/src/py/qpid-route b/tools/src/py/qpid-route
new file mode 100755
index 0000000000..9965047000
--- /dev/null
+++ b/tools/src/py/qpid-route
@@ -0,0 +1,524 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import getopt
+import sys
+import socket
+import os
+import locale
+from qmf.console import Session, BrokerURL
+
+def Usage():
+ print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
+ print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
+ print
+ print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]"
+ print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>"
+ print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>"
+ print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>"
+ print " qpid-route [OPTIONS] route list [<dest-broker>]"
+ print " qpid-route [OPTIONS] route flush [<dest-broker>]"
+ print " qpid-route [OPTIONS] route map [<broker>]"
+ print
+ print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
+ print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
+ print " qpid-route [OPTIONS] link list [<dest-broker>]"
+ print
+ print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
+ print " -v [ --verbose ] Verbose output"
+ print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
+ print " -d [ --durable ] Added configuration shall be durable"
+ print " -e [ --del-empty-link ] Delete link after deleting last route on the link"
+ print " -s [ --src-local ] Make connection to source broker (push route)"
+ print " --ack N Acknowledge transfers over the bridge in batches of N"
+ print " -t <transport> [ --transport <transport>]"
+ print " Specify transport to use for links, defaults to tcp"
+ print
+ print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+ sys.exit(1)
+
+_verbose = False
+_quiet = False
+_durable = False
+_dellink = False
+_srclocal = False
+_transport = "tcp"
+_ack = 0
+_connTimeout = 10
+
+class RouteManager:
+ def __init__(self, localBroker):
+ self.local = BrokerURL(localBroker)
+ self.remote = None
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(localBroker, _connTimeout)
+
+ def disconnect(self):
+ self.qmf.delBroker(self.broker)
+
+ def getLink(self):
+ links = self.qmf.getObjects(_class="link")
+ for link in links:
+ if self.remote.match(link.host, link.port):
+ return link
+ return None
+
+ def addLink(self, remoteBroker):
+ self.remote = BrokerURL(remoteBroker)
+ if self.local.match(self.remote.host, self.remote.port):
+ raise Exception("Linking broker to itself is not permitted")
+
+ brokers = self.qmf.getObjects(_class="broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ if not self.remote.authName or self.remote.authName == "anonymous":
+ mech = "ANONYMOUS"
+ else:
+ mech = "PLAIN"
+ res = broker.connect(self.remote.host, self.remote.port, _durable,
+ mech, self.remote.authName or "", self.remote.authPass or "",
+ _transport)
+ if _verbose:
+ print "Connect method returned:", res.status, res.text
+
+ def delLink(self, remoteBroker):
+ self.remote = BrokerURL(remoteBroker)
+ brokers = self.qmf.getObjects(_class="broker")
+ broker = brokers[0]
+ link = self.getLink()
+ if link == None:
+ raise Exception("Link not found")
+
+ res = link.close()
+ if _verbose:
+ print "Close method returned:", res.status, res.text
+
+ def listLinks(self):
+ links = self.qmf.getObjects(_class="link")
+ if len(links) == 0:
+ print "No Links Found"
+ else:
+ print
+ print "Host Port Transport Durable State Last Error"
+ print "============================================================================="
+ for link in links:
+ print "%-16s%-8d%-13s%c %-18s%s" % \
+ (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError)
+
+ def mapRoutes(self):
+ qmf = self.qmf
+ print
+ print "Finding Linked Brokers:"
+
+ brokerList = {}
+ brokerList[self.local.name()] = self.broker
+ print " %s... Ok" % self.local
+
+ added = True
+ while added:
+ added = False
+ links = qmf.getObjects(_class="link")
+ for link in links:
+ url = BrokerURL("%s:%d" % (link.host, link.port))
+ if url.name() not in brokerList:
+ print " %s..." % url.name(),
+ try:
+ b = qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout)
+ brokerList[url.name()] = b
+ added = True
+ print "Ok"
+ except Exception, e:
+ print e
+
+ print
+ print "Dynamic Routes:"
+ bridges = qmf.getObjects(_class="bridge", dynamic=True)
+ fedExchanges = []
+ for bridge in bridges:
+ if bridge.src not in fedExchanges:
+ fedExchanges.append(bridge.src)
+ if len(fedExchanges) == 0:
+ print " none found"
+ print
+
+ for ex in fedExchanges:
+ print " Exchange %s:" % ex
+ pairs = []
+ for bridge in bridges:
+ if bridge.src == ex:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ found = False
+ for pair in pairs:
+ if pair.matches(fromUrl, toUrl):
+ found = True
+ if not found:
+ pairs.append(RoutePair(fromUrl, toUrl))
+ for pair in pairs:
+ print " %s" % pair
+ print
+
+ print "Static Routes:"
+ bridges = qmf.getObjects(_class="bridge", dynamic=False)
+ if len(bridges) == 0:
+ print " none found"
+ print
+
+ for bridge in bridges:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ leftType = "ex"
+ rightType = "ex"
+ if bridge.srcIsLocal:
+ arrow = "=>"
+ left = bridge.src
+ right = bridge.dest
+ if bridge.srcIsQueue:
+ leftType = "queue"
+ else:
+ arrow = "<="
+ left = bridge.dest
+ right = bridge.src
+ if bridge.srcIsQueue:
+ rightType = "queue"
+
+ if bridge.srcIsQueue:
+ print " %s(%s=%s) %s %s(%s=%s)" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right)
+ else:
+ print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
+ print
+
+ for broker in brokerList:
+ if broker != self.local.name():
+ qmf.delBroker(brokerList[broker])
+
+
+ def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
+ if dynamic and _srclocal:
+ raise Exception("--src-local is not permitted on dynamic routes")
+
+ self.addLink(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ raise Exception("Link failed to create")
+
+ bridges = self.qmf.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
+ if not _quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
+ sys.exit(0)
+
+ if _verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack)
+ if res.status != 0:
+ raise Exception(res.text)
+ if _verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def addQueueRoute(self, remoteBroker, exchange, queue):
+ self.addLink(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ raise Exception("Link failed to create")
+
+ bridges = self.qmf.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if not _quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
+ sys.exit(0)
+
+ if _verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack)
+ if res.status != 0:
+ raise Exception(res.text)
+ if _verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def delQueueRoute(self, remoteBroker, exchange, queue):
+ self.remote = BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not _quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.qmf.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if _verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and _dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if _verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ sys.exit(0)
+ if not _quiet:
+ raise Exception("Route not found")
+
+ def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
+ self.remote = BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not _quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.qmf.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
+ and bridge.dynamic == dynamic:
+ if _verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and _dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if _verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ sys.exit(0)
+ if not _quiet:
+ raise Exception("Route not found")
+
+ def listRoutes(self):
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
+
+ for bridge in bridges:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.getObjectId():
+ myLink = link
+ break
+ if myLink != None:
+ if bridge.dynamic:
+ keyText = "<dynamic>"
+ else:
+ keyText = bridge.key
+ print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
+
+ def clearAllRoutes(self):
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
+
+ for bridge in bridges:
+ if _verbose:
+ myLink = None
+ for link in links:
+ if bridge.linkRef == link.getObjectId():
+ myLink = link
+ break
+ if myLink != None:
+ print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
+ res = bridge.close()
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.text)
+ elif _verbose:
+ print "Ok"
+
+ if _dellink:
+ links = self.qmf.getObjects(_class="link")
+ for link in links:
+ if _verbose:
+ print "Deleting Link: %s:%d... " % (link.host, link.port),
+ res = link.close()
+ if res.status != 0:
+ print "Error: %d - %s" % (res.status, res.text)
+ elif _verbose:
+ print "Ok"
+
+class RoutePair:
+ def __init__(self, fromUrl, toUrl):
+ self.fromUrl = fromUrl
+ self.toUrl = toUrl
+ self.bidir = False
+
+ def __repr__(self):
+ if self.bidir:
+ delimit = "<=>"
+ else:
+ delimit = " =>"
+ return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
+
+ def matches(self, fromUrl, toUrl):
+ if fromUrl == self.fromUrl and toUrl == self.toUrl:
+ return True
+ if toUrl == self.fromUrl and fromUrl == self.toUrl:
+ self.bidir = True
+ return True
+ return False
+
+
+def YN(val):
+ if val == 1:
+ return 'Y'
+ return 'N'
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=", "timeout=")
+ (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
+except:
+ Usage()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
+ if opt[0] == "-v" or opt[0] == "--verbose":
+ _verbose = True
+ if opt[0] == "-q" or opt[0] == "--quiet":
+ _quiet = True
+ if opt[0] == "-d" or opt[0] == "--durable":
+ _durable = True
+ if opt[0] == "-e" or opt[0] == "--del-empty-link":
+ _dellink = True
+ if opt[0] == "-s" or opt[0] == "--src-local":
+ _srclocal = True
+ if opt[0] == "-t" or opt[0] == "--transport":
+ _transport = opt[1]
+ if opt[0] == "--ack":
+ _ack = int(opt[1])
+
+nargs = len(cargs)
+if nargs < 2:
+ Usage()
+if nargs == 2:
+ localBroker = "localhost"
+else:
+ if _srclocal:
+ localBroker = cargs[3]
+ remoteBroker = cargs[2]
+ else:
+ localBroker = cargs[2]
+ if nargs > 3:
+ remoteBroker = cargs[3]
+
+group = cargs[0]
+cmd = cargs[1]
+
+try:
+ rm = RouteManager(localBroker)
+ if group == "link":
+ if cmd == "add":
+ if nargs != 4:
+ Usage()
+ rm.addLink(remoteBroker)
+ elif cmd == "del":
+ if nargs != 4:
+ Usage()
+ rm.delLink(remoteBroker)
+ elif cmd == "list":
+ rm.listLinks()
+
+ elif group == "dynamic":
+ if cmd == "add":
+ if nargs < 5 or nargs > 7:
+ Usage()
+
+ tag = ""
+ excludes = ""
+ if nargs > 5: tag = cargs[5]
+ if nargs > 6: excludes = cargs[6]
+ rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True)
+ elif cmd == "del":
+ if nargs != 5:
+ Usage()
+ else:
+ rm.delRoute(remoteBroker, cargs[4], "", dynamic=True)
+
+ elif group == "route":
+ if cmd == "add":
+ if nargs < 6 or nargs > 8:
+ Usage()
+
+ tag = ""
+ excludes = ""
+ if nargs > 6: tag = cargs[6]
+ if nargs > 7: excludes = cargs[7]
+ rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False)
+ elif cmd == "del":
+ if nargs != 6:
+ Usage()
+ rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False)
+ elif cmd == "map":
+ rm.mapRoutes()
+ else:
+ if cmd == "list":
+ rm.listRoutes()
+ elif cmd == "flush":
+ rm.clearAllRoutes()
+ else:
+ Usage()
+
+ elif group == "queue":
+ if nargs != 6:
+ Usage()
+ if cmd == "add":
+ rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
+ elif cmd == "del":
+ rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
+ else:
+ Usage()
+
+except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+rm.disconnect()
diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat
new file mode 100755
index 0000000000..c6fc5ef0da
--- /dev/null
+++ b/tools/src/py/qpid-stat
@@ -0,0 +1,459 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, Console
+from qpid.disp import Display, Header, Sorter
+
+_host = "localhost"
+_connTimeout = 10
+_types = ""
+_limit = 50
+_increasing = False
+_sortcol = None
+
+def Usage ():
+ print "Usage: qpid-stat [OPTIONS] [broker-addr]"
+ print
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+ print "General Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker connection"
+# print " -n [--numeric] Don't resolve names"
+ print
+ print "Display Options:"
+ print
+ print " -b Show Brokers"
+ print " -c Show Connections"
+# print " -s Show Sessions"
+ print " -e Show Exchanges"
+ print " -q Show Queues"
+ print
+ print " -S [--sort-by] COLNAME Sort by column name"
+ print " -I [--increasing] Sort by increasing value (default = decreasing)"
+ print " -L [--limit] NUM Limit output to NUM rows (default = 50)"
+ print
+ sys.exit (1)
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFFL
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class Broker(object):
+ def __init__(self, qmf, broker):
+ self.broker = broker
+
+ agents = qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
+ self.currentTime = bobj.getTimestamps()[0]
+ try:
+ self.uptime = bobj.uptime
+ except:
+ self.uptime = 0
+ self.connections = {}
+ self.sessions = {}
+ self.exchanges = {}
+ self.queues = {}
+ package = "org.apache.qpid.broker"
+
+ list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
+ for conn in list:
+ if not conn.shadow:
+ self.connections[conn.getObjectId()] = conn
+
+ list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
+ for sess in list:
+ if sess.connectionRef in self.connections:
+ self.sessions[sess.getObjectId()] = sess
+
+ list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
+ for exchange in list:
+ self.exchanges[exchange.getObjectId()] = exchange
+
+ list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
+ for queue in list:
+ self.queues[queue.getObjectId()] = queue
+
+ def getName(self):
+ return self.broker.getUrl()
+
+ def getCurrentTime(self):
+ return self.currentTime
+
+ def getUptime(self):
+ return self.uptime
+
+class BrokerManager(Console):
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def _getCluster(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ return None
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ return None
+
+ self.cluster = clusters[0]
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(_host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
+ if len(subs) == 0:
+ return
+ this = subs[0]
+ remaining = subs[1:]
+ newindent = indent + " "
+ if this == 'b':
+ pass
+ elif this == 'c':
+ if broker:
+ for oid in broker.connections:
+ iconn = broker.connections[oid]
+ self.printConnSub(indent, broker.getName(), iconn)
+ self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
+ sess=sess, exchange=exchange, queue=queue)
+ elif this == 's':
+ pass
+ elif this == 'e':
+ pass
+ elif this == 'q':
+ pass
+ print
+
+ def displayBroker(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header('broker'))
+ heads.append(Header('cluster'))
+ heads.append(Header('uptime', Header.DURATION))
+ heads.append(Header('conn', Header.KMG))
+ heads.append(Header('sess', Header.KMG))
+ heads.append(Header('exch', Header.KMG))
+ heads.append(Header('queue', Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ if self.cluster:
+ ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
+ else:
+ ctext = "<standalone>"
+ row = (broker.getName(), ctext, broker.getUptime(),
+ len(broker.connections), len(broker.sessions),
+ len(broker.exchanges), len(broker.queues))
+ rows.append(row)
+ title = "Brokers"
+ if _sortcol:
+ sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayConn(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header('client-addr'))
+ heads.append(Header('cproc'))
+ heads.append(Header('cpid'))
+ heads.append(Header('auth'))
+ heads.append(Header('connected', Header.DURATION))
+ heads.append(Header('idle', Header.DURATION))
+ heads.append(Header('msgIn', Header.KMG))
+ heads.append(Header('msgOut', Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.connections:
+ conn = broker.connections[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(conn.address)
+ row.append(conn.remoteProcessName)
+ row.append(conn.remotePid)
+ row.append(conn.authIdentity)
+ row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
+ idle = broker.getCurrentTime() - conn.getTimestamps()[0]
+ row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
+ row.append(conn.framesFromClient)
+ row.append(conn.framesToClient)
+ rows.append(row)
+ title = "Connections"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if _sortcol:
+ sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displaySession(self, subs):
+ disp = Display(prefix=" ")
+
+ def displayExchange(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("exchange"))
+ heads.append(Header("type"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("bind", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("msgDrop", Header.KMG))
+ heads.append(Header("byteIn", Header.KMG))
+ heads.append(Header("byteOut", Header.KMG))
+ heads.append(Header("byteDrop", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.exchanges:
+ ex = broker.exchanges[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(ex.durable)
+ row.append(ex.bindingCount)
+ row.append(ex.msgReceives)
+ row.append(ex.msgRoutes)
+ row.append(ex.msgDrops)
+ row.append(ex.byteReceives)
+ row.append(ex.byteRoutes)
+ row.append(ex.byteDrops)
+ rows.append(row)
+ title = "Exchanges"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if _sortcol:
+ sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayQueue(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append(Header('broker'))
+ heads.append(Header("queue"))
+ heads.append(Header("dur", Header.Y))
+ heads.append(Header("autoDel", Header.Y))
+ heads.append(Header("excl", Header.Y))
+ heads.append(Header("msg", Header.KMG))
+ heads.append(Header("msgIn", Header.KMG))
+ heads.append(Header("msgOut", Header.KMG))
+ heads.append(Header("bytes", Header.KMG))
+ heads.append(Header("bytesIn", Header.KMG))
+ heads.append(Header("bytesOut", Header.KMG))
+ heads.append(Header("cons", Header.KMG))
+ heads.append(Header("bind", Header.KMG))
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.queues:
+ q = broker.queues[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(q.name)
+ row.append(q.durable)
+ row.append(q.autoDelete)
+ row.append(q.exclusive)
+ row.append(q.msgDepth)
+ row.append(q.msgTotalEnqueues)
+ row.append(q.msgTotalDequeues)
+ row.append(q.byteDepth)
+ row.append(q.byteTotalEnqueues)
+ row.append(q.byteTotalDequeues)
+ row.append(q.consumerCount)
+ row.append(q.bindingCount)
+ rows.append(row)
+ title = "Queues"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ if _sortcol:
+ sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
+ dispRows = sorter.getSorted()
+ else:
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
+ def displayMain(self, main, subs):
+ if main == 'b': self.displayBroker(subs)
+ elif main == 'c': self.displayConn(subs)
+ elif main == 's': self.displaySession(subs)
+ elif main == 'e': self.displayExchange(subs)
+ elif main == 'q': self.displayQueue(subs)
+
+ def display(self):
+ self._getCluster()
+ if self.cluster:
+ memberList = self.cluster.members.split(";")
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ if _host.find("@") > 0:
+ authString = _host.split("@")[0] + "@"
+ else:
+ authString = ""
+ for host in hostList:
+ b = self.qmf.addBroker(authString + host, _connTimeout)
+ self.brokers.append(Broker(self.qmf, b))
+ else:
+ self.brokers.append(Broker(self.qmf, self.broker))
+
+ self.displayMain(_types[0], _types[1:])
+
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=")
+ (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts)
+except:
+ Usage()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
+ elif opt[0] == "-n" or opt[0] == "--numeric":
+ _numeric = True
+ elif opt[0] == "-S" or opt[0] == "--sort-by":
+ _sortcol = opt[1]
+ elif opt[0] == "-I" or opt[0] == "--increasing":
+ _increasing = True
+ elif opt[0] == "-L" or opt[0] == "--limit":
+ _limit = int(opt[1])
+ elif len(opt[0]) == 2:
+ char = opt[0][1]
+ if "bcseq".find(char) != -1:
+ _types += char
+ else:
+ Usage()
+ else:
+ Usage()
+
+if len(_types) == 0:
+ Usage()
+
+nargs = len(cargs)
+bm = BrokerManager()
+
+if nargs == 1:
+ _host = cargs[0]
+
+try:
+ bm.SetBroker(_host)
+ bm.display()
+except KeyboardInterrupt:
+ print
+except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+bm.Disconnect()
diff --git a/tools/src/py/qpid-tool b/tools/src/py/qpid-tool
new file mode 100755
index 0000000000..05afcc9732
--- /dev/null
+++ b/tools/src/py/qpid-tool
@@ -0,0 +1,197 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import socket
+from cmd import Cmd
+from qpid.connection import ConnectionFailed, Timeout
+from qpid.managementdata import ManagementData
+from shlex import split
+from qpid.disp import Display
+from qpid.peer import Closed
+
+class Mcli (Cmd):
+ """ Management Command Interpreter """
+
+ def __init__ (self, dataObject, dispObject):
+ Cmd.__init__ (self)
+ self.dataObject = dataObject
+ self.dispObject = dispObject
+ self.dataObject.setCli (self)
+ self.prompt = "qpid: "
+
+ def emptyline (self):
+ pass
+
+ def setPromptMessage (self, p):
+ if p == None:
+ self.prompt = "qpid: "
+ else:
+ self.prompt = "qpid[%s]: " % p
+
+ def do_help (self, data):
+ print "Management Tool for QPID"
+ print
+ print "Commands:"
+ print " list - Print summary of existing objects by class"
+ print " list <className> - Print list of objects of the specified class"
+ print " list <className> active - Print list of non-deleted objects of the specified class"
+ print " show <className> - Print contents of all objects of specified class"
+ print " show <className> active - Print contents of all non-deleted objects of specified class"
+ print " show <list-of-IDs> - Print contents of one or more objects (infer className)"
+ print " show <className> <list-of-IDs> - Print contents of one or more objects"
+ print " list is space-separated, ranges may be specified (i.e. 1004-1010)"
+ print " call <ID> <methodName> [<args>] - Invoke a method on an object"
+ print " schema - Print summary of object classes seen on the target"
+ print " schema <className> - Print details of an object class"
+ print " set time-format short - Select short timestamp format (default)"
+ print " set time-format long - Select long timestamp format"
+ print " id [<ID>] - Display translations of display object ids"
+ print " quit or ^D - Exit the program"
+ print
+
+ def complete_set (self, text, line, begidx, endidx):
+ """ Command completion for the 'set' command """
+ tokens = split (line)
+ if len (tokens) < 2:
+ return ["time-format "]
+ elif tokens[1] == "time-format":
+ if len (tokens) == 2:
+ return ["long", "short"]
+ elif len (tokens) == 3:
+ if "long".find (text) == 0:
+ return ["long"]
+ elif "short".find (text) == 0:
+ return ["short"]
+ elif "time-format".find (text) == 0:
+ return ["time-format "]
+ return []
+
+ def do_set (self, data):
+ tokens = split (data)
+ try:
+ if tokens[0] == "time-format":
+ self.dispObject.do_setTimeFormat (tokens[1])
+ except:
+ pass
+
+ def do_id (self, data):
+ self.dataObject.do_id(data)
+
+ def complete_schema (self, text, line, begidx, endidx):
+ tokens = split (line)
+ if len (tokens) > 2:
+ return []
+ return self.dataObject.classCompletions (text)
+
+ def do_schema (self, data):
+ self.dataObject.do_schema (data)
+
+ def complete_list (self, text, line, begidx, endidx):
+ tokens = split (line)
+ if len (tokens) > 2:
+ return []
+ return self.dataObject.classCompletions (text)
+
+ def do_list (self, data):
+ self.dataObject.do_list (data)
+
+ def do_show (self, data):
+ self.dataObject.do_show (data)
+
+ def do_call (self, data):
+ try:
+ self.dataObject.do_call (data)
+ except ValueError, e:
+ print "ValueError:", e
+
+ def do_EOF (self, data):
+ print "quit"
+ try:
+ self.dataObject.do_exit ()
+ except:
+ pass
+ return True
+
+ def do_quit (self, data):
+ try:
+ self.dataObject.do_exit ()
+ except:
+ pass
+ return True
+
+ def postcmd (self, stop, line):
+ return stop
+
+ def postloop (self):
+ print "Exiting..."
+ self.dataObject.close ()
+
+def Usage ():
+ print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]"
+ print
+ sys.exit (1)
+
+#=========================================================
+# Main Program
+#=========================================================
+
+# Get host name and port if specified on the command line
+cargs = sys.argv[1:]
+_host = "localhost"
+
+if len (cargs) > 0:
+ _host = cargs[0]
+
+if _host[0] == '-':
+ Usage()
+
+disp = Display ()
+
+# Attempt to make a connection to the target broker
+try:
+ data = ManagementData (disp, _host)
+except socket.error, e:
+ print "Socket Error (%s):" % _host, e[1]
+ sys.exit (1)
+except IOError, e:
+ print "IOError: %d - %s: %s" % (e.errno, e.strerror, e.filename)
+ sys.exit (1)
+except ConnectionFailed, e:
+ print "Connect Failed %d - %s" % (e[0], e[1])
+ sys.exit(1)
+except Exception, e:
+ if str(e).find ("Exchange not found") != -1:
+ print "Management not enabled on broker: Use '-m yes' option on broker startup."
+ else:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+ sys.exit(1)
+
+# Instantiate the CLI interpreter and launch it.
+cli = Mcli (data, disp)
+print ("Management Tool for QPID")
+try:
+ cli.cmdloop ()
+except Closed, e:
+ print "Connection to Broker Lost:", e
+ sys.exit (1)