summaryrefslogtreecommitdiff
path: root/python/commands
diff options
context:
space:
mode:
Diffstat (limited to 'python/commands')
-rwxr-xr-xpython/commands/qpid-cluster327
-rwxr-xr-xpython/commands/qpid-config572
-rwxr-xr-xpython/commands/qpid-printevents74
-rwxr-xr-xpython/commands/qpid-queue-stats146
-rwxr-xr-xpython/commands/qpid-route524
-rwxr-xr-xpython/commands/qpid-stat459
-rwxr-xr-xpython/commands/qpid-tool197
7 files changed, 0 insertions, 2299 deletions
diff --git a/python/commands/qpid-cluster b/python/commands/qpid-cluster
deleted file mode 100755
index 6d64765184..0000000000
--- a/python/commands/qpid-cluster
+++ /dev/null
@@ -1,327 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-import getopt
-import sys
-import locale
-import socket
-import re
-from qmf.console import Session
-
-class Config:
- def __init__(self):
- self._host = "localhost"
- self._connTimeout = 10
- self._stopId = None
- self._stopAll = False
- self._force = False
- self._numeric = False
- self._showConn = False
- self._delConn = None
-
-def usage ():
- print "Usage: qpid-cluster [OPTIONS] [broker-addr]"
- print
- print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
- print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
- print
- print "Options:"
- print " --timeout seconds (10) Maximum time to wait for broker connection"
- print " -C [--all-connections] View client connections to all cluster members"
- print " -c [--connections] ID View client connections to specified member"
- print " -d [--del-connection] HOST:PORT"
- print " Disconnect a client connection"
- print " -s [--stop] ID Stop one member of the cluster by its ID"
- print " -k [--all-stop] Shut down the whole cluster"
- print " -f [--force] Suppress the 'are-you-sure?' prompt"
- print " -n [--numeric] Don't resolve names"
- print
-
-class IpAddr:
- def __init__(self, text):
- if text.find("@") != -1:
- tokens = text.split("@")
- text = tokens[1]
- if text.find(":") != -1:
- tokens = text.split(":")
- text = tokens[0]
- self.port = int(tokens[1])
- else:
- self.port = 5672
- self.dottedQuad = socket.gethostbyname(text)
- nums = self.dottedQuad.split(".")
- self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
-
- def bestAddr(self, addrPortList):
- bestDiff = 0xFFFFFFFFL
- bestAddr = None
- for addrPort in addrPortList:
- diff = IpAddr(addrPort[0]).addr ^ self.addr
- if diff < bestDiff:
- bestDiff = diff
- bestAddr = addrPort
- return bestAddr
-
-class BrokerManager:
- def __init__(self, config):
- self.config = config
- self.brokerName = None
- self.qmf = None
- self.broker = None
-
- def SetBroker(self, brokerUrl):
- self.url = brokerUrl
- self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
- agents = self.qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == 0:
- self.brokerAgent = a
-
- def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
-
- def _getClusters(self):
- packages = self.qmf.getPackages()
- if "org.apache.qpid.cluster" not in packages:
- raise Exception("Clustering is not installed on the broker.")
-
- clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
- if len(clusters) == 0:
- raise Exception("Clustering is installed but not enabled on the broker.")
-
- return clusters
-
- def _getHostList(self, urlList):
- hosts = []
- hostAddr = IpAddr(self.config._host)
- for url in urlList:
- if url.find("amqp:") != 0:
- raise Exception("Invalid URL 1")
- url = url[5:]
- addrs = str(url).split(",")
- addrList = []
- for addr in addrs:
- tokens = addr.split(":")
- if len(tokens) != 3:
- raise Exception("Invalid URL 2")
- addrList.append((tokens[1], tokens[2]))
-
- # Find the address in the list that is most likely to be in the same subnet as the address
- # with which we made the original QMF connection. This increases the probability that we will
- # be able to reach the cluster member.
-
- best = hostAddr.bestAddr(addrList)
- bestUrl = best[0] + ":" + best[1]
- hosts.append(bestUrl)
- return hosts
-
- def overview(self):
- clusters = self._getClusters()
- cluster = clusters[0]
- memberList = cluster.members.split(";")
- idList = cluster.memberIDs.split(";")
-
- print " Cluster Name: %s" % cluster.clusterName
- print "Cluster Status: %s" % cluster.status
- print " Cluster Size: %d" % cluster.clusterSize
- print " Members: ID=%s URL=%s" % (idList[0], memberList[0])
- for idx in range(1,len(idList)):
- print " : ID=%s URL=%s" % (idList[idx], memberList[idx])
-
- def stopMember(self, id):
- clusters = self._getClusters()
- cluster = clusters[0]
- idList = cluster.memberIDs.split(";")
- if id not in idList:
- raise Exception("No member with matching ID found")
-
- if not self.config._force:
- prompt = "Warning: "
- if len(idList) == 1:
- prompt += "This command will shut down the last running cluster member."
- else:
- prompt += "This command will shut down a cluster member."
- prompt += " Are you sure? [N]: "
-
- confirm = raw_input(prompt)
- if len(confirm) == 0 or confirm[0].upper() != 'Y':
- raise Exception("Operation canceled")
-
- cluster.stopClusterNode(id)
-
- def stopAll(self):
- clusters = self._getClusters()
- if not self.config._force:
- prompt = "Warning: This command will shut down the entire cluster."
- prompt += " Are you sure? [N]: "
-
- confirm = raw_input(prompt)
- if len(confirm) == 0 or confirm[0].upper() != 'Y':
- raise Exception("Operation canceled")
-
- cluster = clusters[0]
- cluster.stopFullCluster()
-
- def showConnections(self):
- clusters = self._getClusters()
- cluster = clusters[0]
- memberList = cluster.members.split(";")
- idList = cluster.memberIDs.split(";")
- displayList = []
- hostList = self._getHostList(memberList)
- self.qmf.delBroker(self.broker)
- self.broker = None
- self.brokers = []
-
- idx = 0
- for host in hostList:
- if self.config._showConn == "all" or self.config._showConn == idList[idx] or self.config._delConn:
- self.brokers.append(self.qmf.addBroker(host, self.config._connTimeout))
- displayList.append(idList[idx])
- idx += 1
-
- idx = 0
- found = False
- for broker in self.brokers:
- if not self.config._delConn:
- print "Clients on Member: ID=%s:" % displayList[idx]
- connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
- for conn in connList:
- if not conn.shadow:
- if self.config._numeric or self.config._delConn:
- a = conn.address
- else:
- tokens = conn.address.split(":")
- try:
- hostList = socket.gethostbyaddr(tokens[0])
- host = hostList[0]
- except:
- host = tokens[0]
- a = host + ":" + tokens[1]
- if self.config._delConn:
- tokens = self.config._delConn.split(":")
- ip = socket.gethostbyname(tokens[0])
- toDelete = ip + ":" + tokens[1]
- if a == toDelete:
- print "Closing connection from client: %s" % a
- conn.close()
- found = True
- else:
- print " %s" % a
- idx += 1
- if not self.config._delConn:
- print
- if self.config._delConn and not found:
- print "Client connection '%s' not found" % self.config._delConn
-
- for broker in self.brokers:
- self.qmf.delBroker(broker)
-
-
-def main(argv=None):
- if argv is None: argv = sys.argv
- try:
- config = Config()
- try:
- longOpts = ("stop=", "all-stop", "force", "connections=", "all-connections" "del-connection=", "numeric", "timeout=")
- (optlist, encArgs) = getopt.gnu_getopt(argv[1:], "s:kfCc:d:n", longOpts)
- except:
- usage()
- return 1
-
- try:
- encoding = locale.getpreferredencoding()
- cargs = [a.decode(encoding) for a in encArgs]
- except:
- cargs = encArgs
-
- count = 0
- for opt in optlist:
- if opt[0] == "--timeout":
- config._connTimeout = int(opt[1])
- if config._connTimeout == 0:
- config._connTimeout = None
- if opt[0] == "-s" or opt[0] == "--stop":
- config._stopId = opt[1]
- if len(config._stopId.split(":")) != 2:
- raise Exception("Member ID must be of form: <host or ip>:<number>")
- count += 1
- if opt[0] == "-k" or opt[0] == "--all-stop":
- config._stopAll = True
- count += 1
- if opt[0] == "-f" or opt[0] == "--force":
- config._force = True
- if opt[0] == "-n" or opt[0] == "--numeric":
- config._numeric = True
- if opt[0] == "-C" or opt[0] == "--all-connections":
- config._showConn = "all"
- count += 1
- if opt[0] == "-c" or opt[0] == "--connections":
- config._showConn = opt[1]
- if len(config._showConn.split(":")) != 2:
- raise Exception("Member ID must be of form: <host or ip>:<number>")
- count += 1
- if opt[0] == "-d" or opt[0] == "--del-connection":
- config._delConn = opt[1]
- if len(config._delConn.split(":")) != 2:
- raise Exception("Connection must be of form: <host or ip>:<port>")
- count += 1
-
- if count > 1:
- print "Only one command option may be supplied"
- print
- usage()
- return 1
-
- nargs = len(cargs)
- bm = BrokerManager(config)
-
- if nargs == 1:
- config._host = cargs[0]
-
- try:
- bm.SetBroker(config._host)
- if config._stopId:
- bm.stopMember(config._stopId)
- elif config._stopAll:
- bm.stopAll()
- elif config._showConn or config._delConn:
- bm.showConnections()
- else:
- bm.overview()
- except KeyboardInterrupt:
- print
- except Exception,e:
- if str(e).find("connection aborted") > 0:
- # we expect this when asking the connected broker to shut down
- return 0
- raise Exception("Failed: %s - %s" % (e.__class__.__name__, e))
-
- bm.Disconnect()
- except Exception, e:
- print str(e)
- return 1
-
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/python/commands/qpid-config b/python/commands/qpid-config
deleted file mode 100755
index 0db42bc6c7..0000000000
--- a/python/commands/qpid-config
+++ /dev/null
@@ -1,572 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-import getopt
-import sys
-import locale
-from qmf.console import Session
-
-_recursive = False
-_host = "localhost"
-_connTimeout = 10
-_altern_ex = None
-_passive = False
-_durable = False
-_clusterDurable = False
-_if_empty = True
-_if_unused = True
-_fileCount = 8
-_fileSize = 24
-_maxQueueSize = None
-_maxQueueCount = None
-_limitPolicy = None
-_order = None
-_msgSequence = False
-_ive = False
-_eventGeneration = None
-_file = None
-
-FILECOUNT = "qpid.file_count"
-FILESIZE = "qpid.file_size"
-MAX_QUEUE_SIZE = "qpid.max_size"
-MAX_QUEUE_COUNT = "qpid.max_count"
-POLICY_TYPE = "qpid.policy_type"
-CLUSTER_DURABLE = "qpid.persist_last_node"
-LVQ = "qpid.last_value_queue"
-LVQNB = "qpid.last_value_queue_no_browse"
-MSG_SEQUENCE = "qpid.msg_sequence"
-IVE = "qpid.ive"
-QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
-
-def Usage ():
- print "Usage: qpid-config [OPTIONS]"
- print " qpid-config [OPTIONS] exchanges [filter-string]"
- print " qpid-config [OPTIONS] queues [filter-string]"
- print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]"
- print " qpid-config [OPTIONS] del exchange <name>"
- print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]"
- print " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]"
- print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]"
- print " <for type xml> [-f -|filename]"
- print " <for type header> [all|any] k1=v1 [, k2=v2...]"
- print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"
- print
- print "Options:"
- print " --timeout seconds (10) Maximum time to wait for broker connection"
- print " -b [ --bindings ] Show bindings in queue or exchange list"
- print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker"
- print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
- print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
- print
- print "Add Queue Options:"
- print " --alternate-exchange [name of the alternate exchange]"
- print " The alternate-exchange field specifies how messages on this queue should"
- print " be treated when they are rejected by a subscriber, or when they are"
- print " orphaned by queue deletion. When present, rejected or orphaned messages"
- print " MUST be routed to the alternate-exchange. In all cases the messages MUST"
- print " be removed from the queue."
- print " --passive Do not actually change the broker state (queue will not be created)"
- print " --durable Queue is durable"
- print " --cluster-durable Queue becomes durable if there is only one functioning cluster node"
- print " --file-count N (8) Number of files in queue's persistence journal"
- print " --file-size N (24) File size in pages (64Kib/page)"
- print " --max-queue-size N Maximum in-memory queue size as bytes"
- print " --max-queue-count N Maximum in-memory queue size as a number of messages"
- print " --limit-policy [none | reject | flow-to-disk | ring | ring-strict]"
- print " Action taken when queue limit is reached:"
- print " none (default) - Use broker's default policy"
- print " reject - Reject enqueued messages"
- print " flow-to-disk - Page messages to disk"
- print " ring - Replace oldest unacquired message with new"
- print " ring-strict - Replace oldest message, reject if oldest is acquired"
- print " --order [fifo | lvq | lvq-no-browse]"
- print " Set queue ordering policy:"
- print " fifo (default) - First in, first out"
- print " lvq - Last Value Queue ordering, allows queue browsing"
- print " lvq-no-browse - Last Value Queue ordering, browsing clients may lose data"
- print " --generate-queue-events N"
- print " If set to 1, every enqueue will generate an event that can be processed by"
- print " registered listeners (e.g. for replication). If set to 2, events will be"
- print " generated for enqueues and dequeues"
- print
- print "Del Queue Options:"
- print " --force Force delete of queue even if it's currently used or it's not empty"
- print " --force-if-not-empty Force delete of queue even if it's not empty"
- print " --force-if-used Force delete of queue even if it's currently used"
- print
- print "Add Exchange <type> values:"
- print " direct Direct exchange for point-to-point communication"
- print " fanout Fanout exchange for broadcast communication"
- print " topic Topic exchange that routes messages using binding keys with wildcards"
- print " headers Headers exchange that matches header fields against the binding keys"
- print
- print "Add Exchange Options:"
- print " --alternate-exchange [name of the alternate exchange]"
- print " In the event that a message cannot be routed, this is the name of the exchange to"
- print " which the message will be sent. Messages transferred using message.transfer will"
- print " be routed to the alternate-exchange only if they are sent with the \"none\""
- print " accept-mode, and the discard-unroutable delivery property is set to false, and"
- print " there is no queue to route to for the given message according to the bindings"
- print " on this exchange."
- print " --passive Do not actually change the broker state (exchange will not be created)"
- print " --durable Exchange is durable"
- print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header"
- print " with a value that increments for each message forwarded."
- print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference"
- print " to the last message forwarded and enqueuing that message to newly bound"
- print " queues."
- print
- sys.exit (1)
-
-
-#
-# helpers for the arg parsing in bind(). return multiple values; "ok"
-# followed by the resultant args
-
-#
-# accept -f followed by either
-# a filename or "-", for stdin. pull the bits into a string, to be
-# passed to the xml binding.
-#
-def snarf_xquery_args():
- if not _file:
- print "Invalid args to bind xml: need an input file or stdin"
- return [False]
- if _file == "-":
- res = sys.stdin.read()
- else:
- f = open(_file) # let this signal if it can't find it
- res = f.read()
- f.close()
- return [True, res]
-
-#
-# look for "any"/"all" and grok the rest of argv into a map
-#
-def snarf_header_args(cargs):
- if len(cargs) < 2:
- print "Invalid args to bind headers: need 'any'/'all' plus conditions"
- return [False]
- op = cargs[0]
- if op == "all" or op == "any":
- kv = {}
- for thing in cargs[1:]:
- k_and_v = thing.split("=")
- kv[k_and_v[0]] = k_and_v[1]
- return [True, op, kv]
- else:
- print "Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'"
- return [False]
-
-class BrokerManager:
- def __init__ (self):
- self.brokerName = None
- self.qmf = None
- self.broker = None
-
- def SetBroker (self, brokerUrl):
- self.url = brokerUrl
- self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
- agents = self.qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == 0:
- self.brokerAgent = a
-
- def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
-
- def Overview (self):
- exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
- queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
- print "Total Exchanges: %d" % len (exchanges)
- etype = {}
- for ex in exchanges:
- if ex.type not in etype:
- etype[ex.type] = 1
- else:
- etype[ex.type] = etype[ex.type] + 1
- for typ in etype:
- print "%15s: %d" % (typ, etype[typ])
-
- print
- print " Total Queues: %d" % len (queues)
- _durable = 0
- for queue in queues:
- if queue.durable:
- _durable = _durable + 1
- print " durable: %d" % _durable
- print " non-durable: %d" % (len (queues) - _durable)
-
- def ExchangeList (self, filter):
- exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
- caption1 = "Type "
- caption2 = "Exchange Name"
- maxNameLen = len(caption2)
- for ex in exchanges:
- if self.match(ex.name, filter):
- if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
- print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
- line = ""
- for i in range(((maxNameLen + len(caption1)) / 5) + 5):
- line += "====="
- print line
-
- for ex in exchanges:
- if self.match (ex.name, filter):
- print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
- args = ex.arguments
- if ex.durable: print "--durable",
- if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
- if IVE in args and args[IVE] == 1: print "--ive",
- if ex.altExchange:
- print "--alternate-exchange=%s" % ex._altExchange_.name,
- print
-
- def ExchangeListRecurse (self, filter):
- exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
- bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
- queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
- for ex in exchanges:
- if self.match (ex.name, filter):
- print "Exchange '%s' (%s)" % (ex.name, ex.type)
- for bind in bindings:
- if bind.exchangeRef == ex.getObjectId():
- qname = "<unknown>"
- queue = self.findById (queues, bind.queueRef)
- if queue != None:
- qname = queue.name
- print " bind [%s] => %s" % (bind.bindingKey, qname)
-
-
- def QueueList (self, filter):
- queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
-
- caption = "Queue Name"
- maxNameLen = len(caption)
- for q in queues:
- if self.match (q.name, filter):
- if len(q.name) > maxNameLen: maxNameLen = len(q.name)
- print "%-*s Attributes" % (maxNameLen, caption)
- line = ""
- for i in range((maxNameLen / 5) + 5):
- line += "====="
- print line
-
- for q in queues:
- if self.match (q.name, filter):
- print "%-*s " % (maxNameLen, q.name),
- args = q.arguments
- if q.durable: print "--durable",
- if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
- if q.autoDelete: print "auto-del",
- if q.exclusive: print "excl",
- if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
- if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
- if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE],
- if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT],
- if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
- if LVQ in args and args[LVQ] == 1: print "--order lvq",
- if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse",
- if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
- if q.altExchange:
- print "--alternate-exchange=%s" % q._altExchange_.name,
- print
-
- def QueueListRecurse (self, filter):
- exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
- bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
- queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
- for queue in queues:
- if self.match (queue.name, filter):
- print "Queue '%s'" % queue.name
- for bind in bindings:
- if bind.queueRef == queue.getObjectId():
- ename = "<unknown>"
- ex = self.findById (exchanges, bind.exchangeRef)
- if ex != None:
- ename = ex.name
- if ename == "":
- ename = "''"
- print " bind [%s] => %s" % (bind.bindingKey, ename)
-
- def AddExchange (self, args):
- if len (args) < 2:
- Usage ()
- etype = args[0]
- ename = args[1]
- declArgs = {}
- if _msgSequence:
- declArgs[MSG_SEQUENCE] = 1
- if _ive:
- declArgs[IVE] = 1
- if _altern_ex != None:
- self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs)
- else:
- self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, passive=_passive, durable=_durable, arguments=declArgs)
-
- def DelExchange (self, args):
- if len (args) < 1:
- Usage ()
- ename = args[0]
- self.broker.getAmqpSession().exchange_delete (exchange=ename)
-
- def AddQueue (self, args):
- if len (args) < 1:
- Usage ()
- qname = args[0]
- declArgs = {}
- if _durable:
- declArgs[FILECOUNT] = _fileCount
- declArgs[FILESIZE] = _fileSize
-
- if _maxQueueSize:
- declArgs[MAX_QUEUE_SIZE] = _maxQueueSize
- if _maxQueueCount:
- declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
- if _limitPolicy:
- if _limitPolicy == "none":
- pass
- elif _limitPolicy == "reject":
- declArgs[POLICY_TYPE] = "reject"
- elif _limitPolicy == "flow-to-disk":
- declArgs[POLICY_TYPE] = "flow_to_disk"
- elif _limitPolicy == "ring":
- declArgs[POLICY_TYPE] = "ring"
- elif _limitPolicy == "ring-strict":
- declArgs[POLICY_TYPE] = "ring_strict"
-
- if _clusterDurable:
- declArgs[CLUSTER_DURABLE] = 1
- if _order:
- if _order == "fifo":
- pass
- elif _order == "lvq":
- declArgs[LVQ] = 1
- elif _order == "lvq-no-browse":
- declArgs[LVQNB] = 1
- if _eventGeneration:
- declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration
-
- if _altern_ex != None:
- self.broker.getAmqpSession().queue_declare (queue=qname, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, arguments=declArgs)
- else:
- self.broker.getAmqpSession().queue_declare (queue=qname, passive=_passive, durable=_durable, arguments=declArgs)
-
- def DelQueue (self, args):
- if len (args) < 1:
- Usage ()
- qname = args[0]
- self.broker.getAmqpSession().queue_delete (queue=qname, if_empty=_if_empty, if_unused=_if_unused)
-
- def Bind (self, args):
- if len (args) < 2:
- Usage ()
- ename = args[0]
- qname = args[1]
- key = ""
- if len (args) > 2:
- key = args[2]
-
- # query the exchange to determine its type.
- res = self.broker.getAmqpSession().exchange_query(ename)
-
- # type of the xchg determines the processing of the rest of
- # argv. if it's an xml xchg, we want to find a file
- # containing an x-query, and pass that. if it's a headers
- # exchange, we need to pass either "any" or all, followed by a
- # map containing key/value pairs. if neither of those, extra
- # args are ignored.
- ok = True
- args = None
- if res.type == "xml":
- # this checks/imports the -f arg
- [ok, xquery] = snarf_xquery_args()
- args = { "xquery" : xquery }
- # print args
- else:
- if res.type == "headers":
- [ok, op, kv] = snarf_header_args(cargs[4:])
- args = kv
- args["x-match"] = op
-
- if not ok:
- sys.exit(1)
-
- self.broker.getAmqpSession().exchange_bind (queue=qname,
- exchange=ename,
- binding_key=key,
- arguments=args)
-
- def Unbind (self, args):
- if len (args) < 2:
- Usage ()
- ename = args[0]
- qname = args[1]
- key = ""
- if len (args) > 2:
- key = args[2]
- self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key)
-
- def findById (self, items, id):
- for item in items:
- if item.getObjectId() == id:
- return item
- return None
-
- def match (self, name, filter):
- if filter == "":
- return True
- if name.find (filter) == -1:
- return False
- return True
-
-def YN (bool):
- if bool:
- return 'Y'
- return 'N'
-
-
-##
-## Main Program
-##
-
-try:
- longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=",
- "file-size=", "max-queue-size=", "max-queue-count=", "limit-policy=",
- "order=", "sequence", "ive", "generate-queue-events=", "force", "force-if-not-empty",
- "force_if_used", "alternate-exchange=", "passive", "timeout=", "file=")
- (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:bf:", longOpts)
-except:
- Usage ()
-
-try:
- encoding = locale.getpreferredencoding()
- cargs = [a.decode(encoding) for a in encArgs]
-except:
- cargs = encArgs
-
-for opt in optlist:
- if opt[0] == "-b" or opt[0] == "--bindings":
- _recursive = True
- if opt[0] == "-a" or opt[0] == "--broker-addr":
- _host = opt[1]
- if opt[0] == "-f" or opt[0] == "--file":
- _file = opt[1]
- if opt[0] == "--timeout":
- _connTimeout = int(opt[1])
- if _connTimeout == 0:
- _connTimeout = None
- if opt[0] == "--alternate-exchange":
- _altern_ex = opt[1]
- if opt[0] == "--passive":
- _passive = True
- if opt[0] == "--durable":
- _durable = True
- if opt[0] == "--cluster-durable":
- _clusterDurable = True
- if opt[0] == "--file-count":
- _fileCount = int (opt[1])
- if opt[0] == "--file-size":
- _fileSize = int (opt[1])
- if opt[0] == "--max-queue-size":
- _maxQueueSize = int (opt[1])
- if opt[0] == "--max-queue-count":
- _maxQueueCount = int (opt[1])
- if opt[0] == "--limit-policy":
- _limitPolicy = opt[1]
- if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", "ring-strict"):
- print "Error: Invalid --limit-policy argument"
- sys.exit(1)
- if opt[0] == "--order":
- _order = opt[1]
- if _order not in ("fifo", "lvq", "lvq-no-browse"):
- print "Error: Invalid --order argument"
- sys.exit(1)
- if opt[0] == "--sequence":
- _msgSequence = True
- if opt[0] == "--ive":
- _ive = True
- if opt[0] == "--generate-queue-events":
- _eventGeneration = int (opt[1])
- if opt[0] == "--force":
- _if_empty = False
- _if_unused = False
- if opt[0] == "--force-if-not-empty":
- _if_empty = False
- if opt[0] == "--force-if-used":
- _if_unused = False
-
-
-nargs = len (cargs)
-bm = BrokerManager ()
-
-try:
- bm.SetBroker(_host)
- if nargs == 0:
- bm.Overview ()
- else:
- cmd = cargs[0]
- modifier = ""
- if nargs > 1:
- modifier = cargs[1]
- if cmd == "exchanges":
- if _recursive:
- bm.ExchangeListRecurse (modifier)
- else:
- bm.ExchangeList (modifier)
- elif cmd == "queues":
- if _recursive:
- bm.QueueListRecurse (modifier)
- else:
- bm.QueueList (modifier)
- elif cmd == "add":
- if modifier == "exchange":
- bm.AddExchange (cargs[2:])
- elif modifier == "queue":
- bm.AddQueue (cargs[2:])
- else:
- Usage ()
- elif cmd == "del":
- if modifier == "exchange":
- bm.DelExchange (cargs[2:])
- elif modifier == "queue":
- bm.DelQueue (cargs[2:])
- else:
- Usage ()
- elif cmd == "bind":
- bm.Bind (cargs[1:])
- elif cmd == "unbind":
- bm.Unbind (cargs[1:])
- else:
- Usage ()
-except KeyboardInterrupt:
- print
-except IOError, e:
- print e
- sys.exit(1)
-except Exception,e:
- print "Failed: %s: %s" % (e.__class__.__name__, e)
- sys.exit(1)
-
-bm.Disconnect()
diff --git a/python/commands/qpid-printevents b/python/commands/qpid-printevents
deleted file mode 100755
index 0c1b618a1f..0000000000
--- a/python/commands/qpid-printevents
+++ /dev/null
@@ -1,74 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-import optparse
-import sys
-import socket
-from time import time, strftime, gmtime, sleep
-from qmf.console import Console, Session
-
-class EventConsole(Console):
- def event(self, broker, event):
- print event
-
- def brokerConnected(self, broker):
- print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl()
-
- def brokerDisconnected(self, broker):
- print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
-
-
-##
-## Main Program
-##
-def main():
- _usage = "%prog [options] [broker-addr]..."
- _description = \
-"""Collect and print events from one or more Qpid message brokers. If no broker-addr is
-supplied, %prog will connect to 'localhost:5672'.
-broker-addr is of the form: [username/password@] hostname | ip-address [:<port>]
-ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost
-"""
- p = optparse.OptionParser(usage=_usage, description=_description)
-
- options, arguments = p.parse_args()
- if len(arguments) == 0:
- arguments.append("localhost")
-
- console = EventConsole()
- session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True)
- brokers = []
- for host in arguments:
- brokers.append(session.addBroker(host))
-
- try:
- while (True):
- sleep(10)
- except KeyboardInterrupt:
- for broker in brokers:
- session.delBroker(broker)
- print
- sys.exit(0)
-
-if __name__ == '__main__':
- main()
-
diff --git a/python/commands/qpid-queue-stats b/python/commands/qpid-queue-stats
deleted file mode 100755
index 3b8a0dcb19..0000000000
--- a/python/commands/qpid-queue-stats
+++ /dev/null
@@ -1,146 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-import optparse
-import sys
-import re
-import socket
-import qpid
-from threading import Condition
-from qmf.console import Session, Console
-from qpid.peer import Closed
-from qpid.connection import Connection, ConnectionFailed
-from time import sleep
-
-class BrokerManager(Console):
- def __init__(self, host):
- self.url = host
- self.objects = {}
- self.filter = None
- self.session = Session(self, rcvEvents=False, rcvHeartbeats=False,
- userBindings=True, manageConnections=True)
- self.broker = self.session.addBroker(self.url)
- self.firstError = True
-
- def setFilter(self,filter):
- self.filter = filter
-
- def brokerConnected(self, broker):
- if not self.firstError:
- print "*** Broker connected"
- self.firstError = False
-
- def brokerDisconnected(self, broker):
- print "*** Broker connection lost - %s, retrying..." % broker.getError()
- self.firstError = False
- self.objects.clear()
-
- def objectProps(self, broker, record):
- className = record.getClassKey().getClassName()
- if className != "queue":
- return
-
- id = record.getObjectId().__repr__()
- if id not in self.objects:
- self.objects[id] = (record.name, None, None)
-
- def objectStats(self, broker, record):
- className = record.getClassKey().getClassName()
- if className != "queue":
- return
-
- id = record.getObjectId().__repr__()
- if id not in self.objects:
- return
-
- (name, first, last) = self.objects[id]
- if first == None:
- self.objects[id] = (name, record, None)
- return
-
- if len(self.filter) > 0 :
- match = False
-
- for x in self.filter:
- if x.match(name):
- match = True
- break
- if match == False:
- return
-
- if last == None:
- lastSample = first
- else:
- lastSample = last
-
- self.objects[id] = (name, first, record)
-
- deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0])
- if deltaTime < 1000000000.0:
- return
- enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \
- (deltaTime / 1000000000.0)
- dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \
- (deltaTime / 1000000000.0)
- print "%-41s%10.2f%11d%13.2f%13.2f" % \
- (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate)
- sys.stdout.flush()
-
-
- def Display (self):
- self.session.bindClass("org.apache.qpid.broker", "queue")
- print "Queue Name Sec Depth Enq Rate Deq Rate"
- print "========================================================================================"
- sys.stdout.flush()
- try:
- while True:
- sleep (1)
- if self.firstError and self.broker.getError():
- self.firstError = False
- print "*** Error: %s, retrying..." % self.broker.getError()
- except KeyboardInterrupt:
- print
- self.session.delBroker(self.broker)
-
-##
-## Main Program
-##
-def main():
- p = optparse.OptionParser()
- p.add_option('--broker-address','-a', default='localhost' , help='broker-addr is in the form: [username/password@] hostname | ip-address [:<port>] \n ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost')
- p.add_option('--filter','-f' ,default=None ,help='a list of comma separated queue names (regex are accepted) to show')
-
- options, arguments = p.parse_args()
-
- host = options.broker_address
- filter = []
- if options.filter != None:
- for s in options.filter.split(","):
- filter.append(re.compile(s))
-
- bm = BrokerManager(host)
- bm.setFilter(filter)
- bm.Display()
-
-if __name__ == '__main__':
- main()
-
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
deleted file mode 100755
index 9965047000..0000000000
--- a/python/commands/qpid-route
+++ /dev/null
@@ -1,524 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import getopt
-import sys
-import socket
-import os
-import locale
-from qmf.console import Session, BrokerURL
-
-def Usage():
- print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
- print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
- print
- print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]"
- print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>"
- print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>"
- print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>"
- print " qpid-route [OPTIONS] route list [<dest-broker>]"
- print " qpid-route [OPTIONS] route flush [<dest-broker>]"
- print " qpid-route [OPTIONS] route map [<broker>]"
- print
- print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
- print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
- print " qpid-route [OPTIONS] link list [<dest-broker>]"
- print
- print "Options:"
- print " --timeout seconds (10) Maximum time to wait for broker connection"
- print " -v [ --verbose ] Verbose output"
- print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
- print " -d [ --durable ] Added configuration shall be durable"
- print " -e [ --del-empty-link ] Delete link after deleting last route on the link"
- print " -s [ --src-local ] Make connection to source broker (push route)"
- print " --ack N Acknowledge transfers over the bridge in batches of N"
- print " -t <transport> [ --transport <transport>]"
- print " Specify transport to use for links, defaults to tcp"
- print
- print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]"
- print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
- print
- sys.exit(1)
-
-_verbose = False
-_quiet = False
-_durable = False
-_dellink = False
-_srclocal = False
-_transport = "tcp"
-_ack = 0
-_connTimeout = 10
-
-class RouteManager:
- def __init__(self, localBroker):
- self.local = BrokerURL(localBroker)
- self.remote = None
- self.qmf = Session()
- self.broker = self.qmf.addBroker(localBroker, _connTimeout)
-
- def disconnect(self):
- self.qmf.delBroker(self.broker)
-
- def getLink(self):
- links = self.qmf.getObjects(_class="link")
- for link in links:
- if self.remote.match(link.host, link.port):
- return link
- return None
-
- def addLink(self, remoteBroker):
- self.remote = BrokerURL(remoteBroker)
- if self.local.match(self.remote.host, self.remote.port):
- raise Exception("Linking broker to itself is not permitted")
-
- brokers = self.qmf.getObjects(_class="broker")
- broker = brokers[0]
- link = self.getLink()
- if link == None:
- if not self.remote.authName or self.remote.authName == "anonymous":
- mech = "ANONYMOUS"
- else:
- mech = "PLAIN"
- res = broker.connect(self.remote.host, self.remote.port, _durable,
- mech, self.remote.authName or "", self.remote.authPass or "",
- _transport)
- if _verbose:
- print "Connect method returned:", res.status, res.text
-
- def delLink(self, remoteBroker):
- self.remote = BrokerURL(remoteBroker)
- brokers = self.qmf.getObjects(_class="broker")
- broker = brokers[0]
- link = self.getLink()
- if link == None:
- raise Exception("Link not found")
-
- res = link.close()
- if _verbose:
- print "Close method returned:", res.status, res.text
-
- def listLinks(self):
- links = self.qmf.getObjects(_class="link")
- if len(links) == 0:
- print "No Links Found"
- else:
- print
- print "Host Port Transport Durable State Last Error"
- print "============================================================================="
- for link in links:
- print "%-16s%-8d%-13s%c %-18s%s" % \
- (link.host, link.port, link.transport, YN(link.durable), link.state, link.lastError)
-
- def mapRoutes(self):
- qmf = self.qmf
- print
- print "Finding Linked Brokers:"
-
- brokerList = {}
- brokerList[self.local.name()] = self.broker
- print " %s... Ok" % self.local
-
- added = True
- while added:
- added = False
- links = qmf.getObjects(_class="link")
- for link in links:
- url = BrokerURL("%s:%d" % (link.host, link.port))
- if url.name() not in brokerList:
- print " %s..." % url.name(),
- try:
- b = qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout)
- brokerList[url.name()] = b
- added = True
- print "Ok"
- except Exception, e:
- print e
-
- print
- print "Dynamic Routes:"
- bridges = qmf.getObjects(_class="bridge", dynamic=True)
- fedExchanges = []
- for bridge in bridges:
- if bridge.src not in fedExchanges:
- fedExchanges.append(bridge.src)
- if len(fedExchanges) == 0:
- print " none found"
- print
-
- for ex in fedExchanges:
- print " Exchange %s:" % ex
- pairs = []
- for bridge in bridges:
- if bridge.src == ex:
- link = bridge._linkRef_
- fromUrl = "%s:%s" % (link.host, link.port)
- toUrl = bridge.getBroker().getUrl()
- found = False
- for pair in pairs:
- if pair.matches(fromUrl, toUrl):
- found = True
- if not found:
- pairs.append(RoutePair(fromUrl, toUrl))
- for pair in pairs:
- print " %s" % pair
- print
-
- print "Static Routes:"
- bridges = qmf.getObjects(_class="bridge", dynamic=False)
- if len(bridges) == 0:
- print " none found"
- print
-
- for bridge in bridges:
- link = bridge._linkRef_
- fromUrl = "%s:%s" % (link.host, link.port)
- toUrl = bridge.getBroker().getUrl()
- leftType = "ex"
- rightType = "ex"
- if bridge.srcIsLocal:
- arrow = "=>"
- left = bridge.src
- right = bridge.dest
- if bridge.srcIsQueue:
- leftType = "queue"
- else:
- arrow = "<="
- left = bridge.dest
- right = bridge.src
- if bridge.srcIsQueue:
- rightType = "queue"
-
- if bridge.srcIsQueue:
- print " %s(%s=%s) %s %s(%s=%s)" % \
- (toUrl, leftType, left, arrow, fromUrl, rightType, right)
- else:
- print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
- (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
- print
-
- for broker in brokerList:
- if broker != self.local.name():
- qmf.delBroker(brokerList[broker])
-
-
- def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
- if dynamic and _srclocal:
- raise Exception("--src-local is not permitted on dynamic routes")
-
- self.addLink(remoteBroker)
- link = self.getLink()
- if link == None:
- raise Exception("Link failed to create")
-
- bridges = self.qmf.getObjects(_class="bridge")
- for bridge in bridges:
- if bridge.linkRef == link.getObjectId() and \
- bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
- if not _quiet:
- raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
- sys.exit(0)
-
- if _verbose:
- print "Creating inter-broker binding..."
- res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack)
- if res.status != 0:
- raise Exception(res.text)
- if _verbose:
- print "Bridge method returned:", res.status, res.text
-
- def addQueueRoute(self, remoteBroker, exchange, queue):
- self.addLink(remoteBroker)
- link = self.getLink()
- if link == None:
- raise Exception("Link failed to create")
-
- bridges = self.qmf.getObjects(_class="bridge")
- for bridge in bridges:
- if bridge.linkRef == link.getObjectId() and \
- bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
- if not _quiet:
- raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
- sys.exit(0)
-
- if _verbose:
- print "Creating inter-broker binding..."
- res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack)
- if res.status != 0:
- raise Exception(res.text)
- if _verbose:
- print "Bridge method returned:", res.status, res.text
-
- def delQueueRoute(self, remoteBroker, exchange, queue):
- self.remote = BrokerURL(remoteBroker)
- link = self.getLink()
- if link == None:
- if not _quiet:
- raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
- sys.exit(0)
-
- bridges = self.qmf.getObjects(_class="bridge")
- for bridge in bridges:
- if bridge.linkRef == link.getObjectId() and \
- bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
- if _verbose:
- print "Closing bridge..."
- res = bridge.close()
- if res.status != 0:
- raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
- if len(bridges) == 1 and _dellink:
- link = self.getLink()
- if link == None:
- sys.exit(0)
- if _verbose:
- print "Last bridge on link, closing link..."
- res = link.close()
- if res.status != 0:
- raise Exception("Error closing link: %d - %s" % (res.status, res.text))
- sys.exit(0)
- if not _quiet:
- raise Exception("Route not found")
-
- def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
- self.remote = BrokerURL(remoteBroker)
- link = self.getLink()
- if link == None:
- if not _quiet:
- raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
- sys.exit(0)
-
- bridges = self.qmf.getObjects(_class="bridge")
- for bridge in bridges:
- if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
- and bridge.dynamic == dynamic:
- if _verbose:
- print "Closing bridge..."
- res = bridge.close()
- if res.status != 0:
- raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
- if len(bridges) == 1 and _dellink:
- link = self.getLink()
- if link == None:
- sys.exit(0)
- if _verbose:
- print "Last bridge on link, closing link..."
- res = link.close()
- if res.status != 0:
- raise Exception("Error closing link: %d - %s" % (res.status, res.text))
- sys.exit(0)
- if not _quiet:
- raise Exception("Route not found")
-
- def listRoutes(self):
- links = self.qmf.getObjects(_class="link")
- bridges = self.qmf.getObjects(_class="bridge")
-
- for bridge in bridges:
- myLink = None
- for link in links:
- if bridge.linkRef == link.getObjectId():
- myLink = link
- break
- if myLink != None:
- if bridge.dynamic:
- keyText = "<dynamic>"
- else:
- keyText = bridge.key
- print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
-
- def clearAllRoutes(self):
- links = self.qmf.getObjects(_class="link")
- bridges = self.qmf.getObjects(_class="bridge")
-
- for bridge in bridges:
- if _verbose:
- myLink = None
- for link in links:
- if bridge.linkRef == link.getObjectId():
- myLink = link
- break
- if myLink != None:
- print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
- res = bridge.close()
- if res.status != 0:
- print "Error: %d - %s" % (res.status, res.text)
- elif _verbose:
- print "Ok"
-
- if _dellink:
- links = self.qmf.getObjects(_class="link")
- for link in links:
- if _verbose:
- print "Deleting Link: %s:%d... " % (link.host, link.port),
- res = link.close()
- if res.status != 0:
- print "Error: %d - %s" % (res.status, res.text)
- elif _verbose:
- print "Ok"
-
-class RoutePair:
- def __init__(self, fromUrl, toUrl):
- self.fromUrl = fromUrl
- self.toUrl = toUrl
- self.bidir = False
-
- def __repr__(self):
- if self.bidir:
- delimit = "<=>"
- else:
- delimit = " =>"
- return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
-
- def matches(self, fromUrl, toUrl):
- if fromUrl == self.fromUrl and toUrl == self.toUrl:
- return True
- if toUrl == self.fromUrl and fromUrl == self.toUrl:
- self.bidir = True
- return True
- return False
-
-
-def YN(val):
- if val == 1:
- return 'Y'
- return 'N'
-
-##
-## Main Program
-##
-
-try:
- longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=", "timeout=")
- (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
-except:
- Usage()
-
-try:
- encoding = locale.getpreferredencoding()
- cargs = [a.decode(encoding) for a in encArgs]
-except:
- cargs = encArgs
-
-for opt in optlist:
- if opt[0] == "--timeout":
- _connTimeout = int(opt[1])
- if _connTimeout == 0:
- _connTimeout = None
- if opt[0] == "-v" or opt[0] == "--verbose":
- _verbose = True
- if opt[0] == "-q" or opt[0] == "--quiet":
- _quiet = True
- if opt[0] == "-d" or opt[0] == "--durable":
- _durable = True
- if opt[0] == "-e" or opt[0] == "--del-empty-link":
- _dellink = True
- if opt[0] == "-s" or opt[0] == "--src-local":
- _srclocal = True
- if opt[0] == "-t" or opt[0] == "--transport":
- _transport = opt[1]
- if opt[0] == "--ack":
- _ack = int(opt[1])
-
-nargs = len(cargs)
-if nargs < 2:
- Usage()
-if nargs == 2:
- localBroker = "localhost"
-else:
- if _srclocal:
- localBroker = cargs[3]
- remoteBroker = cargs[2]
- else:
- localBroker = cargs[2]
- if nargs > 3:
- remoteBroker = cargs[3]
-
-group = cargs[0]
-cmd = cargs[1]
-
-try:
- rm = RouteManager(localBroker)
- if group == "link":
- if cmd == "add":
- if nargs != 4:
- Usage()
- rm.addLink(remoteBroker)
- elif cmd == "del":
- if nargs != 4:
- Usage()
- rm.delLink(remoteBroker)
- elif cmd == "list":
- rm.listLinks()
-
- elif group == "dynamic":
- if cmd == "add":
- if nargs < 5 or nargs > 7:
- Usage()
-
- tag = ""
- excludes = ""
- if nargs > 5: tag = cargs[5]
- if nargs > 6: excludes = cargs[6]
- rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True)
- elif cmd == "del":
- if nargs != 5:
- Usage()
- else:
- rm.delRoute(remoteBroker, cargs[4], "", dynamic=True)
-
- elif group == "route":
- if cmd == "add":
- if nargs < 6 or nargs > 8:
- Usage()
-
- tag = ""
- excludes = ""
- if nargs > 6: tag = cargs[6]
- if nargs > 7: excludes = cargs[7]
- rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False)
- elif cmd == "del":
- if nargs != 6:
- Usage()
- rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False)
- elif cmd == "map":
- rm.mapRoutes()
- else:
- if cmd == "list":
- rm.listRoutes()
- elif cmd == "flush":
- rm.clearAllRoutes()
- else:
- Usage()
-
- elif group == "queue":
- if nargs != 6:
- Usage()
- if cmd == "add":
- rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
- elif cmd == "del":
- rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
- else:
- Usage()
-
-except Exception,e:
- print "Failed: %s - %s" % (e.__class__.__name__, e)
- sys.exit(1)
-
-rm.disconnect()
diff --git a/python/commands/qpid-stat b/python/commands/qpid-stat
deleted file mode 100755
index c6fc5ef0da..0000000000
--- a/python/commands/qpid-stat
+++ /dev/null
@@ -1,459 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-import getopt
-import sys
-import locale
-import socket
-import re
-from qmf.console import Session, Console
-from qpid.disp import Display, Header, Sorter
-
-_host = "localhost"
-_connTimeout = 10
-_types = ""
-_limit = 50
-_increasing = False
-_sortcol = None
-
-def Usage ():
- print "Usage: qpid-stat [OPTIONS] [broker-addr]"
- print
- print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
- print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
- print
- print "General Options:"
- print " --timeout seconds (10) Maximum time to wait for broker connection"
-# print " -n [--numeric] Don't resolve names"
- print
- print "Display Options:"
- print
- print " -b Show Brokers"
- print " -c Show Connections"
-# print " -s Show Sessions"
- print " -e Show Exchanges"
- print " -q Show Queues"
- print
- print " -S [--sort-by] COLNAME Sort by column name"
- print " -I [--increasing] Sort by increasing value (default = decreasing)"
- print " -L [--limit] NUM Limit output to NUM rows (default = 50)"
- print
- sys.exit (1)
-
-class IpAddr:
- def __init__(self, text):
- if text.find("@") != -1:
- tokens = text.split("@")
- text = tokens[1]
- if text.find(":") != -1:
- tokens = text.split(":")
- text = tokens[0]
- self.port = int(tokens[1])
- else:
- self.port = 5672
- self.dottedQuad = socket.gethostbyname(text)
- nums = self.dottedQuad.split(".")
- self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
-
- def bestAddr(self, addrPortList):
- bestDiff = 0xFFFFFFFFL
- bestAddr = None
- for addrPort in addrPortList:
- diff = IpAddr(addrPort[0]).addr ^ self.addr
- if diff < bestDiff:
- bestDiff = diff
- bestAddr = addrPort
- return bestAddr
-
-class Broker(object):
- def __init__(self, qmf, broker):
- self.broker = broker
-
- agents = qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == 0:
- self.brokerAgent = a
-
- bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
- self.currentTime = bobj.getTimestamps()[0]
- try:
- self.uptime = bobj.uptime
- except:
- self.uptime = 0
- self.connections = {}
- self.sessions = {}
- self.exchanges = {}
- self.queues = {}
- package = "org.apache.qpid.broker"
-
- list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
- for conn in list:
- if not conn.shadow:
- self.connections[conn.getObjectId()] = conn
-
- list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
- for sess in list:
- if sess.connectionRef in self.connections:
- self.sessions[sess.getObjectId()] = sess
-
- list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
- for exchange in list:
- self.exchanges[exchange.getObjectId()] = exchange
-
- list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
- for queue in list:
- self.queues[queue.getObjectId()] = queue
-
- def getName(self):
- return self.broker.getUrl()
-
- def getCurrentTime(self):
- return self.currentTime
-
- def getUptime(self):
- return self.uptime
-
-class BrokerManager(Console):
- def __init__(self):
- self.brokerName = None
- self.qmf = None
- self.broker = None
- self.brokers = []
- self.cluster = None
-
- def SetBroker(self, brokerUrl):
- self.url = brokerUrl
- self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
- agents = self.qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == 0:
- self.brokerAgent = a
-
- def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
-
- def _getCluster(self):
- packages = self.qmf.getPackages()
- if "org.apache.qpid.cluster" not in packages:
- return None
-
- clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
- if len(clusters) == 0:
- print "Clustering is installed but not enabled on the broker."
- return None
-
- self.cluster = clusters[0]
-
- def _getHostList(self, urlList):
- hosts = []
- hostAddr = IpAddr(_host)
- for url in urlList:
- if url.find("amqp:") != 0:
- raise Exception("Invalid URL 1")
- url = url[5:]
- addrs = str(url).split(",")
- addrList = []
- for addr in addrs:
- tokens = addr.split(":")
- if len(tokens) != 3:
- raise Exception("Invalid URL 2")
- addrList.append((tokens[1], tokens[2]))
-
- # Find the address in the list that is most likely to be in the same subnet as the address
- # with which we made the original QMF connection. This increases the probability that we will
- # be able to reach the cluster member.
-
- best = hostAddr.bestAddr(addrList)
- bestUrl = best[0] + ":" + best[1]
- hosts.append(bestUrl)
- return hosts
-
- def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
- if len(subs) == 0:
- return
- this = subs[0]
- remaining = subs[1:]
- newindent = indent + " "
- if this == 'b':
- pass
- elif this == 'c':
- if broker:
- for oid in broker.connections:
- iconn = broker.connections[oid]
- self.printConnSub(indent, broker.getName(), iconn)
- self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
- sess=sess, exchange=exchange, queue=queue)
- elif this == 's':
- pass
- elif this == 'e':
- pass
- elif this == 'q':
- pass
- print
-
- def displayBroker(self, subs):
- disp = Display(prefix=" ")
- heads = []
- heads.append(Header('broker'))
- heads.append(Header('cluster'))
- heads.append(Header('uptime', Header.DURATION))
- heads.append(Header('conn', Header.KMG))
- heads.append(Header('sess', Header.KMG))
- heads.append(Header('exch', Header.KMG))
- heads.append(Header('queue', Header.KMG))
- rows = []
- for broker in self.brokers:
- if self.cluster:
- ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
- else:
- ctext = "<standalone>"
- row = (broker.getName(), ctext, broker.getUptime(),
- len(broker.connections), len(broker.sessions),
- len(broker.exchanges), len(broker.queues))
- rows.append(row)
- title = "Brokers"
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
- dispRows = sorter.getSorted()
- else:
- dispRows = rows
- disp.formattedTable(title, heads, dispRows)
-
- def displayConn(self, subs):
- disp = Display(prefix=" ")
- heads = []
- if self.cluster:
- heads.append(Header('broker'))
- heads.append(Header('client-addr'))
- heads.append(Header('cproc'))
- heads.append(Header('cpid'))
- heads.append(Header('auth'))
- heads.append(Header('connected', Header.DURATION))
- heads.append(Header('idle', Header.DURATION))
- heads.append(Header('msgIn', Header.KMG))
- heads.append(Header('msgOut', Header.KMG))
- rows = []
- for broker in self.brokers:
- for oid in broker.connections:
- conn = broker.connections[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(conn.address)
- row.append(conn.remoteProcessName)
- row.append(conn.remotePid)
- row.append(conn.authIdentity)
- row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
- idle = broker.getCurrentTime() - conn.getTimestamps()[0]
- row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
- row.append(conn.framesFromClient)
- row.append(conn.framesToClient)
- rows.append(row)
- title = "Connections"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
- dispRows = sorter.getSorted()
- else:
- dispRows = rows
- disp.formattedTable(title, heads, dispRows)
-
- def displaySession(self, subs):
- disp = Display(prefix=" ")
-
- def displayExchange(self, subs):
- disp = Display(prefix=" ")
- heads = []
- if self.cluster:
- heads.append(Header('broker'))
- heads.append(Header("exchange"))
- heads.append(Header("type"))
- heads.append(Header("dur", Header.Y))
- heads.append(Header("bind", Header.KMG))
- heads.append(Header("msgIn", Header.KMG))
- heads.append(Header("msgOut", Header.KMG))
- heads.append(Header("msgDrop", Header.KMG))
- heads.append(Header("byteIn", Header.KMG))
- heads.append(Header("byteOut", Header.KMG))
- heads.append(Header("byteDrop", Header.KMG))
- rows = []
- for broker in self.brokers:
- for oid in broker.exchanges:
- ex = broker.exchanges[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(ex.name)
- row.append(ex.type)
- row.append(ex.durable)
- row.append(ex.bindingCount)
- row.append(ex.msgReceives)
- row.append(ex.msgRoutes)
- row.append(ex.msgDrops)
- row.append(ex.byteReceives)
- row.append(ex.byteRoutes)
- row.append(ex.byteDrops)
- rows.append(row)
- title = "Exchanges"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
- dispRows = sorter.getSorted()
- else:
- dispRows = rows
- disp.formattedTable(title, heads, dispRows)
-
- def displayQueue(self, subs):
- disp = Display(prefix=" ")
- heads = []
- if self.cluster:
- heads.append(Header('broker'))
- heads.append(Header("queue"))
- heads.append(Header("dur", Header.Y))
- heads.append(Header("autoDel", Header.Y))
- heads.append(Header("excl", Header.Y))
- heads.append(Header("msg", Header.KMG))
- heads.append(Header("msgIn", Header.KMG))
- heads.append(Header("msgOut", Header.KMG))
- heads.append(Header("bytes", Header.KMG))
- heads.append(Header("bytesIn", Header.KMG))
- heads.append(Header("bytesOut", Header.KMG))
- heads.append(Header("cons", Header.KMG))
- heads.append(Header("bind", Header.KMG))
- rows = []
- for broker in self.brokers:
- for oid in broker.queues:
- q = broker.queues[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(q.name)
- row.append(q.durable)
- row.append(q.autoDelete)
- row.append(q.exclusive)
- row.append(q.msgDepth)
- row.append(q.msgTotalEnqueues)
- row.append(q.msgTotalDequeues)
- row.append(q.byteDepth)
- row.append(q.byteTotalEnqueues)
- row.append(q.byteTotalDequeues)
- row.append(q.consumerCount)
- row.append(q.bindingCount)
- rows.append(row)
- title = "Queues"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
- if _sortcol:
- sorter = Sorter(heads, rows, _sortcol, _limit, _increasing)
- dispRows = sorter.getSorted()
- else:
- dispRows = rows
- disp.formattedTable(title, heads, dispRows)
-
- def displayMain(self, main, subs):
- if main == 'b': self.displayBroker(subs)
- elif main == 'c': self.displayConn(subs)
- elif main == 's': self.displaySession(subs)
- elif main == 'e': self.displayExchange(subs)
- elif main == 'q': self.displayQueue(subs)
-
- def display(self):
- self._getCluster()
- if self.cluster:
- memberList = self.cluster.members.split(";")
- hostList = self._getHostList(memberList)
- self.qmf.delBroker(self.broker)
- self.broker = None
- if _host.find("@") > 0:
- authString = _host.split("@")[0] + "@"
- else:
- authString = ""
- for host in hostList:
- b = self.qmf.addBroker(authString + host, _connTimeout)
- self.brokers.append(Broker(self.qmf, b))
- else:
- self.brokers.append(Broker(self.qmf, self.broker))
-
- self.displayMain(_types[0], _types[1:])
-
-
-##
-## Main Program
-##
-
-try:
- longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing", "timeout=")
- (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts)
-except:
- Usage()
-
-try:
- encoding = locale.getpreferredencoding()
- cargs = [a.decode(encoding) for a in encArgs]
-except:
- cargs = encArgs
-
-for opt in optlist:
- if opt[0] == "--timeout":
- _connTimeout = int(opt[1])
- if _connTimeout == 0:
- _connTimeout = None
- elif opt[0] == "-n" or opt[0] == "--numeric":
- _numeric = True
- elif opt[0] == "-S" or opt[0] == "--sort-by":
- _sortcol = opt[1]
- elif opt[0] == "-I" or opt[0] == "--increasing":
- _increasing = True
- elif opt[0] == "-L" or opt[0] == "--limit":
- _limit = int(opt[1])
- elif len(opt[0]) == 2:
- char = opt[0][1]
- if "bcseq".find(char) != -1:
- _types += char
- else:
- Usage()
- else:
- Usage()
-
-if len(_types) == 0:
- Usage()
-
-nargs = len(cargs)
-bm = BrokerManager()
-
-if nargs == 1:
- _host = cargs[0]
-
-try:
- bm.SetBroker(_host)
- bm.display()
-except KeyboardInterrupt:
- print
-except Exception,e:
- print "Failed: %s - %s" % (e.__class__.__name__, e)
- sys.exit(1)
-
-bm.Disconnect()
diff --git a/python/commands/qpid-tool b/python/commands/qpid-tool
deleted file mode 100755
index 05afcc9732..0000000000
--- a/python/commands/qpid-tool
+++ /dev/null
@@ -1,197 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-import getopt
-import sys
-import socket
-from cmd import Cmd
-from qpid.connection import ConnectionFailed, Timeout
-from qpid.managementdata import ManagementData
-from shlex import split
-from qpid.disp import Display
-from qpid.peer import Closed
-
-class Mcli (Cmd):
- """ Management Command Interpreter """
-
- def __init__ (self, dataObject, dispObject):
- Cmd.__init__ (self)
- self.dataObject = dataObject
- self.dispObject = dispObject
- self.dataObject.setCli (self)
- self.prompt = "qpid: "
-
- def emptyline (self):
- pass
-
- def setPromptMessage (self, p):
- if p == None:
- self.prompt = "qpid: "
- else:
- self.prompt = "qpid[%s]: " % p
-
- def do_help (self, data):
- print "Management Tool for QPID"
- print
- print "Commands:"
- print " list - Print summary of existing objects by class"
- print " list <className> - Print list of objects of the specified class"
- print " list <className> active - Print list of non-deleted objects of the specified class"
- print " show <className> - Print contents of all objects of specified class"
- print " show <className> active - Print contents of all non-deleted objects of specified class"
- print " show <list-of-IDs> - Print contents of one or more objects (infer className)"
- print " show <className> <list-of-IDs> - Print contents of one or more objects"
- print " list is space-separated, ranges may be specified (i.e. 1004-1010)"
- print " call <ID> <methodName> [<args>] - Invoke a method on an object"
- print " schema - Print summary of object classes seen on the target"
- print " schema <className> - Print details of an object class"
- print " set time-format short - Select short timestamp format (default)"
- print " set time-format long - Select long timestamp format"
- print " id [<ID>] - Display translations of display object ids"
- print " quit or ^D - Exit the program"
- print
-
- def complete_set (self, text, line, begidx, endidx):
- """ Command completion for the 'set' command """
- tokens = split (line)
- if len (tokens) < 2:
- return ["time-format "]
- elif tokens[1] == "time-format":
- if len (tokens) == 2:
- return ["long", "short"]
- elif len (tokens) == 3:
- if "long".find (text) == 0:
- return ["long"]
- elif "short".find (text) == 0:
- return ["short"]
- elif "time-format".find (text) == 0:
- return ["time-format "]
- return []
-
- def do_set (self, data):
- tokens = split (data)
- try:
- if tokens[0] == "time-format":
- self.dispObject.do_setTimeFormat (tokens[1])
- except:
- pass
-
- def do_id (self, data):
- self.dataObject.do_id(data)
-
- def complete_schema (self, text, line, begidx, endidx):
- tokens = split (line)
- if len (tokens) > 2:
- return []
- return self.dataObject.classCompletions (text)
-
- def do_schema (self, data):
- self.dataObject.do_schema (data)
-
- def complete_list (self, text, line, begidx, endidx):
- tokens = split (line)
- if len (tokens) > 2:
- return []
- return self.dataObject.classCompletions (text)
-
- def do_list (self, data):
- self.dataObject.do_list (data)
-
- def do_show (self, data):
- self.dataObject.do_show (data)
-
- def do_call (self, data):
- try:
- self.dataObject.do_call (data)
- except ValueError, e:
- print "ValueError:", e
-
- def do_EOF (self, data):
- print "quit"
- try:
- self.dataObject.do_exit ()
- except:
- pass
- return True
-
- def do_quit (self, data):
- try:
- self.dataObject.do_exit ()
- except:
- pass
- return True
-
- def postcmd (self, stop, line):
- return stop
-
- def postloop (self):
- print "Exiting..."
- self.dataObject.close ()
-
-def Usage ():
- print "Usage: qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]"
- print
- sys.exit (1)
-
-#=========================================================
-# Main Program
-#=========================================================
-
-# Get host name and port if specified on the command line
-cargs = sys.argv[1:]
-_host = "localhost"
-
-if len (cargs) > 0:
- _host = cargs[0]
-
-if _host[0] == '-':
- Usage()
-
-disp = Display ()
-
-# Attempt to make a connection to the target broker
-try:
- data = ManagementData (disp, _host)
-except socket.error, e:
- print "Socket Error (%s):" % _host, e[1]
- sys.exit (1)
-except IOError, e:
- print "IOError: %d - %s: %s" % (e.errno, e.strerror, e.filename)
- sys.exit (1)
-except ConnectionFailed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit(1)
-except Exception, e:
- if str(e).find ("Exchange not found") != -1:
- print "Management not enabled on broker: Use '-m yes' option on broker startup."
- else:
- print "Failed: %s - %s" % (e.__class__.__name__, e)
- sys.exit(1)
-
-# Instantiate the CLI interpreter and launch it.
-cli = Mcli (data, disp)
-print ("Management Tool for QPID")
-try:
- cli.cmdloop ()
-except Closed, e:
- print "Connection to Broker Lost:", e
- sys.exit (1)