summaryrefslogtreecommitdiff
path: root/python/qmf2/tests
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-14 14:59:24 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-14 14:59:24 +0000
commit074811c4bf1531f04b11db25f348e6c520bc4799 (patch)
treedd46e4aa9bdaca64974bbddc810f3212d935edd5 /python/qmf2/tests
parenta025819835829ea7658e4886ddb6e5e488f916eb (diff)
downloadqpid-python-074811c4bf1531f04b11db25f348e6c520bc4799.tar.gz
moved qpid-* tools out of qpid/python into qpid/tools; moved qmf library into extras/qmf
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@910016 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/tests')
-rw-r--r--python/qmf2/tests/__init__.py29
-rw-r--r--python/qmf2/tests/agent_discovery.py484
-rw-r--r--python/qmf2/tests/agent_test.py167
-rw-r--r--python/qmf2/tests/async_method.py362
-rw-r--r--python/qmf2/tests/async_query.py462
-rw-r--r--python/qmf2/tests/basic_method.py406
-rw-r--r--python/qmf2/tests/basic_query.py516
-rw-r--r--python/qmf2/tests/console_test.py175
-rw-r--r--python/qmf2/tests/events.py208
-rw-r--r--python/qmf2/tests/multi_response.py295
-rw-r--r--python/qmf2/tests/obj_gets.py599
11 files changed, 0 insertions, 3703 deletions
diff --git a/python/qmf2/tests/__init__.py b/python/qmf2/tests/__init__.py
deleted file mode 100644
index 186f09349e..0000000000
--- a/python/qmf2/tests/__init__.py
+++ /dev/null
@@ -1,29 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import agent_discovery
-import basic_query
-import basic_method
-import obj_gets
-import events
-import multi_response
-import async_query
-import async_method
diff --git a/python/qmf2/tests/agent_discovery.py b/python/qmf2/tests/agent_discovery.py
deleted file mode 100644
index 59b65221e0..0000000000
--- a/python/qmf2/tests/agent_discovery.py
+++ /dev/null
@@ -1,484 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import unittest
-import logging
-import time
-from threading import Thread, Event
-
-import qpid.messaging
-import qmf2.common
-import qmf2.console
-import qmf2.agent
-
-
-class _testNotifier(qmf2.common.Notifier):
- def __init__(self):
- self._event = Event()
-
- def indication(self):
- # note: called by qmf daemon thread
- self._event.set()
-
- def wait_for_work(self, timeout):
- # note: called by application thread to wait
- # for qmf to generate work
- self._event.wait(timeout)
- timed_out = self._event.isSet() == False
- if not timed_out:
- self._event.clear()
- return True
- return False
-
-
-class _agentApp(Thread):
- def __init__(self, name, broker_url, heartbeat):
- Thread.__init__(self)
- self.timeout = 3
- self.broker_url = broker_url
- self.notifier = _testNotifier()
- self.agent = qmf2.agent.Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
- # No database needed for this test
- self.running = False
- self.ready = Event()
-
- def start_app(self):
- self.running = True
- self.start()
- self.ready.wait(10)
- if not self.ready.is_set():
- raise Exception("Agent failed to connect to broker.")
-
- def stop_app(self):
- self.running = False
- # wake main thread
- self.notifier.indication() # hmmm... collide with daemon???
- self.join(10)
- if self.isAlive():
- raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
-
- def run(self):
- # Connect the agent to the broker,
- # broker_url = "user/passwd@hostname:port"
- conn = qpid.messaging.Connection(self.broker_url.host,
- self.broker_url.port,
- self.broker_url.user,
- self.broker_url.password)
- conn.connect()
- self.agent.set_connection(conn)
- self.ready.set()
-
- while self.running:
- self.notifier.wait_for_work(None)
- wi = self.agent.get_next_workitem(timeout=0)
- while wi is not None:
- logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
- self.agent.release_workitem(wi)
- wi = self.agent.get_next_workitem(timeout=0)
-
- # done, cleanup agent
- self.agent.remove_connection(self.timeout)
- self.agent.destroy(self.timeout)
-
-
-class BaseTest(unittest.TestCase):
- def configure(self, config):
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- # one second agent indication interval
- self.agent_heartbeat = 1
- self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
- self.agent1.start_app()
- self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
- self.agent2.start_app()
-
- def tearDown(self):
- if self.agent1:
- self.agent1.stop_app()
- self.agent1 = None
- if self.agent2:
- self.agent2.stop_app()
- self.agent2 = None
-
- def test_discover_all(self):
- """
- create console
- enable agent discovery
- wait
- expect agent add for agent1 and agent2
- """
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
- self.console.enable_agent_discovery()
-
- agent1_found = agent2_found = False
- wi = self.console.get_next_workitem(timeout=3)
- while wi and not (agent1_found and agent2_found):
- if wi.get_type() == wi.AGENT_ADDED:
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent1":
- agent1_found = True
- elif agent.get_name() == "agent2":
- agent2_found = True
- else:
- self.fail("Unexpected agent name received: %s" %
- agent.get_name())
- if agent1_found and agent2_found:
- break;
-
- wi = self.console.get_next_workitem(timeout=3)
-
- self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
-
- self.console.destroy(10)
-
-
- def test_discover_one(self):
- """
- create console
- enable agent discovery, filter for agent1 only
- wait until timeout
- expect agent add for agent1 only
- """
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- query = qmf2.common.QmfQuery.create_predicate(
- qmf2.common.QmfQuery.TARGET_AGENT,
- [qmf2.common.QmfQuery.EQ, qmf2.common.QmfQuery.KEY_AGENT_NAME,
- [qmf2.common.QmfQuery.QUOTE, "agent1"]])
- self.console.enable_agent_discovery(query)
-
- agent1_found = agent2_found = False
- wi = self.console.get_next_workitem(timeout=3)
- while wi:
- if wi.get_type() == wi.AGENT_ADDED:
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent1":
- agent1_found = True
- elif agent.get_name() == "agent2":
- agent2_found = True
- else:
- self.fail("Unexpected agent name received: %s" %
- agent.get_name())
-
- wi = self.console.get_next_workitem(timeout=2)
-
- self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered")
-
- self.console.destroy(10)
-
-
- def test_heartbeat(self):
- """
- create console with 2 sec agent timeout
- enable agent discovery, find all agents
- stop agent1, expect timeout notification
- stop agent2, expect timeout notification
- """
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=2)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
- self.console.enable_agent_discovery()
-
- agent1_found = agent2_found = False
- wi = self.console.get_next_workitem(timeout=4)
- while wi and not (agent1_found and agent2_found):
- if wi.get_type() == wi.AGENT_ADDED:
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent1":
- agent1_found = True
- elif agent.get_name() == "agent2":
- agent2_found = True
- else:
- self.fail("Unexpected agent name received: %s" %
- agent.get_name())
- if agent1_found and agent2_found:
- break;
-
- wi = self.console.get_next_workitem(timeout=4)
-
- self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
-
- # now kill agent1 and wait for expiration
-
- agent1 = self.agent1
- self.agent1 = None
- agent1.stop_app()
-
- wi = self.console.get_next_workitem(timeout=4)
- while wi is not None:
- if wi.get_type() == wi.AGENT_DELETED:
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent1":
- agent1_found = False
- else:
- self.fail("Unexpected agent_deleted received: %s" %
- agent.get_name())
- if not agent1_found:
- break;
-
- wi = self.console.get_next_workitem(timeout=4)
-
- self.assertFalse(agent1_found, "agent1 did not delete!")
-
- # now kill agent2 and wait for expiration
-
- agent2 = self.agent2
- self.agent2 = None
- agent2.stop_app()
-
- wi = self.console.get_next_workitem(timeout=4)
- while wi is not None:
- if wi.get_type() == wi.AGENT_DELETED:
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent2":
- agent2_found = False
- else:
- self.fail("Unexpected agent_deleted received: %s" %
- agent.get_name())
- if not agent2_found:
- break;
-
- wi = self.console.get_next_workitem(timeout=4)
-
- self.assertFalse(agent2_found, "agent2 did not delete!")
-
- self.console.destroy(10)
-
-
- def test_find_agent(self):
- """
- create console
- do not enable agent discovery
- find agent1, expect success
- find agent-none, expect failure
- find agent2, expect success
- """
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- agent1 = self.console.find_agent("agent1", timeout=3)
- self.assertTrue(agent1 and agent1.get_name() == "agent1")
-
- no_agent = self.console.find_agent("agent-none", timeout=3)
- self.assertTrue(no_agent == None)
-
- agent2 = self.console.find_agent("agent2", timeout=3)
- self.assertTrue(agent2 and agent2.get_name() == "agent2")
-
- self.console.remove_connection(self.conn, 10)
- self.console.destroy(10)
-
-
- def test_heartbeat_x2(self):
- """
- create 2 consoles with 2 sec agent timeout
- enable agent discovery, find all agents
- stop agent1, expect timeout notification on both consoles
- stop agent2, expect timeout notification on both consoles
- """
- console_count = 2
- self.consoles = []
- for i in range(console_count):
- console = qmf2.console.Console("test-console-" + str(i),
- notifier=_testNotifier(),
- agent_timeout=2)
- conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- conn.connect()
- console.add_connection(conn)
- console.enable_agent_discovery()
- self.consoles.append(console)
-
- # now wait for all consoles to discover all agents,
- # agents send a heartbeat once a second
- for console in self.consoles:
- agent1_found = agent2_found = False
- wi = console.get_next_workitem(timeout=2)
- while wi and not (agent1_found and agent2_found):
- if wi.get_type() == wi.AGENT_ADDED:
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent1":
- agent1_found = True
- elif agent.get_name() == "agent2":
- agent2_found = True
- else:
- self.fail("Unexpected agent name received: %s" %
- agent.get_name())
- if agent1_found and agent2_found:
- break;
- wi = console.get_next_workitem(timeout=2)
-
- self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
-
- # now kill agent1 and wait for expiration
-
- agent1 = self.agent1
- self.agent1 = None
- agent1.stop_app()
-
- for console in self.consoles:
- agent1_found = True
- wi = console.get_next_workitem(timeout=4)
- while wi is not None:
- if wi.get_type() == wi.AGENT_DELETED:
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent1":
- agent1_found = False
- break
- else:
- self.fail("Unexpected agent_deleted received: %s" %
- agent.get_name())
-
- wi = console.get_next_workitem(timeout=4)
-
- self.assertFalse(agent1_found, "agent1 did not delete!")
-
- # now kill agent2 and wait for expiration
-
- agent2 = self.agent2
- self.agent2 = None
- agent2.stop_app()
-
- for console in self.consoles:
- agent2_found = True
- wi = console.get_next_workitem(timeout=4)
- while wi is not None:
- if wi.get_type() == wi.AGENT_DELETED:
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent2":
- agent2_found = False
- break
- else:
- self.fail("Unexpected agent_deleted received: %s" %
- agent.get_name())
-
- wi = console.get_next_workitem(timeout=4)
-
- self.assertFalse(agent2_found, "agent2 did not delete!")
-
-
- for console in self.consoles:
- console.destroy(10)
-
-
- def test_find_agent_x2(self):
- """
- create 2 consoles, do not enable agent discovery
- console-1: find agent1, expect success
- console-2: find agent2, expect success
- Verify console-1 does -not- know agent2
- Verify console-2 does -not- know agent1
- """
- console_count = 2
- self.consoles = []
- for i in range(console_count):
- console = qmf2.console.Console("test-console-" + str(i),
- notifier=_testNotifier(),
- agent_timeout=2)
- conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- conn.connect()
- console.add_connection(conn)
- self.consoles.append(console)
-
- agent1 = self.consoles[0].find_agent("agent1", timeout=3)
- self.assertTrue(agent1 and agent1.get_name() == "agent1")
-
- agent2 = self.consoles[1].find_agent("agent2", timeout=3)
- self.assertTrue(agent2 and agent2.get_name() == "agent2")
-
- # wait long enough for agent heartbeats to be sent...
-
- time.sleep(self.agent_heartbeat * 2)
-
- agents = self.consoles[0].get_agents()
- self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent1")
- agent1 = self.consoles[0].get_agent("agent1")
- self.assertTrue(agent1 and agent1.get_name() == "agent1")
-
-
- agents = self.consoles[1].get_agents()
- self.assertTrue(len(agents) == 1 and agents[0].get_name() == "agent2")
- agent2 = self.consoles[1].get_agent("agent2")
- self.assertTrue(agent2 and agent2.get_name() == "agent2")
-
- # verify no new agents were learned
-
- for console in self.consoles:
- console.destroy(10)
-
diff --git a/python/qmf2/tests/agent_test.py b/python/qmf2/tests/agent_test.py
deleted file mode 100644
index 14d8ada197..0000000000
--- a/python/qmf2/tests/agent_test.py
+++ /dev/null
@@ -1,167 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import logging
-import time
-import unittest
-from threading import Semaphore
-
-
-from qpid.messaging import *
-from qmf2.common import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
- QmfEvent, SchemaMethod, Notifier, SchemaClassId,
- WorkItem)
-from qmf2.agent import (Agent, QmfAgentData)
-
-
-
-class ExampleNotifier(Notifier):
- def __init__(self):
- self._sema4 = Semaphore(0) # locked
-
- def indication(self):
- self._sema4.release()
-
- def waitForWork(self):
- print("Waiting for event...")
- self._sema4.acquire()
- print("...event present")
-
-
-
-
-class QmfTest(unittest.TestCase):
- def test_begin(self):
- print("!!! being test")
-
- def test_end(self):
- print("!!! end test")
-
-
-#
-# An example agent application
-#
-
-
-if __name__ == '__main__':
- _notifier = ExampleNotifier()
- _agent = Agent( "qmf.testAgent", _notifier=_notifier )
-
- # Dynamically construct a class schema
-
- _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
- _desc="A test data schema",
- _object_id_names=["index1", "index2"] )
- # add properties
- _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- # these two properties are statistics
- _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
- _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # These two properties can be set via the method call
- _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
-
-
- # add method
- _meth = SchemaMethod( _desc="Method to set string and int in object." )
- _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
- _schema.add_method( "set_meth", _meth )
-
- # Add schema to Agent
-
- _agent.register_object_class(_schema)
-
- # instantiate managed data objects matching the schema
-
- _obj1 = QmfAgentData( _agent, _schema=_schema )
- _obj1.set_value("index1", 100)
- _obj1.set_value("index2", "a name" )
- _obj1.set_value("set_string", "UNSET")
- _obj1.set_value("set_int", 0)
- _obj1.set_value("query_count", 0)
- _obj1.set_value("method_call_count", 0)
- _agent.add_object( _obj1 )
-
- _agent.add_object( QmfAgentData( _agent, _schema=_schema,
- _values={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
- # add an "unstructured" object to the Agent
- _obj2 = QmfAgentData(_agent, _object_id="01545")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 2)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _agent.add_object(_obj2)
-
-
- ## Now connect to the broker
-
- _c = Connection("localhost")
- _c.connect()
- _agent.setConnection(_c)
-
- _error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."})
-
- _done = False
- while not _done:
- # try:
- _notifier.waitForWork()
-
- _wi = _agent.get_next_workitem(timeout=0)
- while _wi:
-
- if _wi.get_type() == WorkItem.METHOD_CALL:
- mc = _wi.get_params()
-
- if mc.get_name() == "set_meth":
- print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id())
- print("!!! args='%s'" % str(mc.get_args()))
- print("!!! userid=%s" % str(mc.get_user_id()))
- print("!!! handle=%s" % _wi.get_handle())
- _agent.method_response(_wi.get_handle(),
- {"rc1": 100, "rc2": "Success"})
- else:
- print("!!! Unknown Method name = %s" % mc.get_name())
- _agent.method_response(_wi.get_handle(), _error=_error_data)
- else:
- print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params())))
-
- _agent.release_workitem(_wi)
- _wi = _agent.get_next_workitem(timeout=0)
- # except:
- # print( "shutting down...")
- # _done = True
-
- print( "Removing connection... TBD!!!" )
- #_myConsole.remove_connection( _c, 10 )
-
- print( "Destroying agent... TBD!!!" )
- #_myConsole.destroy( 10 )
-
- print( "******** agent test done ********" )
-
-
-
diff --git a/python/qmf2/tests/async_method.py b/python/qmf2/tests/async_method.py
deleted file mode 100644
index 556b62756f..0000000000
--- a/python/qmf2/tests/async_method.py
+++ /dev/null
@@ -1,362 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import unittest
-import logging
-from threading import Thread, Event
-
-import qpid.messaging
-from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
- SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
- QmfData, WorkItem)
-import qmf2.console
-from qmf2.agent import(QmfAgentData, Agent, MethodCallParams)
-
-
-class _testNotifier(Notifier):
- def __init__(self):
- self._event = Event()
-
- def indication(self):
- # note: called by qmf daemon thread
- self._event.set()
-
- def wait_for_work(self, timeout):
- # note: called by application thread to wait
- # for qmf to generate work
- self._event.wait(timeout)
- timed_out = self._event.isSet() == False
- if not timed_out:
- self._event.clear()
- return True
- return False
-
-
-class _agentApp(Thread):
- def __init__(self, name, broker_url, heartbeat):
- Thread.__init__(self)
- self.notifier = _testNotifier()
- self.broker_url = broker_url
- self.agent = Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
-
- # Dynamically construct a management database
-
- _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
- _desc="A test data schema",
- _object_id_names=["index1", "index2"] )
- # add properties
- _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- # these two properties are statistics
- _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
- _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # These two properties can be set via the method call
- _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # add method
- _meth = SchemaMethod( _desc="Method to set string and int in object." )
- _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
- # the input value of cookie is returned in the response
- _meth.add_argument( "cookie", SchemaProperty(qmfTypes.TYPE_LSTR,
- kwargs={"dir":"IO"}))
- _schema.add_method( "set_meth", _meth )
-
- # Add schema to Agent
-
- self.agent.register_object_class(_schema)
-
- # instantiate managed data objects matching the schema
-
- _obj1 = QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":100, "index2":"a name"})
- _obj1.set_value("set_string", "UNSET")
- _obj1.set_value("set_int", 0)
- _obj1.set_value("query_count", 0)
- _obj1.set_value("method_call_count", 0)
- self.agent.add_object( _obj1 )
-
- self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
- # add an "unstructured" object to the Agent
- _obj2 = QmfAgentData(self.agent, _object_id="01545")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 2)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- self.agent.add_object(_obj2)
-
- self.running = False
- self.ready = Event()
-
- def start_app(self):
- self.running = True
- self.start()
- self.ready.wait(10)
- if not self.ready.is_set():
- raise Exception("Agent failed to connect to broker.")
-
- def stop_app(self):
- self.running = False
- # wake main thread
- self.notifier.indication() # hmmm... collide with daemon???
- self.join(10)
- if self.isAlive():
- raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
-
- def run(self):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(self.broker_url.host,
- self.broker_url.port,
- self.broker_url.user,
- self.broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
- self.ready.set()
-
- # Agent application main processing loop
- while self.running:
- self.notifier.wait_for_work(None)
- wi = self.agent.get_next_workitem(timeout=0)
- while wi is not None:
- if wi.get_type() == WorkItem.METHOD_CALL:
- mc = wi.get_params()
- if not isinstance(mc, MethodCallParams):
- raise Exception("Unexpected method call parameters")
-
- if mc.get_name() == "set_meth":
- obj = self.agent.get_object(mc.get_object_id(),
- mc.get_schema_id())
- if obj is None:
- error_info = QmfData.create({"code": -2,
- "description":
- "Bad Object Id."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(),
- _error=error_info)
- else:
- obj.inc_value("method_call_count")
- out_args = {"code" : 0}
- if "cookie" in mc.get_args():
- out_args["cookie"] = mc.get_args()["cookie"]
- if "arg_int" in mc.get_args():
- obj.set_value("set_int", mc.get_args()["arg_int"])
- if "arg_str" in mc.get_args():
- obj.set_value("set_string", mc.get_args()["arg_str"])
- self.agent.method_response(wi.get_handle(),
- out_args)
- elif mc.get_name() == "a_method":
- obj = self.agent.get_object(mc.get_object_id(),
- mc.get_schema_id())
- if obj is None:
- error_info = QmfData.create({"code": -3,
- "description":
- "Unknown object id."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(),
- _error=error_info)
- elif obj.get_object_id() != "01545":
- error_info = QmfData.create( {"code": -4,
- "description":
- "Unexpected id."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(),
- _error=error_info)
- else:
- args = mc.get_args()
- if ("arg1" in args and args["arg1"] == 1 and
- "arg2" in args and args["arg2"] == "Now set!"
- and "arg3" in args and args["arg3"] == 1966):
- out_args = {"code" : 0}
- if "cookie" in mc.get_args():
- out_args["cookie"] = mc.get_args()["cookie"]
- self.agent.method_response(wi.get_handle(),
- out_args)
- else:
- error_info = QmfData.create(
- {"code": -5,
- "description":
- "Bad Args."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(),
- _error=error_info)
- else:
- error_info = QmfData.create( {"code": -1,
- "description":
- "Unknown method call."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(), _error=error_info)
-
- self.agent.release_workitem(wi)
- wi = self.agent.get_next_workitem(timeout=0)
-
- if self.conn:
- self.agent.remove_connection(10)
- self.agent.destroy(10)
-
-
-
-class BaseTest(unittest.TestCase):
- def configure(self, config):
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- # one second agent heartbeat interval
- self.agent_heartbeat = 1
- self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
- self.agent1.start_app()
- self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
- self.agent2.start_app()
-
- def tearDown(self):
- if self.agent1:
- self.agent1.stop_app()
- self.agent1 = None
- if self.agent2:
- self.agent2.stop_app()
- self.agent2 = None
-
- def test_described_obj(self):
- # create console
- # find agents
- # synchronous query for all objects in schema
- # method call on each object
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- i_count = 0
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
-
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(sid_list and len(sid_list) == 1)
- for sid in sid_list:
- t_params = {QmfData.KEY_SCHEMA_ID: sid}
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
- _target_params=t_params)
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 2)
- for obj in obj_list:
- cookie = "cookie-" + str(i_count)
- i_count += 1
- mr = obj.invoke_method( "set_meth",
- {"arg_int": -99,
- "arg_str": "Now set!",
- "cookie": cookie},
- _reply_handle=cookie,
- _timeout=3)
- self.assertTrue(mr)
-
- # done, now wait for async responses
-
- r_count = 0
- while self.notifier.wait_for_work(3):
- wi = self.console.get_next_workitem(timeout=0)
- while wi is not None:
- r_count += 1
- self.assertTrue(wi.get_type() == WorkItem.METHOD_RESPONSE)
- reply = wi.get_params()
- self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
- self.assertTrue(reply.succeeded())
- self.assertTrue(reply.get_argument("cookie") == wi.get_handle())
-
- wi = self.console.get_next_workitem(timeout=0)
-
- self.assertTrue(r_count == i_count)
-
- self.console.destroy(10)
-
-
- def test_managed_obj(self):
- # create console
- # find agents
- # synchronous query for a managed object
- # method call on each object
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- i_count = 0
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545")
- obj_list = self.console.do_query(agent, query)
-
- self.assertTrue(isinstance(obj_list, type([])))
- self.assertTrue(len(obj_list) == 1)
- obj = obj_list[0]
-
- cookie = "cookie-" + str(i_count)
- i_count += 1
- mr = obj.invoke_method("a_method",
- {"arg1": 1,
- "arg2": "Now set!",
- "arg3": 1966,
- "cookie": cookie},
- _reply_handle=cookie,
- _timeout=3)
- self.assertTrue(mr)
-
- # done, now wait for async responses
-
- r_count = 0
- while self.notifier.wait_for_work(3):
- wi = self.console.get_next_workitem(timeout=0)
- while wi is not None:
- r_count += 1
- self.assertTrue(wi.get_type() == WorkItem.METHOD_RESPONSE)
- reply = wi.get_params()
- self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
- self.assertTrue(reply.succeeded())
- self.assertTrue(reply.get_argument("cookie") == wi.get_handle())
-
- wi = self.console.get_next_workitem(timeout=0)
-
- self.assertTrue(r_count == i_count)
-
- self.console.destroy(10)
diff --git a/python/qmf2/tests/async_query.py b/python/qmf2/tests/async_query.py
deleted file mode 100644
index 3a9a767bf0..0000000000
--- a/python/qmf2/tests/async_query.py
+++ /dev/null
@@ -1,462 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import unittest
-import logging
-from threading import Thread, Event
-
-import qpid.messaging
-from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
- SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
- QmfData, WorkItem)
-import qmf2.console
-from qmf2.agent import(QmfAgentData, Agent)
-
-
-class _testNotifier(Notifier):
- def __init__(self):
- self._event = Event()
-
- def indication(self):
- # note: called by qmf daemon thread
- self._event.set()
-
- def wait_for_work(self, timeout):
- # note: called by application thread to wait
- # for qmf to generate work
- self._event.wait(timeout)
- timed_out = self._event.isSet() == False
- if not timed_out:
- self._event.clear()
- return True
- return False
-
-
-class _agentApp(Thread):
- def __init__(self, name, broker_url, heartbeat):
- Thread.__init__(self)
- self.notifier = _testNotifier()
- self.broker_url = broker_url
- self.agent = Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
-
- # Dynamically construct a management database
-
- _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
- _desc="A test data schema",
- _object_id_names=["index1", "index2"] )
- # add properties
- _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- # these two properties are statistics
- _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
- _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # These two properties can be set via the method call
- _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # add method
- _meth = SchemaMethod( _desc="Method to set string and int in object." )
- _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
- _schema.add_method( "set_meth", _meth )
-
- # Add schema to Agent
-
- self.agent.register_object_class(_schema)
-
- # instantiate managed data objects matching the schema
-
- _obj1 = QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":100, "index2":"a name"})
- _obj1.set_value("set_string", "UNSET")
- _obj1.set_value("set_int", 0)
- _obj1.set_value("query_count", 0)
- _obj1.set_value("method_call_count", 0)
- self.agent.add_object( _obj1 )
-
- self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
- self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":50,
- "index2": "my name",
- "set_string": "SET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
-
- # add an "unstructured" object to the Agent
- _obj2 = QmfAgentData(self.agent, _object_id="01545")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 2)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _obj2.set_value("index1", 50)
- self.agent.add_object(_obj2)
-
- _obj2 = QmfAgentData(self.agent, _object_id="01546")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 3)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _obj2.set_value("index1", 51)
- self.agent.add_object(_obj2)
-
- _obj2 = QmfAgentData(self.agent, _object_id="01544")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 4)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _obj2.set_value("index1", 49)
- self.agent.add_object(_obj2)
-
- _obj2 = QmfAgentData(self.agent, _object_id="01543")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 4)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _obj2.set_value("index1", 48)
- self.agent.add_object(_obj2)
-
- self.running = False
- self.ready = Event()
-
- def start_app(self):
- self.running = True
- self.start()
- self.ready.wait(10)
- if not self.ready.is_set():
- raise Exception("Agent failed to connect to broker.")
-
- def stop_app(self):
- self.running = False
- # wake main thread
- self.notifier.indication() # hmmm... collide with daemon???
- self.join(10)
- if self.isAlive():
- raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
-
- def run(self):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(self.broker_url.host,
- self.broker_url.port,
- self.broker_url.user,
- self.broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
- self.ready.set()
-
- while self.running:
- self.notifier.wait_for_work(None)
- wi = self.agent.get_next_workitem(timeout=0)
- while wi is not None:
- logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
- self.agent.release_workitem(wi)
- wi = self.agent.get_next_workitem(timeout=0)
-
- if self.conn:
- self.agent.remove_connection(10)
- self.agent.destroy(10)
-
-
-
-
-class BaseTest(unittest.TestCase):
- def configure(self, config):
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- # one second agent indication interval
- self.agent_heartbeat = 1
- self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
- self.agent1.start_app()
- self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
- self.agent2.start_app()
-
- def tearDown(self):
- if self.agent1:
- self.agent1.stop_app()
- self.agent1 = None
- if self.agent2:
- self.agent2.stop_app()
- self.agent2 = None
-
- def test_all_schema_ids(self):
- # create console
- # find agents
- # asynchronous query for all schema ids
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # send queries
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
- rc = self.console.do_query(agent, query,
- _reply_handle=aname)
- self.assertTrue(rc)
-
- # done. Now wait for async responses
-
- count = 0
- while self.notifier.wait_for_work(3):
- wi = self.console.get_next_workitem(timeout=0)
- while wi is not None:
- count += 1
- self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
- self.assertTrue(wi.get_handle() == "agent1" or
- wi.get_handle() == "agent2")
- reply = wi.get_params()
- self.assertTrue(len(reply) == 1)
- self.assertTrue(isinstance(reply[0], SchemaClassId))
- self.assertTrue(reply[0].get_package_name() == "MyPackage")
- self.assertTrue(reply[0].get_class_name() == "MyClass")
- self.console.release_workitem(wi)
- wi = self.console.get_next_workitem(timeout=0)
-
- self.assertTrue(count == 2)
- self.console.destroy(10)
-
-
-
- def test_undescribed_objs(self):
- # create console
- # find agents
- # asynchronous query for all non-schema objects
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # send queries
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT)
- rc = self.console.do_query(agent, query, _reply_handle=aname)
- self.assertTrue(rc)
-
- # done. Now wait for async responses
-
- count = 0
- while self.notifier.wait_for_work(3):
- wi = self.console.get_next_workitem(timeout=0)
- while wi is not None:
- count += 1
- self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
- self.assertTrue(wi.get_handle() == "agent1" or
- wi.get_handle() == "agent2")
- reply = wi.get_params()
- self.assertTrue(len(reply) == 4)
- self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData))
- self.assertFalse(reply[0].is_described()) # no schema
- self.console.release_workitem(wi)
- wi = self.console.get_next_workitem(timeout=0)
-
- self.assertTrue(count == 2)
- self.console.destroy(10)
-
-
-
- def test_described_objs(self):
- # create console
- # find agents
- # asynchronous query for all schema-based objects
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- #
- t_params = {QmfData.KEY_SCHEMA_ID: SchemaClassId("MyPackage", "MyClass")}
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
- #
- rc = self.console.do_query(agent, query, _reply_handle=aname)
- self.assertTrue(rc)
-
- # done. Now wait for async responses
-
- count = 0
- while self.notifier.wait_for_work(3):
- wi = self.console.get_next_workitem(timeout=0)
- while wi is not None:
- count += 1
- self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
- self.assertTrue(wi.get_handle() == "agent1" or
- wi.get_handle() == "agent2")
- reply = wi.get_params()
- self.assertTrue(len(reply) == 3)
- self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData))
- self.assertTrue(reply[0].is_described()) # has schema
- self.console.release_workitem(wi)
- wi = self.console.get_next_workitem(timeout=0)
-
- self.assertTrue(count == 2)
- # @todo test if the console has learned the corresponding schemas....
- self.console.destroy(10)
-
-
-
- def test_all_schemas(self):
- # create console
- # find agents
- # asynchronous query for all schemas
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- # test internal state using non-api calls:
- # no schemas present yet
- self.assertTrue(len(self.console._schema_cache) == 0)
- # end test
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # send queries
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
- rc = self.console.do_query(agent, query, _reply_handle=aname)
- self.assertTrue(rc)
-
- # done. Now wait for async responses
-
- count = 0
- while self.notifier.wait_for_work(3):
- wi = self.console.get_next_workitem(timeout=0)
- while wi is not None:
- count += 1
- self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
- self.assertTrue(wi.get_handle() == "agent1" or
- wi.get_handle() == "agent2")
- reply = wi.get_params()
- self.assertTrue(len(reply) == 1)
- self.assertTrue(isinstance(reply[0], qmf2.common.SchemaObjectClass))
- self.assertTrue(reply[0].get_class_id().get_package_name() == "MyPackage")
- self.assertTrue(reply[0].get_class_id().get_class_name() == "MyClass")
- self.console.release_workitem(wi)
- wi = self.console.get_next_workitem(timeout=0)
-
- self.assertTrue(count == 2)
-
- # test internal state using non-api calls:
- # schema has been learned
- self.assertTrue(len(self.console._schema_cache) == 1)
- # end test
-
- self.console.destroy(10)
-
-
-
- def test_query_expiration(self):
- # create console
- # find agents
- # kill the agents
- # send async query
- # wait for & verify expiration
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=30)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- # find the agents
- agents = []
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
- agents.append(agent)
-
- # now nuke the agents from orbit. It's the only way to be sure.
-
- self.agent1.stop_app()
- self.agent1 = None
- self.agent2.stop_app()
- self.agent2 = None
-
- # now send queries to agents that no longer exist
- for agent in agents:
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
- rc = self.console.do_query(agent, query,
- _reply_handle=agent.get_name(),
- _timeout=2)
- self.assertTrue(rc)
-
- # done. Now wait for async responses due to timeouts
-
- count = 0
- while self.notifier.wait_for_work(3):
- wi = self.console.get_next_workitem(timeout=0)
- while wi is not None:
- count += 1
- self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
- self.assertTrue(wi.get_handle() == "agent1" or
- wi.get_handle() == "agent2")
- reply = wi.get_params()
- self.assertTrue(len(reply) == 0) # empty
-
- self.console.release_workitem(wi)
- wi = self.console.get_next_workitem(timeout=0)
-
- self.assertTrue(count == 2)
- self.console.destroy(10)
diff --git a/python/qmf2/tests/basic_method.py b/python/qmf2/tests/basic_method.py
deleted file mode 100644
index be2bdff9ab..0000000000
--- a/python/qmf2/tests/basic_method.py
+++ /dev/null
@@ -1,406 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import unittest
-import logging
-from threading import Thread, Event
-
-import qpid.messaging
-from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
- SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
- QmfData, WorkItem)
-import qmf2.console
-from qmf2.agent import(QmfAgentData, Agent, MethodCallParams)
-
-
-class _testNotifier(Notifier):
- def __init__(self):
- self._event = Event()
-
- def indication(self):
- # note: called by qmf daemon thread
- self._event.set()
-
- def wait_for_work(self, timeout):
- # note: called by application thread to wait
- # for qmf to generate work
- self._event.wait(timeout)
- timed_out = self._event.isSet() == False
- if not timed_out:
- self._event.clear()
- return True
- return False
-
-
-class _agentApp(Thread):
- def __init__(self, name, broker_url, heartbeat):
- Thread.__init__(self)
- self.notifier = _testNotifier()
- self.broker_url = broker_url
- self.agent = Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
-
- # Dynamically construct a management database
-
- _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
- _desc="A test data schema",
- _object_id_names=["index1", "index2"] )
- # add properties
- _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- # these two properties are statistics
- _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
- _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # These two properties can be set via the method call
- _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # add method
- _meth = SchemaMethod( _desc="Method to set string and int in object." )
- _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
- _schema.add_method( "set_meth", _meth )
-
- # Add schema to Agent
-
- self.agent.register_object_class(_schema)
-
- # instantiate managed data objects matching the schema
-
- _obj1 = QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":100, "index2":"a name"})
- _obj1.set_value("set_string", "UNSET")
- _obj1.set_value("set_int", 0)
- _obj1.set_value("query_count", 0)
- _obj1.set_value("method_call_count", 0)
- self.agent.add_object( _obj1 )
-
- self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
- # add an "unstructured" object to the Agent
- _obj2 = QmfAgentData(self.agent, _object_id="01545")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 2)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- self.agent.add_object(_obj2)
-
- self.running = False
- self.ready = Event()
-
- def start_app(self):
- self.running = True
- self.start()
- self.ready.wait(10)
- if not self.ready.is_set():
- raise Exception("Agent failed to connect to broker.")
-
- def stop_app(self):
- self.running = False
- # wake main thread
- self.notifier.indication() # hmmm... collide with daemon???
- self.join(10)
- if self.isAlive():
- raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
-
- def run(self):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(self.broker_url.host,
- self.broker_url.port,
- self.broker_url.user,
- self.broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
- self.ready.set()
-
- # Agent application main processing loop
- while self.running:
- self.notifier.wait_for_work(None)
- wi = self.agent.get_next_workitem(timeout=0)
- while wi is not None:
- if wi.get_type() == WorkItem.METHOD_CALL:
- mc = wi.get_params()
- if not isinstance(mc, MethodCallParams):
- raise Exception("Unexpected method call parameters")
-
- if mc.get_name() == "set_meth":
- obj = self.agent.get_object(mc.get_object_id(),
- mc.get_schema_id())
- if obj is None:
- error_info = QmfData.create({"code": -2,
- "description":
- "Bad Object Id."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(),
- _error=error_info)
- else:
- obj.inc_value("method_call_count")
- if "arg_int" in mc.get_args():
- obj.set_value("set_int", mc.get_args()["arg_int"])
- if "arg_str" in mc.get_args():
- obj.set_value("set_string", mc.get_args()["arg_str"])
- self.agent.method_response(wi.get_handle(),
- {"code" : 0})
- elif mc.get_name() == "a_method":
- obj = self.agent.get_object(mc.get_object_id(),
- mc.get_schema_id())
- if obj is None:
- error_info = QmfData.create({"code": -3,
- "description":
- "Unknown object id."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(),
- _error=error_info)
- elif obj.get_object_id() != "01545":
- error_info = QmfData.create( {"code": -4,
- "description":
- "Unexpected id."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(),
- _error=error_info)
- else:
- args = mc.get_args()
- if ("arg1" in args and args["arg1"] == 1 and
- "arg2" in args and args["arg2"] == "Now set!"
- and "arg3" in args and args["arg3"] == 1966):
- self.agent.method_response(wi.get_handle(),
- {"code" : 0})
- else:
- error_info = QmfData.create(
- {"code": -5,
- "description":
- "Bad Args."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(),
- _error=error_info)
- else:
- error_info = QmfData.create( {"code": -1,
- "description":
- "Unknown method call."},
- _object_id="_error")
- self.agent.method_response(wi.get_handle(), _error=error_info)
-
- self.agent.release_workitem(wi)
- wi = self.agent.get_next_workitem(timeout=0)
-
- if self.conn:
- self.agent.remove_connection(10)
- self.agent.destroy(10)
-
-
-
-class BaseTest(unittest.TestCase):
- def configure(self, config):
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- # one second agent heartbeat interval
- self.agent_heartbeat = 1
- self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
- self.agent1.start_app()
- self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
- self.agent2.start_app()
-
- def tearDown(self):
- if self.agent1:
- self.agent1.stop_app()
- self.agent1 = None
- if self.agent2:
- self.agent2.stop_app()
- self.agent2 = None
-
- def test_described_obj(self):
- # create console
- # find agents
- # synchronous query for all objects in schema
- # method call on each object
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
-
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(sid_list and len(sid_list) == 1)
- for sid in sid_list:
- t_params = {QmfData.KEY_SCHEMA_ID: sid}
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
- _target_params=t_params)
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 2)
- for obj in obj_list:
- mr = obj.invoke_method( "set_meth", {"arg_int": -99,
- "arg_str": "Now set!"},
- _timeout=3)
- self.assertTrue(isinstance(mr, qmf2.console.MethodResult))
- self.assertTrue(mr.succeeded())
- self.assertTrue(mr.get_argument("code") == 0)
-
- self.assertTrue(obj.get_value("method_call_count") == 0)
- self.assertTrue(obj.get_value("set_string") == "UNSET")
- self.assertTrue(obj.get_value("set_int") == 0)
-
- obj.refresh()
-
- self.assertTrue(obj.get_value("method_call_count") == 1)
- self.assertTrue(obj.get_value("set_string") == "Now set!")
- self.assertTrue(obj.get_value("set_int") == -99)
-
- self.console.destroy(10)
-
-
- def test_bad_method_schema(self):
- # create console
- # find agents
- # synchronous query for all objects with schema
- # invalid method call on each object
- # - should throw a ValueError - NOT YET.
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
-
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(sid_list and len(sid_list) == 1)
- for sid in sid_list:
-
- t_params = {QmfData.KEY_SCHEMA_ID: sid}
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
- [QmfQuery.TRUE],
- _target_params=t_params)
-
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 2)
- for obj in obj_list:
- mr = obj.invoke_method("unknown_method",
- {"arg1": -99, "arg2": "Now set!"},
- _timeout=3)
- # self.failUnlessRaises(ValueError,
- # obj.invoke_method,
- # "unknown_meth",
- # {"arg1": -99, "arg2": "Now set!"},
- # _timeout=3)
- self.assertTrue(isinstance(mr, qmf2.console.MethodResult))
- self.assertFalse(mr.succeeded())
- self.assertTrue(isinstance(mr.get_exception(), QmfData))
-
- self.console.destroy(10)
-
- def test_bad_method_no_schema(self):
- # create console
- # find agents
- # synchronous query for all objects with no schema
- # invalid method call on each object
- # - should throw a ValueError
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT)
-
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 1)
- for obj in obj_list:
- self.assertTrue(obj.get_schema_class_id() == None)
- mr = obj.invoke_method("unknown_meth",
- {"arg1": -99, "arg2": "Now set!"},
- _timeout=3)
- self.assertTrue(isinstance(mr, qmf2.console.MethodResult))
- self.assertFalse(mr.succeeded())
- self.assertTrue(isinstance(mr.get_exception(), QmfData))
-
- self.console.destroy(10)
-
- def test_managed_obj(self):
- # create console
- # find agents
- # synchronous query for a managed object
- # method call on each object
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545")
- obj_list = self.console.do_query(agent, query)
-
- self.assertTrue(isinstance(obj_list, type([])))
- self.assertTrue(len(obj_list) == 1)
- obj = obj_list[0]
-
- mr = obj.invoke_method("a_method",
- {"arg1": 1,
- "arg2": "Now set!",
- "arg3": 1966},
- _timeout=3)
- self.assertTrue(isinstance(mr, qmf2.console.MethodResult))
- self.assertTrue(mr.succeeded())
- self.assertTrue(mr.get_argument("code") == 0)
- # @todo refresh and verify changes
-
- self.console.destroy(10)
diff --git a/python/qmf2/tests/basic_query.py b/python/qmf2/tests/basic_query.py
deleted file mode 100644
index dd321cb4bb..0000000000
--- a/python/qmf2/tests/basic_query.py
+++ /dev/null
@@ -1,516 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import unittest
-import logging
-from threading import Thread, Event
-
-import qpid.messaging
-from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
- SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
- QmfData)
-import qmf2.console
-from qmf2.agent import(QmfAgentData, Agent)
-
-
-class _testNotifier(Notifier):
- def __init__(self):
- self._event = Event()
-
- def indication(self):
- # note: called by qmf daemon thread
- self._event.set()
-
- def wait_for_work(self, timeout):
- # note: called by application thread to wait
- # for qmf to generate work
- self._event.wait(timeout)
- timed_out = self._event.isSet() == False
- if not timed_out:
- self._event.clear()
- return True
- return False
-
-
-class _agentApp(Thread):
- def __init__(self, name, broker_url, heartbeat):
- Thread.__init__(self)
- self.notifier = _testNotifier()
- self.broker_url = broker_url
- self.agent = Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
-
- # Dynamically construct a management database
-
- _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
- _desc="A test data schema",
- _object_id_names=["index1", "index2"] )
- # add properties
- _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- # these two properties are statistics
- _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
- _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # These two properties can be set via the method call
- _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # add method
- _meth = SchemaMethod( _desc="Method to set string and int in object." )
- _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
- _schema.add_method( "set_meth", _meth )
-
- # Add schema to Agent
-
- self.agent.register_object_class(_schema)
-
- # instantiate managed data objects matching the schema
-
- _obj1 = QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":100, "index2":"a name"})
- _obj1.set_value("set_string", "UNSET")
- _obj1.set_value("set_int", 0)
- _obj1.set_value("query_count", 0)
- _obj1.set_value("method_call_count", 0)
- self.agent.add_object( _obj1 )
-
- self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
- self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":50,
- "index2": "my name",
- "set_string": "SET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
-
- # add an "unstructured" object to the Agent
- _obj2 = QmfAgentData(self.agent, _object_id="01545")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 2)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _obj2.set_value("index1", 50)
- self.agent.add_object(_obj2)
-
- _obj2 = QmfAgentData(self.agent, _object_id="01546")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 3)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _obj2.set_value("index1", 51)
- self.agent.add_object(_obj2)
-
- _obj2 = QmfAgentData(self.agent, _object_id="01544")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 4)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _obj2.set_value("index1", 49)
- self.agent.add_object(_obj2)
-
- _obj2 = QmfAgentData(self.agent, _object_id="01543")
- _obj2.set_value("field1", "a value")
- _obj2.set_value("field2", 4)
- _obj2.set_value("field3", {"a":1, "map":2, "value":3})
- _obj2.set_value("field4", ["a", "list", "value"])
- _obj2.set_value("index1", 48)
- self.agent.add_object(_obj2)
-
- self.running = False
- self.ready = Event()
-
- def start_app(self):
- self.running = True
- self.start()
- self.ready.wait(10)
- if not self.ready.is_set():
- raise Exception("Agent failed to connect to broker.")
-
- def stop_app(self):
- self.running = False
- # wake main thread
- self.notifier.indication() # hmmm... collide with daemon???
- self.join(10)
- if self.isAlive():
- raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
-
- def run(self):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(self.broker_url.host,
- self.broker_url.port,
- self.broker_url.user,
- self.broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
- self.ready.set()
-
- while self.running:
- self.notifier.wait_for_work(None)
- wi = self.agent.get_next_workitem(timeout=0)
- while wi is not None:
- logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
- self.agent.release_workitem(wi)
- wi = self.agent.get_next_workitem(timeout=0)
-
- if self.conn:
- self.agent.remove_connection(10)
- self.agent.destroy(10)
-
-
-
-
-class BaseTest(unittest.TestCase):
- def configure(self, config):
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- # one second agent indication interval
- self.agent_heartbeat = 1
- self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
- self.agent1.start_app()
- self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
- self.agent2.start_app()
-
- def tearDown(self):
- if self.agent1:
- self.agent1.stop_app()
- self.agent1 = None
- if self.agent2:
- self.agent2.stop_app()
- self.agent2 = None
-
- def test_all_oids(self):
- # create console
- # find agents
- # synchronous query for all schemas
- # synchronous query for all objects per schema
- # verify known object ids are returned
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # first, find objects per schema
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(sid_list and len(sid_list) == 1)
- for sid in sid_list:
- t_params = {QmfData.KEY_SCHEMA_ID: sid}
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID,
- _target_params=t_params)
-
- oid_list = self.console.do_query(agent, query)
-
- self.assertTrue(isinstance(oid_list, type([])),
- "Unexpected return type")
- self.assertTrue(len(oid_list) == 3, "Wrong count")
- self.assertTrue('100a name' in oid_list)
- self.assertTrue('99another name' in oid_list)
- self.assertTrue('50my name' in oid_list)
- self.assertTrue('01545' not in oid_list)
-
-
- # now, find all unmanaged objects (no schema)
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
- oid_list = self.console.do_query(agent, query)
-
- self.assertTrue(isinstance(oid_list, type([])),
- "Unexpected return type")
- self.assertTrue(len(oid_list) == 4, "Wrong count")
- self.assertTrue('100a name' not in oid_list)
- self.assertTrue('99another name' not in oid_list)
- self.assertTrue('01545' in oid_list)
- self.assertTrue('01544' in oid_list)
- self.assertTrue('01543' in oid_list)
- self.assertTrue('01546' in oid_list)
-
- self.console.destroy(10)
-
-
- def test_direct_oids(self):
- # create console
- # find agents
- # synchronous query for each objects
- # verify objects and schemas are correct
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # first, find objects per schema
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(sid_list and len(sid_list) == 1)
-
- for oid in ['100a name', '99another name']:
- query = QmfQuery.create_id_object(oid, sid_list[0])
- obj_list = self.console.do_query(agent, query)
-
- self.assertTrue(isinstance(obj_list, type([])),
- "Unexpected return type")
- self.assertTrue(len(obj_list) == 1)
- obj = obj_list[0]
- self.assertTrue(isinstance(obj, QmfData))
- self.assertTrue(obj.get_object_id() == oid)
- self.assertTrue(obj.get_schema_class_id() == sid_list[0])
- schema_id = obj.get_schema_class_id()
- self.assertTrue(isinstance(schema_id, SchemaClassId))
- self.assertTrue(obj.is_described())
-
- # now find schema-less objects
- for oid in ['01545']:
- query = QmfQuery.create_id_object(oid)
- obj_list = self.console.do_query(agent, query)
-
- self.assertTrue(isinstance(obj_list, type([])),
- "Unexpected return type")
- self.assertTrue(len(obj_list) == 1)
- obj = obj_list[0]
- self.assertTrue(isinstance(obj, QmfData))
- self.assertTrue(obj.get_object_id() == oid)
- self.assertFalse(obj.is_described())
-
- self.console.destroy(10)
-
-
-
- def test_packages(self):
- # create console
- # find agents
- # synchronous query all package names
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
- package_list = self.console.do_query(agent, query)
- self.assertTrue(len(package_list) == 1)
- self.assertTrue('MyPackage' in package_list)
-
-
- self.console.destroy(10)
-
-
-
- def test_predicate_schema_id(self):
- # create console
- # find agents
- # synchronous query for all schema by package name
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
- [QmfQuery.EQ,
- SchemaClassId.KEY_PACKAGE,
- [QmfQuery.QUOTE, "MyPackage"]])
-
- schema_list = self.console.do_query(agent, query)
- self.assertTrue(len(schema_list))
- for schema in schema_list:
- self.assertTrue(schema.get_class_id().get_package_name() ==
- "MyPackage")
-
-
- self.console.destroy(10)
-
-
-
- def test_predicate_no_match(self):
- # create console
- # find agents
- # synchronous query for all schema by package name
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
- [QmfQuery.EQ,
- [QmfQuery.UNQUOTE, SchemaClassId.KEY_PACKAGE],
- [QmfQuery.QUOTE, "No-Such-Package"]])
-
- schema_list = self.console.do_query(agent, query)
- self.assertTrue(len(schema_list) == 0)
-
- self.console.destroy(10)
-
-
- def test_predicate_match_string(self):
- # create console
- # find agents
- # synchronous query for all objects with a value named
- # set_string which is < or equal to "UNSET"
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # get the schema id for MyPackage:MyClass schema
- query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID,
- [QmfQuery.AND,
- [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE,
- [QmfQuery.QUOTE, "MyPackage"]],
- [QmfQuery.EQ, SchemaClassId.KEY_CLASS,
- [QmfQuery.QUOTE, "MyClass"]]])
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(len(sid_list) == 1)
-
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
- [QmfQuery.AND,
- [QmfQuery.EXISTS, [QmfQuery.QUOTE, "set_string"]],
- [QmfQuery.EQ, "set_string", [QmfQuery.QUOTE, "UNSET"]]],
- _target_params={QmfData.KEY_SCHEMA_ID: sid_list[0]})
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 2)
- for obj in obj_list:
- self.assertTrue(obj.has_value("set_string"))
- self.assertTrue(obj.get_value("set_string") == "UNSET")
-
- self.console.destroy(10)
-
-
-
- def test_predicate_match_integer(self):
- # create console
- # find agents
- # synchronous query for all objects with a value named
- # "index1" which is < or equal to various values
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # Query the unmanaged (no schema) objects
-
- # == 50
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
- [QmfQuery.AND,
- [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]],
- [QmfQuery.EQ, "index1", 50]])
-
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 1)
- self.assertTrue(obj_list[0].has_value("index1"))
- self.assertTrue(obj_list[0].get_value("index1") == 50)
-
- # <= 50
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
- [QmfQuery.AND,
- [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]],
- [QmfQuery.LE, "index1", 50]])
-
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 3)
- for obj in obj_list:
- self.assertTrue(obj.has_value("index1"))
- self.assertTrue(obj.get_value("index1") <= 50)
-
-
- # > 50
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
- [QmfQuery.AND,
- [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]],
- [QmfQuery.GT, "index1", 50]])
-
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 1)
- for obj in obj_list:
- self.assertTrue(obj.has_value("index1"))
- self.assertTrue(obj.get_value("index1") > 50)
-
- self.console.destroy(10)
-
diff --git a/python/qmf2/tests/console_test.py b/python/qmf2/tests/console_test.py
deleted file mode 100644
index ac0e064f20..0000000000
--- a/python/qmf2/tests/console_test.py
+++ /dev/null
@@ -1,175 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import logging
-import time
-from threading import Semaphore
-
-
-from qpid.messaging import *
-from qmf2.common import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey,
- SchemaClassId, SchemaClass, QmfData)
-from qmf2.console import Console
-
-
-class ExampleNotifier(Notifier):
- def __init__(self):
- self._sema4 = Semaphore(0) # locked
-
- def indication(self):
- self._sema4.release()
-
- def waitForWork(self):
- print("Waiting for event...")
- self._sema4.acquire()
- print("...event present")
-
-
-logging.getLogger().setLevel(logging.INFO)
-
-print( "Starting Connection" )
-_c = Connection("localhost")
-_c.connect()
-
-print( "Starting Console" )
-
-_notifier = ExampleNotifier()
-_myConsole = Console(notifier=_notifier)
-_myConsole.addConnection( _c )
-
-# Allow discovery only for the agent named "qmf.testAgent"
-# @todo: replace "manual" query construction with
-# a formal class-based Query API
-_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
- QmfQueryPredicate({QmfQuery.CMP_EQ:
- [QmfQuery.KEY_AGENT_NAME,
- "qmf.testAgent"]}))
-_myConsole.enable_agent_discovery(_query)
-
-_done = False
-while not _done:
-# try:
- _notifier.waitForWork()
-
- _wi = _myConsole.get_next_workitem(timeout=0)
- while _wi:
- print("!!! work item received %d:%s" % (_wi.get_type(),
- str(_wi.get_params())))
-
-
- if _wi.get_type() == _wi.AGENT_ADDED:
- _agent = _wi.get_params().get("agent")
- if not _agent:
- print("!!!! AGENT IN REPLY IS NULL !!! ")
-
- _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
- oid_list = _myConsole.doQuery(_agent, _query)
-
- print("!!!************************** REPLY=%s" % oid_list)
-
- for oid in oid_list:
- _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
- oid)
- obj_list = _myConsole.doQuery(_agent, _query)
-
- print("!!!************************** REPLY=%s" % obj_list)
-
- if obj_list is None:
- obj_list={}
-
- for obj in obj_list:
- resp = obj.invoke_method( "set_meth",
- {"arg_int": -11,
- "arg_str": "are we not goons?"},
- None,
- 3)
- if resp is None:
- print("!!!*** NO RESPONSE FROM METHOD????")
- else:
- print("!!! method succeeded()=%s" % resp.succeeded())
- print("!!! method exception()=%s" % resp.get_exception())
- print("!!! method get args() = %s" % resp.get_arguments())
-
- if not obj.is_described():
- resp = obj.invoke_method( "bad method",
- {"arg_int": -11,
- "arg_str": "are we not goons?"},
- None,
- 3)
- if resp is None:
- print("!!!*** NO RESPONSE FROM METHOD????")
- else:
- print("!!! method succeeded()=%s" % resp.succeeded())
- print("!!! method exception()=%s" % resp.get_exception())
- print("!!! method get args() = %s" % resp.get_arguments())
-
-
- #---------------------------------
- #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name")
-
- #obj_list = _myConsole.doQuery(_agent, _query)
-
- #---------------------------------
-
- # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
-
- # package_list = _myConsole.doQuery(_agent, _query)
-
- # for pname in package_list:
- # print("!!! Querying for schema from package: %s" % pname)
- # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID,
- # QmfQueryPredicate(
- # {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]}))
-
- # schema_id_list = _myConsole.doQuery(_agent, _query)
- # for sid in schema_id_list:
- # _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
- # QmfQueryPredicate(
- # {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID,
- # sid.map_encode()]}))
-
- # schema_list = _myConsole.doQuery(_agent, _query)
- # for schema in schema_list:
- # sid = schema.get_class_id()
- # _query = QmfQuery.create_predicate(
- # QmfQuery.TARGET_OBJECT_ID,
- # QmfQueryPredicate({QmfQuery.CMP_EQ:
- # [QmfData.KEY_SCHEMA_ID,
- # sid.map_encode()]}))
-
- # oid_list = _myConsole.doQuery(_agent, _query)
- # for oid in oid_list:
- # _query = QmfQuery.create_id(
- # QmfQuery.TARGET_OBJECT, oid)
- # _reply = _myConsole.doQuery(_agent, _query)
-
- # print("!!!************************** REPLY=%s" % _reply)
-
-
- _myConsole.release_workitem(_wi)
- _wi = _myConsole.get_next_workitem(timeout=0)
-# except:
-# logging.info( "shutting down..." )
-# _done = True
-
-print( "Removing connection" )
-_myConsole.removeConnection( _c, 10 )
-
-print( "Destroying console:" )
-_myConsole.destroy( 10 )
-
-print( "******** console test done ********" )
diff --git a/python/qmf2/tests/events.py b/python/qmf2/tests/events.py
deleted file mode 100644
index e55dc8572e..0000000000
--- a/python/qmf2/tests/events.py
+++ /dev/null
@@ -1,208 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import unittest
-import time
-import datetime
-import logging
-from threading import Thread, Event
-
-import qpid.messaging
-from qpid.harness import Skipped
-from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
- SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
- QmfData, SchemaEventClass,
- QmfEvent)
-import qmf2.console
-from qmf2.agent import(QmfAgentData, Agent)
-
-
-class _testNotifier(Notifier):
- def __init__(self):
- self._event = Event()
-
- def indication(self):
- # note: called by qmf daemon thread
- self._event.set()
-
- def wait_for_work(self, timeout):
- # note: called by application thread to wait
- # for qmf to generate work
- self._event.wait(timeout)
- timed_out = self._event.isSet() == False
- if not timed_out:
- self._event.clear()
- return True
- return False
-
-
-class _agentApp(Thread):
- def __init__(self, name, broker_url, heartbeat):
- Thread.__init__(self)
- self.timeout = 3
- self.broker_url = broker_url
- self.notifier = _testNotifier()
- self.agent = Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
-
- # Dynamically construct a management database
-
- _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage",
- "MyClass",
- stype=SchemaClassId.TYPE_EVENT),
- _desc="A test event schema")
- # add properties
- _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- # Add schema to Agent
- self.schema = _schema
- self.agent.register_object_class(_schema)
-
- self.running = False
- self.ready = Event()
-
- def start_app(self):
- self.running = True
- self.start()
- self.ready.wait(10)
- if not self.ready.is_set():
- raise Exception("Agent failed to connect to broker.")
- # time.sleep(1)
-
- def stop_app(self):
- self.running = False
- # wake main thread
- self.notifier.indication() # hmmm... collide with daemon???
- self.join(self.timeout)
- if self.isAlive():
- raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
-
- def run(self):
- # broker_url = "user/passwd@hostname:port"
- conn = qpid.messaging.Connection(self.broker_url.host,
- self.broker_url.port,
- self.broker_url.user,
- self.broker_url.password)
- try:
- conn.connect()
- except qpid.messaging.ConnectError, e:
- raise Skipped(e)
-
- self.agent.set_connection(conn)
- self.ready.set()
-
- counter = 1
- while self.running:
- # post an event every second
- event = QmfEvent.create(long(time.time() * 1000),
- QmfEvent.SEV_WARNING,
- {"prop-1": counter,
- "prop-2": str(datetime.datetime.utcnow())},
- _schema_id=self.schema.get_class_id())
- counter += 1
- self.agent.raise_event(event)
- wi = self.agent.get_next_workitem(timeout=0)
- while wi is not None:
- logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
- self.agent.release_workitem(wi)
- wi = self.agent.get_next_workitem(timeout=0)
- self.notifier.wait_for_work(1)
-
- self.agent.remove_connection(self.timeout)
- self.agent.destroy(self.timeout)
-
-
-
-class BaseTest(unittest.TestCase):
- def configure(self, config):
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- # one second agent indication interval
- self.agent1 = _agentApp("agent1", self.broker, 1)
- self.agent1.start_app()
- self.agent2 = _agentApp("agent2", self.broker, 1)
- self.agent2.start_app()
-
- def tearDown(self):
- if self.agent1:
- self.agent1.stop_app()
- self.agent1 = None
- if self.agent2:
- self.agent2.stop_app()
- self.agent2 = None
-
- def test_get_events(self):
- # create console
- # find agents
-
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- try:
- self.conn.connect()
- except qpid.messaging.ConnectError, e:
- raise Skipped(e)
-
- self.console.add_connection(self.conn)
-
- # find the agents
- for aname in ["agent1", "agent2"]:
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # now wait for events
- agent1_events = agent2_events = 0
- wi = self.console.get_next_workitem(timeout=4)
- while wi:
- if wi.get_type() == wi.EVENT_RECEIVED:
- event = wi.get_params().get("event")
- self.assertTrue(isinstance(event, QmfEvent))
- self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING)
- self.assertTrue(event.get_value("prop-1") > 0)
-
- agent = wi.get_params().get("agent")
- if not agent or not isinstance(agent, qmf2.console.Agent):
- self.fail("Unexpected workitem from agent")
- else:
- if agent.get_name() == "agent1":
- agent1_events += 1
- elif agent.get_name() == "agent2":
- agent2_events += 1
- else:
- self.fail("Unexpected agent name received: %s" %
- agent.get_name())
- if agent1_events and agent2_events:
- break;
-
- wi = self.console.get_next_workitem(timeout=4)
-
- self.assertTrue(agent1_events > 0 and agent2_events > 0)
-
- self.console.destroy(10)
-
-
-
-
diff --git a/python/qmf2/tests/multi_response.py b/python/qmf2/tests/multi_response.py
deleted file mode 100644
index d3d00a70c5..0000000000
--- a/python/qmf2/tests/multi_response.py
+++ /dev/null
@@ -1,295 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import unittest
-import logging
-from threading import Thread, Event
-
-import qpid.messaging
-from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
- SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
- QmfData)
-import qmf2.console
-from qmf2.agent import(QmfAgentData, Agent)
-
-# note: objects, schema per agent must each be > max objs
-_SCHEMAS_PER_AGENT=7
-_OBJS_PER_AGENT=19
-_MAX_OBJS_PER_MSG=3
-
-
-class _testNotifier(Notifier):
- def __init__(self):
- self._event = Event()
-
- def indication(self):
- # note: called by qmf daemon thread
- self._event.set()
-
- def wait_for_work(self, timeout):
- # note: called by application thread to wait
- # for qmf to generate work
- self._event.wait(timeout)
- timed_out = self._event.isSet() == False
- if not timed_out:
- self._event.clear()
- return True
- return False
-
-
-class _agentApp(Thread):
- def __init__(self, name, broker_url, heartbeat):
- Thread.__init__(self)
- self.schema_count = _SCHEMAS_PER_AGENT
- self.obj_count = _OBJS_PER_AGENT
- self.notifier = _testNotifier()
- self.broker_url = broker_url
- self.agent = Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat,
- _max_msg_size=_MAX_OBJS_PER_MSG)
-
- # Dynamically construct a management database
- for i in range(self.schema_count):
- _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage",
- "MyClass-" + str(i)),
- _desc="A test data schema",
- _object_id_names=["index1", "index2"] )
- # add properties
- _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- # these two properties are statistics
- _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
- _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # These two properties can be set via the method call
- _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # add method
- _meth = SchemaMethod( _desc="Method to set string and int in object." )
- _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
- _schema.add_method( "set_meth", _meth )
-
- # Add schema to Agent
-
- self.agent.register_object_class(_schema)
-
- # instantiate managed data objects matching the schema
-
- for j in range(self.obj_count):
-
- self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":j,
- "index2": "name-" + str(j),
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0} ))
-
- self.running = False
- self.ready = Event()
-
- def start_app(self):
- self.running = True
- self.start()
- self.ready.wait(10)
- if not self.ready.is_set():
- raise Exception("Agent failed to connect to broker.")
-
- def stop_app(self):
- self.running = False
- # wake main thread
- self.notifier.indication() # hmmm... collide with daemon???
- self.join(10)
- if self.isAlive():
- raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
-
- def run(self):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(self.broker_url.host,
- self.broker_url.port,
- self.broker_url.user,
- self.broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
- self.ready.set()
-
- while self.running:
- self.notifier.wait_for_work(None)
- wi = self.agent.get_next_workitem(timeout=0)
- while wi is not None:
- logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
- self.agent.release_workitem(wi)
- wi = self.agent.get_next_workitem(timeout=0)
-
- if self.conn:
- self.agent.remove_connection(10)
- self.agent.destroy(10)
-
-
-
-
-class BaseTest(unittest.TestCase):
- def configure(self, config):
- self.agent_count = 2
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- # one second agent indication interval
- self.agent_heartbeat = 1
- self.agents = []
- for a in range(self.agent_count):
- agent = _agentApp("agent-" + str(a),
- self.broker,
- self.agent_heartbeat)
- agent.start_app()
- self.agents.append(agent)
-
- def tearDown(self):
- for agent in self.agents:
- if agent is not None:
- agent.stop_app()
-
- def test_all_schema_id(self):
- # create console
- # find agents
- # synchronous query for all schemas_ids
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for agent_app in self.agents:
- agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
- self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
-
- # get a list of all schema_ids
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
- for sid in sid_list:
- self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "MyPackage")
- self.assertTrue(sid.get_class_name().split('-')[0] == "MyClass")
-
- self.console.destroy(10)
-
-
- def test_all_schema(self):
- # create console
- # find agents
- # synchronous query for all schemas
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for agent_app in self.agents:
- agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
- self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
-
- # get a list of all schema_ids
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
- schema_list = self.console.do_query(agent, query)
- self.assertTrue(schema_list and
- len(schema_list) == _SCHEMAS_PER_AGENT)
- for schema in schema_list:
- self.assertTrue(isinstance(schema, SchemaObjectClass))
-
- self.console.destroy(10)
-
-
- def test_all_object_id(self):
- # create console
- # find agents
- # synchronous query for all object_ids by schema_id
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for agent_app in self.agents:
- agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
- self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
-
- # get a list of all schema_ids
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
- for sid in sid_list:
- query = QmfQuery.create_wildcard_object_id(sid)
- oid_list = self.console.do_query(agent, query)
- self.assertTrue(oid_list and
- len(oid_list) == _OBJS_PER_AGENT)
- for oid in oid_list:
- self.assertTrue(isinstance(oid, basestring))
-
- self.console.destroy(10)
-
-
- def test_all_objects(self):
- # create console
- # find agents
- # synchronous query for all objects by schema_id
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for agent_app in self.agents:
- agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
- self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
-
- # get a list of all schema_ids
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
- sid_list = self.console.do_query(agent, query)
- self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
- for sid in sid_list:
- query = QmfQuery.create_wildcard_object(sid)
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(obj_list and
- len(obj_list) == _OBJS_PER_AGENT)
- for obj in obj_list:
- self.assertTrue(isinstance(obj,
- qmf2.console.QmfConsoleData))
-
- self.console.destroy(10)
diff --git a/python/qmf2/tests/obj_gets.py b/python/qmf2/tests/obj_gets.py
deleted file mode 100644
index 5b1446bb3a..0000000000
--- a/python/qmf2/tests/obj_gets.py
+++ /dev/null
@@ -1,599 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import unittest
-import logging
-import datetime
-from threading import Thread, Event
-
-import qpid.messaging
-from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
- SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
- QmfData)
-import qmf2.console
-from qmf2.agent import(QmfAgentData, Agent)
-
-
-class _testNotifier(Notifier):
- def __init__(self):
- self._event = Event()
-
- def indication(self):
- # note: called by qmf daemon thread
- self._event.set()
-
- def wait_for_work(self, timeout):
- # note: called by application thread to wait
- # for qmf to generate work
- self._event.wait(timeout)
- timed_out = self._event.isSet() == False
- if not timed_out:
- self._event.clear()
- return True
- return False
-
-
-class _agentApp(Thread):
- def __init__(self, name, broker_url, heartbeat):
- Thread.__init__(self)
- self.notifier = _testNotifier()
- self.broker_url = broker_url
- self.agent = Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
-
- # Management Database
- # - two different schema packages,
- # - two classes within one schema package
- # - multiple objects per schema package+class
- # - two "undescribed" objects
-
- # "package1/class1"
-
- _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"),
- _desc="A test data schema - one",
- _object_id_names=["key"] )
-
- _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
- _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- self.agent.register_object_class(_schema)
-
- _obj = QmfAgentData( self.agent,
- _values={"key":"p1c1_key1"},
- _schema=_schema)
- _obj.set_value("count1", 0)
- _obj.set_value("count2", 0)
- self.agent.add_object( _obj )
-
- _obj = QmfAgentData( self.agent,
- _values={"key":"p1c1_key2"},
- _schema=_schema )
- _obj.set_value("count1", 9)
- _obj.set_value("count2", 10)
- self.agent.add_object( _obj )
-
- # "package1/class2"
-
- _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"),
- _desc="A test data schema - two",
- _object_id_names=["name"] )
- # add properties
- _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- self.agent.register_object_class(_schema)
-
- _obj = QmfAgentData( self.agent,
- _values={"name":"p1c2_name1"},
- _schema=_schema )
- _obj.set_value("string1", "a data string")
- self.agent.add_object( _obj )
-
-
- # "package2/class1"
-
- _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"),
- _desc="A test data schema - second package",
- _object_id_names=["key"] )
-
- _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- self.agent.register_object_class(_schema)
-
- _obj = QmfAgentData( self.agent,
- _values={"key":"p2c1_key1"},
- _schema=_schema )
- _obj.set_value("counter", 0)
- self.agent.add_object( _obj )
-
- _obj = QmfAgentData( self.agent,
- _values={"key":"p2c1_key2"},
- _schema=_schema )
- _obj.set_value("counter", 2112)
- self.agent.add_object( _obj )
-
-
- # add two "unstructured" objects to the Agent
-
- _obj = QmfAgentData(self.agent, _object_id="undesc-1")
- _obj.set_value("field1", "a value")
- _obj.set_value("field2", 2)
- _obj.set_value("field3", {"a":1, "map":2, "value":3})
- _obj.set_value("field4", ["a", "list", "value"])
- self.agent.add_object(_obj)
-
-
- _obj = QmfAgentData(self.agent, _object_id="undesc-2")
- _obj.set_value("key-1", "a value")
- _obj.set_value("key-2", 2)
- self.agent.add_object(_obj)
-
- self.running = False
- self.ready = Event()
-
- def start_app(self):
- self.running = True
- self.start()
- self.ready.wait(10)
- if not self.ready.is_set():
- raise Exception("Agent failed to connect to broker.")
-
- def stop_app(self):
- self.running = False
- self.notifier.indication() # hmmm... collide with daemon???
- self.join(10)
- if self.isAlive():
- raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
-
- def run(self):
- # broker_url = "user/passwd@hostname:port"
- self.conn = qpid.messaging.Connection(self.broker_url.host,
- self.broker_url.port,
- self.broker_url.user,
- self.broker_url.password)
- self.conn.connect()
- self.agent.set_connection(self.conn)
- self.ready.set()
-
- while self.running:
- self.notifier.wait_for_work(None)
- wi = self.agent.get_next_workitem(timeout=0)
- while wi is not None:
- logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
- self.agent.release_workitem(wi)
- wi = self.agent.get_next_workitem(timeout=0)
-
- if self.conn:
- self.agent.remove_connection(10)
- self.agent.destroy(10)
-
-
-class BaseTest(unittest.TestCase):
- agent_count = 5
-
- def configure(self, config):
- self.config = config
- self.broker = config.broker
- self.defines = self.config.defines
-
- def setUp(self):
- self.agents = []
- for i in range(self.agent_count):
- agent = _agentApp("agent-" + str(i), self.broker, 1)
- agent.start_app()
- self.agents.append(agent)
- #print("!!!! STARTING TEST: %s" % datetime.datetime.utcnow())
-
- def tearDown(self):
- #print("!!!! STOPPING TEST: %s" % datetime.datetime.utcnow())
- for agent in self.agents:
- if agent is not None:
- agent.stop_app()
-
-
- def test_all_agents(self):
- # create console
- # find all agents
- # synchronous query for all objects by id
- # verify known object ids are returned
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for agent_app in self.agents:
- aname = agent_app.agent.get_name()
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # console has discovered all agents, now query all undesc-2 objects
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_object_id="undesc-2", _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_object_id() == "undesc-2")
-
- # now query all objects from schema "package1"
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_pname="package1", _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == (self.agent_count * 3))
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
-
- # now query all objects from schema "package2"
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_pname="package2", _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == (self.agent_count * 2))
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
-
- # now query all objects from schema "package1/class2"
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_pname="package1", _cname="class2",
- _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
-
- # given the schema identifier from the last query, repeat using the
- # specific schema id
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- schema_id = objs[0].get_schema_class_id()
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_schema_id=schema_id, _timeout=5)
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id() == schema_id)
-
-
- self.console.destroy(10)
-
-
-
- def test_agent_subset(self):
- # create console
- # find all agents
- # synchronous query for all objects by id
- # verify known object ids are returned
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- agent_list = []
- for agent_app in self.agents:
- aname = agent_app.agent.get_name()
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
- agent_list.append(agent)
-
- # Only use a subset of the agents:
- agent_list = agent_list[:len(agent_list)/2]
-
- # console has discovered all agents, now query all undesc-2 objects
- objs = self.console.get_objects(_object_id="undesc-2",
- _agents=agent_list, _timeout=5)
- self.assertTrue(len(objs) == len(agent_list))
- for obj in objs:
- self.assertTrue(obj.get_object_id() == "undesc-2")
-
- # now query all objects from schema "package1"
- objs = self.console.get_objects(_pname="package1",
- _agents=agent_list,
- _timeout=5)
- self.assertTrue(len(objs) == (len(agent_list) * 3))
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
-
- # now query all objects from schema "package2"
- objs = self.console.get_objects(_pname="package2",
- _agents=agent_list,
- _timeout=5)
- self.assertTrue(len(objs) == (len(agent_list) * 2))
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
-
- # now query all objects from schema "package1/class2"
- objs = self.console.get_objects(_pname="package1", _cname="class2",
- _agents=agent_list,
- _timeout=5)
- self.assertTrue(len(objs) == len(agent_list))
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
-
- # given the schema identifier from the last query, repeat using the
- # specific schema id
- schema_id = objs[0].get_schema_class_id()
- objs = self.console.get_objects(_schema_id=schema_id,
- _agents=agent_list,
- _timeout=5)
- self.assertTrue(len(objs) == len(agent_list))
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id() == schema_id)
-
-
- self.console.destroy(10)
-
-
-
- def test_single_agent(self):
- # create console
- # find all agents
- # synchronous query for all objects by id
- # verify known object ids are returned
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- agent_list = []
- for agent_app in self.agents:
- aname = agent_app.agent.get_name()
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
- agent_list.append(agent)
-
- # Only use one agent
- agent = agent_list[0]
-
- # console has discovered all agents, now query all undesc-2 objects
- objs = self.console.get_objects(_object_id="undesc-2",
- _agents=agent, _timeout=5)
- self.assertTrue(len(objs) == 1)
- for obj in objs:
- self.assertTrue(obj.get_object_id() == "undesc-2")
-
- # now query all objects from schema "package1"
- objs = self.console.get_objects(_pname="package1",
- _agents=agent,
- _timeout=5)
- self.assertTrue(len(objs) == 3)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
-
- # now query all objects from schema "package2"
- objs = self.console.get_objects(_pname="package2",
- _agents=agent,
- _timeout=5)
- self.assertTrue(len(objs) == 2)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
-
- # now query all objects from schema "package1/class2"
- objs = self.console.get_objects(_pname="package1", _cname="class2",
- _agents=agent,
- _timeout=5)
- self.assertTrue(len(objs) == 1)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
-
- # given the schema identifier from the last query, repeat using the
- # specific schema id
- schema_id = objs[0].get_schema_class_id()
- objs = self.console.get_objects(_schema_id=schema_id,
- _agents=agent,
- _timeout=5)
- self.assertTrue(len(objs) == 1)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id() == schema_id)
-
-
- self.console.destroy(10)
-
-
-
- def test_all_objs_by_oid(self):
- # create console
- # find all agents
- # synchronous query for all described objects by:
- # oid & schema_id
- # oid & package name
- # oid & package and class name
- # verify known object ids are returned
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for agent_app in self.agents:
- aname = agent_app.agent.get_name()
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- # now query all objects from schema "package1"
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_pname="package1",
- _object_id="p1c1_key1", _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
- self.assertTrue(obj.get_object_id() == "p1c1_key1")
- # mooch the schema for a later test
- schema_id_p1c1 = objs[0].get_schema_class_id()
-
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_pname="package1",
- _object_id="p1c2_name1", _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
- self.assertTrue(obj.get_object_id() == "p1c2_name1")
-
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_pname="package2", _cname="class1",
- _object_id="p2c1_key1", _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
- self.assertTrue(obj.get_object_id() == "p2c1_key1")
-
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_schema_id=schema_id_p1c1,
- _object_id="p1c1_key2", _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
- self.assertTrue(obj.get_object_id() == "p1c1_key2")
-
- # this should return all "undescribed" objects
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(len(objs) == (self.agent_count * 2))
- for obj in objs:
- self.assertTrue(obj.get_object_id() == "undesc-1" or
- obj.get_object_id() == "undesc-2")
-
- # these should fail
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_schema_id=schema_id_p1c1,
- _object_id="does not exist",
- _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(objs == None)
-
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_pname="package2",
- _object_id="does not exist",
- _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(objs == None)
-
- #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
- objs = self.console.get_objects(_pname="package3",
- _object_id="does not exist",
- _timeout=5)
- #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
- self.assertTrue(objs == None)
-
- self.console.destroy(10)
-
-
- def test_wildcard_schema_id(self):
- # create console
- # find all agents
- # synchronous query for all described objects by:
- # oid & wildcard schema_id
- # wildcard schema_id
- # verify known object ids are returned
- self.notifier = _testNotifier()
- self.console = qmf2.console.Console(notifier=self.notifier,
- agent_timeout=3)
- self.conn = qpid.messaging.Connection(self.broker.host,
- self.broker.port,
- self.broker.user,
- self.broker.password)
- self.conn.connect()
- self.console.add_connection(self.conn)
-
- for agent_app in self.agents:
- aname = agent_app.agent.get_name()
- agent = self.console.find_agent(aname, timeout=3)
- self.assertTrue(agent and agent.get_name() == aname)
-
- wild_schema_id = SchemaClassId("package1", "class1")
- objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5)
- self.assertTrue(len(objs) == (self.agent_count * 2))
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
-
- wild_schema_id = SchemaClassId("package1", "class2")
- objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5)
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
- self.assertTrue(obj.get_object_id() == "p1c2_name1")
-
- wild_schema_id = SchemaClassId("package2", "class1")
- objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5)
- self.assertTrue(len(objs) == (self.agent_count * 2))
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
-
- wild_schema_id = SchemaClassId("package1", "class1")
- objs = self.console.get_objects(_schema_id=wild_schema_id,
- _object_id="p1c1_key2", _timeout=5)
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
- self.assertTrue(obj.get_object_id() == "p1c1_key2")
-
- # should fail
- objs = self.console.get_objects(_schema_id=wild_schema_id,
- _object_id="does not exist",
- _timeout=5)
- self.assertTrue(objs == None)
-
- wild_schema_id = SchemaClassId("package2", "class1")
- objs = self.console.get_objects(_schema_id=wild_schema_id,
- _object_id="p2c1_key2", _timeout=5)
- self.assertTrue(len(objs) == self.agent_count)
- for obj in objs:
- self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
- self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
- self.assertTrue(obj.get_object_id() == "p2c1_key2")
-
- # should fail
- wild_schema_id = SchemaClassId("package1", "bad-class")
- objs = self.console.get_objects(_schema_id=wild_schema_id,
- _object_id="p1c1_key2", _timeout=5)
- self.assertTrue(objs == None)
-
- self.console.destroy(10)
-