summaryrefslogtreecommitdiff
path: root/qpid/python/qmf2/agent.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qmf2/agent.py')
-rw-r--r--qpid/python/qmf2/agent.py174
1 files changed, 142 insertions, 32 deletions
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py
index fd63807dc3..790cec283c 100644
--- a/qpid/python/qmf2/agent.py
+++ b/qpid/python/qmf2/agent.py
@@ -21,7 +21,7 @@ import logging
import datetime
import time
import Queue
-from threading import Thread, Lock, currentThread
+from threading import Thread, Lock, currentThread, Event
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
from common import (make_subject, parse_subject, OpCode, QmfQuery,
@@ -88,6 +88,7 @@ class Agent(Thread):
_max_msg_size=0, _capacity=10):
Thread.__init__(self)
self._running = False
+ self._ready = Event()
self.name = str(name)
self._domain = _domain
@@ -179,6 +180,9 @@ class Agent(Thread):
self._running = True
self.start()
+ self._ready.wait(10)
+ if not self._ready.isSet():
+ raise Exception("Agent managment thread failed to start.")
def remove_connection(self, timeout=None):
# tell connection thread to shutdown
@@ -222,11 +226,15 @@ class Agent(Thread):
if not isinstance(schema, SchemaClass):
raise TypeError("SchemaClass instance expected")
+ classId = schema.get_class_id()
+ pname = classId.get_package_name()
+ cname = classId.get_class_name()
+ hstr = classId.get_hash_string()
+ if not hstr:
+ raise Exception("Schema hash is not set.")
+
self._lock.acquire()
try:
- classId = schema.get_class_id()
- pname = classId.get_package_name()
- cname = classId.get_class_name()
if pname not in self._packages:
self._packages[pname] = [cname]
else:
@@ -355,6 +363,9 @@ class Agent(Thread):
global _callback_thread
next_heartbeat = datetime.datetime.utcnow()
batch_limit = 10 # a guess
+
+ self._ready.set()
+
while self._running:
now = datetime.datetime.utcnow()
@@ -496,7 +507,9 @@ class Agent(Thread):
query = cmap.get(MsgKey.query)
if query is not None:
# fake a QmfData containing my identifier for the query compare
- tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
+ tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
+ self.get_name()},
+ _object_id="my-name")
reply = QmfQuery.from_map(query).evaluate(tmpData)
if reply:
@@ -555,7 +568,35 @@ class Agent(Thread):
msg.reply_to,
mname,
oid, schema_id)
- param = MethodCallParams( mname, oid, schema_id, in_args, msg.user_id)
+ param = MethodCallParams( mname, oid, schema_id, in_args,
+ msg.user_id)
+
+ # @todo: validate the method against the schema:
+ # if self._schema:
+ # # validate
+ # _in_args = _in_args.copy()
+ # ms = self._schema.get_method(name)
+ # if ms is None:
+ # raise ValueError("Method '%s' is undefined." % name)
+
+ # for aname,prop in ms.get_arguments().iteritems():
+ # if aname not in _in_args:
+ # if prop.get_default():
+ # _in_args[aname] = prop.get_default()
+ # elif not prop.is_optional():
+ # raise ValueError("Method '%s' requires argument '%s'"
+ # % (name, aname))
+ # for aname in _in_args.iterkeys():
+ # prop = ms.get_argument(aname)
+ # if prop is None:
+ # raise ValueError("Method '%s' does not define argument"
+ # " '%s'" % (name, aname))
+ # if "I" not in prop.get_direction():
+ # raise ValueError("Method '%s' argument '%s' is not an"
+ # " input." % (name, aname))
+
+ # # @todo check if value is correct (type, range, etc)
+
self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
self._work_q_put = True
@@ -567,7 +608,9 @@ class Agent(Thread):
self._lock.acquire()
try:
for name in self._packages.iterkeys():
- if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})):
+ qmfData = QmfData.create({SchemaClassId.KEY_PACKAGE:name},
+ _object_id="_package")
+ if query.evaluate(qmfData):
pnames.append(name)
finally:
self._lock.release()
@@ -631,41 +674,64 @@ class Agent(Thread):
t_params = query.get_target_param()
if t_params:
sid = t_params.get(QmfData.KEY_SCHEMA_ID)
-
# if querying for a specific object, do a direct lookup
if query.get_selector() == QmfQuery.ID:
oid = query.get_id()
found = None
self._lock.acquire()
try:
- if sid:
- found = self._described_data.get(sid)
- if found:
- found = found.get(oid)
+ if sid and not sid.get_hash_string():
+ # wildcard schema_id match, check each schema
+ for name,db in self._described_data.iteritems():
+ if (name.get_class_name() == sid.get_class_name()
+ and name.get_package_name() == sid.get_package_name()):
+ found = db.get(oid)
+ if found:
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(found.map_encode())
else:
- found = self._undescribed_data.get(oid)
+ if sid:
+ db = self._described_data.get(sid)
+ if db:
+ found = db.get(oid)
+ else:
+ found = self._undescribed_data.get(oid)
+ if found:
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(found.map_encode())
finally:
self._lock.release()
- if found:
- if _idOnly:
- data_objs.append(query.get_id())
- else:
- data_objs.append(found.map_encode())
else: # otherwise, evaluate all data
self._lock.acquire()
try:
- if sid:
- db = self._described_data.get(sid)
+ if sid and not sid.get_hash_string():
+ # wildcard schema_id match, check each schema
+ for name,db in self._described_data.iteritems():
+ if (name.get_class_name() == sid.get_class_name()
+ and name.get_package_name() == sid.get_package_name()):
+ for oid,data in db.iteritems():
+ if query.evaluate(data):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(data.map_encode())
else:
- db = self._undescribed_data
-
- if db:
- for oid,val in db.iteritems():
- if query.evaluate(val):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(val.map_encode())
+ if sid:
+ db = self._described_data.get(sid)
+ else:
+ db = self._undescribed_data
+
+ if db:
+ for oid,data in db.iteritems():
+ if query.evaluate(data):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(data.map_encode())
finally:
self._lock.release()
@@ -693,15 +759,40 @@ class QmfAgentData(QmfData):
A managed data object that is owned by an agent.
"""
- def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None,
- _schema=None):
+ def __init__(self, agent, _values={}, _subtypes={}, _tag=None,
+ _object_id=None, _schema=None):
+ schema_id = None
+ if _schema:
+ schema_id = _schema.get_class_id()
+
+ if _object_id is None:
+ if not isinstance(_schema, SchemaObjectClass):
+ raise Exception("An object_id must be provided if the object"
+ "doesn't have an associated schema.")
+ ids = _schema.get_id_names()
+ if not ids:
+ raise Exception("Object must have an Id or a schema that"
+ " provides an Id")
+ _object_id = u""
+ for key in ids:
+ value = _values.get(key)
+ if value is None:
+ raise Exception("Object must have a value for key"
+ " attribute '%s'" % str(key))
+ try:
+ _object_id += unicode(value)
+ except:
+ raise Exception("Cannot create object_id from key"
+ " value '%s'" % str(value))
+
# timestamp in millisec since epoch UTC
ctime = long(time.time() * 1000)
super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
_tag=_tag, _ctime=ctime,
_utime=ctime, _object_id=_object_id,
- _schema=_schema, _const=False)
+ _schema_id=schema_id, _const=False)
self._agent = agent
+ self._validated = False
def destroy(self):
self._dtime = long(time.time() * 1000)
@@ -729,6 +820,25 @@ class QmfAgentData(QmfData):
# @todo: need to take write-lock
logging.error(" TBD!!!")
+ def validate(self):
+ """
+ Compares this object's data against the associated schema. Throws an
+ exception if the data does not conform to the schema.
+ """
+ props = self._schema.get_properties()
+ for name,val in props.iteritems():
+ # @todo validate: type compatible with amqp_type?
+ # @todo validate: primary keys have values
+ if name not in self._values:
+ if val._isOptional:
+ # ok not to be present, put in dummy value
+ # to simplify access
+ self._values[name] = None
+ else:
+ raise Exception("Required property '%s' not present." % name)
+ self._validated = True
+
+
################################################################################
################################################################################