From 723bff9590332fee6b4ecacb08dc762613994128 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Fri, 19 Sep 2008 20:54:08 +0000 Subject: QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697237 13f79535-47bb-0310-9956-ffa450edef68 --- python/commands/qpid-config | 220 ++++++++++++------------------------ python/commands/qpid-route | 266 +++++++++++++++++--------------------------- python/qpid/qmfconsole.py | 261 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 387 insertions(+), 360 deletions(-) (limited to 'python') diff --git a/python/commands/qpid-config b/python/commands/qpid-config index cc9315f7ea..6bc38c7440 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -22,16 +22,7 @@ import os import getopt import sys -import socket -import qpid -from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.datatypes import uuid4 -from qpid.util import connect -from time import sleep +from qpid import qmfconsole _recursive = False _host = "localhost" @@ -78,44 +69,21 @@ def Usage (): class BrokerManager: def __init__ (self): - self.dest = None - self.src = None - self.broker = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session (self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) + self.brokerName = None + self.qmf = None + self.broker = None + + def SetBroker (self, brokerUrl): + self.url = brokerUrl + self.qmf = qmfconsole.Session() + self.broker = self.qmf.addBroker(brokerUrl) + + def Disconnect(self): + self.qmf.delBroker(self.broker) def Overview (self): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + queues = self.qmf.getObjects(name="queue") print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: @@ -136,11 +104,7 @@ class BrokerManager: print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") + exchanges = self.qmf.getObjects(name="exchange") print "Durable Type Bindings Exchange Name" print "=======================================================" for ex in exchanges: @@ -148,18 +112,14 @@ class BrokerManager: print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name) def ExchangeListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + bindings = self.qmf.getObjects(name="binding") + queues = self.qmf.getObjects(name="queue") for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) for bind in bindings: - if bind.exchangeRef == ex.id: + if bind.exchangeRef == ex.getObjectId(): qname = "" queue = self.findById (queues, bind.queueRef) if queue != None: @@ -168,12 +128,8 @@ class BrokerManager: def QueueList (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - queues = mc.syncGetObjects (mch, "queue") - journals = mc.syncGetObjects (mch, "journal") + queues = self.qmf.getObjects(name="queue") + journals = self.qmf.getObjects(name="journal") print " Store Size" print "Durable AutoDel Excl Bindings (files x file pages) Queue Name" print "===========================================================================================" @@ -193,18 +149,14 @@ class BrokerManager: YN (q.exclusive), q.bindingCount, q.name) def QueueListRecurse (self, filter): - self.ConnectToBroker () - mc = self.mclient - mch = self.mchannel - mc.syncWaitForStable (mch) - exchanges = mc.syncGetObjects (mch, "exchange") - bindings = mc.syncGetObjects (mch, "binding") - queues = mc.syncGetObjects (mch, "queue") + exchanges = self.qmf.getObjects(name="exchange") + bindings = self.qmf.getObjects(name="binding") + queues = self.qmf.getObjects(name="queue") for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name for bind in bindings: - if bind.queueRef == queue.id: + if bind.queueRef == queue.getObjectId(): ename = "" ex = self.findById (exchanges, bind.exchangeRef) if ex != None: @@ -216,30 +168,19 @@ class BrokerManager: def AddExchange (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () etype = args[0] ename = args[1] - - try: - self.session.exchange_declare (exchange=ename, type=etype, durable=_durable) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable) def DelExchange (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () ename = args[0] - - try: - self.session.exchange_delete (exchange=ename) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_delete (exchange=ename) def AddQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] declArgs = {} if _durable: @@ -251,55 +192,37 @@ class BrokerManager: if _maxQueueCount: declArgs[MAX_QUEUE_COUNT] = _maxQueueCount - try: - self.session.queue_declare (queue=qname, durable=_durable, arguments=declArgs) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs) def DelQueue (self, args): if len (args) < 1: Usage () - self.ConnectToBroker () qname = args[0] - - try: - self.session.queue_delete (queue=qname) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().queue_delete (queue=qname) def Bind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_bind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key) def Unbind (self, args): if len (args) < 2: Usage () - self.ConnectToBroker () ename = args[0] qname = args[1] key = "" if len (args) > 2: key = args[2] - - try: - self.session.exchange_unbind (queue=qname, exchange=ename, binding_key=key) - except Closed, e: - print "Failed:", e + self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key) def findById (self, items, id): for item in items: - if item.id == id: + if item.getObjectId() == id: return item return None @@ -343,43 +266,48 @@ for opt in optlist: nargs = len (cargs) bm = BrokerManager () -bm.SetBroker (Broker (_host)) - -if nargs == 0: - bm.Overview () -else: - cmd = cargs[0] - modifier = "" - if nargs > 1: - modifier = cargs[1] - if cmd[0] == 'e': - if _recursive: - bm.ExchangeListRecurse (modifier) - else: - bm.ExchangeList (modifier) - elif cmd[0] == 'q': - if _recursive: - bm.QueueListRecurse (modifier) - else: - bm.QueueList (modifier) - elif cmd == "add": - if modifier == "exchange": - bm.AddExchange (cargs[2:]) - elif modifier == "queue": - bm.AddQueue (cargs[2:]) - else: - Usage () - elif cmd == "del": - if modifier == "exchange": - bm.DelExchange (cargs[2:]) - elif modifier == "queue": - bm.DelQueue (cargs[2:]) + +try: + bm.SetBroker(qmfconsole.BrokerURL(_host)) + if nargs == 0: + bm.Overview () + else: + cmd = cargs[0] + modifier = "" + if nargs > 1: + modifier = cargs[1] + if cmd[0] == 'e': + if _recursive: + bm.ExchangeListRecurse (modifier) + else: + bm.ExchangeList (modifier) + elif cmd[0] == 'q': + if _recursive: + bm.QueueListRecurse (modifier) + else: + bm.QueueList (modifier) + elif cmd == "add": + if modifier == "exchange": + bm.AddExchange (cargs[2:]) + elif modifier == "queue": + bm.AddQueue (cargs[2:]) + else: + Usage () + elif cmd == "del": + if modifier == "exchange": + bm.DelExchange (cargs[2:]) + elif modifier == "queue": + bm.DelQueue (cargs[2:]) + else: + Usage () + elif cmd == "bind": + bm.Bind (cargs[1:]) + elif cmd == "unbind": + bm.Unbind (cargs[1:]) else: Usage () - elif cmd == "bind": - bm.Bind (cargs[1:]) - elif cmd == "unbind": - bm.Unbind (cargs[1:]) - else: - Usage () +except Exception,e: + print "Failed:", e.message + sys.exit(1) + bm.Disconnect() diff --git a/python/commands/qpid-route b/python/commands/qpid-route index 3cd9109a6a..172927b72a 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -22,13 +22,8 @@ import getopt import sys import socket -import qpid import os -from qpid.management import managementClient -from qpid.managementdata import Broker -from qpid.peer import Closed -from qpid.connection import Connection, ConnectionFailed -from qpid.util import connect +from qpid import qmfconsole def Usage (): print "Usage: qpid-route [OPTIONS] link add " @@ -58,93 +53,57 @@ _dellink = False class RouteManager: def __init__ (self, destBroker): - self.dest = Broker (destBroker) + self.dest = qmfconsole.BrokerURL(destBroker) self.src = None - - def ConnectToBroker (self): - broker = self.dest - if _verbose: - print "Connecting to broker: %s:%d" % (broker.host, broker.port) - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (broker.host, broker.port), \ - username=broker.username, password=broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec) - self.mch = self.mclient.addChannel (self.session) - self.mclient.syncWaitForStable (self.mch) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) + self.qmf = qmfconsole.Session() + self.broker = self.qmf.addBroker(destBroker) def Disconnect (self): - self.mclient.removeChannel (self.mch) - self.session.close(timeout=10) - self.conn.close(timeout=10) + self.qmf.delBroker(self.broker) def getLink (self): - links = self.mclient.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") for link in links: if "%s:%d" % (link.host, link.port) == self.src.name (): return link return None def AddLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient - + self.src = qmfconsole.BrokerURL(srcBroker) if self.dest.name() == self.src.name(): print "Linking broker to itself is not permitted" sys.exit(1) - brokers = mc.syncGetObjects (self.mch, "broker") + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] link = self.getLink() if link != None: - print "Link already exists" - sys.exit(1) + raise Exception("Link already exists") - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" + if self.src.authName == "anonymous": + mech = "ANONYMOUS" else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) + mech = "PLAIN" + res = broker.connect(self.src.host, self.src.port, False, _durable, + mech, self.src.authName, self.src.authPass) if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () + print "Connect method returned:", res.status, res.text + link = self.getLink() def DelLink (self, srcBroker): - self.src = Broker (srcBroker) - mc = self.mclient - - brokers = mc.syncGetObjects (self.mch, "broker") + self.src = qmfconsole.BrokerURL(srcBroker) + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] link = self.getLink() if link == None: - print "Link not found" - sys.exit(1) + raise Exception("Link not found") - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if _verbose: - print "Close method returned:", res.status, res.statusText + print "Close method returned:", res.status, res.text def ListLinks (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") if len(links) == 0: print "No Links Found" else: @@ -152,145 +111,118 @@ class RouteManager: print "Host Port Durable State Last Error" print "===================================================================" for link in links: - print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError) + print "%-16s%-8d %c %-18s%s" % \ + (link.host, link.port, YN(link.durable), link.state, link.lastError) def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes): - self.src = Broker (srcBroker) - mc = self.mclient - + self.src = qmfconsole.BrokerURL(srcBroker) if self.dest.name() == self.src.name(): - print "Linking broker to itself is not permitted" - sys.exit(1) + raise Exception("Linking broker to itself is not permitted") - brokers = mc.syncGetObjects (self.mch, "broker") + brokers = self.qmf.getObjects(name="broker") broker = brokers[0] - link = self.getLink () + link = self.getLink() if link == None: if _verbose: print "Inter-broker link not found, creating..." - connectArgs = {} - connectArgs["host"] = self.src.host - connectArgs["port"] = self.src.port - connectArgs["useSsl"] = False - connectArgs["durable"] = _durable - if self.src.username == "anonymous": - connectArgs["authMechanism"] = "ANONYMOUS" + if self.src.authName == "anonymous": + mech = "ANONYMOUS" else: - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password - res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) + mech = "PLAIN" + res = broker.connect(self.src.host, self.src.port, False, _durable, + mech, self.src.authName, self.src.authPass) if _verbose: - print "Connect method returned:", res.status, res.statusText - link = self.getLink () + print "Connect method returned:", res.status, res.text + link = self.getLink() if link == None: - print "Protocol Error - Missing link ID" - sys.exit (1) + raise Exception("Protocol Error - Missing link ID") - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and \ + bridge.dest == exchange and bridge.key == routingKey: if not _quiet: - print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey) - sys.exit (1) + raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)) sys.exit (0) if _verbose: print "Creating inter-broker binding..." - bridgeArgs = {} - bridgeArgs["durable"] = _durable - bridgeArgs["src"] = exchange - bridgeArgs["dest"] = exchange - bridgeArgs["key"] = routingKey - bridgeArgs["tag"] = tag - bridgeArgs["excludes"] = excludes - bridgeArgs["srcIsQueue"] = 0 - bridgeArgs["srcIsLocal"] = 0 - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs) + res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, 0, 0) if res.status == 4: - print "Can't create a durable route on a non-durable link" - sys.exit(1) + raise Exception("Can't create a durable route on a non-durable link") if _verbose: print "Bridge method returned:", res.status, res.statusText def DelRoute (self, srcBroker, exchange, routingKey): - self.src = Broker (srcBroker) - mc = self.mclient - - link = self.getLink () + self.src = qmfconsole.BrokerURL(srcBroker) + link = self.getLink() if link == None: if not _quiet: - print "No link found from %s to %s" % (self.src.name(), self.dest.name()) - sys.exit (1) + raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name())) sys.exit (0) - bridges = mc.syncGetObjects (self.mch, "bridge") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: - if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey: + if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey: if _verbose: print "Closing bridge..." - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: - print "Error closing bridge: %d - %s" % (res.status, res.statusText) - sys.exit (1) + raise Exception("Error closing bridge: %d - %s" % (res.status, res.statusText)) if len (bridges) == 1 and _dellink: link = self.getLink () if link == None: sys.exit (0) if _verbose: print "Last bridge on link, closing link..." - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: - print "Error closing link: %d - %s" % (res.status, res.statusText) - sys.exit (1) + raise Exception("Error closing link: %d - %s" % (res.status, res.statusText)) sys.exit (0) if not _quiet: - print "Route not found" - sys.exit (1) + raise Exception("Route not found") def ListRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + links = self.qmf.getObjects(name="link") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) def ClearAllRoutes (self): - mc = self.mclient - links = mc.syncGetObjects (self.mch, "link") - bridges = mc.syncGetObjects (self.mch, "bridge") + links = self.qmf.getObjects(name="link") + bridges = self.qmf.getObjects(name="bridge") for bridge in bridges: if _verbose: myLink = None for link in links: - if bridge.linkRef == link.id: + if bridge.linkRef == link.getObjectId(): myLink = link break if myLink != None: print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key), - res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close") + res = bridge.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.statusText) elif _verbose: print "Ok" if _dellink: - links = mc.syncGetObjects (self.mch, "link") + links = self.qmf.getObjects(name="link") for link in links: if _verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), - res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close") + res = link.close() if res.status != 0: print "Error: %d - %s" % (res.status, res.statusText) elif _verbose: @@ -331,41 +263,45 @@ else: group = cargs[0] cmd = cargs[1] -rm = RouteManager (destBroker) -rm.ConnectToBroker () - -if group == "link": - if cmd == "add": - if nargs != 4: - Usage() - rm.AddLink (cargs[3]) - elif cmd == "del": - if nargs != 4: - Usage() - rm.DelLink (cargs[3]) - elif cmd == "list": - rm.ListLinks () - -elif group == "route": - if cmd == "add": - if nargs < 6 or nargs > 8: - Usage () - - tag = "" - excludes = "" - if nargs > 6: tag = cargs[6] - if nargs > 7: excludes = cargs[7] - rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) - elif cmd == "del": - if nargs != 6: - Usage () - else: - rm.DelRoute (cargs[3], cargs[4], cargs[5]) - else: - if cmd == "list": - rm.ListRoutes () - elif cmd == "flush": - rm.ClearAllRoutes () + +try: + rm = RouteManager (destBroker) + if group == "link": + if cmd == "add": + if nargs != 4: + Usage() + rm.AddLink (cargs[3]) + elif cmd == "del": + if nargs != 4: + Usage() + rm.DelLink (cargs[3]) + elif cmd == "list": + rm.ListLinks () + + elif group == "route": + if cmd == "add": + if nargs < 6 or nargs > 8: + Usage () + + tag = "" + excludes = "" + if nargs > 6: tag = cargs[6] + if nargs > 7: excludes = cargs[7] + rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes) + elif cmd == "del": + if nargs != 6: + Usage () + else: + rm.DelRoute (cargs[3], cargs[4], cargs[5]) else: - Usage () + if cmd == "list": + rm.ListRoutes () + elif cmd == "flush": + rm.ClearAllRoutes () + else: + Usage () +except Exception,e: + print "Failed:", e.message + sys.exit(1) + rm.Disconnect () diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index f83b4f315c..435da475d7 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -23,6 +23,7 @@ import os import qpid import struct import socket +import re from qpid.peer import Closed from qpid.connection import Connection, ConnectionFailed from qpid.datatypes import uuid4 @@ -46,10 +47,14 @@ class Console: used to obtain details about the class.""" pass - def newAgent(self, broker, agent): + def newAgent(self, agent): """ Invoked when a QMF agent is discovered. """ pass + def delAgent(self, agent): + """ Invoked when a QMF agent disconects. """ + pass + def objectProps(self, broker, id, record): """ Invoked when an object is updated. """ pass @@ -70,6 +75,25 @@ class Console: """ """ pass +class BrokerURL: + def __init__(self, text): + rex = re.compile(r""" + # [ [ / ] @] [ : ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(text) + if not match: raise ValueError("'%s' is not a valid broker url" % (text)) + user, password, host, port = match.groups() + + self.host = socket.gethostbyname(host) + if port: self.port = int(port) + else: self.port = 5672 + self.authName = user or "guest" + self.authPass = password or "guest" + self.authMech = "PLAIN" + + def name(self): + return self.host + ":" + str(self.port) + class Session: """ An instance of the Session class represents a console session running @@ -95,12 +119,17 @@ class Session: self.cv = Condition() self.syncSequenceList = [] self.getResult = [] + self.error = None + + def __repr__(self): + return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers) - def addBroker(self, host="localhost", port=5672, - authMech="PLAIN", authName="guest", authPass="guest"): + def addBroker(self, target="localhost"): """ Connect to a Qpid broker. Returns an object of type Broker. """ - broker = Broker(self, host, port, authMech, authName, authPass) + url = BrokerURL(target) + broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass) self.brokers.append(broker) + self.getObjects(broker=broker, name="agent") return broker def delBroker(self, broker): @@ -138,11 +167,22 @@ class Session: if (cname, hash) in self.packages[pname]: return self.packages[pname][(cname, hash)] - def getAgents(self): + def getAgents(self, broker=None): """ Get a list of currently known agents """ - for broker in self.brokers: - broker._waitForStable() - pass + brokerList = [] + if broker == None: + for b in self.brokers: + brokerList.append(b) + else: + brokerList.append(broker) + + for b in brokerList: + b._waitForStable() + agentList = [] + for b in brokerList: + for a in b.getAgents(): + agentList.append(a) + return agentList def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. @@ -165,7 +205,8 @@ class Session: broker = - supply a broker as returned by addBroker """ if "broker" in kwargs: - brokerList = [].append(kwargs["broker"]) + brokerList = [] + brokerList.append(kwargs["broker"]) else: brokerList = self.brokers for broker in brokerList: @@ -176,7 +217,7 @@ class Session: agent = kwargs["agent"] if agent.broker not in brokerList: raise Exception("Supplied agent is not accessible through the supplied broker") - agentList = append(agent) + agentList.append(agent) else: for broker in brokerList: for agent in broker.getAgents(): @@ -209,7 +250,7 @@ class Session: starttime = time() timeout = False self.cv.acquire() - while len(self.syncSequenceList) > 0: + while len(self.syncSequenceList) > 0 and self.error == None: self.cv.wait(self.GET_WAIT_TIME) if time() - starttime > self.GET_WAIT_TIME: for pendingSeq in self.syncSequenceList: @@ -218,6 +259,11 @@ class Session: timeout = True self.cv.release() + if self.error: + errorText = self.error + self.error = None + raise Exception(errorText) + if len(self.getResult) == 0 and timeout: raise RuntimeError("No agent responded within timeout period") return self.getResult @@ -351,6 +397,8 @@ class Session: self.cv.release() schema = self.packages[pname][(cname, hash)] object = Object(self, broker, schema, codec, prop, stat) + if pname == "org.apache.qpid.broker" and cname == "agent": + broker._updateAgent(object) self.cv.acquire() if seq in self.syncSequenceList: @@ -365,6 +413,13 @@ class Session: if stat: self.console.objectStats(broker, object.getObjectId(), object) + def _handleError(self, error): + self.error = error + self.cv.acquire() + self.syncSequenceList = [] + self.cv.notify() + self.cv.release() + class Package: """ """ def __init__(self, name): @@ -407,23 +462,23 @@ class SchemaClass: return result def getKey(self): - """ """ + """ Return the class-key for this class. """ return self.classKey def getProperties(self): - """ """ + """ Return the list of properties for the class. """ return self.properties def getStatistics(self): - """ """ + """ Return the list of statistics for the class. """ return self.statistics def getMethods(self): - """ """ + """ Return the list of methods for the class. """ return self.methods def getEvents(self): - """ """ + """ Return the list of events for the class. """ return self.events class SchemaProperty: @@ -448,6 +503,9 @@ class SchemaProperty: elif key == "maxlen" : self.maxlen = value elif key == "desc" : self.desc = str(value) + def __repr__(self): + return self.name + class SchemaStatistic: """ """ def __init__(self, codec): @@ -461,6 +519,9 @@ class SchemaStatistic: if key == "unit" : self.unit = str(value) elif key == "desc" : self.desc = str(value) + def __repr__(self): + return self.name + class SchemaMethod: """ """ def __init__(self, codec): @@ -476,6 +537,19 @@ class SchemaMethod: for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=True)) + def __repr__(self): + result = self.name + "(" + first = True + for arg in self.arguments: + if arg.dir.find("I") != -1: + if first: + first = False + else: + result += ", " + result += arg.name + result += ")" + return result + class SchemaEvent: """ """ def __init__(self, codec): @@ -491,6 +565,18 @@ class SchemaEvent: for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=False)) + def __repr__(self): + result = self.name + "(" + first = True + for arg in self.arguments: + if first: + first = False + else: + result += ", " + result += arg.name + result += ")" + return result + class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -538,12 +624,8 @@ class ObjectId(object): return 0 def __repr__(self): - return "%08x-%04x-%04x-%04x-%04x%08x" % ((self.first & 0xFFFFFFFF00000000) >> 32, - (self.first & 0x00000000FFFF0000) >> 16, - (self.first & 0x000000000000FFFF), - (self.second & 0xFFFF000000000000) >> 48, - (self.second & 0x0000FFFF00000000) >> 32, - (self.second & 0x00000000FFFFFFFF)) + return "%d-%d-%d-%d-%x" % (self.getFlags(), self.getSequence(), + self.getBroker(), self.getBank(), self.getObject()) def index(self): return (self.first, self.second) @@ -596,23 +678,27 @@ class Object(object): self.statistics.append((statistic, self._decodeValue(codec, statistic.type))) def getObjectId(self): - """ """ + """ Return the object identifier for this object """ return self.objectId def getClassKey(self): - """ """ + """ Return the class-key that references the schema describing this object. """ return self.schema.getKey() def getSchema(self): - """ """ + """ Return the schema that describes this object. """ return self.schema + def getMethods(self): + """ Return a list of methods available for this object. """ + return self.schema.getMethods() + def getTimestamps(self): - """ """ + """ Return the current, creation, and deletion times for this object. """ return self.currentTime, self.createTime, self.deleteTime def getIndex(self): - """ """ + """ Return a string describing this object's primary key. """ result = "" for property, value in self.properties: if property.index: @@ -634,6 +720,7 @@ class Object(object): for statistic, value in self.statistics: if name == statistic.name: return value + raise Exception("Type Object has no attribute '%s'" % name) def _invoke(self, name, args, kwargs): for method in self.schema.getMethods(): @@ -653,20 +740,27 @@ class Object(object): self._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank())) - self.broker._send(smsg) self.broker.cv.acquire() self.broker.syncInFlight = True + self.broker.cv.release() + + self.broker._send(smsg) + + self.broker.cv.acquire() starttime = time() - while self.broker.syncInFlight: + while self.broker.syncInFlight and self.broker.error == None: self.broker.cv.wait(self.broker.SYNC_TIME) if time() - starttime > self.broker.SYNC_TIME: self.broker.cv.release() self.session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") self.broker.cv.release() + if self.broker.error != None: + errorText = self.broker.error + self.broker.error = None + raise Exception(errorText) return self.broker.syncResult - else: - raise Exception("Invalid Method (software defect)") + raise Exception("Invalid Method (software defect) [%s]" % name) def _parsePresenceMasks(self, codec, schema): excludeList = [] @@ -747,18 +841,21 @@ class MethodResult(object): class Broker: """ """ - SYNC_TIME = 10 + SYNC_TIME = 10 def __init__(self, session, host, port, authMech, authUser, authPass): self.session = session - self.agents = [] - self.agents.append(Agent(self, 0)) + self.host = host + self.port = port + self.agents = {} + self.agents[0] = Agent(self, 0, "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False self.syncRequest = 0 self.syncResult = None self.reqsOutstanding = 1 + self.error = None self.brokerId = None err = None try: @@ -767,7 +864,7 @@ class Broker: self.conn.start() self.replyName = "reply-%s" % self.amqpSessionId self.amqpSession = self.conn.session(self.amqpSessionId) - self.amqpSession.auto_sync = False + self.amqpSession.auto_sync = True self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) self.amqpSession.exchange_bind(exchange="amq.direct", queue=self.replyName, binding_key=self.replyName) @@ -798,6 +895,7 @@ class Broker: except ConnectionFailed, e: err = "Connect Failed %d - %s" % (e[0], e[1]) + self.active = True if err != None: raise Exception(err) @@ -811,7 +909,36 @@ class Broker: def getAgents(self): """ Get the list of agents reachable via this broker """ - return self.agents + return self.agents.values() + + def getAmqpSession(self): + """ Get the AMQP session object for this connected broker. """ + return self.amqpSession + + def isConnected(self): + return self.active + + def __repr__(self): + if self.active: + if self.port == 5672: + port = "" + else: + port = ":%d" % self.port + return "Broker connected at: amqp://%s%s" % (self.host, port) + else: + return "Disconnected Broker" + + def _updateAgent(self, obj): + if obj.deleteTime == 0: + if obj.objectIdBank not in self.agents: + agent = Agent(self, obj.objectIdBank, obj.label) + self.agents[obj.objectIdBank] = agent + if self.session.console != None: + self.session.console.newAgent(agent) + else: + agent = self.agents.pop(obj.objectIdBank, None) + if agent != None and self.session.console != None: + self.session.console.delAgent(agent) def _setHeader(self, codec, opcode, seq=0): """ Compose the header of a management message. """ @@ -848,10 +975,14 @@ class Broker: self.amqpSession.message_transfer(destination=dest, message=msg) def _shutdown(self): - self.amqpSession.incoming("rdest").stop() - if self.session.console != None: - self.amqpSession.incoming("tdest").stop() - self.amqpSession.close() + if self.active: + self.amqpSession.incoming("rdest").stop() + if self.session.console != None: + self.amqpSession.incoming("tdest").stop() + self.amqpSession.close() + self.active = False + else: + raise Exception("Broker already disconnected") def _waitForStable(self): self.cv.acquire() @@ -877,7 +1008,8 @@ class Broker: self.reqsOutstanding -= 1 if self.reqsOutstanding == 0 and not self.topicBound and self.session.console != None: self.topicBound = True - self.amqpSession.exchange_bind(exchange="qpid.management", queue=self.topicName, binding_key="mgmt.#") + self.amqpSession.exchange_bind(exchange="qpid.management", + queue=self.topicName, binding_key="mgmt.#") if self.reqsOutstanding == 0 and self.syncInFlight: self.syncInFlight = False self.cv.notify() @@ -901,13 +1033,23 @@ class Broker: elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) def _exceptionCb(self, data): - pass + self.active = False + self.error = data + self.cv.acquire() + if self.syncInFlight: + self.cv.notify() + self.cv.release() + self.session._handleError(self.error) class Agent: """ """ - def __init__(self, broker, bank): + def __init__(self, broker, bank, label): self.broker = broker self.bank = bank + self.label = label + + def __repr__(self): + return "Agent at bank %d (%s)" % (self.bank, self.label) class Event: """ """ @@ -941,11 +1083,32 @@ class SequenceManager: return data -# TEST +class DebugConsole(Console): + """ """ + def newPackage(self, name): + print "newPackage:", name + + def newClass(self, classKey): + print "newClass:", classKey + + def newAgent(self, agent): + print "newAgent:", agent + + def delAgent(self, agent): + print "delAgent:", agent -#c = Console() -#s = Session(c) -#b = s.addBroker() -#cl = s.getClasses("org.apache.qpid.broker") -#sch = s.getSchema(cl[0]) + def objectProps(self, broker, id, record): + print "objectProps:", record.getClassKey() + + def objectStats(self, broker, id, record): + print "objectStats:", record.getClassKey() + + def event(self, broker, event): + print "event:", event + + def heartbeat(self, agent, timestamp): + print "heartbeat:", agent + + def brokerInfo(self, broker): + print "brokerInfo:", broker -- cgit v1.2.1