diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-02-14 14:59:24 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-14 14:59:24 +0000 |
| commit | b57cbea2e80030da1ec46b74ed5c09a7329299c9 (patch) | |
| tree | 01c130d31fc22f2975dcfd7ec40db349f7d98a09 /qpid/python/qmf2/tests/basic_query.py | |
| parent | 02336adf2b3ce963fcd6db9ecb1cb6397ed2fc47 (diff) | |
| download | qpid-python-b57cbea2e80030da1ec46b74ed5c09a7329299c9.tar.gz | |
moved qpid-* tools out of qpid/python into qpid/tools; moved qmf library into extras/qmf
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@910016 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qmf2/tests/basic_query.py')
| -rw-r--r-- | qpid/python/qmf2/tests/basic_query.py | 516 |
1 files changed, 0 insertions, 516 deletions
diff --git a/qpid/python/qmf2/tests/basic_query.py b/qpid/python/qmf2/tests/basic_query.py deleted file mode 100644 index dd321cb4bb..0000000000 --- a/qpid/python/qmf2/tests/basic_query.py +++ /dev/null @@ -1,516 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import unittest -import logging -from threading import Thread, Event - -import qpid.messaging -from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, - SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, - QmfData) -import qmf2.console -from qmf2.agent import(QmfAgentData, Agent) - - -class _testNotifier(Notifier): - def __init__(self): - self._event = Event() - - def indication(self): - # note: called by qmf daemon thread - self._event.set() - - def wait_for_work(self, timeout): - # note: called by application thread to wait - # for qmf to generate work - self._event.wait(timeout) - timed_out = self._event.isSet() == False - if not timed_out: - self._event.clear() - return True - return False - - -class _agentApp(Thread): - def __init__(self, name, broker_url, heartbeat): - Thread.__init__(self) - self.notifier = _testNotifier() - self.broker_url = broker_url - self.agent = Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) - - # Dynamically construct a management database - - _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), - _desc="A test data schema", - _object_id_names=["index1", "index2"] ) - # add properties - _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) - _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) - - # these two properties are statistics - _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # These two properties can be set via the method call - _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) - _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) - - # add method - _meth = SchemaMethod( _desc="Method to set string and int in object." ) - _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) - _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) - _schema.add_method( "set_meth", _meth ) - - # Add schema to Agent - - self.agent.register_object_class(_schema) - - # instantiate managed data objects matching the schema - - _obj1 = QmfAgentData( self.agent, _schema=_schema, - _values={"index1":100, "index2":"a name"}) - _obj1.set_value("set_string", "UNSET") - _obj1.set_value("set_int", 0) - _obj1.set_value("query_count", 0) - _obj1.set_value("method_call_count", 0) - self.agent.add_object( _obj1 ) - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - self.agent.add_object( QmfAgentData( self.agent, _schema=_schema, - _values={"index1":50, - "index2": "my name", - "set_string": "SET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) - - - # add an "unstructured" object to the Agent - _obj2 = QmfAgentData(self.agent, _object_id="01545") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 2) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 50) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01546") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 3) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 51) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01544") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 4) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 49) - self.agent.add_object(_obj2) - - _obj2 = QmfAgentData(self.agent, _object_id="01543") - _obj2.set_value("field1", "a value") - _obj2.set_value("field2", 4) - _obj2.set_value("field3", {"a":1, "map":2, "value":3}) - _obj2.set_value("field4", ["a", "list", "value"]) - _obj2.set_value("index1", 48) - self.agent.add_object(_obj2) - - self.running = False - self.ready = Event() - - def start_app(self): - self.running = True - self.start() - self.ready.wait(10) - if not self.ready.is_set(): - raise Exception("Agent failed to connect to broker.") - - def stop_app(self): - self.running = False - # wake main thread - self.notifier.indication() # hmmm... collide with daemon??? - self.join(10) - if self.isAlive(): - raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") - - def run(self): - # broker_url = "user/passwd@hostname:port" - self.conn = qpid.messaging.Connection(self.broker_url.host, - self.broker_url.port, - self.broker_url.user, - self.broker_url.password) - self.conn.connect() - self.agent.set_connection(self.conn) - self.ready.set() - - while self.running: - self.notifier.wait_for_work(None) - wi = self.agent.get_next_workitem(timeout=0) - while wi is not None: - logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) - self.agent.release_workitem(wi) - wi = self.agent.get_next_workitem(timeout=0) - - if self.conn: - self.agent.remove_connection(10) - self.agent.destroy(10) - - - - -class BaseTest(unittest.TestCase): - def configure(self, config): - self.config = config - self.broker = config.broker - self.defines = self.config.defines - - def setUp(self): - # one second agent indication interval - self.agent_heartbeat = 1 - self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat) - self.agent1.start_app() - self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat) - self.agent2.start_app() - - def tearDown(self): - if self.agent1: - self.agent1.stop_app() - self.agent1 = None - if self.agent2: - self.agent2.stop_app() - self.agent2 = None - - def test_all_oids(self): - # create console - # find agents - # synchronous query for all schemas - # synchronous query for all objects per schema - # verify known object ids are returned - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # first, find objects per schema - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == 1) - for sid in sid_list: - t_params = {QmfData.KEY_SCHEMA_ID: sid} - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID, - _target_params=t_params) - - oid_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(oid_list, type([])), - "Unexpected return type") - self.assertTrue(len(oid_list) == 3, "Wrong count") - self.assertTrue('100a name' in oid_list) - self.assertTrue('99another name' in oid_list) - self.assertTrue('50my name' in oid_list) - self.assertTrue('01545' not in oid_list) - - - # now, find all unmanaged objects (no schema) - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID) - oid_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(oid_list, type([])), - "Unexpected return type") - self.assertTrue(len(oid_list) == 4, "Wrong count") - self.assertTrue('100a name' not in oid_list) - self.assertTrue('99another name' not in oid_list) - self.assertTrue('01545' in oid_list) - self.assertTrue('01544' in oid_list) - self.assertTrue('01543' in oid_list) - self.assertTrue('01546' in oid_list) - - self.console.destroy(10) - - - def test_direct_oids(self): - # create console - # find agents - # synchronous query for each objects - # verify objects and schemas are correct - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # first, find objects per schema - query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID) - sid_list = self.console.do_query(agent, query) - self.assertTrue(sid_list and len(sid_list) == 1) - - for oid in ['100a name', '99another name']: - query = QmfQuery.create_id_object(oid, sid_list[0]) - obj_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(obj_list, type([])), - "Unexpected return type") - self.assertTrue(len(obj_list) == 1) - obj = obj_list[0] - self.assertTrue(isinstance(obj, QmfData)) - self.assertTrue(obj.get_object_id() == oid) - self.assertTrue(obj.get_schema_class_id() == sid_list[0]) - schema_id = obj.get_schema_class_id() - self.assertTrue(isinstance(schema_id, SchemaClassId)) - self.assertTrue(obj.is_described()) - - # now find schema-less objects - for oid in ['01545']: - query = QmfQuery.create_id_object(oid) - obj_list = self.console.do_query(agent, query) - - self.assertTrue(isinstance(obj_list, type([])), - "Unexpected return type") - self.assertTrue(len(obj_list) == 1) - obj = obj_list[0] - self.assertTrue(isinstance(obj, QmfData)) - self.assertTrue(obj.get_object_id() == oid) - self.assertFalse(obj.is_described()) - - self.console.destroy(10) - - - - def test_packages(self): - # create console - # find agents - # synchronous query all package names - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES) - package_list = self.console.do_query(agent, query) - self.assertTrue(len(package_list) == 1) - self.assertTrue('MyPackage' in package_list) - - - self.console.destroy(10) - - - - def test_predicate_schema_id(self): - # create console - # find agents - # synchronous query for all schema by package name - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, - [QmfQuery.EQ, - SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, "MyPackage"]]) - - schema_list = self.console.do_query(agent, query) - self.assertTrue(len(schema_list)) - for schema in schema_list: - self.assertTrue(schema.get_class_id().get_package_name() == - "MyPackage") - - - self.console.destroy(10) - - - - def test_predicate_no_match(self): - # create console - # find agents - # synchronous query for all schema by package name - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA, - [QmfQuery.EQ, - [QmfQuery.UNQUOTE, SchemaClassId.KEY_PACKAGE], - [QmfQuery.QUOTE, "No-Such-Package"]]) - - schema_list = self.console.do_query(agent, query) - self.assertTrue(len(schema_list) == 0) - - self.console.destroy(10) - - - def test_predicate_match_string(self): - # create console - # find agents - # synchronous query for all objects with a value named - # set_string which is < or equal to "UNSET" - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # get the schema id for MyPackage:MyClass schema - query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, - [QmfQuery.AND, - [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, "MyPackage"]], - [QmfQuery.EQ, SchemaClassId.KEY_CLASS, - [QmfQuery.QUOTE, "MyClass"]]]) - sid_list = self.console.do_query(agent, query) - self.assertTrue(len(sid_list) == 1) - - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, "set_string"]], - [QmfQuery.EQ, "set_string", [QmfQuery.QUOTE, "UNSET"]]], - _target_params={QmfData.KEY_SCHEMA_ID: sid_list[0]}) - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 2) - for obj in obj_list: - self.assertTrue(obj.has_value("set_string")) - self.assertTrue(obj.get_value("set_string") == "UNSET") - - self.console.destroy(10) - - - - def test_predicate_match_integer(self): - # create console - # find agents - # synchronous query for all objects with a value named - # "index1" which is < or equal to various values - self.notifier = _testNotifier() - self.console = qmf2.console.Console(notifier=self.notifier, - agent_timeout=3) - self.conn = qpid.messaging.Connection(self.broker.host, - self.broker.port, - self.broker.user, - self.broker.password) - self.conn.connect() - self.console.add_connection(self.conn) - - for aname in ["agent1", "agent2"]: - agent = self.console.find_agent(aname, timeout=3) - self.assertTrue(agent and agent.get_name() == aname) - - # Query the unmanaged (no schema) objects - - # == 50 - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.EQ, "index1", 50]]) - - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 1) - self.assertTrue(obj_list[0].has_value("index1")) - self.assertTrue(obj_list[0].get_value("index1") == 50) - - # <= 50 - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.LE, "index1", 50]]) - - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 3) - for obj in obj_list: - self.assertTrue(obj.has_value("index1")) - self.assertTrue(obj.get_value("index1") <= 50) - - - # > 50 - query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, - [QmfQuery.AND, - [QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]], - [QmfQuery.GT, "index1", 50]]) - - obj_list = self.console.do_query(agent, query) - self.assertTrue(len(obj_list) == 1) - for obj in obj_list: - self.assertTrue(obj.has_value("index1")) - self.assertTrue(obj.get_value("index1") > 50) - - self.console.destroy(10) - |
