summaryrefslogtreecommitdiff
path: root/qpid/python/qmf2/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qmf2/console.py')
-rw-r--r--qpid/python/qmf2/console.py77
1 files changed, 28 insertions, 49 deletions
diff --git a/qpid/python/qmf2/console.py b/qpid/python/qmf2/console.py
index e08abc007c..df0f93f1da 100644
--- a/qpid/python/qmf2/console.py
+++ b/qpid/python/qmf2/console.py
@@ -23,7 +23,7 @@ import platform
import time
import datetime
import Queue
-from threading import Thread
+from threading import Thread, Event
from threading import Lock
from threading import currentThread
from threading import Condition
@@ -193,9 +193,8 @@ class QmfConsoleData(QmfData):
"""
Console's representation of an managed QmfData instance.
"""
- def __init__(self, map_, agent, _schema=None):
+ def __init__(self, map_, agent):
super(QmfConsoleData, self).__init__(_map=map_,
- _schema=_schema,
_const=True)
self._agent = agent
@@ -276,31 +275,6 @@ class QmfConsoleData(QmfData):
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- if self._schema:
- # validate
- _in_args = _in_args.copy()
- ms = self._schema.get_method(name)
- if ms is None:
- raise ValueError("Method '%s' is undefined." % name)
-
- for aname,prop in ms.get_arguments().iteritems():
- if aname not in _in_args:
- if prop.get_default():
- _in_args[aname] = prop.get_default()
- elif not prop.is_optional():
- raise ValueError("Method '%s' requires argument '%s'"
- % (name, aname))
- for aname in _in_args.iterkeys():
- prop = ms.get_argument(aname)
- if prop is None:
- raise ValueError("Method '%s' does not define argument"
- " '%s'" % (name, aname))
- if "I" not in prop.get_direction():
- raise ValueError("Method '%s' argument '%s' is not an"
- " input." % (name, aname))
-
- # @todo check if value is correct (type, range, etc)
-
handle = self._agent._console._req_correlation.allocate()
if handle == 0:
raise Exception("Can not allocate a correlation id!")
@@ -349,7 +323,7 @@ class QmfConsoleData(QmfData):
_tag=newer._tag, _object_id=newer._object_id,
_ctime=newer._ctime, _utime=newer._utime,
_dtime=newer._dtime,
- _schema=newer._schema, _const=True)
+ _schema_id=newer._schema_id, _const=True)
class QmfLocalData(QmfData):
"""
@@ -396,7 +370,7 @@ class Agent(object):
def is_active(self):
return self._announce_timestamp != None
-
+
def _send_msg(self, msg, correlation_id=None):
"""
Low-level routine to asynchronously send a message to this agent.
@@ -598,6 +572,9 @@ class Console(Thread):
@param kwargs: ??? Unused
"""
Thread.__init__(self)
+ self._operational = False
+ self._ready = Event()
+
if not name:
self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
else:
@@ -615,7 +592,6 @@ class Console(Thread):
self._locate_sender = None
self._schema_cache = {}
self._req_correlation = SequencedWaiter()
- self._operational = False
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
@@ -706,6 +682,9 @@ class Console(Thread):
#
self._operational = True
self.start()
+ self._ready.wait(10)
+ if not self._ready.isSet():
+ raise Exception("Console managment thread failed to start.")
@@ -906,21 +885,21 @@ class Console(Thread):
logging.debug("Response to Object Query received")
obj_list = []
for obj_map in reply.content.get(MsgKey.data_obj):
- # if the object references a schema, fetch it
- sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
- if sid_map:
- sid = SchemaClassId.from_map(sid_map)
- schema = self._fetch_schema(sid, _agent=agent,
- _timeout=timeout)
- if not schema:
- logging.warning("Unknown schema, id=%s" % sid)
- continue
- obj = QmfConsoleData(map_=obj_map, agent=agent,
- _schema=schema)
- else:
- # no schema needed
- obj = QmfConsoleData(map_=obj_map, agent=agent)
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
obj_list.append(obj)
+ # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+ # if sid_map:
+ # sid = SchemaClassId.from_map(sid_map)
+ # # if the object references a schema, fetch it
+ # # schema = self._fetch_schema(sid, _agent=agent,
+ # # _timeout=timeout)
+ # # if not schema:
+ # # logging.warning("Unknown schema, id=%s" % sid)
+ # # continue
+ # obj = QmfConsoleData(map_=obj_map, agent=agent,
+ # _schema=schema)
+ # else:
+ # # no schema needed
return obj_list
else:
logging.warning("Unexpected Target for a Query: '%s'" % target)
@@ -928,9 +907,9 @@ class Console(Thread):
def run(self):
global _callback_thread
- #
- # @todo KAG Rewrite when api supports waiting on multiple receivers
- #
+
+ self._ready.set()
+
while self._operational:
# qLen = self._work_q.qsize()
@@ -1152,7 +1131,7 @@ class Console(Thread):
# need to create and add a new agent?
matched = False
if self._agent_discovery_filter:
- tmp = QmfData.create(values=ai_map)
+ tmp = QmfData.create(values=ai_map, _object_id="agent-filter")
matched = self._agent_discovery_filter.evaluate(tmp)
if (correlated or matched):