diff options
| author | Ted Ross <tross@apache.org> | 2013-10-07 21:20:20 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-10-07 21:20:20 +0000 |
| commit | a4a2f3f4c0986c74d4cf9277b649f498c8b1aae3 (patch) | |
| tree | 5c4162ec757024151febd9029cb7224c2d77adfa | |
| parent | ee5511f752a7316760ed74f6493a7433ffed4923 (diff) | |
| download | qpid-python-a4a2f3f4c0986c74d4cf9277b649f498c8b1aae3.tar.gz | |
QPID-5213 - Added a CLI tool to access management data in the router.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1530071 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/dispatch/config.sh | 3 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 35 | ||||
| -rwxr-xr-x | qpid/extras/dispatch/tools/src/py/qdstat | 283 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tools/src/py/qdtoollibs/__init__.py | 21 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tools/src/py/qdtoollibs/disp.py | 270 |
5 files changed, 605 insertions, 7 deletions
diff --git a/qpid/extras/dispatch/config.sh b/qpid/extras/dispatch/config.sh index f398830ee6..3c96464ea9 100644 --- a/qpid/extras/dispatch/config.sh +++ b/qpid/extras/dispatch/config.sh @@ -22,5 +22,6 @@ cd $(dirname ${BASH_SOURCE[0]}) > /dev/null export QPID_DISPATCH_HOME=$(pwd) cd - > /dev/null -export PYTHONPATH=$QPID_DISPATCH_HOME/python:$PYTHONPATH +export PYTHONPATH=$QPID_DISPATCH_HOME/python:$QPID_DISPATCH_HOME/tools/src/py:$PYTHONPATH +export PATH=$QPID_DISPATCH_HOME/tools/src/py:$PATH diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 90e2e0c9dc..244a438667 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -429,6 +429,12 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (addr) { // + // If the incoming link is an endpoint link, count this as an ingress delivery. + // + if (rlink->link_type == DX_LINK_ENDPOINT) + addr->deliveries_ingress++; + + // // To field is valid and contains a known destination. Handle the various // cases for forwarding. // @@ -678,6 +684,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); + dx_field_iterator_t *iter = 0; // // If this link is not a router link and it has no source address, we can't @@ -688,6 +695,25 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) return 0; } + + // + // If this is an endpoint link with a source address, make sure the address is + // appropriate for endpoint links. If it is not a local or mobile address, (i.e. + // a router or area address), it cannot be bound to an endpoint link. + // + if(r_src && !is_router && !is_dynamic) { + iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); + unsigned char prefix = dx_field_iterator_octet(iter); + dx_field_iterator_reset(iter); + + if (prefix != 'L' && prefix != 'M') { + dx_field_iterator_free(iter); + pn_link_close(pn_link); + dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src); + return 0; + } + } + // // Create a router_link record for this link. Some of the fields will be // modified in the different cases below. @@ -725,19 +751,16 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) // assign it an ephemeral and routable address. If it has a non-dymanic // address, that address needs to be set up in the address list. // - dx_field_iterator_t *iter; - char temp_addr[1000]; // FIXME - dx_address_t *addr; + char temp_addr[1000]; // FIXME + dx_address_t *addr; if (is_dynamic) { dx_router_generate_temp_addr(router, temp_addr, 1000); iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); pn_terminus_set_address(dx_link_source(link), temp_addr); dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr); - } else { - iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); + } else dx_log(module, LOG_INFO, "Registered local address: %s", r_src); - } hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { diff --git a/qpid/extras/dispatch/tools/src/py/qdstat b/qpid/extras/dispatch/tools/src/py/qdstat new file mode 100755 index 0000000000..f9895b0198 --- /dev/null +++ b/qpid/extras/dispatch/tools/src/py/qdstat @@ -0,0 +1,283 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +from optparse import OptionParser, OptionGroup +import sys +import locale +import socket +import re +from proton import Messenger, Message + +home = os.environ.get("QD_TOOLS_HOME", os.path.normpath("/usr/share/qd-tools")) +sys.path.append(os.path.join(home, "python")) + +from qdtoollibs import Display, Header, Sorter, YN, Commas, TimeLong + + +class Config: + def __init__(self): + self._host = "0.0.0.0" + self._connTimeout = 10 + self._types = "" + self._limit = 50 + self._increasing = False + self._sortcol = None + +config = Config() +conn_options = {} + +def OptionsAndArguments(argv): + """ Set global variables for options, return arguments """ + + global config + global conn_options + + usage = \ +"""%prog -g [options] + %prog -c [options] + %prog -l [options] + %prog -n [options] + %prog -a [options] + %prog -m [options]""" + + parser = OptionParser(usage=usage) + + group1 = OptionGroup(parser, "General Options") + group1.add_option("-b", "--bus", action="store", type="string", default="0.0.0.0", metavar="<url>", + help="URL of the messaging bus to connect to") + group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", + help="Maximum time to wait for connection (in seconds)") + group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", + help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + group1.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)") + group1.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)") + parser.add_option_group(group1) + + group2 = OptionGroup(parser, "Command Options") + group2.add_option("-g", "--general", help="Show General Broker Stats", action="store_const", const="g", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") + group2.add_option("-l", "--links", help="Show Router Links", action="store_const", const="l", dest="show") + group2.add_option("-n", "--nodes", help="Show Router Nodes", action="store_const", const="n", dest="show") + group2.add_option("-a", "--address", help="Show Router Addresses", action="store_const", const="a", dest="show") + group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + parser.add_option_group(group2) + + opts, args = parser.parse_args(args=argv) + + if not opts.show: + parser.error("You must specify one of these options: -g, -c, -l, -n, -a, or -m. For details, try $ qdstat --help") + + config._types = opts.show + config._host = opts.bus + config._connTimeout = opts.timeout + + return args + + +class BusManager: + def __init__(self): + pass + + def SetHost(self, host): + self.M = Messenger() + self.M.start() + self.M.route("amqp:/*", "amqp://%s/$1" % host) + self.address = "amqp:/_local/agent" + self.reply = "amqp:/reply-address/0001" # FIX THIS! + self.M.subscribe(self.reply) + + def Disconnect(self): + self.M.stop() + + def displayConn(self): + pass + + def _addr_class(self, addr): + if not addr: + return "-" + if addr[0] == 'M' : return "global" + if addr[0] == 'R' : return "router" + if addr[0] == 'A' : return "area" + if addr[0] == 'L' : return "local" + return "unknown: %s" % addr[0] + + def _addr_text(self, addr): + if not addr: + return "-" + return addr[1:] + + def displayRouterLinks(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header("type")) + heads.append(Header("dir")) + heads.append(Header("rindex")) + heads.append(Header("class")) + heads.append(Header("addr")) + rows = [] + + request = Message() + response = Message() + + request.address = self.address + request.reply_to = self.reply + request.correlation_id = 1 + request.properties = {u'operation':u'GET', u'type':u'org.apache.qpid.dispatch.router.link'} + + self.M.put(request) + self.M.send() + + self.M.recv() + self.M.get(response) + + for link in response.body: + row = [] + row.append(link['link-type']) + row.append(link['link-dir']) + if link['link-type'] == "router": + row.append(link['index']) + else: + row.append('-') + row.append(self._addr_class(link['owning-addr'])) + row.append(self._addr_text(link['owning-addr'])) + rows.append(row) + title = "Router Links" + dispRows = rows + disp.formattedTable(title, heads, dispRows) + + def displayAddresses(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header("class")) + heads.append(Header("address")) + heads.append(Header("in-proc", Header.Y)) + heads.append(Header("local", Header.COMMAS)) + heads.append(Header("remote", Header.COMMAS)) + heads.append(Header("in", Header.COMMAS)) + heads.append(Header("out", Header.COMMAS)) + heads.append(Header("thru", Header.COMMAS)) + rows = [] + + request = Message() + response = Message() + + request.address = self.address + request.reply_to = self.reply + request.correlation_id = 1 + request.properties = {u'operation':u'GET', u'type':u'org.apache.qpid.dispatch.router.address'} + + self.M.put(request) + self.M.send() + + self.M.recv() + self.M.get(response) + + for addr in response.body: + row = [] + row.append(self._addr_class(addr['addr'])) + row.append(self._addr_text(addr['addr'])) + row.append(addr['in-process']) + row.append(addr['subscriber-count']) + row.append(addr['remote-count']) + row.append(addr['deliveries-ingress']) + row.append(addr['deliveries-egress']) + row.append(addr['deliveries-transit']) + rows.append(row) + title = "Router Addresses" + sorter = Sorter(heads, rows, 'address', 0, True) + dispRows = sorter.getSorted() + disp.formattedTable(title, heads, dispRows) + + def displayMemory(self): + disp = Display(prefix=" ") + heads = [] + heads.append(Header("type")) + heads.append(Header("size", Header.COMMAS)) + heads.append(Header("batch")) + heads.append(Header("thread-max", Header.COMMAS)) + heads.append(Header("total", Header.COMMAS)) + heads.append(Header("in-threads", Header.COMMAS)) + heads.append(Header("rebal-in", Header.COMMAS)) + heads.append(Header("rebal-out", Header.COMMAS)) + rows = [] + + request = Message() + response = Message() + + request.address = self.address + request.reply_to = self.reply + request.correlation_id = 1 + request.properties = {u'operation':u'GET', u'type':u'org.apache.qpid.dispatch.allocator'} + + self.M.put(request) + self.M.send() + + self.M.recv() + self.M.get(response) + + for t in response.body: + row = [] + row.append(t['name']) + row.append(t['type_size']) + row.append(t['transfer_batch_size']) + row.append(t['local_free_list_max']) + row.append(t['total_alloc_from_heap']) + row.append(t['held_by_threads']) + row.append(t['batches_rebalanced_to_threads']) + row.append(t['batches_rebalanced_to_global']) + rows.append(row) + title = "Types" + sorter = Sorter(heads, rows, 'type', 0, True) + dispRows = sorter.getSorted() + disp.formattedTable(title, heads, dispRows) + + def displayMain(self, names, main): + if main == 'l': self.displayRouterLinks() + elif main == 'n': self.displayRouterNodes() + elif main == 'a': self.displayAddresses() + elif main == 'm': self.displayMemory() + + def display(self, names): + self.displayMain(names, config._types) + + +def main(argv=None): + + args = OptionsAndArguments(argv) + bm = BusManager() + + try: + bm.SetHost(config._host) + bm.display(args) + bm.Disconnect() + return 0 + except KeyboardInterrupt: + print + except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + raise + + bm.Disconnect() # try to deallocate brokers + return 1 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/qpid/extras/dispatch/tools/src/py/qdtoollibs/__init__.py b/qpid/extras/dispatch/tools/src/py/qdtoollibs/__init__.py new file mode 100644 index 0000000000..378bf24ef1 --- /dev/null +++ b/qpid/extras/dispatch/tools/src/py/qdtoollibs/__init__.py @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qdtoollibs.disp import * + diff --git a/qpid/extras/dispatch/tools/src/py/qdtoollibs/disp.py b/qpid/extras/dispatch/tools/src/py/qdtoollibs/disp.py new file mode 100644 index 0000000000..529a727449 --- /dev/null +++ b/qpid/extras/dispatch/tools/src/py/qdtoollibs/disp.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from time import strftime, gmtime + +def YN(val): + if val: + return 'Y' + return 'N' + +def Commas(value): + sval = str(value) + result = "" + while True: + if len(sval) == 0: + return result + left = sval[:-3] + right = sval[-3:] + result = right + result + if len(left) > 0: + result = ',' + result + sval = left + +def TimeLong(value): + return strftime("%c", gmtime(value / 1000000000)) + +def TimeShort(value): + return strftime("%X", gmtime(value / 1000000000)) + + +class Header: + """ """ + NONE = 1 + KMG = 2 + YN = 3 + Y = 4 + TIME_LONG = 5 + TIME_SHORT = 6 + DURATION = 7 + COMMAS = 8 + + def __init__(self, text, format=NONE): + self.text = text + self.format = format + + def __repr__(self): + return self.text + + def __str__(self): + return self.text + + def formatted(self, value): + try: + if value == None: + return '' + if self.format == Header.NONE: + return value + if self.format == Header.KMG: + return self.num(value) + if self.format == Header.YN: + if value: + return 'Y' + return 'N' + if self.format == Header.Y: + if value: + return 'Y' + return '' + if self.format == Header.TIME_LONG: + return TimeLong(value) + if self.format == Header.TIME_SHORT: + return TimeShort(value) + if self.format == Header.DURATION: + if value < 0: value = 0 + sec = value / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + if self.format == Header.COMMAS: + return Commas(value) + except: + return "?" + + def numCell(self, value, tag): + fp = float(value) / 1000. + if fp < 10.0: + return "%1.2f%c" % (fp, tag) + if fp < 100.0: + return "%2.1f%c" % (fp, tag) + return "%4d%c" % (value / 1000, tag) + + def num(self, value): + if value < 1000: + return "%4d" % value + if value < 1000000: + return self.numCell(value, 'k') + value /= 1000 + if value < 1000000: + return self.numCell(value, 'm') + value /= 1000 + return self.numCell(value, 'g') + + +class Display: + """ Display formatting """ + + def __init__(self, spacing=2, prefix=" "): + self.tableSpacing = spacing + self.tablePrefix = prefix + self.timestampFormat = "%X" + + def formattedTable(self, title, heads, rows): + fRows = [] + for row in rows: + fRow = [] + col = 0 + for cell in row: + fRow.append(heads[col].formatted(cell)) + col += 1 + fRows.append(fRow) + headtext = [] + for head in heads: + headtext.append(head.text) + self.table(title, headtext, fRows) + + def table(self, title, heads, rows): + """ Print a table with autosized columns """ + + # Pad the rows to the number of heads + for row in rows: + diff = len(heads) - len(row) + for idx in range(diff): + row.append("") + + print title + if len (rows) == 0: + return + colWidth = [] + col = 0 + line = self.tablePrefix + for head in heads: + width = len (head) + for row in rows: + text = row[col] + if text.__class__ == str: + text = text.decode('utf-8') + cellWidth = len(unicode(text)) + if cellWidth > width: + width = cellWidth + colWidth.append (width + self.tableSpacing) + line = line + head + if col < len (heads) - 1: + for i in range (colWidth[col] - len (head)): + line = line + " " + col = col + 1 + print line + line = self.tablePrefix + for width in colWidth: + for i in range (width): + line = line + "=" + print line + + for row in rows: + line = self.tablePrefix + col = 0 + for width in colWidth: + text = row[col] + if text.__class__ == str: + text = text.decode('utf-8') + line = line + unicode(text) + if col < len (heads) - 1: + for i in range (width - len(unicode(text))): + line = line + " " + col = col + 1 + print line + + def do_setTimeFormat (self, fmt): + """ Select timestamp format """ + if fmt == "long": + self.timestampFormat = "%c" + elif fmt == "short": + self.timestampFormat = "%X" + + def timestamp (self, nsec): + """ Format a nanosecond-since-the-epoch timestamp for printing """ + return strftime (self.timestampFormat, gmtime (nsec / 1000000000)) + + def duration(self, nsec): + if nsec < 0: nsec = 0 + sec = nsec / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + +class Sortable: + """ """ + def __init__(self, row, sortIndex): + self.row = row + self.sortIndex = sortIndex + if sortIndex >= len(row): + raise Exception("sort index exceeds row boundary") + + def __cmp__(self, other): + return cmp(self.row[self.sortIndex], other.row[self.sortIndex]) + + def getRow(self): + return self.row + +class Sorter: + """ """ + def __init__(self, heads, rows, sortCol, limit=0, inc=True): + col = 0 + for head in heads: + if head.text == sortCol: + break + col += 1 + if col == len(heads): + raise Exception("sortCol '%s', not found in headers" % sortCol) + + list = [] + for row in rows: + list.append(Sortable(row, col)) + list.sort() + if not inc: + list.reverse() + count = 0 + self.sorted = [] + for row in list: + self.sorted.append(row.getRow()) + count += 1 + if count == limit: + break + + def getSorted(self): + return self.sorted |
