summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-09-19 20:54:08 +0000
committerTed Ross <tross@apache.org>2008-09-19 20:54:08 +0000
commit723bff9590332fee6b4ecacb08dc762613994128 (patch)
tree6063112085022d53cde7db630078f2ba8a2c0a49 /python/qpid
parent88e386e9130caecb9a095dd0362dbeff89828adc (diff)
downloadqpid-python-723bff9590332fee6b4ecacb08dc762613994128.tar.gz
QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697237 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/qmfconsole.py261
1 files changed, 212 insertions, 49 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index f83b4f315c..435da475d7 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -23,6 +23,7 @@ import os
import qpid
import struct
import socket
+import re
from qpid.peer import Closed
from qpid.connection import Connection, ConnectionFailed
from qpid.datatypes import uuid4
@@ -46,10 +47,14 @@ class Console:
used to obtain details about the class."""
pass
- def newAgent(self, broker, agent):
+ def newAgent(self, agent):
""" Invoked when a QMF agent is discovered. """
pass
+ def delAgent(self, agent):
+ """ Invoked when a QMF agent disconects. """
+ pass
+
def objectProps(self, broker, id, record):
""" Invoked when an object is updated. """
pass
@@ -70,6 +75,25 @@ class Console:
""" """
pass
+class BrokerURL:
+ def __init__(self, text):
+ rex = re.compile(r"""
+ # [ <user> [ / <password> ] @] <host> [ :<port> ]
+ ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
+ match = rex.match(text)
+ if not match: raise ValueError("'%s' is not a valid broker url" % (text))
+ user, password, host, port = match.groups()
+
+ self.host = socket.gethostbyname(host)
+ if port: self.port = int(port)
+ else: self.port = 5672
+ self.authName = user or "guest"
+ self.authPass = password or "guest"
+ self.authMech = "PLAIN"
+
+ def name(self):
+ return self.host + ":" + str(self.port)
+
class Session:
"""
An instance of the Session class represents a console session running
@@ -95,12 +119,17 @@ class Session:
self.cv = Condition()
self.syncSequenceList = []
self.getResult = []
+ self.error = None
+
+ def __repr__(self):
+ return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers)
- def addBroker(self, host="localhost", port=5672,
- authMech="PLAIN", authName="guest", authPass="guest"):
+ def addBroker(self, target="localhost"):
""" Connect to a Qpid broker. Returns an object of type Broker. """
- broker = Broker(self, host, port, authMech, authName, authPass)
+ url = BrokerURL(target)
+ broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass)
self.brokers.append(broker)
+ self.getObjects(broker=broker, name="agent")
return broker
def delBroker(self, broker):
@@ -138,11 +167,22 @@ class Session:
if (cname, hash) in self.packages[pname]:
return self.packages[pname][(cname, hash)]
- def getAgents(self):
+ def getAgents(self, broker=None):
""" Get a list of currently known agents """
- for broker in self.brokers:
- broker._waitForStable()
- pass
+ brokerList = []
+ if broker == None:
+ for b in self.brokers:
+ brokerList.append(b)
+ else:
+ brokerList.append(broker)
+
+ for b in brokerList:
+ b._waitForStable()
+ agentList = []
+ for b in brokerList:
+ for a in b.getAgents():
+ agentList.append(a)
+ return agentList
def getObjects(self, **kwargs):
""" Get a list of objects from QMF agents.
@@ -165,7 +205,8 @@ class Session:
broker = <broker> - supply a broker as returned by addBroker
"""
if "broker" in kwargs:
- brokerList = [].append(kwargs["broker"])
+ brokerList = []
+ brokerList.append(kwargs["broker"])
else:
brokerList = self.brokers
for broker in brokerList:
@@ -176,7 +217,7 @@ class Session:
agent = kwargs["agent"]
if agent.broker not in brokerList:
raise Exception("Supplied agent is not accessible through the supplied broker")
- agentList = append(agent)
+ agentList.append(agent)
else:
for broker in brokerList:
for agent in broker.getAgents():
@@ -209,7 +250,7 @@ class Session:
starttime = time()
timeout = False
self.cv.acquire()
- while len(self.syncSequenceList) > 0:
+ while len(self.syncSequenceList) > 0 and self.error == None:
self.cv.wait(self.GET_WAIT_TIME)
if time() - starttime > self.GET_WAIT_TIME:
for pendingSeq in self.syncSequenceList:
@@ -218,6 +259,11 @@ class Session:
timeout = True
self.cv.release()
+ if self.error:
+ errorText = self.error
+ self.error = None
+ raise Exception(errorText)
+
if len(self.getResult) == 0 and timeout:
raise RuntimeError("No agent responded within timeout period")
return self.getResult
@@ -351,6 +397,8 @@ class Session:
self.cv.release()
schema = self.packages[pname][(cname, hash)]
object = Object(self, broker, schema, codec, prop, stat)
+ if pname == "org.apache.qpid.broker" and cname == "agent":
+ broker._updateAgent(object)
self.cv.acquire()
if seq in self.syncSequenceList:
@@ -365,6 +413,13 @@ class Session:
if stat:
self.console.objectStats(broker, object.getObjectId(), object)
+ def _handleError(self, error):
+ self.error = error
+ self.cv.acquire()
+ self.syncSequenceList = []
+ self.cv.notify()
+ self.cv.release()
+
class Package:
""" """
def __init__(self, name):
@@ -407,23 +462,23 @@ class SchemaClass:
return result
def getKey(self):
- """ """
+ """ Return the class-key for this class. """
return self.classKey
def getProperties(self):
- """ """
+ """ Return the list of properties for the class. """
return self.properties
def getStatistics(self):
- """ """
+ """ Return the list of statistics for the class. """
return self.statistics
def getMethods(self):
- """ """
+ """ Return the list of methods for the class. """
return self.methods
def getEvents(self):
- """ """
+ """ Return the list of events for the class. """
return self.events
class SchemaProperty:
@@ -448,6 +503,9 @@ class SchemaProperty:
elif key == "maxlen" : self.maxlen = value
elif key == "desc" : self.desc = str(value)
+ def __repr__(self):
+ return self.name
+
class SchemaStatistic:
""" """
def __init__(self, codec):
@@ -461,6 +519,9 @@ class SchemaStatistic:
if key == "unit" : self.unit = str(value)
elif key == "desc" : self.desc = str(value)
+ def __repr__(self):
+ return self.name
+
class SchemaMethod:
""" """
def __init__(self, codec):
@@ -476,6 +537,19 @@ class SchemaMethod:
for idx in range(argCount):
self.arguments.append(SchemaArgument(codec, methodArg=True))
+ def __repr__(self):
+ result = self.name + "("
+ first = True
+ for arg in self.arguments:
+ if arg.dir.find("I") != -1:
+ if first:
+ first = False
+ else:
+ result += ", "
+ result += arg.name
+ result += ")"
+ return result
+
class SchemaEvent:
""" """
def __init__(self, codec):
@@ -491,6 +565,18 @@ class SchemaEvent:
for idx in range(argCount):
self.arguments.append(SchemaArgument(codec, methodArg=False))
+ def __repr__(self):
+ result = self.name + "("
+ first = True
+ for arg in self.arguments:
+ if first:
+ first = False
+ else:
+ result += ", "
+ result += arg.name
+ result += ")"
+ return result
+
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -538,12 +624,8 @@ class ObjectId(object):
return 0
def __repr__(self):
- return "%08x-%04x-%04x-%04x-%04x%08x" % ((self.first & 0xFFFFFFFF00000000) >> 32,
- (self.first & 0x00000000FFFF0000) >> 16,
- (self.first & 0x000000000000FFFF),
- (self.second & 0xFFFF000000000000) >> 48,
- (self.second & 0x0000FFFF00000000) >> 32,
- (self.second & 0x00000000FFFFFFFF))
+ return "%d-%d-%d-%d-%x" % (self.getFlags(), self.getSequence(),
+ self.getBroker(), self.getBank(), self.getObject())
def index(self):
return (self.first, self.second)
@@ -596,23 +678,27 @@ class Object(object):
self.statistics.append((statistic, self._decodeValue(codec, statistic.type)))
def getObjectId(self):
- """ """
+ """ Return the object identifier for this object """
return self.objectId
def getClassKey(self):
- """ """
+ """ Return the class-key that references the schema describing this object. """
return self.schema.getKey()
def getSchema(self):
- """ """
+ """ Return the schema that describes this object. """
return self.schema
+ def getMethods(self):
+ """ Return a list of methods available for this object. """
+ return self.schema.getMethods()
+
def getTimestamps(self):
- """ """
+ """ Return the current, creation, and deletion times for this object. """
return self.currentTime, self.createTime, self.deleteTime
def getIndex(self):
- """ """
+ """ Return a string describing this object's primary key. """
result = ""
for property, value in self.properties:
if property.index:
@@ -634,6 +720,7 @@ class Object(object):
for statistic, value in self.statistics:
if name == statistic.name:
return value
+ raise Exception("Type Object has no attribute '%s'" % name)
def _invoke(self, name, args, kwargs):
for method in self.schema.getMethods():
@@ -653,20 +740,27 @@ class Object(object):
self._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
- self.broker._send(smsg)
self.broker.cv.acquire()
self.broker.syncInFlight = True
+ self.broker.cv.release()
+
+ self.broker._send(smsg)
+
+ self.broker.cv.acquire()
starttime = time()
- while self.broker.syncInFlight:
+ while self.broker.syncInFlight and self.broker.error == None:
self.broker.cv.wait(self.broker.SYNC_TIME)
if time() - starttime > self.broker.SYNC_TIME:
self.broker.cv.release()
self.session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
self.broker.cv.release()
+ if self.broker.error != None:
+ errorText = self.broker.error
+ self.broker.error = None
+ raise Exception(errorText)
return self.broker.syncResult
- else:
- raise Exception("Invalid Method (software defect)")
+ raise Exception("Invalid Method (software defect) [%s]" % name)
def _parsePresenceMasks(self, codec, schema):
excludeList = []
@@ -747,18 +841,21 @@ class MethodResult(object):
class Broker:
""" """
- SYNC_TIME = 10
+ SYNC_TIME = 10
def __init__(self, session, host, port, authMech, authUser, authPass):
self.session = session
- self.agents = []
- self.agents.append(Agent(self, 0))
+ self.host = host
+ self.port = port
+ self.agents = {}
+ self.agents[0] = Agent(self, 0, "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
self.syncRequest = 0
self.syncResult = None
self.reqsOutstanding = 1
+ self.error = None
self.brokerId = None
err = None
try:
@@ -767,7 +864,7 @@ class Broker:
self.conn.start()
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
- self.amqpSession.auto_sync = False
+ self.amqpSession.auto_sync = True
self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
self.amqpSession.exchange_bind(exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
@@ -798,6 +895,7 @@ class Broker:
except ConnectionFailed, e:
err = "Connect Failed %d - %s" % (e[0], e[1])
+ self.active = True
if err != None:
raise Exception(err)
@@ -811,7 +909,36 @@ class Broker:
def getAgents(self):
""" Get the list of agents reachable via this broker """
- return self.agents
+ return self.agents.values()
+
+ def getAmqpSession(self):
+ """ Get the AMQP session object for this connected broker. """
+ return self.amqpSession
+
+ def isConnected(self):
+ return self.active
+
+ def __repr__(self):
+ if self.active:
+ if self.port == 5672:
+ port = ""
+ else:
+ port = ":%d" % self.port
+ return "Broker connected at: amqp://%s%s" % (self.host, port)
+ else:
+ return "Disconnected Broker"
+
+ def _updateAgent(self, obj):
+ if obj.deleteTime == 0:
+ if obj.objectIdBank not in self.agents:
+ agent = Agent(self, obj.objectIdBank, obj.label)
+ self.agents[obj.objectIdBank] = agent
+ if self.session.console != None:
+ self.session.console.newAgent(agent)
+ else:
+ agent = self.agents.pop(obj.objectIdBank, None)
+ if agent != None and self.session.console != None:
+ self.session.console.delAgent(agent)
def _setHeader(self, codec, opcode, seq=0):
""" Compose the header of a management message. """
@@ -848,10 +975,14 @@ class Broker:
self.amqpSession.message_transfer(destination=dest, message=msg)
def _shutdown(self):
- self.amqpSession.incoming("rdest").stop()
- if self.session.console != None:
- self.amqpSession.incoming("tdest").stop()
- self.amqpSession.close()
+ if self.active:
+ self.amqpSession.incoming("rdest").stop()
+ if self.session.console != None:
+ self.amqpSession.incoming("tdest").stop()
+ self.amqpSession.close()
+ self.active = False
+ else:
+ raise Exception("Broker already disconnected")
def _waitForStable(self):
self.cv.acquire()
@@ -877,7 +1008,8 @@ class Broker:
self.reqsOutstanding -= 1
if self.reqsOutstanding == 0 and not self.topicBound and self.session.console != None:
self.topicBound = True
- self.amqpSession.exchange_bind(exchange="qpid.management", queue=self.topicName, binding_key="mgmt.#")
+ self.amqpSession.exchange_bind(exchange="qpid.management",
+ queue=self.topicName, binding_key="mgmt.#")
if self.reqsOutstanding == 0 and self.syncInFlight:
self.syncInFlight = False
self.cv.notify()
@@ -901,13 +1033,23 @@ class Broker:
elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
def _exceptionCb(self, data):
- pass
+ self.active = False
+ self.error = data
+ self.cv.acquire()
+ if self.syncInFlight:
+ self.cv.notify()
+ self.cv.release()
+ self.session._handleError(self.error)
class Agent:
""" """
- def __init__(self, broker, bank):
+ def __init__(self, broker, bank, label):
self.broker = broker
self.bank = bank
+ self.label = label
+
+ def __repr__(self):
+ return "Agent at bank %d (%s)" % (self.bank, self.label)
class Event:
""" """
@@ -941,11 +1083,32 @@ class SequenceManager:
return data
-# TEST
+class DebugConsole(Console):
+ """ """
+ def newPackage(self, name):
+ print "newPackage:", name
+
+ def newClass(self, classKey):
+ print "newClass:", classKey
+
+ def newAgent(self, agent):
+ print "newAgent:", agent
+
+ def delAgent(self, agent):
+ print "delAgent:", agent
-#c = Console()
-#s = Session(c)
-#b = s.addBroker()
-#cl = s.getClasses("org.apache.qpid.broker")
-#sch = s.getSchema(cl[0])
+ def objectProps(self, broker, id, record):
+ print "objectProps:", record.getClassKey()
+
+ def objectStats(self, broker, id, record):
+ print "objectStats:", record.getClassKey()
+
+ def event(self, broker, event):
+ print "event:", event
+
+ def heartbeat(self, agent, timestamp):
+ print "heartbeat:", agent
+
+ def brokerInfo(self, broker):
+ print "brokerInfo:", broker