diff options
Diffstat (limited to 'qpid/python/qmf2/agent.py')
| -rw-r--r-- | qpid/python/qmf2/agent.py | 174 |
1 files changed, 142 insertions, 32 deletions
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py index fd63807dc3..790cec283c 100644 --- a/qpid/python/qmf2/agent.py +++ b/qpid/python/qmf2/agent.py @@ -21,7 +21,7 @@ import logging import datetime import time import Queue -from threading import Thread, Lock, currentThread +from threading import Thread, Lock, currentThread, Event from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 from common import (make_subject, parse_subject, OpCode, QmfQuery, @@ -88,6 +88,7 @@ class Agent(Thread): _max_msg_size=0, _capacity=10): Thread.__init__(self) self._running = False + self._ready = Event() self.name = str(name) self._domain = _domain @@ -179,6 +180,9 @@ class Agent(Thread): self._running = True self.start() + self._ready.wait(10) + if not self._ready.isSet(): + raise Exception("Agent managment thread failed to start.") def remove_connection(self, timeout=None): # tell connection thread to shutdown @@ -222,11 +226,15 @@ class Agent(Thread): if not isinstance(schema, SchemaClass): raise TypeError("SchemaClass instance expected") + classId = schema.get_class_id() + pname = classId.get_package_name() + cname = classId.get_class_name() + hstr = classId.get_hash_string() + if not hstr: + raise Exception("Schema hash is not set.") + self._lock.acquire() try: - classId = schema.get_class_id() - pname = classId.get_package_name() - cname = classId.get_class_name() if pname not in self._packages: self._packages[pname] = [cname] else: @@ -355,6 +363,9 @@ class Agent(Thread): global _callback_thread next_heartbeat = datetime.datetime.utcnow() batch_limit = 10 # a guess + + self._ready.set() + while self._running: now = datetime.datetime.utcnow() @@ -496,7 +507,9 @@ class Agent(Thread): query = cmap.get(MsgKey.query) if query is not None: # fake a QmfData containing my identifier for the query compare - tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()}) + tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME: + self.get_name()}, + _object_id="my-name") reply = QmfQuery.from_map(query).evaluate(tmpData) if reply: @@ -555,7 +568,35 @@ class Agent(Thread): msg.reply_to, mname, oid, schema_id) - param = MethodCallParams( mname, oid, schema_id, in_args, msg.user_id) + param = MethodCallParams( mname, oid, schema_id, in_args, + msg.user_id) + + # @todo: validate the method against the schema: + # if self._schema: + # # validate + # _in_args = _in_args.copy() + # ms = self._schema.get_method(name) + # if ms is None: + # raise ValueError("Method '%s' is undefined." % name) + + # for aname,prop in ms.get_arguments().iteritems(): + # if aname not in _in_args: + # if prop.get_default(): + # _in_args[aname] = prop.get_default() + # elif not prop.is_optional(): + # raise ValueError("Method '%s' requires argument '%s'" + # % (name, aname)) + # for aname in _in_args.iterkeys(): + # prop = ms.get_argument(aname) + # if prop is None: + # raise ValueError("Method '%s' does not define argument" + # " '%s'" % (name, aname)) + # if "I" not in prop.get_direction(): + # raise ValueError("Method '%s' argument '%s' is not an" + # " input." % (name, aname)) + + # # @todo check if value is correct (type, range, etc) + self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) self._work_q_put = True @@ -567,7 +608,9 @@ class Agent(Thread): self._lock.acquire() try: for name in self._packages.iterkeys(): - if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})): + qmfData = QmfData.create({SchemaClassId.KEY_PACKAGE:name}, + _object_id="_package") + if query.evaluate(qmfData): pnames.append(name) finally: self._lock.release() @@ -631,41 +674,64 @@ class Agent(Thread): t_params = query.get_target_param() if t_params: sid = t_params.get(QmfData.KEY_SCHEMA_ID) - # if querying for a specific object, do a direct lookup if query.get_selector() == QmfQuery.ID: oid = query.get_id() found = None self._lock.acquire() try: - if sid: - found = self._described_data.get(sid) - if found: - found = found.get(oid) + if sid and not sid.get_hash_string(): + # wildcard schema_id match, check each schema + for name,db in self._described_data.iteritems(): + if (name.get_class_name() == sid.get_class_name() + and name.get_package_name() == sid.get_package_name()): + found = db.get(oid) + if found: + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(found.map_encode()) else: - found = self._undescribed_data.get(oid) + if sid: + db = self._described_data.get(sid) + if db: + found = db.get(oid) + else: + found = self._undescribed_data.get(oid) + if found: + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(found.map_encode()) finally: self._lock.release() - if found: - if _idOnly: - data_objs.append(query.get_id()) - else: - data_objs.append(found.map_encode()) else: # otherwise, evaluate all data self._lock.acquire() try: - if sid: - db = self._described_data.get(sid) + if sid and not sid.get_hash_string(): + # wildcard schema_id match, check each schema + for name,db in self._described_data.iteritems(): + if (name.get_class_name() == sid.get_class_name() + and name.get_package_name() == sid.get_package_name()): + for oid,data in db.iteritems(): + if query.evaluate(data): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(data.map_encode()) else: - db = self._undescribed_data - - if db: - for oid,val in db.iteritems(): - if query.evaluate(val): - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(val.map_encode()) + if sid: + db = self._described_data.get(sid) + else: + db = self._undescribed_data + + if db: + for oid,data in db.iteritems(): + if query.evaluate(data): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(data.map_encode()) finally: self._lock.release() @@ -693,15 +759,40 @@ class QmfAgentData(QmfData): A managed data object that is owned by an agent. """ - def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None, - _schema=None): + def __init__(self, agent, _values={}, _subtypes={}, _tag=None, + _object_id=None, _schema=None): + schema_id = None + if _schema: + schema_id = _schema.get_class_id() + + if _object_id is None: + if not isinstance(_schema, SchemaObjectClass): + raise Exception("An object_id must be provided if the object" + "doesn't have an associated schema.") + ids = _schema.get_id_names() + if not ids: + raise Exception("Object must have an Id or a schema that" + " provides an Id") + _object_id = u"" + for key in ids: + value = _values.get(key) + if value is None: + raise Exception("Object must have a value for key" + " attribute '%s'" % str(key)) + try: + _object_id += unicode(value) + except: + raise Exception("Cannot create object_id from key" + " value '%s'" % str(value)) + # timestamp in millisec since epoch UTC ctime = long(time.time() * 1000) super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes, _tag=_tag, _ctime=ctime, _utime=ctime, _object_id=_object_id, - _schema=_schema, _const=False) + _schema_id=schema_id, _const=False) self._agent = agent + self._validated = False def destroy(self): self._dtime = long(time.time() * 1000) @@ -729,6 +820,25 @@ class QmfAgentData(QmfData): # @todo: need to take write-lock logging.error(" TBD!!!") + def validate(self): + """ + Compares this object's data against the associated schema. Throws an + exception if the data does not conform to the schema. + """ + props = self._schema.get_properties() + for name,val in props.iteritems(): + # @todo validate: type compatible with amqp_type? + # @todo validate: primary keys have values + if name not in self._values: + if val._isOptional: + # ok not to be present, put in dummy value + # to simplify access + self._values[name] = None + else: + raise Exception("Required property '%s' not present." % name) + self._validated = True + + ################################################################################ ################################################################################ |
