summaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-07-12 20:38:07 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-07-12 20:38:07 +0000
commit322492d5b1e89ef7e36f732a33007ee43db1bd15 (patch)
tree850bdc13f076989c5b2fde1c9dba3e72cbe9425e /extras
parentf9a21d0974911b4fdeb4a8ce6fc5780efc8df3ff (diff)
downloadqpid-python-322492d5b1e89ef7e36f732a33007ee43db1bd15.tar.gz
QMF: enable python console to pull schema info from agent.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@963479 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras')
-rw-r--r--extras/qmf/src/py/qmf/console.py203
1 files changed, 164 insertions, 39 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py
index 385cfdfedd..77de1602f2 100644
--- a/extras/qmf/src/py/qmf/console.py
+++ b/extras/qmf/src/py/qmf/console.py
@@ -1004,6 +1004,7 @@ class Session:
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
+ classKey._setType(kind)
schema = self.schemaCache.getSchema(classKey)
if not schema:
@@ -1043,13 +1044,17 @@ class Session:
def _handleSchemaResp(self, broker, codec, seq, agent_addr):
kind = codec.read_uint8()
classKey = ClassKey(codec)
+ classKey._setType(kind)
_class = SchemaClass(kind, classKey, codec, self)
- self.schemaCache.declareClass(classKey, _class)
+ new_pkg, new_cls = self.schemaCache.declareClass(classKey, _class)
ctx = self.seqMgr._release(seq)
if ctx:
broker._decOutstanding()
if self.console != None:
- self.console.newClass(kind, classKey)
+ if new_pkg:
+ self.console.newPackage(classKey.getPackageName())
+ if new_cls:
+ self.console.newClass(kind, classKey)
if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
agent = self._getAgentForAgentAddr(agent_addr)
@@ -1086,6 +1091,7 @@ class Session:
agent.touch()
if self.console and agent:
self.console.heartbeat(agent, timestamp)
+ agent.update_schema_timestamp(values.get("schema_timestamp", 0))
def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
@@ -1297,6 +1303,8 @@ class Session:
normally invoked on the object itself.
"""
schema = self.getSchema(schemaKey)
+ if not schema:
+ raise Exception("Schema not present (Key=%s)" % str(schemaKey))
for method in schema.getMethods():
if name == method.name:
#
@@ -1361,8 +1369,8 @@ class Session:
if arg.dir.find("I") != -1:
self._encodeValue(sendCodec, argList[aIdx], arg.type)
aIdx += 1
- smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
- (objectId.getBrokerBank(), objectId.getAgentBank()))
+ smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
+ (objectId.getBrokerBank(), objectId.getAgentBank()))
broker._send(smsg)
return seq
return None
@@ -1443,19 +1451,28 @@ class SchemaCache(object):
self.lock.acquire()
if packageName in self.packages:
for pkey in self.packages[packageName]:
- list.append(self.packages[packageName][pkey].getKey())
+ if isinstance(self.packages[packageName][pkey], SchemaClass):
+ list.append(self.packages[packageName][pkey].getKey())
+ elif self.packages[packageName][pkey] is not None:
+ # schema not present yet, but we have schema type
+ list.append(ClassKey({"_package_name": packageName,
+ "_class_name": pkey[0],
+ "_hash": pkey[1],
+ "_type": self.packages[packageName][pkey]}))
finally:
self.lock.release()
return list
def getSchema(self, classKey):
- """ Get the schema for a QMF class """
+ """ Get the schema for a QMF class, return None if schema not available """
pname = classKey.getPackageName()
pkey = classKey.getPackageKey()
try:
self.lock.acquire()
if pname in self.packages:
- if pkey in self.packages[pname]:
+ if (pkey in self.packages[pname] and
+ isinstance(self.packages[pname][pkey], SchemaClass)):
+ # hack: value may be schema type info if schema not available
return self.packages[pname][pkey]
finally:
self.lock.release()
@@ -1472,21 +1489,31 @@ class SchemaCache(object):
self.lock.release()
return True
- def declareClass(self, classKey, classDef):
- """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """
+ def declareClass(self, classKey, classDef=None):
+ """ Add a class definition to the cache, if supplied. Return a pair
+ indicating if the package or class is new.
+ """
+ new_package = False
+ new_class = False
pname = classKey.getPackageName()
pkey = classKey.getPackageKey()
try:
self.lock.acquire()
if pname not in self.packages:
self.packages[pname] = {}
+ new_package = True
packageMap = self.packages[pname]
- if pkey in packageMap:
- return None
- packageMap[pkey] = classDef
+ if pkey not in packageMap:
+ new_class = True
+ # hack: if no classDef given, store the class type code until we get
+ # the full schema:
+ if classDef is None:
+ packageMap[pkey] = classKey.getType()
+ else:
+ packageMap[pkey] = classDef
finally:
self.lock.release()
- return True
+ return (new_package, new_class)
#===================================================================================================
@@ -1494,12 +1521,25 @@ class SchemaCache(object):
#===================================================================================================
class ClassKey:
""" A ClassKey uniquely identifies a class from the schema. """
+
+ TYPE_DATA = "_data"
+ TYPE_EVENT = "_event"
+
def __init__(self, constructor):
if constructor.__class__ == str:
# construct from __repr__ string
try:
- self.pname, cls = constructor.split(":")
- self.cname, hsh = cls.split("(")
+ # supports two formats:
+ # type present = P:C:T(H)
+ # no type present = P:C(H)
+ tmp = constructor.split(":")
+ if len(tmp) == 3:
+ self.pname, self.cname, rem = tmp
+ self.type, hsh = rem.split("(")
+ else:
+ self.pname, rem = tmp
+ self.cname, hsh = rem.split("(")
+ self.type = None
hsh = hsh.strip(")")
hexValues = hsh.split("-")
h0 = int(hexValues[0], 16)
@@ -1517,6 +1557,7 @@ class ClassKey:
self.pname = constructor['_package_name']
self.cname = constructor['_class_name']
self.hash = constructor['_hash']
+ self.type = constructor['_type']
except:
raise Exception("Invalid ClassKey map format")
else:
@@ -1525,14 +1566,20 @@ class ClassKey:
self.pname = str(codec.read_str8())
self.cname = str(codec.read_str8())
self.hash = UUID(codec.read_bin128())
+ # old V1 codec did not include "type"
+ self.type = None
def encode(self, codec):
+ # old V1 codec did not include "type"
codec.write_str8(self.pname)
codec.write_str8(self.cname)
codec.write_bin128(self.hash.bytes)
def asMap(self):
- return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash}
+ return {'_package_name': self.pname,
+ '_class_name': self.cname,
+ '_hash': self.hash,
+ '_type': self.type}
def getPackageName(self):
return self.pname
@@ -1543,6 +1590,9 @@ class ClassKey:
def getHash(self):
return self.hash
+ def getType(self):
+ return self.type
+
def getHashString(self):
return str(self.hash)
@@ -1550,7 +1600,15 @@ class ClassKey:
return (self.cname, self.hash)
def __repr__(self):
- return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+ if self.type is None:
+ return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+ return self.pname + ":" + self.cname + ":" + self.type + "(" + self.getHashString() + ")"
+
+ def _setType(self, _type):
+ if _type == 2 or _type == ClassKey.TYPE_EVENT:
+ self.type = ClassKey.TYPE_EVENT
+ else:
+ self.type = ClassKey.TYPE_DATA
#===================================================================================================
@@ -2640,6 +2698,7 @@ class Agent:
self.lastSeenTime = time()
self.closed = None
self.epoch = 0
+ self.schema_timestamp = None
def _checkClosed(self):
@@ -2660,7 +2719,21 @@ class Agent:
elif 'qmf_event' in kwargs:
if self.session.console:
self.session.console.event(self.broker, kwargs['qmf_event'])
-
+ elif 'qmf_schema_id' in kwargs:
+ ckey = kwargs['qmf_schema_id']
+ new_pkg, new_cls = self.session.schemaCache.declareClass(ckey)
+ if self.session.console:
+ if new_pkg:
+ self.session.console.newPackage(ckey.getPackageName())
+ if new_cls:
+ # translate V2's string based type value to legacy
+ # integer value for backward compatibility
+ cls_type = ckey.getType()
+ if str(cls_type) == ckey.TYPE_DATA:
+ cls_type = 1
+ elif str(cls_type) == ckey.TYPE_EVENT:
+ cls_type = 2
+ self.session.console.newClass(cls_type, ckey)
def touch(self):
if self.heartbeatInterval:
@@ -2670,6 +2743,27 @@ class Agent:
def setEpoch(self, epoch):
self.epoch = epoch
+ def update_schema_timestamp(self, timestamp):
+ """ Check the latest schema timestamp from the agent V2 heartbeat. Issue a
+ query for all packages & classes should the timestamp change.
+ """
+ self.lock.acquire()
+ try:
+ if self.schema_timestamp == timestamp:
+ return
+ self.schema_timestamp = timestamp
+
+ context = RequestContext(self, self)
+ sequence = self.seqMgr._reserve(context)
+
+ self.contextMap[sequence] = context
+ context.setSequence(sequence)
+
+ finally:
+ self.lock.release()
+
+ self._v2SendSchemaIdQuery(sequence, {})
+
def epochMismatch(self, epoch):
if epoch == 0 or self.epoch == 0:
@@ -2914,7 +3008,8 @@ class Agent:
def _v2HandleDataInd(self, mp, ah, content):
"""
- Handle a QMFv2 data indication from the agent
+ Handle a QMFv2 data indication from the agent. Note: called from context
+ of the Broker thread.
"""
if mp.correlation_id:
try:
@@ -2938,6 +3033,19 @@ class Agent:
context.addV2QueryResult(omap)
context.processV2Data()
+ elif kind == "_schema_id":
+ for sid in content:
+ try:
+ ckey = ClassKey(sid)
+ except:
+ # @todo: log error
+ ckey = None
+ if ckey is not None:
+ # @todo: for now, the application cannot directly send a query for
+ # _schema_id. This request _must_ have been initiated by the framework
+ # in order to update the schema cache.
+ context.notifiable(qmf_schema_id=ckey)
+
if 'partial' not in ah:
context.signal()
@@ -3053,6 +3161,25 @@ class Agent:
self.broker._send(smsg)
+ def _v2SendQuery(self, query, sequence):
+ """
+ Given a query map, construct and send a V2 Query message.
+ """
+ dp = self.broker.amqpSession.delivery_properties()
+ dp.routing_key = self.getV2RoutingKey()
+ mp = self.broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self.broker.authUser
+ mp.correlation_id = str(sequence)
+ mp.app_id = "qmf2"
+ mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue)
+ mp.application_headers = {'qmf.opcode':'_query_request'}
+ sendCodec = Codec()
+ sendCodec.write_map(query)
+ msg = Message(dp, mp, sendCodec.encoded)
+ self.broker._send(msg, "qmf.default.direct")
+
+
def _v2SendGetQuery(self, sequence, kwargs):
"""
Send a get query to a QMFv2 agent.
@@ -3071,22 +3198,20 @@ class Agent:
elif '_objectId' in kwargs:
query['_object_id'] = kwargs['_objectId'].asMap()
+ self._v2SendQuery(query, sequence)
+
+
+ def _v2SendSchemaIdQuery(self, sequence, kwargs):
+ """
+ Send a query for all schema ids to a QMFv2 agent.
+ """
#
- # Construct and transmit the message
+ # Build the query map
#
- dp = self.broker.amqpSession.delivery_properties()
- dp.routing_key = self.getV2RoutingKey()
- mp = self.broker.amqpSession.message_properties()
- mp.content_type = "amqp/map"
- mp.user_id = self.broker.authUser
- mp.correlation_id = str(sequence)
- mp.app_id = "qmf2"
- mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue)
- mp.application_headers = {'qmf.opcode':'_query_request'}
- sendCodec = Codec()
- sendCodec.write_map(query)
- msg = Message(dp, mp, sendCodec.encoded)
- self.broker._send(msg, "qmf.default.direct")
+ query = {'_what': 'SCHEMA_ID'}
+ # @todo - predicate support. For now, return all known schema ids.
+
+ self._v2SendQuery(query, sequence)
def _v2SendSchemaRequest(self, schemaId):
@@ -3106,7 +3231,8 @@ class Agent:
def _handleQmfV1Message(self, opcode, seq, mp, ah, codec):
"""
- Process QMFv1 messages arriving from an agent.
+ Process QMFv1 messages arriving from an agent. Note well: this method is
+ called from the context of the Broker thread.
"""
if opcode == 'm': self._v1HandleMethodResp(codec, seq)
elif opcode == 'e': self._v1HandleEventInd(codec, seq)
@@ -3117,7 +3243,8 @@ class Agent:
def _handleQmfV2Message(self, opcode, mp, ah, content):
"""
- Process QMFv2 messages arriving from an agent.
+ Process QMFv2 messages arriving from an agent. Note well: this method is
+ called from the context of the Broker thread.
"""
if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content)
@@ -3335,6 +3462,7 @@ class Event:
self.session = agent.session
self.broker = agent.broker
self.classKey = ClassKey(codec)
+ self.classKey._setType(ClassKey.TYPE_EVENT)
self.timestamp = codec.read_int64()
self.severity = codec.read_uint8()
self.arguments = {}
@@ -3377,9 +3505,6 @@ class Event:
def getTimestamp(self):
return self.timestamp
- def getName(self):
- return self.name
-
def getSchema(self):
return self.schema
@@ -3391,7 +3516,7 @@ class SequenceManager:
""" Manage sequence numbers for asynchronous method calls """
def __init__(self):
self.lock = Lock()
- self.sequence = 0
+ self.sequence = long(time()) # pseudo-randomize the start
self.pending = {}
def _reserve(self, data):