diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-02-14 14:59:24 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-14 14:59:24 +0000 |
| commit | 074811c4bf1531f04b11db25f348e6c520bc4799 (patch) | |
| tree | dd46e4aa9bdaca64974bbddc810f3212d935edd5 /python/qmf2/tests | |
| parent | a025819835829ea7658e4886ddb6e5e488f916eb (diff) | |
| download | qpid-python-074811c4bf1531f04b11db25f348e6c520bc4799.tar.gz | |
moved qpid-* tools out of qpid/python into qpid/tools; moved qmf library into extras/qmf
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@910016 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/tests')
| -rw-r--r-- | python/qmf2/tests/__init__.py | 29 | ||||
| -rw-r--r-- | python/qmf2/tests/agent_discovery.py | 484 | ||||
| -rw-r--r-- | python/qmf2/tests/agent_test.py | 167 | ||||
| -rw-r--r-- | python/qmf2/tests/async_method.py | 362 | ||||
| -rw-r--r-- | python/qmf2/tests/async_query.py | 462 | ||||
| -rw-r--r-- | python/qmf2/tests/basic_method.py | 406 | ||||
| -rw-r--r-- | python/qmf2/tests/basic_query.py | 516 | ||||
| -rw-r--r-- | python/qmf2/tests/console_test.py | 175 | ||||
| -rw-r--r-- | python/qmf2/tests/events.py | 208 | ||||
| -rw-r--r-- | python/qmf2/tests/multi_response.py | 295 | ||||
| -rw-r--r-- | python/qmf2/tests/obj_gets.py | 599 |
11 files changed, 0 insertions, 3703 deletions
diff --git a/python/qmf2/tests/__init__.py b/python/qmf2/tests/__init__.py deleted file mode 100644 index 186f09349e..0000000000 --- a/python/qmf2/tests/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import agent_discovery -import basic_query -import basic_method -import obj_gets -import events -import multi_response -import async_query -import async_method diff --git a/python/qmf2/tests/agent_discovery.py b/python/qmf2/tests/agent_discovery.py deleted file mode 100644 index 59b65221e0..0000000000 --- a/python/qmf2/tests/agent_discovery.py +++ /dev/null @@ -1,484 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -import time -from threading import Thread, Event - -import qpid.messaging -import qmf2.common -import qmf2.console -import qmf2.agent - - -class _testNotifier(qmf2.common.Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.timeout = 3 - self.broker_url = broker_url - self.notifier = _testNotifier() - self.agent = qmf2.agent.Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) - # No database needed for this test - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # Connect the agent to the broker, - # broker_url = "user/passwd@hostname:port" - conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - conn.connect() - self.agent.set_connection(conn) - self.ready.set() - - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - # done, cleanup agent - self.agent.remove_connection(self.timeout) - self.agent.destroy(self.timeout) - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent indication interval - self.agent_heartbeat = 1 - self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) - self.agent1.start_app() - self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) - self.agent2.start_app() - - def tearDown(self): - if self.agent1: - self.agent1.stop_app() - self.agent1 = None - if self.agent2: - self.agent2.stop_app() - self.agent2 = None - - def test_discover_all(self): - """ - create console - enable agent discovery - wait - expect agent add for agent1 and agent2 - """ - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - self.console.enable_agent_discovery() - - agent1_found = agent2_found = False - wi = self.console.get_next_workitem(timeout=3) - while wi and not (agent1_found and agent2_found): - if wi.get_type() == wi.AGENT_ADDED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = True - elif agent.get_name() == "agent2": - agent2_found = True - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - if agent1_found and agent2_found: - break; - - wi = self.console.get_next_workitem(timeout=3) - - self.assertTrue(agent1_found and agent2_found, "All agents not discovered") - - self.console.destroy(10) - - - def test_discover_one(self): - """ - create console - enable agent discovery, filter for agent1 only - wait until timeout - expect agent add for agent1 only - """ - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - query = qmf2.common.QmfQuery.create_predicate( - qmf2.common.QmfQuery.TARGET_AGENT, - [qmf2.common.QmfQuery.EQ, qmf2.common.QmfQuery.KEY_AGENT_NAME, - [qmf2.common.QmfQuery.QUOTE, "agent1"]]) - self.console.enable_agent_discovery(query) - - agent1_found = agent2_found = False - wi = self.console.get_next_workitem(timeout=3) - while wi: - if wi.get_type() == wi.AGENT_ADDED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = True - elif agent.get_name() == "agent2": - agent2_found = True - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - - wi = self.console.get_next_workitem(timeout=2) - - self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered") - - self.console.destroy(10) - - - def test_heartbeat(self): - """ - create console with 2 sec agent timeout - enable agent discovery, find all agents - stop agent1, expect timeout notification - stop agent2, expect timeout notification - """ - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=2) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - self.console.enable_agent_discovery() - - agent1_found = agent2_found = False - wi = self.console.get_next_workitem(timeout=4) - while wi and not (agent1_found and agent2_found): - if wi.get_type() == wi.AGENT_ADDED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = True - elif agent.get_name() == "agent2": - agent2_found = True - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - if agent1_found and agent2_found: - break; - - wi = self.console.get_next_workitem(timeout=4) - - self.assertTrue(agent1_found and agent2_found, "All agents not discovered") - - # now kill agent1 and wait for expiration - - agent1 = self.agent1 - self.agent1 = None - agent1.stop_app() - - wi = self.console.get_next_workitem(timeout=4) - while wi is not None: - if wi.get_type() == wi.AGENT_DELETED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = False - else: - self.fail("Unexpected agent_deleted received: %s" % - agent.get_name()) - if not agent1_found: - break; - - wi = self.console.get_next_workitem(timeout=4) - - self.assertFalse(agent1_found, "agent1 did not delete!") - - # now kill agent2 and wait for expiration - - agent2 = self.agent2 - self.agent2 = None - agent2.stop_app() - - wi = self.console.get_next_workitem(timeout=4) - while wi is not None: - if wi.get_type() == wi.AGENT_DELETED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent2": - agent2_found = False - else: - self.fail("Unexpected agent_deleted received: %s" % - agent.get_name()) - if not agent2_found: - break; - - wi = self.console.get_next_workitem(timeout=4) - - self.assertFalse(agent2_found, "agent2 did not delete!") - - self.console.destroy(10) - - - def test_find_agent(self): - """ - create console - do not enable agent discovery - find agent1, expect success - find agent-none, expect failure - find agent2, expect success - """ - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - agent1 = self.console.find_agent("agent1", timeout=3) - self.assertTrue(agent1 and agent1.get_name() == "agent1") - - no_agent = self.console.find_agent("agent-none", timeout=3) - self.assertTrue(no_agent == None) - - agent2 = self.console.find_agent("agent2", timeout=3) - self.assertTrue(agent2 and agent2.get_name() == "agent2") - - self.console.remove_connection(self.conn, 10) - self.console.destroy(10) - - - def test_heartbeat_x2(self): - """ - create 2 consoles with 2 sec agent timeout - enable agent discovery, find all agents - stop agent1, expect timeout notification on both consoles - stop agent2, expect timeout notification on both consoles - """ - console_count = 2 - self.consoles = [] - for i in range(console_count): - console = qmf2.console.Console("test-console-" + str(i), - notifier=_testNotifier(), - agent_timeout=2) - conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - conn.connect() - console.add_connection(conn) - console.enable_agent_discovery() - self.consoles.append(console) - - # now wait for all consoles to discover all agents, - # agents send a heartbeat once a second - for console in self.consoles: - agent1_found = agent2_found = False - wi = console.get_next_workitem(timeout=2) - while wi and not (agent1_found and agent2_found): - if wi.get_type() == wi.AGENT_ADDED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = True - elif agent.get_name() == "agent2": - agent2_found = True - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - if agent1_found and agent2_found: - break; - wi = console.get_next_workitem(timeout=2) - - self.assertTrue(agent1_found and agent2_found, "All agents not discovered") - - # now kill agent1 and wait for expiration - - agent1 = self.agent1 - self.agent1 = None - agent1.stop_app() - - for console in self.consoles: - agent1_found = True - wi = console.get_next_workitem(timeout=4) - while wi is not None: - if wi.get_type() == wi.AGENT_DELETED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_found = False - break - else: - self.fail("Unexpected agent_deleted received: %s" % - agent.get_name()) - - wi = console.get_next_workitem(timeout=4) - - self.assertFalse(agent1_found, "agent1 did not delete!") - - # now kill agent2 and wait for expiration - - agent2 = self.agent2 - self.agent2 = None - agent2.stop_app() - - for console in self.consoles: - agent2_found = True - wi = console.get_next_workitem(timeout=4) - while wi is not None: - if wi.get_type() == wi.AGENT_DELETED: - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent2": - agent2_found = False - break - else: - self.fail("Unexpected agent_deleted received: %s" % - agent.get_name()) - - wi = console.get_next_workitem(timeout=4) - - self.assertFalse(agent2_found, "agent2 did not delete!") - - - for console in self.consoles: - console.destroy(10) - - - def test_find_agent_x2(self): - """ - create 2 consoles, do not enable agent discovery - console-1: find agent1, expect success - console-2: find agent2, expect success - Verify console-1 does -not- know agent2 - Verify console-2 does -not- know agent1 - """ - console_count = 2 - self.consoles = [] - for i in range(console_count): - console = qmf2.console.Console("test-console-" + str(i), - notifier=_testNotifier(), - agent_timeout=2) - conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - conn.connect() - console.add_connection(conn) - self.consoles.append(console) - - agent1 = self.consoles[0].find_agent("agent1", timeout=3) - self.assertTrue(agent1 and agent1.get_name() == "agent1") - - agent2 = self.consoles[1].find_agent("agent2", timeout=3) - self.assertTrue(agent2 and agent2.get_name() == "agent2") - - # wait long enough for agent heartbeats to be sent... - - time.sleep(self.agent_heartbeat * 2) - - agents = self.consoles[0].get_agents() - self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent1") - agent1 = self.consoles[0].get_agent("agent1") - self.assertTrue(agent1 and agent1.get_name() == "agent1") - - - agents = self.consoles[1].get_agents() - self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent2") - agent2 = self.consoles[1].get_agent("agent2") - self.assertTrue(agent2 and agent2.get_name() == "agent2") - - # verify no new agents were learned - - for console in self.consoles: - console.destroy(10) - diff --git a/python/qmf2/tests/agent_test.py b/python/qmf2/tests/agent_test.py deleted file mode 100644 index 14d8ada197..0000000000 --- a/python/qmf2/tests/agent_test.py +++ /dev/null @@ -1,167 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import logging -import time -import unittest -from threading import Semaphore - - -from qpid.messaging import * -from qmf2.common import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData, - QmfEvent, SchemaMethod, Notifier, SchemaClassId, - WorkItem) -from qmf2.agent import (Agent, QmfAgentData) - - - -class ExampleNotifier(Notifier): - def __init__(self): - self._sema4 = Semaphore(0) # locked - - def indication(self): - self._sema4.release() - - def waitForWork(self): - print("Waiting for event...") - self._sema4.acquire() - print("...event present") - - - - -class QmfTest(unittest.TestCase): - def test_begin(self): - print("!!! being test") - - def test_end(self): - print("!!! end test") - - -# -# An example agent application -# - - -if __name__ == '__main__': - _notifier = ExampleNotifier() - _agent = Agent( "qmf.testAgent", _notifier=_notifier ) - - # Dynamically construct a class schema - - _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), - _desc="A test data schema", - _object_id_names=["index1", "index2"] ) - # add properties - _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) - - # these two properties are statistics - _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # These two properties can be set via the method call - _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) - - - # add method - _meth = SchemaMethod( _desc="Method to set string and int in object." ) - _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) - _schema.add_method( "set_meth", _meth ) - - # Add schema to Agent - - _agent.register_object_class(_schema) - - # instantiate managed data objects matching the schema - - _obj1 = QmfAgentData( _agent, _schema=_schema ) - _obj1.set_value("index1", 100) - _obj1.set_value("index2", "a name" ) - _obj1.set_value("set_string", "UNSET") - _obj1.set_value("set_int", 0) - _obj1.set_value("query_count", 0) - _obj1.set_value("method_call_count", 0) - _agent.add_object( _obj1 ) - - _agent.add_object( QmfAgentData( _agent, _schema=_schema, - _values={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - # add an "unstructured" object to the Agent - _obj2 = QmfAgentData(_agent, _object_id="01545") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 2) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _agent.add_object(_obj2) - - - ## Now connect to the broker - - _c = Connection("localhost") - _c.connect() - _agent.setConnection(_c) - - _error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."}) - - _done = False - while not _done: - # try: - _notifier.waitForWork() - - _wi = _agent.get_next_workitem(timeout=0) - while _wi: - - if _wi.get_type() == WorkItem.METHOD_CALL: - mc = _wi.get_params() - - if mc.get_name() == "set_meth": - print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id()) - print("!!! args='%s'" % str(mc.get_args())) - print("!!! userid=%s" % str(mc.get_user_id())) - print("!!! handle=%s" % _wi.get_handle()) - _agent.method_response(_wi.get_handle(), - {"rc1": 100, "rc2": "Success"}) - else: - print("!!! Unknown Method name = %s" % mc.get_name()) - _agent.method_response(_wi.get_handle(), _error=_error_data) - else: - print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params()))) - - _agent.release_workitem(_wi) - _wi = _agent.get_next_workitem(timeout=0) - # except: - # print( "shutting down...") - # _done = True - - print( "Removing connection... TBD!!!" ) - #_myConsole.remove_connection( _c, 10 ) - - print( "Destroying agent... TBD!!!" ) - #_myConsole.destroy( 10 ) - - print( "******** agent test done ********" ) - - - diff --git a/python/qmf2/tests/async_method.py b/python/qmf2/tests/async_method.py deleted file mode 100644 index 556b62756f..0000000000 --- a/python/qmf2/tests/async_method.py +++ /dev/null @@ -1,362 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -from threading import Thread, Event - -import qpid.messaging -from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, - SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, - QmfData, WorkItem) -import qmf2.console -from qmf2.agent import(QmfAgentData, Agent, MethodCallParams) - - -class _testNotifier(Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.notifier = _testNotifier() - self.broker_url = broker_url - self.agent = Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) - - # Dynamically construct a management database - - _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), - _desc="A test data schema", - _object_id_names=["index1", "index2"] ) - # add properties - _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) - - # these two properties are statistics - _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # These two properties can be set via the method call - _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # add method - _meth = SchemaMethod( _desc="Method to set string and int in object." ) - _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) - # the input value of cookie is returned in the response - _meth.add_argument( "cookie", SchemaProperty(qmfTypes.TYPE_LSTR, - kwargs={"dir":"IO"})) - _schema.add_method( "set_meth", _meth ) - - # Add schema to Agent - - self.agent.register_object_class(_schema) - - # instantiate managed data objects matching the schema - - _obj1 = QmfAgentData( self.agent, _schema=_schema, - _values={"index1":100, "index2":"a name"}) - _obj1.set_value("set_string", "UNSET") - _obj1.set_value("set_int", 0) - _obj1.set_value("query_count", 0) - _obj1.set_value("method_call_count", 0) - self.agent.add_object( _obj1 ) - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - # add an "unstructured" object to the Agent - _obj2 = QmfAgentData(self.agent, _object_id="01545") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 2) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - self.agent.add_object(_obj2) - - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - self.ready.set() - - # Agent application main processing loop - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - if wi.get_type() == WorkItem.METHOD_CALL: - mc = wi.get_params() - if not isinstance(mc, MethodCallParams): - raise Exception("Unexpected method call parameters") - - if mc.get_name() == "set_meth": - obj = self.agent.get_object(mc.get_object_id(), - mc.get_schema_id()) - if obj is None: - error_info = QmfData.create({"code": -2, - "description": - "Bad Object Id."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), - _error=error_info) - else: - obj.inc_value("method_call_count") - out_args = {"code" : 0} - if "cookie" in mc.get_args(): - out_args["cookie"] = mc.get_args()["cookie"] - if "arg_int" in mc.get_args(): - obj.set_value("set_int", mc.get_args()["arg_int"]) - if "arg_str" in mc.get_args(): - obj.set_value("set_string", mc.get_args()["arg_str"]) - self.agent.method_response(wi.get_handle(), - out_args) - elif mc.get_name() == "a_method": - obj = self.agent.get_object(mc.get_object_id(), - mc.get_schema_id()) - if obj is None: - error_info = QmfData.create({"code": -3, - "description": - "Unknown object id."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), - _error=error_info) - elif obj.get_object_id() != "01545": - error_info = QmfData.create( {"code": -4, - "description": - "Unexpected id."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), - _error=error_info) - else: - args = mc.get_args() - if ("arg1" in args and args["arg1"] == 1 and - "arg2" in args and args["arg2"] == "Now set!" - and "arg3" in args and args["arg3"] == 1966): - out_args = {"code" : 0} - if "cookie" in mc.get_args(): - out_args["cookie"] = mc.get_args()["cookie"] - self.agent.method_response(wi.get_handle(), - out_args) - else: - error_info = QmfData.create( - {"code": -5, - "description": - "Bad Args."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), - _error=error_info) - else: - error_info = QmfData.create( {"code": -1, - "description": - "Unknown method call."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), _error=error_info) - - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - if self.conn: - self.agent.remove_connection(10) - self.agent.destroy(10) - - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent heartbeat interval - self.agent_heartbeat = 1 - self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) - self.agent1.start_app() - self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) - self.agent2.start_app() - - def tearDown(self): - if self.agent1: - self.agent1.stop_app() - self.agent1 = None - if self.agent2: - self.agent2.stop_app() - self.agent2 = None - - def test_described_obj(self): - # create console - # find agents - # synchronous query for all objects in schema - # method call on each object - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - i_count = 0 - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == 1) - for sid in sid_list: - t_params = {QmfData.KEY_SCHEMA_ID: sid} - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, - _target_params=t_params) - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 2) - for obj in obj_list: - cookie = "cookie-" + str(i_count) - i_count += 1 - mr = obj.invoke_method( "set_meth", - {"arg_int": -99, - "arg_str": "Now set!", - "cookie": cookie}, - _reply_handle=cookie, - _timeout=3) - self.assertTrue(mr) - - # done, now wait for async responses - - r_count = 0 - while self.notifier.wait_for_work(3): - wi = self.console.get_next_workitem(timeout=0) - while wi is not None: - r_count += 1 - self.assertTrue(wi.get_type() == WorkItem.METHOD_RESPONSE) - reply = wi.get_params() - self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) - self.assertTrue(reply.succeeded()) - self.assertTrue(reply.get_argument("cookie") == wi.get_handle()) - - wi = self.console.get_next_workitem(timeout=0) - - self.assertTrue(r_count == i_count) - - self.console.destroy(10) - - - def test_managed_obj(self): - # create console - # find agents - # synchronous query for a managed object - # method call on each object - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - i_count = 0 - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545") - obj_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(obj_list, type([]))) - self.assertTrue(len(obj_list) == 1) - obj = obj_list[0] - - cookie = "cookie-" + str(i_count) - i_count += 1 - mr = obj.invoke_method("a_method", - {"arg1": 1, - "arg2": "Now set!", - "arg3": 1966, - "cookie": cookie}, - _reply_handle=cookie, - _timeout=3) - self.assertTrue(mr) - - # done, now wait for async responses - - r_count = 0 - while self.notifier.wait_for_work(3): - wi = self.console.get_next_workitem(timeout=0) - while wi is not None: - r_count += 1 - self.assertTrue(wi.get_type() == WorkItem.METHOD_RESPONSE) - reply = wi.get_params() - self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) - self.assertTrue(reply.succeeded()) - self.assertTrue(reply.get_argument("cookie") == wi.get_handle()) - - wi = self.console.get_next_workitem(timeout=0) - - self.assertTrue(r_count == i_count) - - self.console.destroy(10) diff --git a/python/qmf2/tests/async_query.py b/python/qmf2/tests/async_query.py deleted file mode 100644 index 3a9a767bf0..0000000000 --- a/python/qmf2/tests/async_query.py +++ /dev/null @@ -1,462 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -from threading import Thread, Event - -import qpid.messaging -from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, - SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, - QmfData, WorkItem) -import qmf2.console -from qmf2.agent import(QmfAgentData, Agent) - - -class _testNotifier(Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.notifier = _testNotifier() - self.broker_url = broker_url - self.agent = Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) - - # Dynamically construct a management database - - _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), - _desc="A test data schema", - _object_id_names=["index1", "index2"] ) - # add properties - _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) - - # these two properties are statistics - _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # These two properties can be set via the method call - _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # add method - _meth = SchemaMethod( _desc="Method to set string and int in object." ) - _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) - _schema.add_method( "set_meth", _meth ) - - # Add schema to Agent - - self.agent.register_object_class(_schema) - - # instantiate managed data objects matching the schema - - _obj1 = QmfAgentData( self.agent, _schema=_schema, - _values={"index1":100, "index2":"a name"}) - _obj1.set_value("set_string", "UNSET") - _obj1.set_value("set_int", 0) - _obj1.set_value("query_count", 0) - _obj1.set_value("method_call_count", 0) - self.agent.add_object( _obj1 ) - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":50, - "index2": "my name", - "set_string": "SET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - - # add an "unstructured" object to the Agent - _obj2 = QmfAgentData(self.agent, _object_id="01545") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 2) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 50) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01546") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 3) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 51) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01544") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 4) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 49) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01543") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 4) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 48) - self.agent.add_object(_obj2) - - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - self.ready.set() - - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - if self.conn: - self.agent.remove_connection(10) - self.agent.destroy(10) - - - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent indication interval - self.agent_heartbeat = 1 - self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) - self.agent1.start_app() - self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) - self.agent2.start_app() - - def tearDown(self): - if self.agent1: - self.agent1.stop_app() - self.agent1 = None - if self.agent2: - self.agent2.stop_app() - self.agent2 = None - - def test_all_schema_ids(self): - # create console - # find agents - # asynchronous query for all schema ids - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # send queries - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - rc = self.console.do_query(agent, query, - _reply_handle=aname) - self.assertTrue(rc) - - # done. Now wait for async responses - - count = 0 - while self.notifier.wait_for_work(3): - wi = self.console.get_next_workitem(timeout=0) - while wi is not None: - count += 1 - self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) - self.assertTrue(wi.get_handle() == "agent1" or - wi.get_handle() == "agent2") - reply = wi.get_params() - self.assertTrue(len(reply) == 1) - self.assertTrue(isinstance(reply[0], SchemaClassId)) - self.assertTrue(reply[0].get_package_name() == "MyPackage") - self.assertTrue(reply[0].get_class_name() == "MyClass") - self.console.release_workitem(wi) - wi = self.console.get_next_workitem(timeout=0) - - self.assertTrue(count == 2) - self.console.destroy(10) - - - - def test_undescribed_objs(self): - # create console - # find agents - # asynchronous query for all non-schema objects - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # send queries - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT) - rc = self.console.do_query(agent, query, _reply_handle=aname) - self.assertTrue(rc) - - # done. Now wait for async responses - - count = 0 - while self.notifier.wait_for_work(3): - wi = self.console.get_next_workitem(timeout=0) - while wi is not None: - count += 1 - self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) - self.assertTrue(wi.get_handle() == "agent1" or - wi.get_handle() == "agent2") - reply = wi.get_params() - self.assertTrue(len(reply) == 4) - self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData)) - self.assertFalse(reply[0].is_described()) # no schema - self.console.release_workitem(wi) - wi = self.console.get_next_workitem(timeout=0) - - self.assertTrue(count == 2) - self.console.destroy(10) - - - - def test_described_objs(self): - # create console - # find agents - # asynchronous query for all schema-based objects - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # - t_params = {QmfData.KEY_SCHEMA_ID: SchemaClassId("MyPackage", "MyClass")} - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) - # - rc = self.console.do_query(agent, query, _reply_handle=aname) - self.assertTrue(rc) - - # done. Now wait for async responses - - count = 0 - while self.notifier.wait_for_work(3): - wi = self.console.get_next_workitem(timeout=0) - while wi is not None: - count += 1 - self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) - self.assertTrue(wi.get_handle() == "agent1" or - wi.get_handle() == "agent2") - reply = wi.get_params() - self.assertTrue(len(reply) == 3) - self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData)) - self.assertTrue(reply[0].is_described()) # has schema - self.console.release_workitem(wi) - wi = self.console.get_next_workitem(timeout=0) - - self.assertTrue(count == 2) - # @todo test if the console has learned the corresponding schemas.... - self.console.destroy(10) - - - - def test_all_schemas(self): - # create console - # find agents - # asynchronous query for all schemas - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - # test internal state using non-api calls: - # no schemas present yet - self.assertTrue(len(self.console._schema_cache) == 0) - # end test - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # send queries - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA) - rc = self.console.do_query(agent, query, _reply_handle=aname) - self.assertTrue(rc) - - # done. Now wait for async responses - - count = 0 - while self.notifier.wait_for_work(3): - wi = self.console.get_next_workitem(timeout=0) - while wi is not None: - count += 1 - self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) - self.assertTrue(wi.get_handle() == "agent1" or - wi.get_handle() == "agent2") - reply = wi.get_params() - self.assertTrue(len(reply) == 1) - self.assertTrue(isinstance(reply[0], qmf2.common.SchemaObjectClass)) - self.assertTrue(reply[0].get_class_id().get_package_name() == "MyPackage") - self.assertTrue(reply[0].get_class_id().get_class_name() == "MyClass") - self.console.release_workitem(wi) - wi = self.console.get_next_workitem(timeout=0) - - self.assertTrue(count == 2) - - # test internal state using non-api calls: - # schema has been learned - self.assertTrue(len(self.console._schema_cache) == 1) - # end test - - self.console.destroy(10) - - - - def test_query_expiration(self): - # create console - # find agents - # kill the agents - # send async query - # wait for & verify expiration - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=30) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - # find the agents - agents = [] - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - agents.append(agent) - - # now nuke the agents from orbit. It's the only way to be sure. - - self.agent1.stop_app() - self.agent1 = None - self.agent2.stop_app() - self.agent2 = None - - # now send queries to agents that no longer exist - for agent in agents: - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA) - rc = self.console.do_query(agent, query, - _reply_handle=agent.get_name(), - _timeout=2) - self.assertTrue(rc) - - # done. Now wait for async responses due to timeouts - - count = 0 - while self.notifier.wait_for_work(3): - wi = self.console.get_next_workitem(timeout=0) - while wi is not None: - count += 1 - self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE) - self.assertTrue(wi.get_handle() == "agent1" or - wi.get_handle() == "agent2") - reply = wi.get_params() - self.assertTrue(len(reply) == 0) # empty - - self.console.release_workitem(wi) - wi = self.console.get_next_workitem(timeout=0) - - self.assertTrue(count == 2) - self.console.destroy(10) diff --git a/python/qmf2/tests/basic_method.py b/python/qmf2/tests/basic_method.py deleted file mode 100644 index be2bdff9ab..0000000000 --- a/python/qmf2/tests/basic_method.py +++ /dev/null @@ -1,406 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -from threading import Thread, Event - -import qpid.messaging -from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, - SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, - QmfData, WorkItem) -import qmf2.console -from qmf2.agent import(QmfAgentData, Agent, MethodCallParams) - - -class _testNotifier(Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.notifier = _testNotifier() - self.broker_url = broker_url - self.agent = Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) - - # Dynamically construct a management database - - _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), - _desc="A test data schema", - _object_id_names=["index1", "index2"] ) - # add properties - _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) - - # these two properties are statistics - _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # These two properties can be set via the method call - _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # add method - _meth = SchemaMethod( _desc="Method to set string and int in object." ) - _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) - _schema.add_method( "set_meth", _meth ) - - # Add schema to Agent - - self.agent.register_object_class(_schema) - - # instantiate managed data objects matching the schema - - _obj1 = QmfAgentData( self.agent, _schema=_schema, - _values={"index1":100, "index2":"a name"}) - _obj1.set_value("set_string", "UNSET") - _obj1.set_value("set_int", 0) - _obj1.set_value("query_count", 0) - _obj1.set_value("method_call_count", 0) - self.agent.add_object( _obj1 ) - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - # add an "unstructured" object to the Agent - _obj2 = QmfAgentData(self.agent, _object_id="01545") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 2) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - self.agent.add_object(_obj2) - - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - self.ready.set() - - # Agent application main processing loop - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - if wi.get_type() == WorkItem.METHOD_CALL: - mc = wi.get_params() - if not isinstance(mc, MethodCallParams): - raise Exception("Unexpected method call parameters") - - if mc.get_name() == "set_meth": - obj = self.agent.get_object(mc.get_object_id(), - mc.get_schema_id()) - if obj is None: - error_info = QmfData.create({"code": -2, - "description": - "Bad Object Id."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), - _error=error_info) - else: - obj.inc_value("method_call_count") - if "arg_int" in mc.get_args(): - obj.set_value("set_int", mc.get_args()["arg_int"]) - if "arg_str" in mc.get_args(): - obj.set_value("set_string", mc.get_args()["arg_str"]) - self.agent.method_response(wi.get_handle(), - {"code" : 0}) - elif mc.get_name() == "a_method": - obj = self.agent.get_object(mc.get_object_id(), - mc.get_schema_id()) - if obj is None: - error_info = QmfData.create({"code": -3, - "description": - "Unknown object id."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), - _error=error_info) - elif obj.get_object_id() != "01545": - error_info = QmfData.create( {"code": -4, - "description": - "Unexpected id."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), - _error=error_info) - else: - args = mc.get_args() - if ("arg1" in args and args["arg1"] == 1 and - "arg2" in args and args["arg2"] == "Now set!" - and "arg3" in args and args["arg3"] == 1966): - self.agent.method_response(wi.get_handle(), - {"code" : 0}) - else: - error_info = QmfData.create( - {"code": -5, - "description": - "Bad Args."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), - _error=error_info) - else: - error_info = QmfData.create( {"code": -1, - "description": - "Unknown method call."}, - _object_id="_error") - self.agent.method_response(wi.get_handle(), _error=error_info) - - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - if self.conn: - self.agent.remove_connection(10) - self.agent.destroy(10) - - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent heartbeat interval - self.agent_heartbeat = 1 - self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) - self.agent1.start_app() - self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) - self.agent2.start_app() - - def tearDown(self): - if self.agent1: - self.agent1.stop_app() - self.agent1 = None - if self.agent2: - self.agent2.stop_app() - self.agent2 = None - - def test_described_obj(self): - # create console - # find agents - # synchronous query for all objects in schema - # method call on each object - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == 1) - for sid in sid_list: - t_params = {QmfData.KEY_SCHEMA_ID: sid} - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, - _target_params=t_params) - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 2) - for obj in obj_list: - mr = obj.invoke_method( "set_meth", {"arg_int": -99, - "arg_str": "Now set!"}, - _timeout=3) - self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) - self.assertTrue(mr.succeeded()) - self.assertTrue(mr.get_argument("code") == 0) - - self.assertTrue(obj.get_value("method_call_count") == 0) - self.assertTrue(obj.get_value("set_string") == "UNSET") - self.assertTrue(obj.get_value("set_int") == 0) - - obj.refresh() - - self.assertTrue(obj.get_value("method_call_count") == 1) - self.assertTrue(obj.get_value("set_string") == "Now set!") - self.assertTrue(obj.get_value("set_int") == -99) - - self.console.destroy(10) - - - def test_bad_method_schema(self): - # create console - # find agents - # synchronous query for all objects with schema - # invalid method call on each object - # - should throw a ValueError - NOT YET. - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == 1) - for sid in sid_list: - - t_params = {QmfData.KEY_SCHEMA_ID: sid} - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.TRUE], - _target_params=t_params) - - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 2) - for obj in obj_list: - mr = obj.invoke_method("unknown_method", - {"arg1": -99, "arg2": "Now set!"}, - _timeout=3) - # self.failUnlessRaises(ValueError, - # obj.invoke_method, - # "unknown_meth", - # {"arg1": -99, "arg2": "Now set!"}, - # _timeout=3) - self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) - self.assertFalse(mr.succeeded()) - self.assertTrue(isinstance(mr.get_exception(), QmfData)) - - self.console.destroy(10) - - def test_bad_method_no_schema(self): - # create console - # find agents - # synchronous query for all objects with no schema - # invalid method call on each object - # - should throw a ValueError - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT) - - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 1) - for obj in obj_list: - self.assertTrue(obj.get_schema_class_id() == None) - mr = obj.invoke_method("unknown_meth", - {"arg1": -99, "arg2": "Now set!"}, - _timeout=3) - self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) - self.assertFalse(mr.succeeded()) - self.assertTrue(isinstance(mr.get_exception(), QmfData)) - - self.console.destroy(10) - - def test_managed_obj(self): - # create console - # find agents - # synchronous query for a managed object - # method call on each object - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545") - obj_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(obj_list, type([]))) - self.assertTrue(len(obj_list) == 1) - obj = obj_list[0] - - mr = obj.invoke_method("a_method", - {"arg1": 1, - "arg2": "Now set!", - "arg3": 1966}, - _timeout=3) - self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) - self.assertTrue(mr.succeeded()) - self.assertTrue(mr.get_argument("code") == 0) - # @todo refresh and verify changes - - self.console.destroy(10) diff --git a/python/qmf2/tests/basic_query.py b/python/qmf2/tests/basic_query.py deleted file mode 100644 index dd321cb4bb..0000000000 --- a/python/qmf2/tests/basic_query.py +++ /dev/null @@ -1,516 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -from threading import Thread, Event - -import qpid.messaging -from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, - SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, - QmfData) -import qmf2.console -from qmf2.agent import(QmfAgentData, Agent) - - -class _testNotifier(Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.notifier = _testNotifier() - self.broker_url = broker_url - self.agent = Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) - - # Dynamically construct a management database - - _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), - _desc="A test data schema", - _object_id_names=["index1", "index2"] ) - # add properties - _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) - - # these two properties are statistics - _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # These two properties can be set via the method call - _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # add method - _meth = SchemaMethod( _desc="Method to set string and int in object." ) - _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) - _schema.add_method( "set_meth", _meth ) - - # Add schema to Agent - - self.agent.register_object_class(_schema) - - # instantiate managed data objects matching the schema - - _obj1 = QmfAgentData( self.agent, _schema=_schema, - _values={"index1":100, "index2":"a name"}) - _obj1.set_value("set_string", "UNSET") - _obj1.set_value("set_int", 0) - _obj1.set_value("query_count", 0) - _obj1.set_value("method_call_count", 0) - self.agent.add_object( _obj1 ) - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":50, - "index2": "my name", - "set_string": "SET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - - # add an "unstructured" object to the Agent - _obj2 = QmfAgentData(self.agent, _object_id="01545") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 2) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 50) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01546") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 3) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 51) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01544") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 4) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 49) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01543") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 4) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 48) - self.agent.add_object(_obj2) - - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - self.ready.set() - - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - if self.conn: - self.agent.remove_connection(10) - self.agent.destroy(10) - - - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent indication interval - self.agent_heartbeat = 1 - self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) - self.agent1.start_app() - self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) - self.agent2.start_app() - - def tearDown(self): - if self.agent1: - self.agent1.stop_app() - self.agent1 = None - if self.agent2: - self.agent2.stop_app() - self.agent2 = None - - def test_all_oids(self): - # create console - # find agents - # synchronous query for all schemas - # synchronous query for all objects per schema - # verify known object ids are returned - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # first, find objects per schema - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == 1) - for sid in sid_list: - t_params = {QmfData.KEY_SCHEMA_ID: sid} - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID, - _target_params=t_params) - - oid_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(oid_list, type([])), - "Unexpected return type") - self.assertTrue(len(oid_list) == 3, "Wrong count") - self.assertTrue('100a name' in oid_list) - self.assertTrue('99another name' in oid_list) - self.assertTrue('50my name' in oid_list) - self.assertTrue('01545' not in oid_list) - - - # now, find all unmanaged objects (no schema) - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) - oid_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(oid_list, type([])), - "Unexpected return type") - self.assertTrue(len(oid_list) == 4, "Wrong count") - self.assertTrue('100a name' not in oid_list) - self.assertTrue('99another name' not in oid_list) - self.assertTrue('01545' in oid_list) - self.assertTrue('01544' in oid_list) - self.assertTrue('01543' in oid_list) - self.assertTrue('01546' in oid_list) - - self.console.destroy(10) - - - def test_direct_oids(self): - # create console - # find agents - # synchronous query for each objects - # verify objects and schemas are correct - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # first, find objects per schema - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == 1) - - for oid in ['100a name', '99another name']: - query = QmfQuery.create_id_object(oid, sid_list[0]) - obj_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(obj_list, type([])), - "Unexpected return type") - self.assertTrue(len(obj_list) == 1) - obj = obj_list[0] - self.assertTrue(isinstance(obj, QmfData)) - self.assertTrue(obj.get_object_id() == oid) - self.assertTrue(obj.get_schema_class_id() == sid_list[0]) - schema_id = obj.get_schema_class_id() - self.assertTrue(isinstance(schema_id, SchemaClassId)) - self.assertTrue(obj.is_described()) - - # now find schema-less objects - for oid in ['01545']: - query = QmfQuery.create_id_object(oid) - obj_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(obj_list, type([])), - "Unexpected return type") - self.assertTrue(len(obj_list) == 1) - obj = obj_list[0] - self.assertTrue(isinstance(obj, QmfData)) - self.assertTrue(obj.get_object_id() == oid) - self.assertFalse(obj.is_described()) - - self.console.destroy(10) - - - - def test_packages(self): - # create console - # find agents - # synchronous query all package names - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) - package_list = self.console.do_query(agent, query) - self.assertTrue(len(package_list) == 1) - self.assertTrue('MyPackage' in package_list) - - - self.console.destroy(10) - - - - def test_predicate_schema_id(self): - # create console - # find agents - # synchronous query for all schema by package name - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, - [QmfQuery.EQ, - SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, "MyPackage"]]) - - schema_list = self.console.do_query(agent, query) - self.assertTrue(len(schema_list)) - for schema in schema_list: - self.assertTrue(schema.get_class_id().get_package_name() == - "MyPackage") - - - self.console.destroy(10) - - - - def test_predicate_no_match(self): - # create console - # find agents - # synchronous query for all schema by package name - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, - [QmfQuery.EQ, - [QmfQuery.UNQUOTE, SchemaClassId.KEY_PACKAGE], - [QmfQuery.QUOTE, "No-Such-Package"]]) - - schema_list = self.console.do_query(agent, query) - self.assertTrue(len(schema_list) == 0) - - self.console.destroy(10) - - - def test_predicate_match_string(self): - # create console - # find agents - # synchronous query for all objects with a value named - # set_string which is < or equal to "UNSET" - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # get the schema id for MyPackage:MyClass schema - query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, - [QmfQuery.AND, - [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, "MyPackage"]], - [QmfQuery.EQ, SchemaClassId.KEY_CLASS, - [QmfQuery.QUOTE, "MyClass"]]]) - sid_list = self.console.do_query(agent, query) - self.assertTrue(len(sid_list) == 1) - - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, "set_string"]], - [QmfQuery.EQ, "set_string", [QmfQuery.QUOTE, "UNSET"]]], - _target_params={QmfData.KEY_SCHEMA_ID: sid_list[0]}) - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 2) - for obj in obj_list: - self.assertTrue(obj.has_value("set_string")) - self.assertTrue(obj.get_value("set_string") == "UNSET") - - self.console.destroy(10) - - - - def test_predicate_match_integer(self): - # create console - # find agents - # synchronous query for all objects with a value named - # "index1" which is < or equal to various values - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # Query the unmanaged (no schema) objects - - # == 50 - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.EQ, "index1", 50]]) - - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 1) - self.assertTrue(obj_list[0].has_value("index1")) - self.assertTrue(obj_list[0].get_value("index1") == 50) - - # <= 50 - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.LE, "index1", 50]]) - - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 3) - for obj in obj_list: - self.assertTrue(obj.has_value("index1")) - self.assertTrue(obj.get_value("index1") <= 50) - - - # > 50 - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.GT, "index1", 50]]) - - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 1) - for obj in obj_list: - self.assertTrue(obj.has_value("index1")) - self.assertTrue(obj.get_value("index1") > 50) - - self.console.destroy(10) - diff --git a/python/qmf2/tests/console_test.py b/python/qmf2/tests/console_test.py deleted file mode 100644 index ac0e064f20..0000000000 --- a/python/qmf2/tests/console_test.py +++ /dev/null @@ -1,175 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import logging -import time -from threading import Semaphore - - -from qpid.messaging import * -from qmf2.common import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey, - SchemaClassId, SchemaClass, QmfData) -from qmf2.console import Console - - -class ExampleNotifier(Notifier): - def __init__(self): - self._sema4 = Semaphore(0) # locked - - def indication(self): - self._sema4.release() - - def waitForWork(self): - print("Waiting for event...") - self._sema4.acquire() - print("...event present") - - -logging.getLogger().setLevel(logging.INFO) - -print( "Starting Connection" ) -_c = Connection("localhost") -_c.connect() - -print( "Starting Console" ) - -_notifier = ExampleNotifier() -_myConsole = Console(notifier=_notifier) -_myConsole.addConnection( _c ) - -# Allow discovery only for the agent named "qmf.testAgent" -# @todo: replace "manual" query construction with -# a formal class-based Query API -_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, - QmfQueryPredicate({QmfQuery.CMP_EQ: - [QmfQuery.KEY_AGENT_NAME, - "qmf.testAgent"]})) -_myConsole.enable_agent_discovery(_query) - -_done = False -while not _done: -# try: - _notifier.waitForWork() - - _wi = _myConsole.get_next_workitem(timeout=0) - while _wi: - print("!!! work item received %d:%s" % (_wi.get_type(), - str(_wi.get_params()))) - - - if _wi.get_type() == _wi.AGENT_ADDED: - _agent = _wi.get_params().get("agent") - if not _agent: - print("!!!! AGENT IN REPLY IS NULL !!! ") - - _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) - oid_list = _myConsole.doQuery(_agent, _query) - - print("!!!************************** REPLY=%s" % oid_list) - - for oid in oid_list: - _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, - oid) - obj_list = _myConsole.doQuery(_agent, _query) - - print("!!!************************** REPLY=%s" % obj_list) - - if obj_list is None: - obj_list={} - - for obj in obj_list: - resp = obj.invoke_method( "set_meth", - {"arg_int": -11, - "arg_str": "are we not goons?"}, - None, - 3) - if resp is None: - print("!!!*** NO RESPONSE FROM METHOD????") - else: - print("!!! method succeeded()=%s" % resp.succeeded()) - print("!!! method exception()=%s" % resp.get_exception()) - print("!!! method get args() = %s" % resp.get_arguments()) - - if not obj.is_described(): - resp = obj.invoke_method( "bad method", - {"arg_int": -11, - "arg_str": "are we not goons?"}, - None, - 3) - if resp is None: - print("!!!*** NO RESPONSE FROM METHOD????") - else: - print("!!! method succeeded()=%s" % resp.succeeded()) - print("!!! method exception()=%s" % resp.get_exception()) - print("!!! method get args() = %s" % resp.get_arguments()) - - - #--------------------------------- - #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name") - - #obj_list = _myConsole.doQuery(_agent, _query) - - #--------------------------------- - - # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) - - # package_list = _myConsole.doQuery(_agent, _query) - - # for pname in package_list: - # print("!!! Querying for schema from package: %s" % pname) - # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, - # QmfQueryPredicate( - # {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]})) - - # schema_id_list = _myConsole.doQuery(_agent, _query) - # for sid in schema_id_list: - # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, - # QmfQueryPredicate( - # {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID, - # sid.map_encode()]})) - - # schema_list = _myConsole.doQuery(_agent, _query) - # for schema in schema_list: - # sid = schema.get_class_id() - # _query = QmfQuery.create_predicate( - # QmfQuery.TARGET_OBJECT_ID, - # QmfQueryPredicate({QmfQuery.CMP_EQ: - # [QmfData.KEY_SCHEMA_ID, - # sid.map_encode()]})) - - # oid_list = _myConsole.doQuery(_agent, _query) - # for oid in oid_list: - # _query = QmfQuery.create_id( - # QmfQuery.TARGET_OBJECT, oid) - # _reply = _myConsole.doQuery(_agent, _query) - - # print("!!!************************** REPLY=%s" % _reply) - - - _myConsole.release_workitem(_wi) - _wi = _myConsole.get_next_workitem(timeout=0) -# except: -# logging.info( "shutting down..." ) -# _done = True - -print( "Removing connection" ) -_myConsole.removeConnection( _c, 10 ) - -print( "Destroying console:" ) -_myConsole.destroy( 10 ) - -print( "******** console test done ********" ) diff --git a/python/qmf2/tests/events.py b/python/qmf2/tests/events.py deleted file mode 100644 index e55dc8572e..0000000000 --- a/python/qmf2/tests/events.py +++ /dev/null @@ -1,208 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import time -import datetime -import logging -from threading import Thread, Event - -import qpid.messaging -from qpid.harness import Skipped -from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, - SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, - QmfData, SchemaEventClass, - QmfEvent) -import qmf2.console -from qmf2.agent import(QmfAgentData, Agent) - - -class _testNotifier(Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.timeout = 3 - self.broker_url = broker_url - self.notifier = _testNotifier() - self.agent = Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) - - # Dynamically construct a management database - - _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage", - "MyClass", - stype=SchemaClassId.TYPE_EVENT), - _desc="A test event schema") - # add properties - _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR)) - - # Add schema to Agent - self.schema = _schema - self.agent.register_object_class(_schema) - - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - # time.sleep(1) - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(self.timeout) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # broker_url = "user/passwd@hostname:port" - conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - try: - conn.connect() - except qpid.messaging.ConnectError, e: - raise Skipped(e) - - self.agent.set_connection(conn) - self.ready.set() - - counter = 1 - while self.running: - # post an event every second - event = QmfEvent.create(long(time.time() * 1000), - QmfEvent.SEV_WARNING, - {"prop-1": counter, - "prop-2": str(datetime.datetime.utcnow())}, - _schema_id=self.schema.get_class_id()) - counter += 1 - self.agent.raise_event(event) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - self.notifier.wait_for_work(1) - - self.agent.remove_connection(self.timeout) - self.agent.destroy(self.timeout) - - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent indication interval - self.agent1 = _agentApp("agent1", self.broker, 1) - self.agent1.start_app() - self.agent2 = _agentApp("agent2", self.broker, 1) - self.agent2.start_app() - - def tearDown(self): - if self.agent1: - self.agent1.stop_app() - self.agent1 = None - if self.agent2: - self.agent2.stop_app() - self.agent2 = None - - def test_get_events(self): - # create console - # find agents - - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - try: - self.conn.connect() - except qpid.messaging.ConnectError, e: - raise Skipped(e) - - self.console.add_connection(self.conn) - - # find the agents - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # now wait for events - agent1_events = agent2_events = 0 - wi = self.console.get_next_workitem(timeout=4) - while wi: - if wi.get_type() == wi.EVENT_RECEIVED: - event = wi.get_params().get("event") - self.assertTrue(isinstance(event, QmfEvent)) - self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING) - self.assertTrue(event.get_value("prop-1") > 0) - - agent = wi.get_params().get("agent") - if not agent or not isinstance(agent, qmf2.console.Agent): - self.fail("Unexpected workitem from agent") - else: - if agent.get_name() == "agent1": - agent1_events += 1 - elif agent.get_name() == "agent2": - agent2_events += 1 - else: - self.fail("Unexpected agent name received: %s" % - agent.get_name()) - if agent1_events and agent2_events: - break; - - wi = self.console.get_next_workitem(timeout=4) - - self.assertTrue(agent1_events > 0 and agent2_events > 0) - - self.console.destroy(10) - - - - diff --git a/python/qmf2/tests/multi_response.py b/python/qmf2/tests/multi_response.py deleted file mode 100644 index d3d00a70c5..0000000000 --- a/python/qmf2/tests/multi_response.py +++ /dev/null @@ -1,295 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -from threading import Thread, Event - -import qpid.messaging -from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, - SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, - QmfData) -import qmf2.console -from qmf2.agent import(QmfAgentData, Agent) - -# note: objects, schema per agent must each be > max objs -_SCHEMAS_PER_AGENT=7 -_OBJS_PER_AGENT=19 -_MAX_OBJS_PER_MSG=3 - - -class _testNotifier(Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.schema_count = _SCHEMAS_PER_AGENT - self.obj_count = _OBJS_PER_AGENT - self.notifier = _testNotifier() - self.broker_url = broker_url - self.agent = Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat, - _max_msg_size=_MAX_OBJS_PER_MSG) - - # Dynamically construct a management database - for i in range(self.schema_count): - _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", - "MyClass-" + str(i)), - _desc="A test data schema", - _object_id_names=["index1", "index2"] ) - # add properties - _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) - - # these two properties are statistics - _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # These two properties can be set via the method call - _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # add method - _meth = SchemaMethod( _desc="Method to set string and int in object." ) - _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) - _schema.add_method( "set_meth", _meth ) - - # Add schema to Agent - - self.agent.register_object_class(_schema) - - # instantiate managed data objects matching the schema - - for j in range(self.obj_count): - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":j, - "index2": "name-" + str(j), - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - self.ready.set() - - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - if self.conn: - self.agent.remove_connection(10) - self.agent.destroy(10) - - - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.agent_count = 2 - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent indication interval - self.agent_heartbeat = 1 - self.agents = [] - for a in range(self.agent_count): - agent = _agentApp("agent-" + str(a), - self.broker, - self.agent_heartbeat) - agent.start_app() - self.agents.append(agent) - - def tearDown(self): - for agent in self.agents: - if agent is not None: - agent.stop_app() - - def test_all_schema_id(self): - # create console - # find agents - # synchronous query for all schemas_ids - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for agent_app in self.agents: - agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3) - self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name()) - - # get a list of all schema_ids - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT) - for sid in sid_list: - self.assertTrue(isinstance(sid, SchemaClassId)) - self.assertTrue(sid.get_package_name() == "MyPackage") - self.assertTrue(sid.get_class_name().split('-')[0] == "MyClass") - - self.console.destroy(10) - - - def test_all_schema(self): - # create console - # find agents - # synchronous query for all schemas - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for agent_app in self.agents: - agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3) - self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name()) - - # get a list of all schema_ids - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA) - schema_list = self.console.do_query(agent, query) - self.assertTrue(schema_list and - len(schema_list) == _SCHEMAS_PER_AGENT) - for schema in schema_list: - self.assertTrue(isinstance(schema, SchemaObjectClass)) - - self.console.destroy(10) - - - def test_all_object_id(self): - # create console - # find agents - # synchronous query for all object_ids by schema_id - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for agent_app in self.agents: - agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3) - self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name()) - - # get a list of all schema_ids - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT) - for sid in sid_list: - query = QmfQuery.create_wildcard_object_id(sid) - oid_list = self.console.do_query(agent, query) - self.assertTrue(oid_list and - len(oid_list) == _OBJS_PER_AGENT) - for oid in oid_list: - self.assertTrue(isinstance(oid, basestring)) - - self.console.destroy(10) - - - def test_all_objects(self): - # create console - # find agents - # synchronous query for all objects by schema_id - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for agent_app in self.agents: - agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3) - self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name()) - - # get a list of all schema_ids - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT) - for sid in sid_list: - query = QmfQuery.create_wildcard_object(sid) - obj_list = self.console.do_query(agent, query) - self.assertTrue(obj_list and - len(obj_list) == _OBJS_PER_AGENT) - for obj in obj_list: - self.assertTrue(isinstance(obj, - qmf2.console.QmfConsoleData)) - - self.console.destroy(10) diff --git a/python/qmf2/tests/obj_gets.py b/python/qmf2/tests/obj_gets.py deleted file mode 100644 index 5b1446bb3a..0000000000 --- a/python/qmf2/tests/obj_gets.py +++ /dev/null @@ -1,599 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -import datetime -from threading import Thread, Event - -import qpid.messaging -from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, - SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, - QmfData) -import qmf2.console -from qmf2.agent import(QmfAgentData, Agent) - - -class _testNotifier(Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.notifier = _testNotifier() - self.broker_url = broker_url - self.agent = Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) - - # Management Database - # - two different schema packages, - # - two classes within one schema package - # - multiple objects per schema package+class - # - two "undescribed" objects - - # "package1/class1" - - _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"), - _desc="A test data schema - one", - _object_id_names=["key"] ) - - _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32)) - _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32)) - - self.agent.register_object_class(_schema) - - _obj = QmfAgentData( self.agent, - _values={"key":"p1c1_key1"}, - _schema=_schema) - _obj.set_value("count1", 0) - _obj.set_value("count2", 0) - self.agent.add_object( _obj ) - - _obj = QmfAgentData( self.agent, - _values={"key":"p1c1_key2"}, - _schema=_schema ) - _obj.set_value("count1", 9) - _obj.set_value("count2", 10) - self.agent.add_object( _obj ) - - # "package1/class2" - - _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"), - _desc="A test data schema - two", - _object_id_names=["name"] ) - # add properties - _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR)) - - self.agent.register_object_class(_schema) - - _obj = QmfAgentData( self.agent, - _values={"name":"p1c2_name1"}, - _schema=_schema ) - _obj.set_value("string1", "a data string") - self.agent.add_object( _obj ) - - - # "package2/class1" - - _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"), - _desc="A test data schema - second package", - _object_id_names=["key"] ) - - _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32)) - - self.agent.register_object_class(_schema) - - _obj = QmfAgentData( self.agent, - _values={"key":"p2c1_key1"}, - _schema=_schema ) - _obj.set_value("counter", 0) - self.agent.add_object( _obj ) - - _obj = QmfAgentData( self.agent, - _values={"key":"p2c1_key2"}, - _schema=_schema ) - _obj.set_value("counter", 2112) - self.agent.add_object( _obj ) - - - # add two "unstructured" objects to the Agent - - _obj = QmfAgentData(self.agent, _object_id="undesc-1") - _obj.set_value("field1", "a value") - _obj.set_value("field2", 2) - _obj.set_value("field3", {"a":1, "map":2, "value":3}) - _obj.set_value("field4", ["a", "list", "value"]) - self.agent.add_object(_obj) - - - _obj = QmfAgentData(self.agent, _object_id="undesc-2") - _obj.set_value("key-1", "a value") - _obj.set_value("key-2", 2) - self.agent.add_object(_obj) - - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - self.ready.set() - - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - if self.conn: - self.agent.remove_connection(10) - self.agent.destroy(10) - - -class BaseTest(unittest.TestCase): - agent_count = 5 - - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - self.agents = [] - for i in range(self.agent_count): - agent = _agentApp("agent-" + str(i), self.broker, 1) - agent.start_app() - self.agents.append(agent) - #print("!!!! STARTING TEST: %s" % datetime.datetime.utcnow()) - - def tearDown(self): - #print("!!!! STOPPING TEST: %s" % datetime.datetime.utcnow()) - for agent in self.agents: - if agent is not None: - agent.stop_app() - - - def test_all_agents(self): - # create console - # find all agents - # synchronous query for all objects by id - # verify known object ids are returned - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for agent_app in self.agents: - aname = agent_app.agent.get_name() - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # console has discovered all agents, now query all undesc-2 objects - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_object_id="undesc-2", _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_object_id() == "undesc-2") - - # now query all objects from schema "package1" - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_pname="package1", _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == (self.agent_count * 3)) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - - # now query all objects from schema "package2" - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_pname="package2", _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == (self.agent_count * 2)) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") - - # now query all objects from schema "package1/class2" - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_pname="package1", _cname="class2", - _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") - - # given the schema identifier from the last query, repeat using the - # specific schema id - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - schema_id = objs[0].get_schema_class_id() - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_schema_id=schema_id, _timeout=5) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id() == schema_id) - - - self.console.destroy(10) - - - - def test_agent_subset(self): - # create console - # find all agents - # synchronous query for all objects by id - # verify known object ids are returned - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - agent_list = [] - for agent_app in self.agents: - aname = agent_app.agent.get_name() - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - agent_list.append(agent) - - # Only use a subset of the agents: - agent_list = agent_list[:len(agent_list)/2] - - # console has discovered all agents, now query all undesc-2 objects - objs = self.console.get_objects(_object_id="undesc-2", - _agents=agent_list, _timeout=5) - self.assertTrue(len(objs) == len(agent_list)) - for obj in objs: - self.assertTrue(obj.get_object_id() == "undesc-2") - - # now query all objects from schema "package1" - objs = self.console.get_objects(_pname="package1", - _agents=agent_list, - _timeout=5) - self.assertTrue(len(objs) == (len(agent_list) * 3)) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - - # now query all objects from schema "package2" - objs = self.console.get_objects(_pname="package2", - _agents=agent_list, - _timeout=5) - self.assertTrue(len(objs) == (len(agent_list) * 2)) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") - - # now query all objects from schema "package1/class2" - objs = self.console.get_objects(_pname="package1", _cname="class2", - _agents=agent_list, - _timeout=5) - self.assertTrue(len(objs) == len(agent_list)) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") - - # given the schema identifier from the last query, repeat using the - # specific schema id - schema_id = objs[0].get_schema_class_id() - objs = self.console.get_objects(_schema_id=schema_id, - _agents=agent_list, - _timeout=5) - self.assertTrue(len(objs) == len(agent_list)) - for obj in objs: - self.assertTrue(obj.get_schema_class_id() == schema_id) - - - self.console.destroy(10) - - - - def test_single_agent(self): - # create console - # find all agents - # synchronous query for all objects by id - # verify known object ids are returned - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - agent_list = [] - for agent_app in self.agents: - aname = agent_app.agent.get_name() - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - agent_list.append(agent) - - # Only use one agent - agent = agent_list[0] - - # console has discovered all agents, now query all undesc-2 objects - objs = self.console.get_objects(_object_id="undesc-2", - _agents=agent, _timeout=5) - self.assertTrue(len(objs) == 1) - for obj in objs: - self.assertTrue(obj.get_object_id() == "undesc-2") - - # now query all objects from schema "package1" - objs = self.console.get_objects(_pname="package1", - _agents=agent, - _timeout=5) - self.assertTrue(len(objs) == 3) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - - # now query all objects from schema "package2" - objs = self.console.get_objects(_pname="package2", - _agents=agent, - _timeout=5) - self.assertTrue(len(objs) == 2) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") - - # now query all objects from schema "package1/class2" - objs = self.console.get_objects(_pname="package1", _cname="class2", - _agents=agent, - _timeout=5) - self.assertTrue(len(objs) == 1) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") - - # given the schema identifier from the last query, repeat using the - # specific schema id - schema_id = objs[0].get_schema_class_id() - objs = self.console.get_objects(_schema_id=schema_id, - _agents=agent, - _timeout=5) - self.assertTrue(len(objs) == 1) - for obj in objs: - self.assertTrue(obj.get_schema_class_id() == schema_id) - - - self.console.destroy(10) - - - - def test_all_objs_by_oid(self): - # create console - # find all agents - # synchronous query for all described objects by: - # oid & schema_id - # oid & package name - # oid & package and class name - # verify known object ids are returned - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for agent_app in self.agents: - aname = agent_app.agent.get_name() - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # now query all objects from schema "package1" - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_pname="package1", - _object_id="p1c1_key1", _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") - self.assertTrue(obj.get_object_id() == "p1c1_key1") - # mooch the schema for a later test - schema_id_p1c1 = objs[0].get_schema_class_id() - - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_pname="package1", - _object_id="p1c2_name1", _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") - self.assertTrue(obj.get_object_id() == "p1c2_name1") - - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_pname="package2", _cname="class1", - _object_id="p2c1_key1", _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") - self.assertTrue(obj.get_object_id() == "p2c1_key1") - - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_schema_id=schema_id_p1c1, - _object_id="p1c1_key2", _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") - self.assertTrue(obj.get_object_id() == "p1c1_key2") - - # this should return all "undescribed" objects - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(len(objs) == (self.agent_count * 2)) - for obj in objs: - self.assertTrue(obj.get_object_id() == "undesc-1" or - obj.get_object_id() == "undesc-2") - - # these should fail - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_schema_id=schema_id_p1c1, - _object_id="does not exist", - _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(objs == None) - - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_pname="package2", - _object_id="does not exist", - _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(objs == None) - - #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow()) - objs = self.console.get_objects(_pname="package3", - _object_id="does not exist", - _timeout=5) - #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow()) - self.assertTrue(objs == None) - - self.console.destroy(10) - - - def test_wildcard_schema_id(self): - # create console - # find all agents - # synchronous query for all described objects by: - # oid & wildcard schema_id - # wildcard schema_id - # verify known object ids are returned - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for agent_app in self.agents: - aname = agent_app.agent.get_name() - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - wild_schema_id = SchemaClassId("package1", "class1") - objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5) - self.assertTrue(len(objs) == (self.agent_count * 2)) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") - - wild_schema_id = SchemaClassId("package1", "class2") - objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") - self.assertTrue(obj.get_object_id() == "p1c2_name1") - - wild_schema_id = SchemaClassId("package2", "class1") - objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5) - self.assertTrue(len(objs) == (self.agent_count * 2)) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") - - wild_schema_id = SchemaClassId("package1", "class1") - objs = self.console.get_objects(_schema_id=wild_schema_id, - _object_id="p1c1_key2", _timeout=5) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") - self.assertTrue(obj.get_object_id() == "p1c1_key2") - - # should fail - objs = self.console.get_objects(_schema_id=wild_schema_id, - _object_id="does not exist", - _timeout=5) - self.assertTrue(objs == None) - - wild_schema_id = SchemaClassId("package2", "class1") - objs = self.console.get_objects(_schema_id=wild_schema_id, - _object_id="p2c1_key2", _timeout=5) - self.assertTrue(len(objs) == self.agent_count) - for obj in objs: - self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") - self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1") - self.assertTrue(obj.get_object_id() == "p2c1_key2") - - # should fail - wild_schema_id = SchemaClassId("package1", "bad-class") - objs = self.console.get_objects(_schema_id=wild_schema_id, - _object_id="p1c1_key2", _timeout=5) - self.assertTrue(objs == None) - - self.console.destroy(10) - |
