diff options
| author | Ted Ross <tross@apache.org> | 2008-09-19 20:54:08 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-09-19 20:54:08 +0000 |
| commit | 723bff9590332fee6b4ecacb08dc762613994128 (patch) | |
| tree | 6063112085022d53cde7db630078f2ba8a2c0a49 /python/qpid | |
| parent | 88e386e9130caecb9a095dd0362dbeff89828adc (diff) | |
| download | qpid-python-723bff9590332fee6b4ecacb08dc762613994128.tar.gz | |
QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
| -rw-r--r-- | python/qpid/qmfconsole.py | 261 |
1 files changed, 212 insertions, 49 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index f83b4f315c..435da475d7 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -23,6 +23,7 @@ import os import qpid import struct import socket +import re from qpid.peer import Closed from qpid.connection import Connection, ConnectionFailed from qpid.datatypes import uuid4 @@ -46,10 +47,14 @@ class Console: used to obtain details about the class.""" pass - def newAgent(self, broker, agent): + def newAgent(self, agent): """ Invoked when a QMF agent is discovered. """ pass + def delAgent(self, agent): + """ Invoked when a QMF agent disconects. """ + pass + def objectProps(self, broker, id, record): """ Invoked when an object is updated. """ pass @@ -70,6 +75,25 @@ class Console: """ """ pass +class BrokerURL: + def __init__(self, text): + rex = re.compile(r""" + # [ <user> [ / <password> ] @] <host> [ :<port> ] + ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) + match = rex.match(text) + if not match: raise ValueError("'%s' is not a valid broker url" % (text)) + user, password, host, port = match.groups() + + self.host = socket.gethostbyname(host) + if port: self.port = int(port) + else: self.port = 5672 + self.authName = user or "guest" + self.authPass = password or "guest" + self.authMech = "PLAIN" + + def name(self): + return self.host + ":" + str(self.port) + class Session: """ An instance of the Session class represents a console session running @@ -95,12 +119,17 @@ class Session: self.cv = Condition() self.syncSequenceList = [] self.getResult = [] + self.error = None + + def __repr__(self): + return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers) - def addBroker(self, host="localhost", port=5672, - authMech="PLAIN", authName="guest", authPass="guest"): + def addBroker(self, target="localhost"): """ Connect to a Qpid broker. Returns an object of type Broker. """ - broker = Broker(self, host, port, authMech, authName, authPass) + url = BrokerURL(target) + broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass) self.brokers.append(broker) + self.getObjects(broker=broker, name="agent") return broker def delBroker(self, broker): @@ -138,11 +167,22 @@ class Session: if (cname, hash) in self.packages[pname]: return self.packages[pname][(cname, hash)] - def getAgents(self): + def getAgents(self, broker=None): """ Get a list of currently known agents """ - for broker in self.brokers: - broker._waitForStable() - pass + brokerList = [] + if broker == None: + for b in self.brokers: + brokerList.append(b) + else: + brokerList.append(broker) + + for b in brokerList: + b._waitForStable() + agentList = [] + for b in brokerList: + for a in b.getAgents(): + agentList.append(a) + return agentList def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. @@ -165,7 +205,8 @@ class Session: broker = <broker> - supply a broker as returned by addBroker """ if "broker" in kwargs: - brokerList = [].append(kwargs["broker"]) + brokerList = [] + brokerList.append(kwargs["broker"]) else: brokerList = self.brokers for broker in brokerList: @@ -176,7 +217,7 @@ class Session: agent = kwargs["agent"] if agent.broker not in brokerList: raise Exception("Supplied agent is not accessible through the supplied broker") - agentList = append(agent) + agentList.append(agent) else: for broker in brokerList: for agent in broker.getAgents(): @@ -209,7 +250,7 @@ class Session: starttime = time() timeout = False self.cv.acquire() - while len(self.syncSequenceList) > 0: + while len(self.syncSequenceList) > 0 and self.error == None: self.cv.wait(self.GET_WAIT_TIME) if time() - starttime > self.GET_WAIT_TIME: for pendingSeq in self.syncSequenceList: @@ -218,6 +259,11 @@ class Session: timeout = True self.cv.release() + if self.error: + errorText = self.error + self.error = None + raise Exception(errorText) + if len(self.getResult) == 0 and timeout: raise RuntimeError("No agent responded within timeout period") return self.getResult @@ -351,6 +397,8 @@ class Session: self.cv.release() schema = self.packages[pname][(cname, hash)] object = Object(self, broker, schema, codec, prop, stat) + if pname == "org.apache.qpid.broker" and cname == "agent": + broker._updateAgent(object) self.cv.acquire() if seq in self.syncSequenceList: @@ -365,6 +413,13 @@ class Session: if stat: self.console.objectStats(broker, object.getObjectId(), object) + def _handleError(self, error): + self.error = error + self.cv.acquire() + self.syncSequenceList = [] + self.cv.notify() + self.cv.release() + class Package: """ """ def __init__(self, name): @@ -407,23 +462,23 @@ class SchemaClass: return result def getKey(self): - """ """ + """ Return the class-key for this class. """ return self.classKey def getProperties(self): - """ """ + """ Return the list of properties for the class. """ return self.properties def getStatistics(self): - """ """ + """ Return the list of statistics for the class. """ return self.statistics def getMethods(self): - """ """ + """ Return the list of methods for the class. """ return self.methods def getEvents(self): - """ """ + """ Return the list of events for the class. """ return self.events class SchemaProperty: @@ -448,6 +503,9 @@ class SchemaProperty: elif key == "maxlen" : self.maxlen = value elif key == "desc" : self.desc = str(value) + def __repr__(self): + return self.name + class SchemaStatistic: """ """ def __init__(self, codec): @@ -461,6 +519,9 @@ class SchemaStatistic: if key == "unit" : self.unit = str(value) elif key == "desc" : self.desc = str(value) + def __repr__(self): + return self.name + class SchemaMethod: """ """ def __init__(self, codec): @@ -476,6 +537,19 @@ class SchemaMethod: for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=True)) + def __repr__(self): + result = self.name + "(" + first = True + for arg in self.arguments: + if arg.dir.find("I") != -1: + if first: + first = False + else: + result += ", " + result += arg.name + result += ")" + return result + class SchemaEvent: """ """ def __init__(self, codec): @@ -491,6 +565,18 @@ class SchemaEvent: for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=False)) + def __repr__(self): + result = self.name + "(" + first = True + for arg in self.arguments: + if first: + first = False + else: + result += ", " + result += arg.name + result += ")" + return result + class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -538,12 +624,8 @@ class ObjectId(object): return 0 def __repr__(self): - return "%08x-%04x-%04x-%04x-%04x%08x" % ((self.first & 0xFFFFFFFF00000000) >> 32, - (self.first & 0x00000000FFFF0000) >> 16, - (self.first & 0x000000000000FFFF), - (self.second & 0xFFFF000000000000) >> 48, - (self.second & 0x0000FFFF00000000) >> 32, - (self.second & 0x00000000FFFFFFFF)) + return "%d-%d-%d-%d-%x" % (self.getFlags(), self.getSequence(), + self.getBroker(), self.getBank(), self.getObject()) def index(self): return (self.first, self.second) @@ -596,23 +678,27 @@ class Object(object): self.statistics.append((statistic, self._decodeValue(codec, statistic.type))) def getObjectId(self): - """ """ + """ Return the object identifier for this object """ return self.objectId def getClassKey(self): - """ """ + """ Return the class-key that references the schema describing this object. """ return self.schema.getKey() def getSchema(self): - """ """ + """ Return the schema that describes this object. """ return self.schema + def getMethods(self): + """ Return a list of methods available for this object. """ + return self.schema.getMethods() + def getTimestamps(self): - """ """ + """ Return the current, creation, and deletion times for this object. """ return self.currentTime, self.createTime, self.deleteTime def getIndex(self): - """ """ + """ Return a string describing this object's primary key. """ result = "" for property, value in self.properties: if property.index: @@ -634,6 +720,7 @@ class Object(object): for statistic, value in self.statistics: if name == statistic.name: return value + raise Exception("Type Object has no attribute '%s'" % name) def _invoke(self, name, args, kwargs): for method in self.schema.getMethods(): @@ -653,20 +740,27 @@ class Object(object): self._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank())) - self.broker._send(smsg) self.broker.cv.acquire() self.broker.syncInFlight = True + self.broker.cv.release() + + self.broker._send(smsg) + + self.broker.cv.acquire() starttime = time() - while self.broker.syncInFlight: + while self.broker.syncInFlight and self.broker.error == None: self.broker.cv.wait(self.broker.SYNC_TIME) if time() - starttime > self.broker.SYNC_TIME: self.broker.cv.release() self.session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") self.broker.cv.release() + if self.broker.error != None: + errorText = self.broker.error + self.broker.error = None + raise Exception(errorText) return self.broker.syncResult - else: - raise Exception("Invalid Method (software defect)") + raise Exception("Invalid Method (software defect) [%s]" % name) def _parsePresenceMasks(self, codec, schema): excludeList = [] @@ -747,18 +841,21 @@ class MethodResult(object): class Broker: """ """ - SYNC_TIME = 10 + SYNC_TIME = 10 def __init__(self, session, host, port, authMech, authUser, authPass): self.session = session - self.agents = [] - self.agents.append(Agent(self, 0)) + self.host = host + self.port = port + self.agents = {} + self.agents[0] = Agent(self, 0, "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False self.syncRequest = 0 self.syncResult = None self.reqsOutstanding = 1 + self.error = None self.brokerId = None err = None try: @@ -767,7 +864,7 @@ class Broker: self.conn.start() self.replyName = "reply-%s" % self.amqpSessionId self.amqpSession = self.conn.session(self.amqpSessionId) - self.amqpSession.auto_sync = False + self.amqpSession.auto_sync = True self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) self.amqpSession.exchange_bind(exchange="amq.direct", queue=self.replyName, binding_key=self.replyName) @@ -798,6 +895,7 @@ class Broker: except ConnectionFailed, e: err = "Connect Failed %d - %s" % (e[0], e[1]) + self.active = True if err != None: raise Exception(err) @@ -811,7 +909,36 @@ class Broker: def getAgents(self): """ Get the list of agents reachable via this broker """ - return self.agents + return self.agents.values() + + def getAmqpSession(self): + """ Get the AMQP session object for this connected broker. """ + return self.amqpSession + + def isConnected(self): + return self.active + + def __repr__(self): + if self.active: + if self.port == 5672: + port = "" + else: + port = ":%d" % self.port + return "Broker connected at: amqp://%s%s" % (self.host, port) + else: + return "Disconnected Broker" + + def _updateAgent(self, obj): + if obj.deleteTime == 0: + if obj.objectIdBank not in self.agents: + agent = Agent(self, obj.objectIdBank, obj.label) + self.agents[obj.objectIdBank] = agent + if self.session.console != None: + self.session.console.newAgent(agent) + else: + agent = self.agents.pop(obj.objectIdBank, None) + if agent != None and self.session.console != None: + self.session.console.delAgent(agent) def _setHeader(self, codec, opcode, seq=0): """ Compose the header of a management message. """ @@ -848,10 +975,14 @@ class Broker: self.amqpSession.message_transfer(destination=dest, message=msg) def _shutdown(self): - self.amqpSession.incoming("rdest").stop() - if self.session.console != None: - self.amqpSession.incoming("tdest").stop() - self.amqpSession.close() + if self.active: + self.amqpSession.incoming("rdest").stop() + if self.session.console != None: + self.amqpSession.incoming("tdest").stop() + self.amqpSession.close() + self.active = False + else: + raise Exception("Broker already disconnected") def _waitForStable(self): self.cv.acquire() @@ -877,7 +1008,8 @@ class Broker: self.reqsOutstanding -= 1 if self.reqsOutstanding == 0 and not self.topicBound and self.session.console != None: self.topicBound = True - self.amqpSession.exchange_bind(exchange="qpid.management", queue=self.topicName, binding_key="mgmt.#") + self.amqpSession.exchange_bind(exchange="qpid.management", + queue=self.topicName, binding_key="mgmt.#") if self.reqsOutstanding == 0 and self.syncInFlight: self.syncInFlight = False self.cv.notify() @@ -901,13 +1033,23 @@ class Broker: elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) def _exceptionCb(self, data): - pass + self.active = False + self.error = data + self.cv.acquire() + if self.syncInFlight: + self.cv.notify() + self.cv.release() + self.session._handleError(self.error) class Agent: """ """ - def __init__(self, broker, bank): + def __init__(self, broker, bank, label): self.broker = broker self.bank = bank + self.label = label + + def __repr__(self): + return "Agent at bank %d (%s)" % (self.bank, self.label) class Event: """ """ @@ -941,11 +1083,32 @@ class SequenceManager: return data -# TEST +class DebugConsole(Console): + """ """ + def newPackage(self, name): + print "newPackage:", name + + def newClass(self, classKey): + print "newClass:", classKey + + def newAgent(self, agent): + print "newAgent:", agent + + def delAgent(self, agent): + print "delAgent:", agent -#c = Console() -#s = Session(c) -#b = s.addBroker() -#cl = s.getClasses("org.apache.qpid.broker") -#sch = s.getSchema(cl[0]) + def objectProps(self, broker, id, record): + print "objectProps:", record.getClassKey() + + def objectStats(self, broker, id, record): + print "objectStats:", record.getClassKey() + + def event(self, broker, event): + print "event:", event + + def heartbeat(self, agent, timestamp): + print "heartbeat:", agent + + def brokerInfo(self, broker): + print "brokerInfo:", broker |
