diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-12 23:01:21 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-02-12 23:01:21 +0000 |
commit | 68008d8656a1dc3e96bedc40a93c9c2389c10f2c (patch) | |
tree | 274bdcdb223e0304217bb75b7d56c66a9ca7a99c /python | |
parent | 68fb9a03641e50fbb6c045ac2f39091bf0bf9d08 (diff) | |
download | qpid-python-68008d8656a1dc3e96bedc40a93c9c2389c10f2c.tar.gz |
QPID-2261: add async method call workitems
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@909648 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r-- | python/qmf2/common.py | 1 | ||||
-rw-r--r-- | python/qmf2/console.py | 167 | ||||
-rw-r--r-- | python/qmf2/tests/__init__.py | 1 | ||||
-rw-r--r-- | python/qmf2/tests/async_method.py | 362 | ||||
-rw-r--r-- | python/qmf2/tests/async_query.py | 14 |
5 files changed, 471 insertions, 74 deletions
diff --git a/python/qmf2/common.py b/python/qmf2/common.py index 10ea994a16..8107b86666 100644 --- a/python/qmf2/common.py +++ b/python/qmf2/common.py @@ -132,6 +132,7 @@ class WorkItem(object): EVENT_RECEIVED=7 AGENT_HEARTBEAT=8 QUERY_COMPLETE=9 + METHOD_RESPONSE=10 # Enumeration of the types of WorkItems produced on the Agent METHOD_CALL=1000 QUERY=1001 diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 7b4bfe9fb8..c13cf70755 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -127,14 +127,11 @@ class _AsyncMailbox(_Mailbox): A Mailbox for asynchronous delivery, with a timeout value. """ def __init__(self, console, - agent_name, _timeout=None): """ Invoked by application thread. """ super(_AsyncMailbox, self).__init__(console) - - self.agent_name = agent_name self.console = console if _timeout is None: @@ -186,8 +183,8 @@ class _QueryMailbox(_AsyncMailbox): Invoked by application thread. """ super(_QueryMailbox, self).__init__(console, - agent_name, _timeout) + self.agent_name = agent_name self.target = target self.msgkey = msgkey self.context = context @@ -267,19 +264,15 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): Handles responses to schema fetches made by the console. """ def __init__(self, console, - agent_name, schema_id, _timeout=None): """ Invoked by application thread. """ super(_SchemaPrefetchMailbox, self).__init__(console, - agent_name, _timeout) - self.schema_id = schema_id - def deliver(self, reply): """ Process schema response messages. @@ -306,6 +299,62 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): +class _MethodMailbox(_AsyncMailbox): + """ + A mailbox used for asynchronous method requests. + """ + def __init__(self, console, + context, + _timeout=None): + """ + Invoked by application thread. + """ + super(_MethodMailbox, self).__init__(console, + _timeout) + self.context = context + + def deliver(self, reply): + """ + Process method response messages delivered to this mailbox. + Invoked by Console Management thread only. + """ + + _map = reply.content.get(MsgKey.method) + if not _map: + logging.error("Invalid method call reply message") + result = None + else: + error=_map.get(SchemaMethod.KEY_ERROR) + if error: + error = QmfData.from_map(error) + result = MethodResult(_error=error) + else: + result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) + + # create workitem + wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + def expire(self): + """ + The mailbox expired without receiving a reply. + Invoked by the Console Management thread only. + """ + logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % + datetime.datetime.utcnow()) + # send along an empty response + wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None) + self.console._work_q.put(wi) + self.console._work_q_put = True + + self.destroy() + + + ##============================================================================== ## DATA MODEL ##============================================================================== @@ -374,7 +423,7 @@ class QmfConsoleData(QmfData): query = QmfQuery.create_id_object(self.get_object_id(), self.get_schema_class_id()) obj_list = self._agent._console.do_query(self._agent, query, - timeout=_timeout) + _timeout=_timeout) if obj_list is None or len(obj_list) != 1: return None @@ -385,7 +434,7 @@ class QmfConsoleData(QmfData): def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ - invoke the named method. + Invoke the named method on this object. """ assert self._agent assert self._agent._console @@ -397,7 +446,11 @@ class QmfConsoleData(QmfData): if _timeout is None: _timeout = self._agent._console._reply_timeout - mbox = _SyncMailbox(self._agent._console) + if _reply_handle is not None: + mbox = _MethodMailbox(self._agent._console, + _reply_handle) + else: + mbox = _SyncMailbox(self._agent._console) cid = mbox.get_address() _map = {self.KEY_OBJECT_ID:str(oid), @@ -417,9 +470,8 @@ class QmfConsoleData(QmfData): mbox.destroy() return None - # @todo async method calls!!! if _reply_handle is not None: - print("ASYNC TBD") + return True logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) @@ -561,21 +613,23 @@ class Agent(object): def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ + Invoke the named method on this agent. """ assert self._console if _timeout is None: _timeout = self._console._reply_timeout - if _in_args: - _in_args = _in_args.copy() - - mbox = _SyncMailbox(self._console) + if _reply_handle is not None: + mbox = _MethodMailbox(self._console, + _reply_handle) + else: + mbox = _SyncMailbox(self._console) cid = mbox.get_address() _map = {SchemaMethod.KEY_NAME:name} if _in_args: - _map[SchemaMethod.KEY_ARGUMENTS] = _in_args + _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy() logging.debug("Sending method req to Agent (%s)" % time.time()) try: @@ -585,9 +639,8 @@ class Agent(object): mbox.destroy() return None - # @todo async method calls!!! if _reply_handle is not None: - print("ASYNC TBD") + return True logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) @@ -939,7 +992,7 @@ class Console(Thread): return agent - def do_query(self, agent, query, timeout=None ): + def do_query(self, agent, query, _reply_handle=None, _timeout=None ): """ """ query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, @@ -954,7 +1007,15 @@ class Console(Thread): if not msgkey: raise Exception("Invalid target for query: %s" % str(query)) - mbox = _SyncMailbox(self) + if _reply_handle is not None: + mbox = _QueryMailbox(self, + agent.get_name(), + _reply_handle, + target, msgkey, + _timeout) + else: + mbox = _SyncMailbox(self) + cid = mbox.get_address() try: @@ -965,17 +1026,21 @@ class Console(Thread): mbox.destroy() return None - if not timeout: - timeout = self._reply_timeout + # return now if async reply expected + if _reply_handle is not None: + return True + + if not _timeout: + _timeout = self._reply_timeout - logging.debug("Waiting for response to Query (%s)" % timeout) + logging.debug("Waiting for response to Query (%s)" % _timeout) now = datetime.datetime.utcnow() - expire = now + datetime.timedelta(seconds=timeout) + expire = now + datetime.timedelta(seconds=_timeout) response = [] while (expire > now): - timeout = timedelta_to_secs(expire - now) - reply = mbox.fetch(timeout) + _timeout = timedelta_to_secs(expire - now) + reply = mbox.fetch(_timeout) if not reply: logging.debug("Query wait timed-out.") break @@ -1021,39 +1086,6 @@ class Console(Thread): mbox.destroy() return response - - def do_async_query(self, agent, query, app_handle, _timeout=None ): - """ - """ - query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, - QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, - QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, - QmfQuery.TARGET_SCHEMA: MsgKey.schema, - QmfQuery.TARGET_OBJECT: MsgKey.data_obj, - QmfQuery.TARGET_AGENT: MsgKey.agent_info} - - target = query.get_target() - msgkey = query_keymap.get(target) - if not msgkey: - raise Exception("Invalid target for query: %s" % str(query)) - - mbox = _QueryMailbox(self, - agent.get_name(), - app_handle, - target, msgkey, - _timeout) - cid = mbox.get_address() - - try: - logging.debug("Sending Query to Agent (%s)" % time.time()) - agent._send_query(query, cid) - except SendError, e: - logging.error(str(e)) - mbox.destroy() - return False - return True - - def _wake_thread(self): """ Make the console management thread loop wakeup from its next_receiver @@ -1189,7 +1221,7 @@ class Console(Thread): query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) timeout = timedelta_to_secs(expired - now) - reply = self.do_query(agent, query, timeout) + reply = self.do_query(agent, query, _timeout=timeout) if reply: obj_list = obj_list + reply else: @@ -1209,7 +1241,7 @@ class Console(Thread): [QmfQuery.QUOTE, _pname]] query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred) timeout = timedelta_to_secs(expired - now) - sid_list = self.do_query(agent, query, timeout) + sid_list = self.do_query(agent, query, _timeout=timeout) if sid_list: for sid in sid_list: now = datetime.datetime.utcnow() @@ -1221,7 +1253,7 @@ class Console(Thread): t_params = {QmfData.KEY_SCHEMA_ID: sid} query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) timeout = timedelta_to_secs(expired - now) - reply = self.do_query(agent, query, timeout) + reply = self.do_query(agent, query, _timeout=timeout) if reply: obj_list = obj_list + reply if obj_list: @@ -1591,8 +1623,7 @@ class Console(Thread): self._lock.release() if need_fetch: - mbox = _SchemaPrefetchMailbox(self, agent.get_name(), - schema_id) + mbox = _SchemaPrefetchMailbox(self, schema_id) query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) logging.debug("Sending Schema Query to Agent (%s)" % time.time()) try: @@ -1628,8 +1659,8 @@ class Console(Thread): # note: do_query will add the new schema to the cache automatically. slist = self.do_query(_agent, - QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), - _timeout) + QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), + _timeout=_timeout) if slist: return slist[0] else: diff --git a/python/qmf2/tests/__init__.py b/python/qmf2/tests/__init__.py index 9fabdb9ef5..186f09349e 100644 --- a/python/qmf2/tests/__init__.py +++ b/python/qmf2/tests/__init__.py @@ -26,3 +26,4 @@ import obj_gets import events import multi_response import async_query +import async_method diff --git a/python/qmf2/tests/async_method.py b/python/qmf2/tests/async_method.py new file mode 100644 index 0000000000..556b62756f --- /dev/null +++ b/python/qmf2/tests/async_method.py @@ -0,0 +1,362 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, WorkItem) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent, MethodCallParams) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.broker_url = broker_url + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + # the input value of cookie is returned in the response + _meth.add_argument( "cookie", SchemaProperty(qmfTypes.TYPE_LSTR, + kwargs={"dir":"IO"})) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + self.agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( self.agent, _schema=_schema, + _values={"index1":100, "index2":"a name"}) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + self.agent.add_object( _obj1 ) + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(self.agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj2) + + self.running = False + self.ready = Event() + + def start_app(self): + self.running = True + self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") + + def stop_app(self): + self.running = False + # wake main thread + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + + # Agent application main processing loop + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + if wi.get_type() == WorkItem.METHOD_CALL: + mc = wi.get_params() + if not isinstance(mc, MethodCallParams): + raise Exception("Unexpected method call parameters") + + if mc.get_name() == "set_meth": + obj = self.agent.get_object(mc.get_object_id(), + mc.get_schema_id()) + if obj is None: + error_info = QmfData.create({"code": -2, + "description": + "Bad Object Id."}, + _object_id="_error") + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + obj.inc_value("method_call_count") + out_args = {"code" : 0} + if "cookie" in mc.get_args(): + out_args["cookie"] = mc.get_args()["cookie"] + if "arg_int" in mc.get_args(): + obj.set_value("set_int", mc.get_args()["arg_int"]) + if "arg_str" in mc.get_args(): + obj.set_value("set_string", mc.get_args()["arg_str"]) + self.agent.method_response(wi.get_handle(), + out_args) + elif mc.get_name() == "a_method": + obj = self.agent.get_object(mc.get_object_id(), + mc.get_schema_id()) + if obj is None: + error_info = QmfData.create({"code": -3, + "description": + "Unknown object id."}, + _object_id="_error") + self.agent.method_response(wi.get_handle(), + _error=error_info) + elif obj.get_object_id() != "01545": + error_info = QmfData.create( {"code": -4, + "description": + "Unexpected id."}, + _object_id="_error") + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + args = mc.get_args() + if ("arg1" in args and args["arg1"] == 1 and + "arg2" in args and args["arg2"] == "Now set!" + and "arg3" in args and args["arg3"] == 1966): + out_args = {"code" : 0} + if "cookie" in mc.get_args(): + out_args["cookie"] = mc.get_args()["cookie"] + self.agent.method_response(wi.get_handle(), + out_args) + else: + error_info = QmfData.create( + {"code": -5, + "description": + "Bad Args."}, + _object_id="_error") + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + error_info = QmfData.create( {"code": -1, + "description": + "Unknown method call."}, + _object_id="_error") + self.agent.method_response(wi.get_handle(), _error=error_info) + + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent heartbeat interval + self.agent_heartbeat = 1 + self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) + self.agent2.start_app() + + def tearDown(self): + if self.agent1: + self.agent1.stop_app() + self.agent1 = None + if self.agent2: + self.agent2.stop_app() + self.agent2 = None + + def test_described_obj(self): + # create console + # find agents + # synchronous query for all objects in schema + # method call on each object + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + i_count = 0 + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) + + sid_list = self.console.do_query(agent, query) + self.assertTrue(sid_list and len(sid_list) == 1) + for sid in sid_list: + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + _target_params=t_params) + obj_list = self.console.do_query(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + cookie = "cookie-" + str(i_count) + i_count += 1 + mr = obj.invoke_method( "set_meth", + {"arg_int": -99, + "arg_str": "Now set!", + "cookie": cookie}, + _reply_handle=cookie, + _timeout=3) + self.assertTrue(mr) + + # done, now wait for async responses + + r_count = 0 + while self.notifier.wait_for_work(3): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.METHOD_RESPONSE) + reply = wi.get_params() + self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) + self.assertTrue(reply.succeeded()) + self.assertTrue(reply.get_argument("cookie") == wi.get_handle()) + + wi = self.console.get_next_workitem(timeout=0) + + self.assertTrue(r_count == i_count) + + self.console.destroy(10) + + + def test_managed_obj(self): + # create console + # find agents + # synchronous query for a managed object + # method call on each object + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + i_count = 0 + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545") + obj_list = self.console.do_query(agent, query) + + self.assertTrue(isinstance(obj_list, type([]))) + self.assertTrue(len(obj_list) == 1) + obj = obj_list[0] + + cookie = "cookie-" + str(i_count) + i_count += 1 + mr = obj.invoke_method("a_method", + {"arg1": 1, + "arg2": "Now set!", + "arg3": 1966, + "cookie": cookie}, + _reply_handle=cookie, + _timeout=3) + self.assertTrue(mr) + + # done, now wait for async responses + + r_count = 0 + while self.notifier.wait_for_work(3): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.METHOD_RESPONSE) + reply = wi.get_params() + self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) + self.assertTrue(reply.succeeded()) + self.assertTrue(reply.get_argument("cookie") == wi.get_handle()) + + wi = self.console.get_next_workitem(timeout=0) + + self.assertTrue(r_count == i_count) + + self.console.destroy(10) diff --git a/python/qmf2/tests/async_query.py b/python/qmf2/tests/async_query.py index f598939d46..3a9a767bf0 100644 --- a/python/qmf2/tests/async_query.py +++ b/python/qmf2/tests/async_query.py @@ -227,7 +227,8 @@ class BaseTest(unittest.TestCase): # send queries query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - rc = self.console.do_async_query(agent, query, aname) + rc = self.console.do_query(agent, query, + _reply_handle=aname) self.assertTrue(rc) # done. Now wait for async responses @@ -273,7 +274,7 @@ class BaseTest(unittest.TestCase): # send queries query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT) - rc = self.console.do_async_query(agent, query, aname) + rc = self.console.do_query(agent, query, _reply_handle=aname) self.assertTrue(rc) # done. Now wait for async responses @@ -320,7 +321,7 @@ class BaseTest(unittest.TestCase): t_params = {QmfData.KEY_SCHEMA_ID: SchemaClassId("MyPackage", "MyClass")} query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) # - rc = self.console.do_async_query(agent, query, aname) + rc = self.console.do_query(agent, query, _reply_handle=aname) self.assertTrue(rc) # done. Now wait for async responses @@ -371,7 +372,7 @@ class BaseTest(unittest.TestCase): # send queries query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA) - rc = self.console.do_async_query(agent, query, aname) + rc = self.console.do_query(agent, query, _reply_handle=aname) self.assertTrue(rc) # done. Now wait for async responses @@ -436,8 +437,9 @@ class BaseTest(unittest.TestCase): # now send queries to agents that no longer exist for agent in agents: query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA) - rc = self.console.do_async_query(agent, query, agent.get_name(), - _timeout=2) + rc = self.console.do_query(agent, query, + _reply_handle=agent.get_name(), + _timeout=2) self.assertTrue(rc) # done. Now wait for async responses due to timeouts |