diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-03 15:44:26 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-03 15:44:26 +0000 |
| commit | d3323694fe07bdb2d3ed67cceef3fd456eeb3c8d (patch) | |
| tree | e9aad63c9e6cebb17179b8b8c40eb7d4cef27c1a /qpid/python/qmf2/console.py | |
| parent | 2e3e4827a8ad0b322f2b6b87fb67b44c7c945a48 (diff) | |
| download | qpid-python-d3323694fe07bdb2d3ed67cceef3fd456eeb3c8d.tar.gz | |
QPID-2261: 1) remove direct reference to schema in QmfData (use schema id instead). 2) schema_id wildcarding query. 3) Prevent set connection calls from returning until after the management threads start.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@906093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qmf2/console.py')
| -rw-r--r-- | qpid/python/qmf2/console.py | 77 |
1 files changed, 28 insertions, 49 deletions
diff --git a/qpid/python/qmf2/console.py b/qpid/python/qmf2/console.py index e08abc007c..df0f93f1da 100644 --- a/qpid/python/qmf2/console.py +++ b/qpid/python/qmf2/console.py @@ -23,7 +23,7 @@ import platform import time import datetime import Queue -from threading import Thread +from threading import Thread, Event from threading import Lock from threading import currentThread from threading import Condition @@ -193,9 +193,8 @@ class QmfConsoleData(QmfData): """ Console's representation of an managed QmfData instance. """ - def __init__(self, map_, agent, _schema=None): + def __init__(self, map_, agent): super(QmfConsoleData, self).__init__(_map=map_, - _schema=_schema, _const=True) self._agent = agent @@ -276,31 +275,6 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - if self._schema: - # validate - _in_args = _in_args.copy() - ms = self._schema.get_method(name) - if ms is None: - raise ValueError("Method '%s' is undefined." % name) - - for aname,prop in ms.get_arguments().iteritems(): - if aname not in _in_args: - if prop.get_default(): - _in_args[aname] = prop.get_default() - elif not prop.is_optional(): - raise ValueError("Method '%s' requires argument '%s'" - % (name, aname)) - for aname in _in_args.iterkeys(): - prop = ms.get_argument(aname) - if prop is None: - raise ValueError("Method '%s' does not define argument" - " '%s'" % (name, aname)) - if "I" not in prop.get_direction(): - raise ValueError("Method '%s' argument '%s' is not an" - " input." % (name, aname)) - - # @todo check if value is correct (type, range, etc) - handle = self._agent._console._req_correlation.allocate() if handle == 0: raise Exception("Can not allocate a correlation id!") @@ -349,7 +323,7 @@ class QmfConsoleData(QmfData): _tag=newer._tag, _object_id=newer._object_id, _ctime=newer._ctime, _utime=newer._utime, _dtime=newer._dtime, - _schema=newer._schema, _const=True) + _schema_id=newer._schema_id, _const=True) class QmfLocalData(QmfData): """ @@ -396,7 +370,7 @@ class Agent(object): def is_active(self): return self._announce_timestamp != None - + def _send_msg(self, msg, correlation_id=None): """ Low-level routine to asynchronously send a message to this agent. @@ -598,6 +572,9 @@ class Console(Thread): @param kwargs: ??? Unused """ Thread.__init__(self) + self._operational = False + self._ready = Event() + if not name: self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) else: @@ -615,7 +592,6 @@ class Console(Thread): self._locate_sender = None self._schema_cache = {} self._req_correlation = SequencedWaiter() - self._operational = False self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout @@ -706,6 +682,9 @@ class Console(Thread): # self._operational = True self.start() + self._ready.wait(10) + if not self._ready.isSet(): + raise Exception("Console managment thread failed to start.") @@ -906,21 +885,21 @@ class Console(Thread): logging.debug("Response to Object Query received") obj_list = [] for obj_map in reply.content.get(MsgKey.data_obj): - # if the object references a schema, fetch it - sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) - if sid_map: - sid = SchemaClassId.from_map(sid_map) - schema = self._fetch_schema(sid, _agent=agent, - _timeout=timeout) - if not schema: - logging.warning("Unknown schema, id=%s" % sid) - continue - obj = QmfConsoleData(map_=obj_map, agent=agent, - _schema=schema) - else: - # no schema needed - obj = QmfConsoleData(map_=obj_map, agent=agent) + obj = QmfConsoleData(map_=obj_map, agent=agent) obj_list.append(obj) + # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID) + # if sid_map: + # sid = SchemaClassId.from_map(sid_map) + # # if the object references a schema, fetch it + # # schema = self._fetch_schema(sid, _agent=agent, + # # _timeout=timeout) + # # if not schema: + # # logging.warning("Unknown schema, id=%s" % sid) + # # continue + # obj = QmfConsoleData(map_=obj_map, agent=agent, + # _schema=schema) + # else: + # # no schema needed return obj_list else: logging.warning("Unexpected Target for a Query: '%s'" % target) @@ -928,9 +907,9 @@ class Console(Thread): def run(self): global _callback_thread - # - # @todo KAG Rewrite when api supports waiting on multiple receivers - # + + self._ready.set() + while self._operational: # qLen = self._work_q.qsize() @@ -1152,7 +1131,7 @@ class Console(Thread): # need to create and add a new agent? matched = False if self._agent_discovery_filter: - tmp = QmfData.create(values=ai_map) + tmp = QmfData.create(values=ai_map, _object_id="agent-filter") matched = self._agent_discovery_filter.evaluate(tmp) if (correlated or matched): |
