diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-01-25 18:17:10 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-01-25 18:17:10 +0000 |
| commit | 8c1e8ae6633847a734bd8f3de8bf2943a7ced668 (patch) | |
| tree | 7faf8ad4c805d7cf7438579a43a92cbedf6e1e48 /qpid/python/qmf2/tests | |
| parent | 6224ba152a429548755ddc8153633c3797dd9647 (diff) | |
| download | qpid-python-8c1e8ae6633847a734bd8f3de8bf2943a7ced668.tar.gz | |
QPID-2261: merge qmfv2 latest into trunk (revs 902858 & 902894)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@902906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qmf2/tests')
| -rw-r--r-- | qpid/python/qmf2/tests/__init__.py | 22 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/agent_discovery.py | 320 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/agent_test.py | 167 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/basic_method.py | 348 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/basic_query.py | 336 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/console_test.py | 175 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/events.py | 193 | ||||
| -rw-r--r-- | qpid/python/qmf2/tests/obj_gets.py | 399 |
8 files changed, 1960 insertions, 0 deletions
diff --git a/qpid/python/qmf2/tests/__init__.py b/qpid/python/qmf2/tests/__init__.py new file mode 100644 index 0000000000..2e742b79be --- /dev/null +++ b/qpid/python/qmf2/tests/__init__.py @@ -0,0 +1,22 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import agent_discovery, basic_query, basic_method, obj_gets, events diff --git a/qpid/python/qmf2/tests/agent_discovery.py b/qpid/python/qmf2/tests/agent_discovery.py new file mode 100644 index 0000000000..3c530cc060 --- /dev/null +++ b/qpid/python/qmf2/tests/agent_discovery.py @@ -0,0 +1,320 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +import qmf2.common +import qmf2.console +import qmf2.agent + + +class _testNotifier(qmf2.common.Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = qmf2.agent.Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + # No database needed for this test + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", 1) + self.agent1.connect_agent(self.broker) + self.agent2 = _agentApp("agent2", 1) + self.agent2.connect_agent(self.broker) + + def tearDown(self): + if self.agent1: + self.agent1.shutdown_agent(10) + self.agent1.stop() + self.agent1 = None + if self.agent2: + self.agent2.shutdown_agent(10) + self.agent2.stop() + self.agent2 = None + + def test_discover_all(self): + # create console + # enable agent discovery + # wait + # expect agent add for agent1 and agent2 + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + self.console.enable_agent_discovery() + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=3) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=3) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + self.console.destroy(10) + + + def test_discover_one(self): + # create console + # enable agent discovery, filter for agent1 only + # wait until timeout + # expect agent add for agent1 only + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + query = qmf2.common.QmfQuery.create_predicate( + qmf2.common.QmfQuery.TARGET_AGENT, + qmf2.common.QmfQueryPredicate({qmf2.common.QmfQuery.CMP_EQ: + [qmf2.common.QmfQuery.KEY_AGENT_NAME, "agent1"]})) + self.console.enable_agent_discovery(query) + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=3) + while wi: + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + + wi = self.console.get_next_workitem(timeout=2) + + self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered") + + self.console.destroy(10) + + + def test_heartbeat(self): + # create console with 2 sec agent timeout + # enable agent discovery, find all agents + # stop agent1, expect timeout notification + # stop agent2, expect timeout notification + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=2) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + self.console.enable_agent_discovery() + + agent1_found = agent2_found = False + wi = self.console.get_next_workitem(timeout=4) + while wi and not (agent1_found and agent2_found): + if wi.get_type() == wi.AGENT_ADDED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = True + elif agent.get_name() == "agent2": + agent2_found = True + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_found and agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertTrue(agent1_found and agent2_found, "All agents not discovered") + + # now kill agent1 and wait for expiration + + agent1 = self.agent1 + self.agent1 = None + agent1.shutdown_agent(10) + agent1.stop() + + wi = self.console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_found = False + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + if not agent1_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertFalse(agent1_found, "agent1 did not delete!") + + # now kill agent2 and wait for expiration + + agent2 = self.agent2 + self.agent2 = None + agent2.shutdown_agent(10) + agent2.stop() + + wi = self.console.get_next_workitem(timeout=4) + while wi is not None: + if wi.get_type() == wi.AGENT_DELETED: + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent2": + agent2_found = False + else: + self.fail("Unexpected agent_deleted received: %s" % + agent.get_name()) + if not agent2_found: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertFalse(agent2_found, "agent2 did not delete!") + + self.console.destroy(10) + + + def test_find_agent(self): + # create console + # do not enable agent discovery + # find agent1, expect success + # find agent-none, expect failure + # find agent2, expect success + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + agent1 = self.console.find_agent("agent1", timeout=3) + self.assertTrue(agent1 and agent1.get_name() == "agent1") + + no_agent = self.console.find_agent("agent-none", timeout=3) + self.assertTrue(no_agent == None) + + agent2 = self.console.find_agent("agent2", timeout=3) + self.assertTrue(agent2 and agent2.get_name() == "agent2") + + self.console.removeConnection(self.conn, 10) + self.console.destroy(10) + + diff --git a/qpid/python/qmf2/tests/agent_test.py b/qpid/python/qmf2/tests/agent_test.py new file mode 100644 index 0000000000..14d8ada197 --- /dev/null +++ b/qpid/python/qmf2/tests/agent_test.py @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import logging +import time +import unittest +from threading import Semaphore + + +from qpid.messaging import * +from qmf2.common import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData, + QmfEvent, SchemaMethod, Notifier, SchemaClassId, + WorkItem) +from qmf2.agent import (Agent, QmfAgentData) + + + +class ExampleNotifier(Notifier): + def __init__(self): + self._sema4 = Semaphore(0) # locked + + def indication(self): + self._sema4.release() + + def waitForWork(self): + print("Waiting for event...") + self._sema4.acquire() + print("...event present") + + + + +class QmfTest(unittest.TestCase): + def test_begin(self): + print("!!! being test") + + def test_end(self): + print("!!! end test") + + +# +# An example agent application +# + + +if __name__ == '__main__': + _notifier = ExampleNotifier() + _agent = Agent( "qmf.testAgent", _notifier=_notifier ) + + # Dynamically construct a class schema + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + _agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( _agent, _schema=_schema ) + _obj1.set_value("index1", 100) + _obj1.set_value("index2", "a name" ) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + _agent.add_object( _obj1 ) + + _agent.add_object( QmfAgentData( _agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(_agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + _agent.add_object(_obj2) + + + ## Now connect to the broker + + _c = Connection("localhost") + _c.connect() + _agent.setConnection(_c) + + _error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."}) + + _done = False + while not _done: + # try: + _notifier.waitForWork() + + _wi = _agent.get_next_workitem(timeout=0) + while _wi: + + if _wi.get_type() == WorkItem.METHOD_CALL: + mc = _wi.get_params() + + if mc.get_name() == "set_meth": + print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id()) + print("!!! args='%s'" % str(mc.get_args())) + print("!!! userid=%s" % str(mc.get_user_id())) + print("!!! handle=%s" % _wi.get_handle()) + _agent.method_response(_wi.get_handle(), + {"rc1": 100, "rc2": "Success"}) + else: + print("!!! Unknown Method name = %s" % mc.get_name()) + _agent.method_response(_wi.get_handle(), _error=_error_data) + else: + print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params()))) + + _agent.release_workitem(_wi) + _wi = _agent.get_next_workitem(timeout=0) + # except: + # print( "shutting down...") + # _done = True + + print( "Removing connection... TBD!!!" ) + #_myConsole.remove_connection( _c, 10 ) + + print( "Destroying agent... TBD!!!" ) + #_myConsole.destroy( 10 ) + + print( "******** agent test done ********" ) + + + diff --git a/qpid/python/qmf2/tests/basic_method.py b/qpid/python/qmf2/tests/basic_method.py new file mode 100644 index 0000000000..a054a769d0 --- /dev/null +++ b/qpid/python/qmf2/tests/basic_method.py @@ -0,0 +1,348 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate, WorkItem) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent, MethodCallParams) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + self.agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( self.agent, _schema=_schema ) + _obj1.set_value("index1", 100) + _obj1.set_value("index2", "a name" ) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + self.agent.add_object( _obj1 ) + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(self.agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj2) + + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # Agent application main processing loop + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + if wi.get_type() == WorkItem.METHOD_CALL: + mc = wi.get_params() + if not isinstance(mc, MethodCallParams): + raise Exception("Unexpected method call parameters") + + if mc.get_name() == "set_meth": + obj = self.agent.get_object(mc.get_object_id()) + if obj is None: + error_info = QmfData.create({"code": -2, + "description": + "Bad Object Id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + obj.inc_value("method_call_count") + if "arg_int" in mc.get_args(): + obj.set_value("set_int", mc.get_args()["arg_int"]) + if "arg_str" in mc.get_args(): + obj.set_value("set_string", mc.get_args()["arg_str"]) + self.agent.method_response(wi.get_handle(), + {"code" : 0}) + elif mc.get_name() == "a_method": + obj = self.agent.get_object(mc.get_object_id()) + if obj is None: + error_info = QmfData.create({"code": -3, + "description": + "Unknown object id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + elif obj.get_object_id() != "01545": + error_info = QmfData.create({"code": -4, + "description": + "Unexpected id."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + args = mc.get_args() + if ("arg1" in args and args["arg1"] == 1 and + "arg2" in args and args["arg2"] == "Now set!" + and "arg3" in args and args["arg3"] == 1966): + self.agent.method_response(wi.get_handle(), + {"code" : 0}) + else: + error_info = QmfData.create({"code": -5, + "description": + "Bad Args."}) + self.agent.method_response(wi.get_handle(), + _error=error_info) + else: + error_info = QmfData.create({"code": -1, + "description": + "Unknown method call."}) + self.agent.method_response(wi.get_handle(), _error=error_info) + + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", 1) + self.agent1.connect_agent(self.broker) + self.agent2 = _agentApp("agent2", 1) + self.agent2.connect_agent(self.broker) + + def tearDown(self): + if self.agent1: + self.agent1.shutdown_agent(10) + self.agent1.stop() + self.agent1 = None + if self.agent2: + self.agent2.shutdown_agent(10) + self.agent2.stop() + self.agent2 = None + + def test_described_obj(self): + # create console + # find agents + # synchronous query for all objects in schema + # method call on each object + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, + QmfQueryPredicate( + {QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]}, + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "MyPackage"]}]})) + + obj_list = self.console.doQuery(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + mr = obj.invoke_method( "set_meth", {"arg_int": -99, + "arg_str": "Now set!"}, + _timeout=3) + self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) + self.assertTrue(mr.succeeded()) + self.assertTrue(mr.get_argument("code") == 0) + + self.assertTrue(obj.get_value("method_call_count") == 0) + self.assertTrue(obj.get_value("set_string") == "UNSET") + self.assertTrue(obj.get_value("set_int") == 0) + + obj.refresh() + + self.assertTrue(obj.get_value("method_call_count") == 1) + self.assertTrue(obj.get_value("set_string") == "Now set!") + self.assertTrue(obj.get_value("set_int") == -99) + + self.console.destroy(10) + + + def test_bad_method(self): + # create console + # find agents + # synchronous query for all objects in schema + # invalid method call on each object + # - should throw a ValueError + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, + QmfQueryPredicate( + {QmfQuery.LOGIC_AND: + [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]}, + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "MyPackage"]}]})) + + obj_list = self.console.doQuery(agent, query) + self.assertTrue(len(obj_list) == 2) + for obj in obj_list: + self.failUnlessRaises(ValueError, + obj.invoke_method, + "unknown_meth", + {"arg1": -99, "arg2": "Now set!"}, + _timeout=3) + self.console.destroy(10) + + + def test_managed_obj(self): + # create console + # find agents + # synchronous query for a managed object + # method call on each object + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545") + obj_list = self.console.doQuery(agent, query) + + self.assertTrue(isinstance(obj_list, type([]))) + self.assertTrue(len(obj_list) == 1) + obj = obj_list[0] + + mr = obj.invoke_method("a_method", + {"arg1": 1, + "arg2": "Now set!", + "arg3": 1966}, + _timeout=3) + self.assertTrue(isinstance(mr, qmf2.console.MethodResult)) + self.assertTrue(mr.succeeded()) + self.assertTrue(mr.get_argument("code") == 0) + # @todo refresh and verify changes + + self.console.destroy(10) diff --git a/qpid/python/qmf2/tests/basic_query.py b/qpid/python/qmf2/tests/basic_query.py new file mode 100644 index 0000000000..2009f5c594 --- /dev/null +++ b/qpid/python/qmf2/tests/basic_query.py @@ -0,0 +1,336 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) + # add properties + _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # these two properties are statistics + _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # These two properties can be set via the method call + _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) + + # add method + _meth = SchemaMethod( _desc="Method to set string and int in object." ) + _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _schema.add_method( "set_meth", _meth ) + + # Add schema to Agent + + self.agent.register_object_class(_schema) + + # instantiate managed data objects matching the schema + + _obj1 = QmfAgentData( self.agent, _schema=_schema ) + _obj1.set_value("index1", 100) + _obj1.set_value("index2", "a name" ) + _obj1.set_value("set_string", "UNSET") + _obj1.set_value("set_int", 0) + _obj1.set_value("query_count", 0) + _obj1.set_value("method_call_count", 0) + self.agent.add_object( _obj1 ) + + self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) + + # add an "unstructured" object to the Agent + _obj2 = QmfAgentData(self.agent, _object_id="01545") + _obj2.set_value("field1", "a value") + _obj2.set_value("field2", 2) + _obj2.set_value("field3", {"a":1, "map":2, "value":3}) + _obj2.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj2) + + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", 1) + self.agent1.connect_agent(self.broker) + self.agent2 = _agentApp("agent2", 1) + self.agent2.connect_agent(self.broker) + + def tearDown(self): + if self.agent1: + self.agent1.shutdown_agent(10) + self.agent1.stop() + self.agent1 = None + if self.agent2: + self.agent2.shutdown_agent(10) + self.agent2.stop() + self.agent2 = None + + def test_all_oids(self): + # create console + # find agents + # synchronous query for all objects by id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) + oid_list = self.console.doQuery(agent, query) + + self.assertTrue(isinstance(oid_list, type([])), + "Unexpected return type") + self.assertTrue(len(oid_list) == 3, "Wrong count") + self.assertTrue('100a name' in oid_list) + self.assertTrue('99another name' in oid_list) + self.assertTrue('01545' in oid_list) + + self.console.destroy(10) + + + def test_direct_oids(self): + # create console + # find agents + # synchronous query for each objects + # verify objects and schemas are correct + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + for oid in ['100a name', '99another name', '01545']: + query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, oid) + obj_list = self.console.doQuery(agent, query) + + self.assertTrue(isinstance(obj_list, type([])), + "Unexpected return type") + self.assertTrue(len(obj_list) == 1) + obj = obj_list[0] + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == oid) + + if obj.is_described(): + self.assertTrue(oid in ['100a name', '99another name']) + schema_id = obj.get_schema_class_id() + self.assertTrue(isinstance(schema_id, SchemaClassId)) + else: + self.assertTrue(oid == "01545") + + + + self.console.destroy(10) + + + + def test_packages(self): + # create console + # find agents + # synchronous query all package names + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) + package_list = self.console.doQuery(agent, query) + self.assertTrue(len(package_list) == 1) + self.assertTrue('MyPackage' in package_list) + + + self.console.destroy(10) + + + + def test_predicate_schema_id(self): + # create console + # find agents + # synchronous query for all schema by package name + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, + QmfQueryPredicate( + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "MyPackage"]})) + + schema_list = self.console.doQuery(agent, query) + self.assertTrue(len(schema_list)) + for schema in schema_list: + self.assertTrue(schema.get_class_id().get_package_name() == + "MyPackage") + + + self.console.destroy(10) + + + + def test_predicate_no_match(self): + # create console + # find agents + # synchronous query for all schema by package name + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, + QmfQueryPredicate( + {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, + "No-Such-Package"]})) + + schema_list = self.console.doQuery(agent, query) + self.assertTrue(len(schema_list) == 0) + + self.console.destroy(10) + + diff --git a/qpid/python/qmf2/tests/console_test.py b/qpid/python/qmf2/tests/console_test.py new file mode 100644 index 0000000000..ac0e064f20 --- /dev/null +++ b/qpid/python/qmf2/tests/console_test.py @@ -0,0 +1,175 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import logging +import time +from threading import Semaphore + + +from qpid.messaging import * +from qmf2.common import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey, + SchemaClassId, SchemaClass, QmfData) +from qmf2.console import Console + + +class ExampleNotifier(Notifier): + def __init__(self): + self._sema4 = Semaphore(0) # locked + + def indication(self): + self._sema4.release() + + def waitForWork(self): + print("Waiting for event...") + self._sema4.acquire() + print("...event present") + + +logging.getLogger().setLevel(logging.INFO) + +print( "Starting Connection" ) +_c = Connection("localhost") +_c.connect() + +print( "Starting Console" ) + +_notifier = ExampleNotifier() +_myConsole = Console(notifier=_notifier) +_myConsole.addConnection( _c ) + +# Allow discovery only for the agent named "qmf.testAgent" +# @todo: replace "manual" query construction with +# a formal class-based Query API +_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, + QmfQueryPredicate({QmfQuery.CMP_EQ: + [QmfQuery.KEY_AGENT_NAME, + "qmf.testAgent"]})) +_myConsole.enable_agent_discovery(_query) + +_done = False +while not _done: +# try: + _notifier.waitForWork() + + _wi = _myConsole.get_next_workitem(timeout=0) + while _wi: + print("!!! work item received %d:%s" % (_wi.get_type(), + str(_wi.get_params()))) + + + if _wi.get_type() == _wi.AGENT_ADDED: + _agent = _wi.get_params().get("agent") + if not _agent: + print("!!!! AGENT IN REPLY IS NULL !!! ") + + _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) + oid_list = _myConsole.doQuery(_agent, _query) + + print("!!!************************** REPLY=%s" % oid_list) + + for oid in oid_list: + _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, + oid) + obj_list = _myConsole.doQuery(_agent, _query) + + print("!!!************************** REPLY=%s" % obj_list) + + if obj_list is None: + obj_list={} + + for obj in obj_list: + resp = obj.invoke_method( "set_meth", + {"arg_int": -11, + "arg_str": "are we not goons?"}, + None, + 3) + if resp is None: + print("!!!*** NO RESPONSE FROM METHOD????") + else: + print("!!! method succeeded()=%s" % resp.succeeded()) + print("!!! method exception()=%s" % resp.get_exception()) + print("!!! method get args() = %s" % resp.get_arguments()) + + if not obj.is_described(): + resp = obj.invoke_method( "bad method", + {"arg_int": -11, + "arg_str": "are we not goons?"}, + None, + 3) + if resp is None: + print("!!!*** NO RESPONSE FROM METHOD????") + else: + print("!!! method succeeded()=%s" % resp.succeeded()) + print("!!! method exception()=%s" % resp.get_exception()) + print("!!! method get args() = %s" % resp.get_arguments()) + + + #--------------------------------- + #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name") + + #obj_list = _myConsole.doQuery(_agent, _query) + + #--------------------------------- + + # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) + + # package_list = _myConsole.doQuery(_agent, _query) + + # for pname in package_list: + # print("!!! Querying for schema from package: %s" % pname) + # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, + # QmfQueryPredicate( + # {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]})) + + # schema_id_list = _myConsole.doQuery(_agent, _query) + # for sid in schema_id_list: + # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, + # QmfQueryPredicate( + # {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID, + # sid.map_encode()]})) + + # schema_list = _myConsole.doQuery(_agent, _query) + # for schema in schema_list: + # sid = schema.get_class_id() + # _query = QmfQuery.create_predicate( + # QmfQuery.TARGET_OBJECT_ID, + # QmfQueryPredicate({QmfQuery.CMP_EQ: + # [QmfData.KEY_SCHEMA_ID, + # sid.map_encode()]})) + + # oid_list = _myConsole.doQuery(_agent, _query) + # for oid in oid_list: + # _query = QmfQuery.create_id( + # QmfQuery.TARGET_OBJECT, oid) + # _reply = _myConsole.doQuery(_agent, _query) + + # print("!!!************************** REPLY=%s" % _reply) + + + _myConsole.release_workitem(_wi) + _wi = _myConsole.get_next_workitem(timeout=0) +# except: +# logging.info( "shutting down..." ) +# _done = True + +print( "Removing connection" ) +_myConsole.removeConnection( _c, 10 ) + +print( "Destroying console:" ) +_myConsole.destroy( 10 ) + +print( "******** console test done ********" ) diff --git a/qpid/python/qmf2/tests/events.py b/qpid/python/qmf2/tests/events.py new file mode 100644 index 0000000000..b2d934728d --- /dev/null +++ b/qpid/python/qmf2/tests/events.py @@ -0,0 +1,193 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import time +import datetime +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate, SchemaEventClass, + QmfEvent) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.timeout = 3 + self.broker_url = broker_url + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Dynamically construct a management database + + _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage", + "MyClass", + stype=SchemaClassId.TYPE_EVENT), + _desc="A test event schema") + # add properties + _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR)) + + # Add schema to Agent + self.schema = _schema + self.agent.register_object_class(_schema) + + self.running = False + + def start_app(self): + self.running = True + self.start() + + def stop_app(self): + self.running = False + # wake main thread + self.notifier.indication() # hmmm... collide with daemon??? + self.join(self.timeout) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # broker_url = "user/passwd@hostname:port" + conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + conn.connect() + self.agent.set_connection(conn) + + counter = 1 + while self.running: + # post an event every second + event = QmfEvent.create(long(time.time() * 1000), + QmfEvent.SEV_WARNING, + {"prop-1": counter, + "prop-2": str(datetime.datetime.utcnow())}, + _schema=self.schema) + counter += 1 + self.agent.raise_event(event) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + self.notifier.wait_for_work(1) + + self.agent.remove_connection(self.timeout) + self.agent.destroy(self.timeout) + + + +class BaseTest(unittest.TestCase): + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + # one second agent indication interval + self.agent1 = _agentApp("agent1", self.broker, 1) + self.agent1.start_app() + self.agent2 = _agentApp("agent2", self.broker, 1) + self.agent2.start_app() + + def tearDown(self): + if self.agent1: + self.agent1.stop_app() + self.agent1 = None + if self.agent2: + self.agent2.stop_app() + self.agent2 = None + + def test_get_events(self): + # create console + # find agents + + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + # find the agents + for aname in ["agent1", "agent2"]: + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now wait for events + agent1_events = agent2_events = 0 + wi = self.console.get_next_workitem(timeout=4) + while wi: + if wi.get_type() == wi.EVENT_RECEIVED: + event = wi.get_params().get("event") + self.assertTrue(isinstance(event, QmfEvent)) + self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING) + self.assertTrue(event.get_value("prop-1") > 0) + + agent = wi.get_params().get("agent") + if not agent or not isinstance(agent, qmf2.console.Agent): + self.fail("Unexpected workitem from agent") + else: + if agent.get_name() == "agent1": + agent1_events += 1 + elif agent.get_name() == "agent2": + agent2_events += 1 + else: + self.fail("Unexpected agent name received: %s" % + agent.get_name()) + if agent1_events and agent2_events: + break; + + wi = self.console.get_next_workitem(timeout=4) + + self.assertTrue(agent1_events > 0 and agent2_events > 0) + + self.console.destroy(10) + + + + diff --git a/qpid/python/qmf2/tests/obj_gets.py b/qpid/python/qmf2/tests/obj_gets.py new file mode 100644 index 0000000000..9287fc7173 --- /dev/null +++ b/qpid/python/qmf2/tests/obj_gets.py @@ -0,0 +1,399 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, QmfQueryPredicate) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.agent = Agent(name, + _notifier=self.notifier, + _heartbeat_interval=heartbeat) + + # Management Database + # - two different schema packages, + # - two classes within one schema package + # - multiple objects per schema package+class + # - two "undescribed" objects + + # "package1/class1" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"), + _desc="A test data schema - one", + _object_id_names=["key"] ) + + _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("key", "p1c1_key1") + _obj.set_value("count1", 0) + _obj.set_value("count2", 0) + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("key", "p1c1_key2") + _obj.set_value("count1", 9) + _obj.set_value("count2", 10) + self.agent.add_object( _obj ) + + # "package1/class2" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"), + _desc="A test data schema - two", + _object_id_names=["name"] ) + # add properties + _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("name", "p1c2_name1") + _obj.set_value("string1", "a data string") + self.agent.add_object( _obj ) + + + # "package2/class1" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"), + _desc="A test data schema - second package", + _object_id_names=["key"] ) + + _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("key", "p2c1_key1") + _obj.set_value("counter", 0) + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, _schema=_schema ) + _obj.set_value("key", "p2c1_key2") + _obj.set_value("counter", 2112) + self.agent.add_object( _obj ) + + + # add two "unstructured" objects to the Agent + + _obj = QmfAgentData(self.agent, _object_id="undesc-1") + _obj.set_value("field1", "a value") + _obj.set_value("field2", 2) + _obj.set_value("field3", {"a":1, "map":2, "value":3}) + _obj.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj) + + + _obj = QmfAgentData(self.agent, _object_id="undesc-2") + _obj.set_value("key-1", "a value") + _obj.set_value("key-2", 2) + self.agent.add_object(_obj) + + self.running = True + self.start() + + def connect_agent(self, broker_url): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(broker_url.host, + broker_url.port, + broker_url.user, + broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + + def disconnect_agent(self, timeout): + if self.conn: + self.agent.remove_connection(timeout) + + def shutdown_agent(self, timeout): + self.agent.destroy(timeout) + + def stop(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + + +class BaseTest(unittest.TestCase): + agent_count = 5 + + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + self.agents = [] + for i in range(self.agent_count): + agent = _agentApp("agent-" + str(i), 1) + agent.connect_agent(self.broker) + self.agents.append(agent) + + def tearDown(self): + for agent in self.agents: + if agent is not None: + agent.shutdown_agent(10) + agent.stop() + + + def test_all_agents(self): + # create console + # find all agents + # synchronous query for all objects by id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # console has discovered all agents, now query all undesc-2 objects + objs = self.console.get_objects(_object_id="undesc-2", _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_object_id() == "undesc-2") + + # now query all objects from schema "package1" + objs = self.console.get_objects(_pname="package1", _timeout=5) + self.assertTrue(len(objs) == (self.agent_count * 3)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + + # now query all objects from schema "package2" + objs = self.console.get_objects(_pname="package2", _timeout=5) + self.assertTrue(len(objs) == (self.agent_count * 2)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + + # now query all objects from schema "package1/class2" + objs = self.console.get_objects(_pname="package1", _cname="class2", _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") + + # given the schema identifier from the last query, repeat using the + # specific schema id + schema_id = objs[0].get_schema_class_id() + objs = self.console.get_objects(_schema_id=schema_id, _timeout=5) + self.assertTrue(len(objs) == self.agent_count) + for obj in objs: + self.assertTrue(obj.get_schema_class_id() == schema_id) + + + self.console.destroy(10) + + + + def test_agent_subset(self): + # create console + # find all agents + # synchronous query for all objects by id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + agent_list = [] + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + agent_list.append(agent) + + # Only use a subset of the agents: + agent_list = agent_list[:len(agent_list)/2] + + # console has discovered all agents, now query all undesc-2 objects + objs = self.console.get_objects(_object_id="undesc-2", + _agents=agent_list, _timeout=5) + self.assertTrue(len(objs) == len(agent_list)) + for obj in objs: + self.assertTrue(obj.get_object_id() == "undesc-2") + + # now query all objects from schema "package1" + objs = self.console.get_objects(_pname="package1", + _agents=agent_list, + _timeout=5) + self.assertTrue(len(objs) == (len(agent_list) * 3)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + + # now query all objects from schema "package2" + objs = self.console.get_objects(_pname="package2", + _agents=agent_list, + _timeout=5) + self.assertTrue(len(objs) == (len(agent_list) * 2)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + + # now query all objects from schema "package1/class2" + objs = self.console.get_objects(_pname="package1", _cname="class2", + _agents=agent_list, + _timeout=5) + self.assertTrue(len(objs) == len(agent_list)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") + + # given the schema identifier from the last query, repeat using the + # specific schema id + schema_id = objs[0].get_schema_class_id() + objs = self.console.get_objects(_schema_id=schema_id, + _agents=agent_list, + _timeout=5) + self.assertTrue(len(objs) == len(agent_list)) + for obj in objs: + self.assertTrue(obj.get_schema_class_id() == schema_id) + + + self.console.destroy(10) + + + + def test_single_agent(self): + # create console + # find all agents + # synchronous query for all objects by id + # verify known object ids are returned + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.addConnection(self.conn) + + agent_list = [] + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + agent_list.append(agent) + + # Only use one agetn + agent = agent_list[0] + + # console has discovered all agents, now query all undesc-2 objects + objs = self.console.get_objects(_object_id="undesc-2", + _agents=agent, _timeout=5) + self.assertTrue(len(objs) == 1) + for obj in objs: + self.assertTrue(obj.get_object_id() == "undesc-2") + + # now query all objects from schema "package1" + objs = self.console.get_objects(_pname="package1", + _agents=agent, + _timeout=5) + self.assertTrue(len(objs) == 3) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + + # now query all objects from schema "package2" + objs = self.console.get_objects(_pname="package2", + _agents=agent, + _timeout=5) + self.assertTrue(len(objs) == 2) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2") + + # now query all objects from schema "package1/class2" + objs = self.console.get_objects(_pname="package1", _cname="class2", + _agents=agent, + _timeout=5) + self.assertTrue(len(objs) == 1) + for obj in objs: + self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1") + self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2") + + # given the schema identifier from the last query, repeat using the + # specific schema id + schema_id = objs[0].get_schema_class_id() + objs = self.console.get_objects(_schema_id=schema_id, + _agents=agent, + _timeout=5) + self.assertTrue(len(objs) == 1) + for obj in objs: + self.assertTrue(obj.get_schema_class_id() == schema_id) + + + self.console.destroy(10) + |
