diff options
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qmf2/agent.py | 174 | ||||
| -rw-r--r-- | qpid/python/qmf2/common.py | 117 | ||||
| -rw-r--r-- | qpid/python/qmf2/console.py | 77 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/basic_method.py | 50 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/basic_query.py | 5 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/events.py | 2 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/obj_gets.py | 102 |
7 files changed, 327 insertions, 200 deletions
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py index fd63807dc3..790cec283c 100644 --- a/qpid/python/qmf2/agent.py +++ b/qpid/python/qmf2/agent.py @@ -21,7 +21,7 @@ import logging import datetime import time import Queue -from threading import Thread, Lock, currentThread +from threading import Thread, Lock, currentThread, Event from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 from common import (make_subject, parse_subject, OpCode, QmfQuery, @@ -88,6 +88,7 @@ class Agent(Thread): _max_msg_size=0, _capacity=10): Thread.__init__(self) self._running = False + self._ready = Event() self.name = str(name) self._domain = _domain @@ -179,6 +180,9 @@ class Agent(Thread): self._running = True self.start() + self._ready.wait(10) + if not self._ready.isSet(): + raise Exception("Agent managment thread failed to start.") def remove_connection(self, timeout=None): # tell connection thread to shutdown @@ -222,11 +226,15 @@ class Agent(Thread): if not isinstance(schema, SchemaClass): raise TypeError("SchemaClass instance expected") + classId = schema.get_class_id() + pname = classId.get_package_name() + cname = classId.get_class_name() + hstr = classId.get_hash_string() + if not hstr: + raise Exception("Schema hash is not set.") + self._lock.acquire() try: - classId = schema.get_class_id() - pname = classId.get_package_name() - cname = classId.get_class_name() if pname not in self._packages: self._packages[pname] = [cname] else: @@ -355,6 +363,9 @@ class Agent(Thread): global _callback_thread next_heartbeat = datetime.datetime.utcnow() batch_limit = 10 # a guess + + self._ready.set() + while self._running: now = datetime.datetime.utcnow() @@ -496,7 +507,9 @@ class Agent(Thread): query = cmap.get(MsgKey.query) if query is not None: # fake a QmfData containing my identifier for the query compare - tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()}) + tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME: + self.get_name()}, + _object_id="my-name") reply = QmfQuery.from_map(query).evaluate(tmpData) if reply: @@ -555,7 +568,35 @@ class Agent(Thread): msg.reply_to, mname, oid, schema_id) - param = MethodCallParams( mname, oid, schema_id, in_args, msg.user_id) + param = MethodCallParams( mname, oid, schema_id, in_args, + msg.user_id) + + # @todo: validate the method against the schema: + # if self._schema: + # # validate + # _in_args = _in_args.copy() + # ms = self._schema.get_method(name) + # if ms is None: + # raise ValueError("Method '%s' is undefined." % name) + + # for aname,prop in ms.get_arguments().iteritems(): + # if aname not in _in_args: + # if prop.get_default(): + # _in_args[aname] = prop.get_default() + # elif not prop.is_optional(): + # raise ValueError("Method '%s' requires argument '%s'" + # % (name, aname)) + # for aname in _in_args.iterkeys(): + # prop = ms.get_argument(aname) + # if prop is None: + # raise ValueError("Method '%s' does not define argument" + # " '%s'" % (name, aname)) + # if "I" not in prop.get_direction(): + # raise ValueError("Method '%s' argument '%s' is not an" + # " input." % (name, aname)) + + # # @todo check if value is correct (type, range, etc) + self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) self._work_q_put = True @@ -567,7 +608,9 @@ class Agent(Thread): self._lock.acquire() try: for name in self._packages.iterkeys(): - if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})): + qmfData = QmfData.create({SchemaClassId.KEY_PACKAGE:name}, + _object_id="_package") + if query.evaluate(qmfData): pnames.append(name) finally: self._lock.release() @@ -631,41 +674,64 @@ class Agent(Thread): t_params = query.get_target_param() if t_params: sid = t_params.get(QmfData.KEY_SCHEMA_ID) - # if querying for a specific object, do a direct lookup if query.get_selector() == QmfQuery.ID: oid = query.get_id() found = None self._lock.acquire() try: - if sid: - found = self._described_data.get(sid) - if found: - found = found.get(oid) + if sid and not sid.get_hash_string(): + # wildcard schema_id match, check each schema + for name,db in self._described_data.iteritems(): + if (name.get_class_name() == sid.get_class_name() + and name.get_package_name() == sid.get_package_name()): + found = db.get(oid) + if found: + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(found.map_encode()) else: - found = self._undescribed_data.get(oid) + if sid: + db = self._described_data.get(sid) + if db: + found = db.get(oid) + else: + found = self._undescribed_data.get(oid) + if found: + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(found.map_encode()) finally: self._lock.release() - if found: - if _idOnly: - data_objs.append(query.get_id()) - else: - data_objs.append(found.map_encode()) else: # otherwise, evaluate all data self._lock.acquire() try: - if sid: - db = self._described_data.get(sid) + if sid and not sid.get_hash_string(): + # wildcard schema_id match, check each schema + for name,db in self._described_data.iteritems(): + if (name.get_class_name() == sid.get_class_name() + and name.get_package_name() == sid.get_package_name()): + for oid,data in db.iteritems(): + if query.evaluate(data): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(data.map_encode()) else: - db = self._undescribed_data - - if db: - for oid,val in db.iteritems(): - if query.evaluate(val): - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(val.map_encode()) + if sid: + db = self._described_data.get(sid) + else: + db = self._undescribed_data + + if db: + for oid,data in db.iteritems(): + if query.evaluate(data): + if _idOnly: + data_objs.append(oid) + else: + data_objs.append(data.map_encode()) finally: self._lock.release() @@ -693,15 +759,40 @@ class QmfAgentData(QmfData): A managed data object that is owned by an agent. """ - def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None, - _schema=None): + def __init__(self, agent, _values={}, _subtypes={}, _tag=None, + _object_id=None, _schema=None): + schema_id = None + if _schema: + schema_id = _schema.get_class_id() + + if _object_id is None: + if not isinstance(_schema, SchemaObjectClass): + raise Exception("An object_id must be provided if the object" + "doesn't have an associated schema.") + ids = _schema.get_id_names() + if not ids: + raise Exception("Object must have an Id or a schema that" + " provides an Id") + _object_id = u"" + for key in ids: + value = _values.get(key) + if value is None: + raise Exception("Object must have a value for key" + " attribute '%s'" % str(key)) + try: + _object_id += unicode(value) + except: + raise Exception("Cannot create object_id from key" + " value '%s'" % str(value)) + # timestamp in millisec since epoch UTC ctime = long(time.time() * 1000) super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes, _tag=_tag, _ctime=ctime, _utime=ctime, _object_id=_object_id, - _schema=_schema, _const=False) + _schema_id=schema_id, _const=False) self._agent = agent + self._validated = False def destroy(self): self._dtime = long(time.time() * 1000) @@ -729,6 +820,25 @@ class QmfAgentData(QmfData): # @todo: need to take write-lock logging.error(" TBD!!!") + def validate(self): + """ + Compares this object's data against the associated schema. Throws an + exception if the data does not conform to the schema. + """ + props = self._schema.get_properties() + for name,val in props.iteritems(): + # @todo validate: type compatible with amqp_type? + # @todo validate: primary keys have values + if name not in self._values: + if val._isOptional: + # ok not to be present, put in dummy value + # to simplify access + self._values[name] = None + else: + raise Exception("Required property '%s' not present." % name) + self._validated = True + + ################################################################################ ################################################################################ diff --git a/qpid/python/qmf2/common.py b/qpid/python/qmf2/common.py index fabe24b30b..280cee3576 100644 --- a/qpid/python/qmf2/common.py +++ b/qpid/python/qmf2/common.py @@ -338,8 +338,7 @@ class _mapEncoder(object): class QmfData(_mapEncoder): """ - Base data class representing arbitrarily structure data. No schema or - managing agent is associated with data of this class. + Base class representing management data. Map format: map["_values"] = map of unordered "name"=<value> pairs (optional) @@ -356,10 +355,10 @@ class QmfData(_mapEncoder): KEY_DELETE_TS = "_delete_ts" def __init__(self, - _values={}, _subtypes={}, _tag=None, _object_id=None, + _values={}, _subtypes={}, _tag=None, + _object_id=None, _schema_id=None, _ctime = 0, _utime = 0, _dtime = 0, - _map=None, - _schema=None, _const=False): + _map=None, _const=False): """ @type _values: dict @param _values: dictionary of initial name=value pairs for object's @@ -372,7 +371,6 @@ class QmfData(_mapEncoder): @type _const: boolean @param _const: if true, this object cannot be modified """ - self._schema_id = None if _map is not None: # construct from map _tag = _map.get(self.KEY_TAG, _tag) @@ -381,11 +379,14 @@ class QmfData(_mapEncoder): _object_id = _map.get(self.KEY_OBJECT_ID, _object_id) sid = _map.get(self.KEY_SCHEMA_ID) if sid: - self._schema_id = SchemaClassId(_map=sid) + _schema_id = SchemaClassId.from_map(sid) _ctime = long(_map.get(self.KEY_CREATE_TS, _ctime)) _utime = long(_map.get(self.KEY_UPDATE_TS, _utime)) _dtime = long(_map.get(self.KEY_DELETE_TS, _dtime)) + if _object_id is None: + raise Exception("An object_id must be provided.") + self._values = _values.copy() self._subtypes = _subtypes.copy() self._tag = _tag @@ -393,30 +394,21 @@ class QmfData(_mapEncoder): self._utime = _utime self._dtime = _dtime self._const = _const + self._schema_id = _schema_id + self._object_id = str(_object_id) - if _object_id is not None: - self._object_id = str(_object_id) - else: - self._object_id = None - - if _schema is not None: - self._set_schema(_schema) - else: - # careful: map constructor may have already set self._schema_id, do - # not override it! - self._schema = None def __create(cls, values, _subtypes={}, _tag=None, _object_id=None, - _schema=None, _const=False): + _schema_id=None, _const=False): # timestamp in millisec since epoch UTC ctime = long(time.time() * 1000) return cls(_values=values, _subtypes=_subtypes, _tag=_tag, _ctime=ctime, _utime=ctime, - _object_id=_object_id, _schema=_schema, _const=_const) + _object_id=_object_id, _schema_id=_schema_id, _const=_const) create = classmethod(__create) - def __from_map(cls, map_, _schema=None, _const=False): - return cls(_map=map_, _schema=_schema, _const=_const) + def __from_map(cls, map_, _const=False): + return cls(_map=map_, _const=_const) from_map = classmethod(__from_map) def is_managed(self): @@ -507,30 +499,7 @@ class QmfData(_mapEncoder): @rtype: str @returns: the identification string, or None if not assigned and id. """ - if self._object_id: - return self._object_id - - # if object id not assigned, see if schema defines a set of field - # values to use as an id - if not self._schema: - return None - - ids = self._schema.get_id_names() - if not ids: - return None - - if not self._validated: - self._validate() - - result = u"" - for key in ids: - try: - result += unicode(self._values[key]) - except: - log.error("get_object_id(): cannot convert value '%s'." % key) - return None - self._object_id = result - return result + return self._object_id def map_encode(self): _map = {} @@ -555,34 +524,6 @@ class QmfData(_mapEncoder): _map[self.KEY_SCHEMA_ID] = self._schema_id.map_encode() return _map - def _set_schema(self, schema): - self._validated = False - self._schema = schema - if schema: - self._schema_id = schema.get_class_id() - if self._const: - self._validate() - else: - self._schema_id = None - - def _validate(self): - """ - Compares this object's data against the associated schema. Throws an - exception if the data does not conform to the schema. - """ - props = self._schema.get_properties() - for name,val in props.iteritems(): - # @todo validate: type compatible with amqp_type? - # @todo validate: primary keys have values - if name not in self._values: - if val._isOptional: - # ok not to be present, put in dummy value - # to simplify access - self._values[name] = None - else: - raise Exception("Required property '%s' not present." % name) - self._validated = True - def __repr__(self): return "QmfData=<<" + str(self.map_encode()) + ">>" @@ -629,7 +570,7 @@ class QmfEvent(QmfData): def __init__(self, _timestamp=None, _sev=SEV_NOTICE, _values={}, _subtypes={}, _tag=None, _map=None, - _schema=None, _const=True): + _schema_id=None, _const=True): """ @type _map: dict @param _map: if not None, construct instance from map representation. @@ -646,14 +587,16 @@ class QmfEvent(QmfData): if _map is not None: # construct from map - super(QmfEvent, self).__init__(_map=_map, _schema=_schema, - _const=_const) + super(QmfEvent, self).__init__(_map=_map, _const=_const, + _object_id="_event") _timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp) _sev = _map.get(self.KEY_SEVERITY, _sev) else: - super(QmfEvent, self).__init__(_values=_values, + super(QmfEvent, self).__init__(_object_id="_event", + _values=_values, _subtypes=_subtypes, _tag=_tag, - _schema=_schema, _const=_const) + _schema_id=_schema_id, + _const=_const) if _timestamp is None: raise TypeError("QmfEvent: a valid timestamp is required.") @@ -665,13 +608,13 @@ class QmfEvent(QmfData): self._severity = _sev def _create(cls, timestamp, severity, values, - _subtypes={}, _tag=None, _schema=None, _const=False): + _subtypes={}, _tag=None, _schema_id=None, _const=False): return cls(_timestamp=timestamp, _sev=severity, _values=values, - _subtypes=_subtypes, _tag=_tag, _schema=_schema, _const=_const) + _subtypes=_subtypes, _tag=_tag, _schema_id=_schema_id, _const=_const) create = classmethod(_create) - def _from_map(cls, map_, _schema=None, _const=False): - return cls(_map=map_, _schema=_schema, _const=_const) + def _from_map(cls, map_, _const=False): + return cls(_map=map_, _const=_const) from_map = classmethod(_from_map) def get_timestamp(self): @@ -1761,7 +1704,9 @@ class SchemaClass(QmfData): self._object_id_names = _map.get(self.KEY_PRIMARY_KEY_NAMES,[]) _desc = _map.get(self.KEY_DESC) else: - super(SchemaClass, self).__init__() + if _classId is None: + raise Exception("A class identifier must be supplied.") + super(SchemaClass, self).__init__(_object_id=str(_classId)) self._object_id_names = [] self._classId = _classId @@ -1876,8 +1821,8 @@ class SchemaObjectClass(SchemaClass): Map format: map(SchemaClass) """ - def __init__(self, _classId=None, _desc=None, - _props={}, _methods={}, _object_id_names=None, + def __init__(self, _classId=None, _desc=None, + _props={}, _methods={}, _object_id_names=[], _map=None): """ @type pname: str diff --git a/qpid/python/qmf2/console.py b/qpid/python/qmf2/console.py index e08abc007c..df0f93f1da 100644 --- a/qpid/python/qmf2/console.py +++ b/qpid/python/qmf2/console.py @@ -23,7 +23,7 @@ import platform import time import datetime import Queue -from threading import Thread +from threading import Thread, Event from threading import Lock from threading import currentThread from threading import Condition @@ -193,9 +193,8 @@ class QmfConsoleData(QmfData): """ Console's representation of an managed QmfData instance. """ - def __init__(self, map_, agent, _schema=None): + def __init__(self, map_, agent): super(QmfConsoleData, self).__init__(_map=map_, - _schema=_schema, _const=True) self._agent = agent @@ -276,31 +275,6 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - if self._schema: - # validate - _in_args = _in_args.copy() - ms = self._schema.get_method(name) - if ms is None: - raise ValueError("Method '%s' is undefined." % name) - - for aname,prop in ms.get_arguments().iteritems(): - if aname not in _in_args: - if prop.get_default(): - _in_args[aname] = prop.get_default() - elif not prop.is_optional(): - raise ValueError("Method '%s' requires argument '%s'" - % (name, aname)) - for aname in _in_args.iterkeys(): - prop = ms.get_argument(aname) - if prop is None: - raise ValueError("Method '%s' does not define argument" - " '%s'" % (name, aname)) - if "I" not in prop.get_direction(): - raise ValueError("Method '%s' argument '%s' is not an" - " input." % (name, aname)) - - # @todo check if value is correct (type, range, etc) - handle = self._agent._console._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") @@ -349,7 +323,7 @@ class QmfConsoleData(QmfData): _tag=newer._tag, _object_id=newer._object_id, _ctime=newer._ctime, _utime=newer._utime, _dtime=newer._dtime, - _schema=newer._schema, _const=True) + _schema_id=newer._schema_id, _const=True) class QmfLocalData(QmfData): """ @@ -396,7 +370,7 @@ class Agent(object): def is_active(self): return self._announce_timestamp != None - + def _send_msg(self, msg, correlation_id=None): """ Low-level routine to asynchronously send a message to this agent. @@ -598,6 +572,9 @@ class Console(Thread): @param kwargs: ??? Unused """ Thread.__init__(self) + self._operational = False + self._ready = Event() + if not name: self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) else: @@ -615,7 +592,6 @@ class Console(Thread): self._locate_sender = None self._schema_cache = {} self._req_correlation = SequencedWaiter() - self._operational = False self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout @@ -706,6 +682,9 @@ class Console(Thread): # self._operational = True self.start() + self._ready.wait(10) + if not self._ready.isSet(): + raise Exception("Console managment thread failed to start.") @@ -906,21 +885,21 @@ class Console(Thread): logging.debug("Response to Object Query received") obj_list = [] for obj_map in reply.content.get(MsgKey.data_obj): - # if the object references a schema, fetch it - sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) - if sid_map: - sid = SchemaClassId.from_map(sid_map) - schema = self._fetch_schema(sid, _agent=agent, - _timeout=timeout) - if not schema: - logging.warning("Unknown schema, id=%s" % sid) - continue - obj = QmfConsoleData(map_=obj_map, agent=agent, - _schema=schema) - else: - # no schema needed - obj = QmfConsoleData(map_=obj_map, agent=agent) + obj = QmfConsoleData(map_=obj_map, agent=agent) obj_list.append(obj) + # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) + # if sid_map: + # sid = SchemaClassId.from_map(sid_map) + # # if the object references a schema, fetch it + # # schema = self._fetch_schema(sid, _agent=agent, + # # _timeout=timeout) + # # if not schema: + # # logging.warning("Unknown schema, id=%s" % sid) + # # continue + # obj = QmfConsoleData(map_=obj_map, agent=agent, + # _schema=schema) + # else: + # # no schema needed return obj_list else: logging.warning("Unexpected Target for a Query: '%s'" % target) @@ -928,9 +907,9 @@ class Console(Thread): def run(self): global _callback_thread - # - # @todo KAG Rewrite when api supports waiting on multiple receivers - # + + self._ready.set() + while self._operational: # qLen = self._work_q.qsize() @@ -1152,7 +1131,7 @@ class Console(Thread): # need to create and add a new agent? matched = False if self._agent_discovery_filter: - tmp = QmfData.create(values=ai_map) + tmp = QmfData.create(values=ai_map, _object_id="agent-filter") matched = self._agent_discovery_filter.evaluate(tmp) if (correlated or matched): diff --git a/qpid/python/qmf2/tests/basic_method.py b/qpid/python/qmf2/tests/basic_method.py index 745fa5d83c..be2bdff9ab 100644 --- a/qpid/python/qmf2/tests/basic_method.py +++ b/qpid/python/qmf2/tests/basic_method.py @@ -84,9 +84,8 @@ class _agentApp(Thread): # instantiate managed data objects matching the schema - _obj1 = QmfAgentData( self.agent, _schema=_schema ) - _obj1.set_value("index1", 100) - _obj1.set_value("index2", "a name" ) + _obj1 = QmfAgentData( self.agent, _schema=_schema, + _values={"index1":100, "index2":"a name"}) _obj1.set_value("set_string", "UNSET") _obj1.set_value("set_int", 0) _obj1.set_value("query_count", 0) @@ -153,7 +152,8 @@ class _agentApp(Thread): if obj is None: error_info = QmfData.create({"code": -2, "description": - "Bad Object Id."}) + "Bad Object Id."}, + _object_id="_error") self.agent.method_response(wi.get_handle(), _error=error_info) else: @@ -170,13 +170,15 @@ class _agentApp(Thread): if obj is None: error_info = QmfData.create({"code": -3, "description": - "Unknown object id."}) + "Unknown object id."}, + _object_id="_error") self.agent.method_response(wi.get_handle(), _error=error_info) elif obj.get_object_id() != "01545": - error_info = QmfData.create({"code": -4, - "description": - "Unexpected id."}) + error_info = QmfData.create( {"code": -4, + "description": + "Unexpected id."}, + _object_id="_error") self.agent.method_response(wi.get_handle(), _error=error_info) else: @@ -187,15 +189,18 @@ class _agentApp(Thread): self.agent.method_response(wi.get_handle(), {"code" : 0}) else: - error_info = QmfData.create({"code": -5, - "description": - "Bad Args."}) + error_info = QmfData.create( + {"code": -5, + "description": + "Bad Args."}, + _object_id="_error") self.agent.method_response(wi.get_handle(), _error=error_info) else: - error_info = QmfData.create({"code": -1, + error_info = QmfData.create( {"code": -1, "description": - "Unknown method call."}) + "Unknown method call."}, + _object_id="_error") self.agent.method_response(wi.get_handle(), _error=error_info) self.agent.release_workitem(wi) @@ -284,7 +289,7 @@ class BaseTest(unittest.TestCase): # find agents # synchronous query for all objects with schema # invalid method call on each object - # - should throw a ValueError + # - should throw a ValueError - NOT YET. self.notifier = _testNotifier() self.console = qmf2.console.Console(notifier=self.notifier, agent_timeout=3) @@ -313,11 +318,18 @@ class BaseTest(unittest.TestCase): obj_list = self.console.do_query(agent, query) self.assertTrue(len(obj_list) == 2) for obj in obj_list: - self.failUnlessRaises(ValueError, - obj.invoke_method, - "unknown_meth", - {"arg1": -99, "arg2": "Now set!"}, - _timeout=3) + mr = obj.invoke_method("unknown_method", + {"arg1": -99, "arg2": "Now set!"}, + _timeout=3) + # self.failUnlessRaises(ValueError, + # obj.invoke_method, + # "unknown_meth", + # {"arg1": -99, "arg2": "Now set!"}, + # _timeout=3) + self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) + self.assertFalse(mr.succeeded()) + self.assertTrue(isinstance(mr.get_exception(), QmfData)) + self.console.destroy(10) def test_bad_method_no_schema(self): diff --git a/qpid/python/qmf2/tests/basic_query.py b/qpid/python/qmf2/tests/basic_query.py index 0f45348d9f..dd321cb4bb 100644 --- a/qpid/python/qmf2/tests/basic_query.py +++ b/qpid/python/qmf2/tests/basic_query.py @@ -84,9 +84,8 @@ class _agentApp(Thread): # instantiate managed data objects matching the schema - _obj1 = QmfAgentData( self.agent, _schema=_schema ) - _obj1.set_value("index1", 100) - _obj1.set_value("index2", "a name" ) + _obj1 = QmfAgentData( self.agent, _schema=_schema, + _values={"index1":100, "index2":"a name"}) _obj1.set_value("set_string", "UNSET") _obj1.set_value("set_int", 0) _obj1.set_value("query_count", 0) diff --git a/qpid/python/qmf2/tests/events.py b/qpid/python/qmf2/tests/events.py index 9a96fbd9a4..e55dc8572e 100644 --- a/qpid/python/qmf2/tests/events.py +++ b/qpid/python/qmf2/tests/events.py @@ -114,7 +114,7 @@ class _agentApp(Thread): QmfEvent.SEV_WARNING, {"prop-1": counter, "prop-2": str(datetime.datetime.utcnow())}, - _schema=self.schema) + _schema_id=self.schema.get_class_id()) counter += 1 self.agent.raise_event(event) wi = self.agent.get_next_workitem(timeout=0) diff --git a/qpid/python/qmf2/tests/obj_gets.py b/qpid/python/qmf2/tests/obj_gets.py index 43f2da5da2..5b1446bb3a 100644 --- a/qpid/python/qmf2/tests/obj_gets.py +++ b/qpid/python/qmf2/tests/obj_gets.py @@ -74,14 +74,16 @@ class _agentApp(Thread): self.agent.register_object_class(_schema) - _obj = QmfAgentData( self.agent, _schema=_schema ) - _obj.set_value("key", "p1c1_key1") + _obj = QmfAgentData( self.agent, + _values={"key":"p1c1_key1"}, + _schema=_schema) _obj.set_value("count1", 0) _obj.set_value("count2", 0) self.agent.add_object( _obj ) - _obj = QmfAgentData( self.agent, _schema=_schema ) - _obj.set_value("key", "p1c1_key2") + _obj = QmfAgentData( self.agent, + _values={"key":"p1c1_key2"}, + _schema=_schema ) _obj.set_value("count1", 9) _obj.set_value("count2", 10) self.agent.add_object( _obj ) @@ -97,8 +99,9 @@ class _agentApp(Thread): self.agent.register_object_class(_schema) - _obj = QmfAgentData( self.agent, _schema=_schema ) - _obj.set_value("name", "p1c2_name1") + _obj = QmfAgentData( self.agent, + _values={"name":"p1c2_name1"}, + _schema=_schema ) _obj.set_value("string1", "a data string") self.agent.add_object( _obj ) @@ -114,13 +117,15 @@ class _agentApp(Thread): self.agent.register_object_class(_schema) - _obj = QmfAgentData( self.agent, _schema=_schema ) - _obj.set_value("key", "p2c1_key1") + _obj = QmfAgentData( self.agent, + _values={"key":"p2c1_key1"}, + _schema=_schema ) _obj.set_value("counter", 0) self.agent.add_object( _obj ) - _obj = QmfAgentData( self.agent, _schema=_schema ) - _obj.set_value("key", "p2c1_key2") + _obj = QmfAgentData( self.agent, + _values={"key":"p2c1_key2"}, + _schema=_schema ) _obj.set_value("counter", 2112) self.agent.add_object( _obj ) @@ -515,3 +520,80 @@ class BaseTest(unittest.TestCase): self.console.destroy(10) + + def test_wildcard_schema_id(self): + # create console + # find all agents + # synchronous query for all described objects by: + # oid & wildcard schema_id + # wildcard schema_id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + wild_schema_id = SchemaClassId("package1", "class1") + objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5) + self.assertTrue(len(objs) == (self.agent_count * 2)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") + + wild_schema_id = SchemaClassId("package1", "class2") + objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") + self.assertTrue(obj.get_object_id() == "p1c2_name1") + + wild_schema_id = SchemaClassId("package2", "class1") + objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5) + self.assertTrue(len(objs) == (self.agent_count * 2)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") + + wild_schema_id = SchemaClassId("package1", "class1") + objs = self.console.get_objects(_schema_id=wild_schema_id, + _object_id="p1c1_key2", _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") + self.assertTrue(obj.get_object_id() == "p1c1_key2") + + # should fail + objs = self.console.get_objects(_schema_id=wild_schema_id, + _object_id="does not exist", + _timeout=5) + self.assertTrue(objs == None) + + wild_schema_id = SchemaClassId("package2", "class1") + objs = self.console.get_objects(_schema_id=wild_schema_id, + _object_id="p2c1_key2", _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") + self.assertTrue(obj.get_object_id() == "p2c1_key2") + + # should fail + wild_schema_id = SchemaClassId("package1", "bad-class") + objs = self.console.get_objects(_schema_id=wild_schema_id, + _object_id="p1c1_key2", _timeout=5) + self.assertTrue(objs == None) + + self.console.destroy(10) + |
