diff options
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r-- | python/qmf2/console.py | 2295 |
1 files changed, 0 insertions, 2295 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py deleted file mode 100644 index c13cf70755..0000000000 --- a/python/qmf2/console.py +++ /dev/null @@ -1,2295 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import sys -import os -import logging -import platform -import time -import datetime -import Queue -from threading import Thread, Event -from threading import Lock -from threading import currentThread -from threading import Condition - -from qpid.messaging import Connection, Message, Empty, SendError - -from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier, - MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId, - SchemaEventClass, SchemaObjectClass, WorkItem, - SchemaMethod, QmfEvent, timedelta_to_secs) - - -# global flag that indicates which thread (if any) is -# running the console notifier callback -_callback_thread=None - - - - -##============================================================================== -## Console Transaction Management -## -## At any given time, a console application may have multiple outstanding -## message transactions with agents. The following objects allow the console -## to track these outstanding transactions. -##============================================================================== - - -class _Mailbox(object): - """ - Virtual base class for all Mailbox-like objects. - """ - def __init__(self, console): - self.console = console - self.cid = 0 - self.console._add_mailbox(self) - - def get_address(self): - return self.cid - - def deliver(self, data): - """ - Invoked by Console Management thread when a message arrives for - this mailbox. - """ - raise Exception("_Mailbox deliver() method must be provided") - - def destroy(self): - """ - Release the mailbox. Once called, the mailbox should no longer be - referenced. - """ - self.console._remove_mailbox(self.cid) - - -class _SyncMailbox(_Mailbox): - """ - A simple mailbox that allows a consumer to wait for delivery of data. - """ - def __init__(self, console): - """ - Invoked by application thread. - """ - super(_SyncMailbox, self).__init__(console) - self._cv = Condition() - self._data = [] - self._waiting = False - - def deliver(self, data): - """ - Drop data into the mailbox, waking any waiters if necessary. - Invoked by Console Management thread only. - """ - self._cv.acquire() - try: - self._data.append(data) - # if was empty, notify waiters - if len(self._data) == 1: - self._cv.notify() - finally: - self._cv.release() - - def fetch(self, timeout=None): - """ - Get one data item from a mailbox, with timeout. - Invoked by application thread. - """ - self._cv.acquire() - try: - if len(self._data) == 0: - self._cv.wait(timeout) - if len(self._data): - return self._data.pop(0) - return None - finally: - self._cv.release() - - -class _AsyncMailbox(_Mailbox): - """ - A Mailbox for asynchronous delivery, with a timeout value. - """ - def __init__(self, console, - _timeout=None): - """ - Invoked by application thread. - """ - super(_AsyncMailbox, self).__init__(console) - self.console = console - - if _timeout is None: - _timeout = console._reply_timeout - self.expiration_date = (datetime.datetime.utcnow() + - datetime.timedelta(seconds=_timeout)) - console._lock.acquire() - try: - console._async_mboxes[self.cid] = self - finally: - console._lock.release() - - # now that an async mbox has been created, wake the - # console mgmt thread so it will know about the mbox expiration - # date (and adjust its idle sleep period correctly) - - console._wake_thread() - - def deliver(self, msg): - """ - """ - raise Exception("deliver() method must be provided") - - def expire(self): - raise Exception("expire() method must be provided") - - - def destroy(self): - self.console._lock.acquire() - try: - if self.cid in self.console._async_mboxes: - del self.console._async_mboxes[self.cid] - finally: - self.console._lock.release() - super(_AsyncMailbox, self).destroy() - - - -class _QueryMailbox(_AsyncMailbox): - """ - A mailbox used for asynchronous query requests. - """ - def __init__(self, console, - agent_name, - context, - target, msgkey, - _timeout=None): - """ - Invoked by application thread. - """ - super(_QueryMailbox, self).__init__(console, - _timeout) - self.agent_name = agent_name - self.target = target - self.msgkey = msgkey - self.context = context - self.result = [] - - def deliver(self, reply): - """ - Process query response messages delivered to this mailbox. - Invoked by Console Management thread only. - """ - done = False - objects = reply.content.get(self.msgkey) - if not objects: - done = True - else: - # convert from map to native types if needed - if self.target == QmfQuery.TARGET_SCHEMA_ID: - for sid_map in objects: - self.result.append(SchemaClassId.from_map(sid_map)) - - elif self.target == QmfQuery.TARGET_SCHEMA: - for schema_map in objects: - # extract schema id, convert based on schema type - sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) - if sid_map: - sid = SchemaClassId.from_map(sid_map) - if sid: - if sid.get_type() == SchemaClassId.TYPE_DATA: - schema = SchemaObjectClass.from_map(schema_map) - else: - schema = SchemaEventClass.from_map(schema_map) - self.console._add_schema(schema) # add to schema cache - self.result.append(schema) - - elif self.target == QmfQuery.TARGET_OBJECT: - for obj_map in objects: - # @todo: need the agent name - ideally from the - # reply message iself. - agent = self.console.get_agent(self.agent_name) - if agent: - obj = QmfConsoleData(map_=obj_map, agent=agent) - # start fetch of schema if not known - sid = obj.get_schema_class_id() - if sid: - self.console._prefetch_schema(sid, agent) - self.result.append(obj) - - - else: - # no conversion needed. - self.result += objects - - if done: - # create workitem - # logging.error("QUERY COMPLETE for %s" % str(self.context)) - wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) - self.console._work_q.put(wi) - self.console._work_q_put = True - - self.destroy() - - - def expire(self): - logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % - datetime.datetime.utcnow()) - # send along whatever (possibly none) has been received so far - wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) - self.console._work_q.put(wi) - self.console._work_q_put = True - - self.destroy() - - - -class _SchemaPrefetchMailbox(_AsyncMailbox): - """ - Handles responses to schema fetches made by the console. - """ - def __init__(self, console, - schema_id, - _timeout=None): - """ - Invoked by application thread. - """ - super(_SchemaPrefetchMailbox, self).__init__(console, - _timeout) - self.schema_id = schema_id - - def deliver(self, reply): - """ - Process schema response messages. - """ - done = False - schemas = reply.content.get(MsgKey.schema) - if schemas: - for schema_map in schemas: - # extract schema id, convert based on schema type - sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) - if sid_map: - sid = SchemaClassId.from_map(sid_map) - if sid: - if sid.get_type() == SchemaClassId.TYPE_DATA: - schema = SchemaObjectClass.from_map(schema_map) - else: - schema = SchemaEventClass.from_map(schema_map) - self.console._add_schema(schema) # add to schema cache - self.destroy() - - - def expire(self): - self.destroy() - - - -class _MethodMailbox(_AsyncMailbox): - """ - A mailbox used for asynchronous method requests. - """ - def __init__(self, console, - context, - _timeout=None): - """ - Invoked by application thread. - """ - super(_MethodMailbox, self).__init__(console, - _timeout) - self.context = context - - def deliver(self, reply): - """ - Process method response messages delivered to this mailbox. - Invoked by Console Management thread only. - """ - - _map = reply.content.get(MsgKey.method) - if not _map: - logging.error("Invalid method call reply message") - result = None - else: - error=_map.get(SchemaMethod.KEY_ERROR) - if error: - error = QmfData.from_map(error) - result = MethodResult(_error=error) - else: - result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) - - # create workitem - wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result) - self.console._work_q.put(wi) - self.console._work_q_put = True - - self.destroy() - - - def expire(self): - """ - The mailbox expired without receiving a reply. - Invoked by the Console Management thread only. - """ - logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % - datetime.datetime.utcnow()) - # send along an empty response - wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None) - self.console._work_q.put(wi) - self.console._work_q_put = True - - self.destroy() - - - -##============================================================================== -## DATA MODEL -##============================================================================== - - -class QmfConsoleData(QmfData): - """ - Console's representation of an managed QmfData instance. - """ - def __init__(self, map_, agent): - super(QmfConsoleData, self).__init__(_map=map_, - _const=True) - self._agent = agent - - def get_timestamps(self): - """ - Returns a list of timestamps describing the lifecycle of - the object. All timestamps are represented by the AMQP - timestamp type. [0] = time of last update from Agent, - [1] = creation timestamp - [2] = deletion timestamp, or zero if not - deleted. - """ - return [self._utime, self._ctime, self._dtime] - - def get_create_time(self): - """ - returns the creation timestamp - """ - return self._ctime - - def get_update_time(self): - """ - returns the update timestamp - """ - return self._utime - - def get_delete_time(self): - """ - returns the deletion timestamp, or zero if not yet deleted. - """ - return self._dtime - - def is_deleted(self): - """ - True if deletion timestamp not zero. - """ - return self._dtime != long(0) - - def refresh(self, _reply_handle=None, _timeout=None): - """ - request that the Agent update the value of this object's - contents. - """ - if _reply_handle is not None: - logging.error(" ASYNC REFRESH TBD!!!") - return None - - assert self._agent - assert self._agent._console - - if _timeout is None: - _timeout = self._agent._console._reply_timeout - - # create query to agent using this objects ID - query = QmfQuery.create_id_object(self.get_object_id(), - self.get_schema_class_id()) - obj_list = self._agent._console.do_query(self._agent, query, - _timeout=_timeout) - if obj_list is None or len(obj_list) != 1: - return None - - self._update(obj_list[0]) - return self - - - def invoke_method(self, name, _in_args={}, _reply_handle=None, - _timeout=None): - """ - Invoke the named method on this object. - """ - assert self._agent - assert self._agent._console - - oid = self.get_object_id() - if oid is None: - raise ValueError("Cannot invoke methods on unmanaged objects.") - - if _timeout is None: - _timeout = self._agent._console._reply_timeout - - if _reply_handle is not None: - mbox = _MethodMailbox(self._agent._console, - _reply_handle) - else: - mbox = _SyncMailbox(self._agent._console) - cid = mbox.get_address() - - _map = {self.KEY_OBJECT_ID:str(oid), - SchemaMethod.KEY_NAME:name} - - sid = self.get_schema_class_id() - if sid: - _map[self.KEY_SCHEMA_ID] = sid.map_encode() - if _in_args: - _map[SchemaMethod.KEY_ARGUMENTS] = _in_args - - logging.debug("Sending method req to Agent (%s)" % time.time()) - try: - self._agent._send_method_req(_map, cid) - except SendError, e: - logging.error(str(e)) - mbox.destroy() - return None - - if _reply_handle is not None: - return True - - logging.debug("Waiting for response to method req (%s)" % _timeout) - replyMsg = mbox.fetch(_timeout) - mbox.destroy() - - if not replyMsg: - logging.debug("Agent method req wait timed-out.") - return None - - _map = replyMsg.content.get(MsgKey.method) - if not _map: - logging.error("Invalid method call reply message") - return None - - error=_map.get(SchemaMethod.KEY_ERROR) - if error: - return MethodResult(_error=QmfData.from_map(error)) - else: - return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) - - def _update(self, newer): - super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes, - _tag=newer._tag, _object_id=newer._object_id, - _ctime=newer._ctime, _utime=newer._utime, - _dtime=newer._dtime, - _schema_id=newer._schema_id, _const=True) - -class QmfLocalData(QmfData): - """ - Console's representation of an unmanaged QmfData instance. There - is no remote agent associated with this instance. The Console has - full control over this instance. - """ - def __init__(self, values, _subtypes={}, _tag=None, _object_id=None, - _schema=None): - # timestamp in millisec since epoch UTC - ctime = long(time.time() * 1000) - super(QmfLocalData, self).__init__(_values=values, - _subtypes=_subtypes, _tag=_tag, - _object_id=_object_id, - _schema=_schema, _ctime=ctime, - _utime=ctime, _const=False) - - -class Agent(object): - """ - A local representation of a remote agent managed by this console. - """ - def __init__(self, name, console): - """ - @type name: string - @param name: uniquely identifies this agent in the AMQP domain. - """ - - if not isinstance(console, Console): - raise TypeError("parameter must be an instance of class Console") - - self._name = name - self._address = QmfAddress.direct(name, console._domain) - self._console = console - self._sender = None - self._packages = {} # map of {package-name:[list of class-names], } for this agent - self._subscriptions = [] # list of active standing subscriptions for this agent - self._announce_timestamp = None # datetime when last announce received - logging.debug( "Created Agent with address: [%s]" % self._address ) - - - def get_name(self): - return self._name - - def is_active(self): - return self._announce_timestamp != None - - def _send_msg(self, msg, correlation_id=None): - """ - Low-level routine to asynchronously send a message to this agent. - """ - msg.reply_to = str(self._console._address) - if correlation_id: - msg.correlation_id = str(correlation_id) - # TRACE - #logging.error("!!! Console %s sending to agent %s (%s)" % - # (self._console._name, self._name, str(msg))) - self._sender.send(msg) - # return handle - - def get_packages(self): - """ - Return a list of the names of all packages known to this agent. - """ - return self._packages.keys() - - def get_classes(self): - """ - Return a dictionary [key:class] of classes known to this agent. - """ - return self._packages.copy() - - def get_objects(self, query, kwargs={}): - """ - Return a list of objects that satisfy the given query. - - @type query: dict, or common.Query - @param query: filter for requested objects - @type kwargs: dict - @param kwargs: ??? used to build match selector and query ??? - @rtype: list - @return: list of matching objects, or None. - """ - pass - - def get_object(self, query, kwargs={}): - """ - Get one object - query is expected to match only one object. - ??? Recommended: explicit timeout param, default None ??? - - @type query: dict, or common.Query - @param query: filter for requested objects - @type kwargs: dict - @param kwargs: ??? used to build match selector and query ??? - @rtype: qmfConsole.ObjectProxy - @return: one matching object, or none - """ - pass - - - def create_subscription(self, query): - """ - Factory for creating standing subscriptions based on a given query. - - @type query: common.Query object - @param query: determines the list of objects for which this subscription applies - @rtype: qmfConsole.Subscription - @returns: an object representing the standing subscription. - """ - pass - - - def invoke_method(self, name, _in_args={}, _reply_handle=None, - _timeout=None): - """ - Invoke the named method on this agent. - """ - assert self._console - - if _timeout is None: - _timeout = self._console._reply_timeout - - if _reply_handle is not None: - mbox = _MethodMailbox(self._console, - _reply_handle) - else: - mbox = _SyncMailbox(self._console) - cid = mbox.get_address() - - _map = {SchemaMethod.KEY_NAME:name} - if _in_args: - _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy() - - logging.debug("Sending method req to Agent (%s)" % time.time()) - try: - self._send_method_req(_map, cid) - except SendError, e: - logging.error(str(e)) - mbox.destroy() - return None - - if _reply_handle is not None: - return True - - logging.debug("Waiting for response to method req (%s)" % _timeout) - replyMsg = mbox.fetch(_timeout) - mbox.destroy() - - if not replyMsg: - logging.debug("Agent method req wait timed-out.") - return None - - _map = replyMsg.content.get(MsgKey.method) - if not _map: - logging.error("Invalid method call reply message") - return None - - return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), - _error=_map.get(SchemaMethod.KEY_ERROR)) - - def enable_events(self): - raise Exception("enable_events tbd") - - def disable_events(self): - raise Exception("disable_events tbd") - - def destroy(self): - raise Exception("destroy tbd") - - def __repr__(self): - return str(self._address) - - def __str__(self): - return self.__repr__() - - def _send_query(self, query, correlation_id=None): - """ - """ - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.get_query)}, - content={MsgKey.query: query.map_encode()}) - self._send_msg( msg, correlation_id ) - - - def _send_method_req(self, mr_map, correlation_id=None): - """ - """ - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.method_req)}, - content=mr_map) - self._send_msg( msg, correlation_id ) - - - ##============================================================================== - ## METHOD CALL - ##============================================================================== - -class MethodResult(object): - def __init__(self, _out_args=None, _error=None): - self._error = _error - self._out_args = _out_args - - def succeeded(self): - return self._error is None - - def get_exception(self): - return self._error - - def get_arguments(self): - return self._out_args - - def get_argument(self, name): - arg = None - if self._out_args: - arg = self._out_args.get(name) - return arg - - - ##============================================================================== - ## CONSOLE - ##============================================================================== - - - - - - -class Console(Thread): - """ - A Console manages communications to a collection of agents on behalf of an application. - """ - def __init__(self, name=None, _domain=None, notifier=None, - reply_timeout = 60, - # agent_timeout = 120, - agent_timeout = 60, - kwargs={}): - """ - @type name: str - @param name: identifier for this console. Must be unique. - @type notifier: qmfConsole.Notifier - @param notifier: invoked when events arrive for processing. - @type kwargs: dict - @param kwargs: ??? Unused - """ - Thread.__init__(self) - self._operational = False - self._ready = Event() - - if not name: - self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) - else: - self._name = str(name) - self._domain = _domain - self._address = QmfAddress.direct(self._name, self._domain) - self._notifier = notifier - self._lock = Lock() - self._conn = None - self._session = None - # dict of "agent-direct-address":class Agent entries - self._agent_map = {} - self._direct_recvr = None - self._announce_recvr = None - self._locate_sender = None - self._schema_cache = {} - self._pending_schema_req = [] - self._agent_discovery_filter = None - self._reply_timeout = reply_timeout - self._agent_timeout = agent_timeout - self._next_agent_expire = None - self._next_mbox_expire = None - # for passing WorkItems to the application - self._work_q = Queue.Queue() - self._work_q_put = False - # Correlation ID and mailbox storage - self._correlation_id = long(time.time()) # pseudo-randomize - self._post_office = {} # indexed by cid - self._async_mboxes = {} # indexed by cid, used to expire them - - ## Old stuff below??? - #self._broker_list = [] - #self.impl = qmfengine.Console() - #self._event = qmfengine.ConsoleEvent() - ##self._cv = Condition() - ##self._sync_count = 0 - ##self._sync_result = None - ##self._select = {} - ##self._cb_cond = Condition() - - - - def destroy(self, timeout=None): - """ - Must be called before the Console is deleted. - Frees up all resources and shuts down all background threads. - - @type timeout: float - @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. - """ - logging.debug("Destroying Console...") - if self._conn: - self.remove_connection(self._conn, timeout) - logging.debug("Console Destroyed") - - - - def add_connection(self, conn): - """ - Add a AMQP connection to the console. The console will setup a session over the - connection. The console will then broadcast an Agent Locate Indication over - the session in order to discover present agents. - - @type conn: qpid.messaging.Connection - @param conn: the connection to the AMQP messaging infrastructure. - """ - if self._conn: - raise Exception( "Multiple connections per Console not supported." ); - self._conn = conn - self._session = conn.session(name=self._name) - - # for messages directly addressed to me - self._direct_recvr = self._session.receiver(str(self._address) + - ";{create:always," - " node-properties:" - " {type:topic," - " x-properties:" - " {type:direct}}}", - capacity=1) - logging.debug("my direct addr=%s" % self._direct_recvr.source) - - self._direct_sender = self._session.sender(str(self._address.get_node()) + - ";{create:always," - " node-properties:" - " {type:topic," - " x-properties:" - " {type:direct}}}") - logging.debug("my direct sender=%s" % self._direct_sender.target) - - # for receiving "broadcast" messages from agents - default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#", - self._domain) - self._topic_recvr = self._session.receiver(str(default_addr) + - ";{create:always," - " node-properties:{type:topic}}", - capacity=1) - logging.debug("default topic recv addr=%s" % self._topic_recvr.source) - - - # for sending to topic subscribers - topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain) - self._topic_sender = self._session.sender(str(topic_addr) + - ";{create:always," - " node-properties:{type:topic}}") - logging.debug("default topic send addr=%s" % self._topic_sender.target) - - # - # Now that receivers are created, fire off the receive thread... - # - self._operational = True - self.start() - self._ready.wait(10) - if not self._ready.isSet(): - raise Exception("Console managment thread failed to start.") - - - - def remove_connection(self, conn, timeout=None): - """ - Remove an AMQP connection from the console. Un-does the add_connection() operation, - and releases any agents and sessions associated with the connection. - - @type conn: qpid.messaging.Connection - @param conn: connection previously added by add_connection() - """ - if self._conn and conn and conn != self._conn: - logging.error( "Attempt to delete unknown connection: %s" % str(conn)) - - # tell connection thread to shutdown - self._operational = False - if self.isAlive(): - # kick my thread to wake it up - self._wake_thread() - logging.debug("waiting for console receiver thread to exit") - self.join(timeout) - if self.isAlive(): - logging.error( "Console thread '%s' is hung..." % self.getName() ) - self._direct_recvr.close() - self._direct_sender.close() - self._topic_recvr.close() - self._topic_sender.close() - self._session.close() - self._session = None - self._conn = None - logging.debug("console connection removal complete") - - - def get_address(self): - """ - The AMQP address this Console is listening to. - """ - return self._address - - - def destroy_agent( self, agent ): - """ - Undoes create. - """ - if not isinstance(agent, Agent): - raise TypeError("agent must be an instance of class Agent") - - self._lock.acquire() - try: - if agent._name in self._agent_map: - del self._agent_map[agent._name] - finally: - self._lock.release() - - def find_agent(self, name, timeout=None ): - """ - Given the name of a particular agent, return an instance of class Agent - representing that agent. Return None if the agent does not exist. - """ - - self._lock.acquire() - try: - agent = self._agent_map.get(name) - if agent: - return agent - finally: - self._lock.release() - - # agent not present yet - ping it with an agent_locate - - mbox = _SyncMailbox(self) - cid = mbox.get_address() - - query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) - msg = Message(subject="console.ind.locate." + name, - properties={"method":"request", - "qmf.subject":make_subject(OpCode.agent_locate)}, - content={MsgKey.query: query.map_encode()}) - msg.reply_to = str(self._address) - msg.correlation_id = str(cid) - logging.debug("Sending Agent Locate (%s)" % time.time()) - # TRACE - #logging.error("!!! Console %s sending agent locate (%s)" % - # (self._name, str(msg))) - try: - self._topic_sender.send(msg) - except SendError, e: - logging.error(str(e)) - mbox.destroy() - return None - - if timeout is None: - timeout = self._reply_timeout - - new_agent = None - logging.debug("Waiting for response to Agent Locate (%s)" % timeout) - mbox.fetch(timeout) - mbox.destroy() - logging.debug("Agent Locate wait ended (%s)" % time.time()) - self._lock.acquire() - try: - new_agent = self._agent_map.get(name) - finally: - self._lock.release() - - return new_agent - - - def get_agents(self): - """ - Return the list of known agents. - """ - self._lock.acquire() - try: - agents = self._agent_map.values() - finally: - self._lock.release() - return agents - - - def get_agent(self, name): - """ - Return the named agent, else None if not currently available. - """ - self._lock.acquire() - try: - agent = self._agent_map.get(name) - finally: - self._lock.release() - return agent - - - def do_query(self, agent, query, _reply_handle=None, _timeout=None ): - """ - """ - query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, - QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, - QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, - QmfQuery.TARGET_SCHEMA: MsgKey.schema, - QmfQuery.TARGET_OBJECT: MsgKey.data_obj, - QmfQuery.TARGET_AGENT: MsgKey.agent_info} - - target = query.get_target() - msgkey = query_keymap.get(target) - if not msgkey: - raise Exception("Invalid target for query: %s" % str(query)) - - if _reply_handle is not None: - mbox = _QueryMailbox(self, - agent.get_name(), - _reply_handle, - target, msgkey, - _timeout) - else: - mbox = _SyncMailbox(self) - - cid = mbox.get_address() - - try: - logging.debug("Sending Query to Agent (%s)" % time.time()) - agent._send_query(query, cid) - except SendError, e: - logging.error(str(e)) - mbox.destroy() - return None - - # return now if async reply expected - if _reply_handle is not None: - return True - - if not _timeout: - _timeout = self._reply_timeout - - logging.debug("Waiting for response to Query (%s)" % _timeout) - now = datetime.datetime.utcnow() - expire = now + datetime.timedelta(seconds=_timeout) - - response = [] - while (expire > now): - _timeout = timedelta_to_secs(expire - now) - reply = mbox.fetch(_timeout) - if not reply: - logging.debug("Query wait timed-out.") - break - - objects = reply.content.get(msgkey) - if not objects: - # last response is empty - break - - # convert from map to native types if needed - if target == QmfQuery.TARGET_SCHEMA_ID: - for sid_map in objects: - response.append(SchemaClassId.from_map(sid_map)) - - elif target == QmfQuery.TARGET_SCHEMA: - for schema_map in objects: - # extract schema id, convert based on schema type - sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) - if sid_map: - sid = SchemaClassId.from_map(sid_map) - if sid: - if sid.get_type() == SchemaClassId.TYPE_DATA: - schema = SchemaObjectClass.from_map(schema_map) - else: - schema = SchemaEventClass.from_map(schema_map) - self._add_schema(schema) # add to schema cache - response.append(schema) - - elif target == QmfQuery.TARGET_OBJECT: - for obj_map in objects: - obj = QmfConsoleData(map_=obj_map, agent=agent) - # start fetch of schema if not known - sid = obj.get_schema_class_id() - if sid: - self._prefetch_schema(sid, agent) - response.append(obj) - else: - # no conversion needed. - response += objects - - now = datetime.datetime.utcnow() - - mbox.destroy() - return response - - def _wake_thread(self): - """ - Make the console management thread loop wakeup from its next_receiver - sleep. - """ - logging.debug("Sending noop to wake up [%s]" % self._address) - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.noop)}, - subject=self._name, - content={"noop":"noop"}) - try: - self._direct_sender.send( msg, sync=True ) - except SendError, e: - logging.error(str(e)) - - - def run(self): - """ - Console Management Thread main loop. - Handles inbound messages, agent discovery, async mailbox timeouts. - """ - global _callback_thread - - self._ready.set() - - while self._operational: - - # qLen = self._work_q.qsize() - - while True: - try: - msg = self._topic_recvr.fetch(timeout=0) - except Empty: - break - # TRACE: - # logging.error("!!! Console %s: msg on %s [%s]" % - # (self._name, self._topic_recvr.source, msg)) - self._dispatch(msg, _direct=False) - - while True: - try: - msg = self._direct_recvr.fetch(timeout = 0) - except Empty: - break - # TRACE - #logging.error("!!! Console %s: msg on %s [%s]" % - # (self._name, self._direct_recvr.source, msg)) - self._dispatch(msg, _direct=True) - - self._expire_agents() # check for expired agents - self._expire_mboxes() # check for expired async mailbox requests - - #if qLen == 0 and self._work_q.qsize() and self._notifier: - if self._work_q_put and self._notifier: - # new stuff on work queue, kick the the application... - self._work_q_put = False - _callback_thread = currentThread() - logging.info("Calling console notifier.indication") - self._notifier.indication() - _callback_thread = None - - if self._operational: - # wait for a message to arrive, or an agent - # to expire, or a mailbox requrest to time out - now = datetime.datetime.utcnow() - next_expire = self._next_agent_expire - if (self._next_mbox_expire and - self._next_mbox_expire < next_expire): - next_expire = self._next_mbox_expire - if next_expire > now: - timeout = timedelta_to_secs(next_expire - now) - try: - logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) - xxx = self._session.next_receiver(timeout = timeout) - except Empty: - pass - - - logging.debug("Shutting down Console thread") - - def get_objects(self, - _object_id=None, - _schema_id=None, - _pname=None, _cname=None, - _agents=None, - _timeout=None): - """ - Retrieve objects by id or schema. - - By object_id: must specify schema_id or pname & cname if object defined - by a schema. Undescribed objects: only object_id needed. - - By schema: must specify schema_id or pname & cname - all instances of - objects defined by that schema are returned. - """ - if _agents is None: - # use copy of current agent list - self._lock.acquire() - try: - agent_list = self._agent_map.values() - finally: - self._lock.release() - elif isinstance(_agents, Agent): - agent_list = [_agents] - else: - agent_list = _agents - # @todo validate this list! - - if _timeout is None: - _timeout = self._reply_timeout - - # @todo: fix when async do_query done - query all agents at once, then - # wait for replies, instead of per-agent querying.... - - obj_list = [] - expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) - for agent in agent_list: - if not agent.is_active(): - continue - now = datetime.datetime.utcnow() - if now >= expired: - break - - if _pname is None: - if _object_id: - query = QmfQuery.create_id_object(_object_id, - _schema_id) - else: - if _schema_id is not None: - t_params = {QmfData.KEY_SCHEMA_ID: _schema_id} - else: - t_params = None - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, - t_params) - timeout = timedelta_to_secs(expired - now) - reply = self.do_query(agent, query, _timeout=timeout) - if reply: - obj_list = obj_list + reply - else: - # looking up by package name (and maybe class name), need to - # find all schema_ids in that package, then lookup object by - # schema_id - if _cname is not None: - pred = [QmfQuery.AND, - [QmfQuery.EQ, - SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, _pname]], - [QmfQuery.EQ, SchemaClassId.KEY_CLASS, - [QmfQuery.QUOTE, _cname]]] - else: - pred = [QmfQuery.EQ, - SchemaClassId.KEY_PACKAGE, - [QmfQuery.QUOTE, _pname]] - query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred) - timeout = timedelta_to_secs(expired - now) - sid_list = self.do_query(agent, query, _timeout=timeout) - if sid_list: - for sid in sid_list: - now = datetime.datetime.utcnow() - if now >= expired: - break - if _object_id is not None: - query = QmfQuery.create_id_object(_object_id, sid) - else: - t_params = {QmfData.KEY_SCHEMA_ID: sid} - query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) - timeout = timedelta_to_secs(expired - now) - reply = self.do_query(agent, query, _timeout=timeout) - if reply: - obj_list = obj_list + reply - if obj_list: - return obj_list - return None - - - - # called by run() thread ONLY - # - def _dispatch(self, msg, _direct=True): - """ - PRIVATE: Process a message received from an Agent - """ - logging.debug( "Message received from Agent! [%s]" % msg ) - try: - version,opcode = parse_subject(msg.properties.get("qmf.subject")) - # @todo: deal with version mismatch!!! - except: - logging.error("Ignoring unrecognized message '%s'" % msg) - return - - cmap = {}; props = {} - if msg.content_type == "amqp/map": - cmap = msg.content - if msg.properties: - props = msg.properties - - if opcode == OpCode.agent_ind: - self._handle_agent_ind_msg( msg, cmap, version, _direct ) - elif opcode == OpCode.data_ind: - self._handle_data_ind_msg(msg, cmap, version, _direct) - elif opcode == OpCode.event_ind: - self._handle_event_ind_msg(msg, cmap, version, _direct) - elif opcode == OpCode.managed_object: - logging.warning("!!! managed_object TBD !!!") - elif opcode == OpCode.object_ind: - logging.warning("!!! object_ind TBD !!!") - elif opcode == OpCode.response: - self._handle_response_msg(msg, cmap, version, _direct) - elif opcode == OpCode.schema_ind: - logging.warning("!!! schema_ind TBD !!!") - elif opcode == OpCode.noop: - logging.debug("No-op msg received.") - else: - logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) - - - def _handle_agent_ind_msg(self, msg, cmap, version, direct): - """ - Process a received agent-ind message. This message may be a response to a - agent-locate, or it can be an unsolicited agent announce. - """ - logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) - - ai_map = cmap.get(MsgKey.agent_info) - if not ai_map or not isinstance(ai_map, type({})): - logging.warning("Bad agent-ind message received: '%s'" % msg) - return - name = ai_map.get("_name") - if not name: - logging.warning("Bad agent-ind message received: agent name missing" - " '%s'" % msg) - return - - correlated = False - if msg.correlation_id: - mbox = self._get_mailbox(msg.correlation_id) - correlated = mbox is not None - - agent = None - self._lock.acquire() - try: - agent = self._agent_map.get(name) - if agent: - # agent already known, just update timestamp - agent._announce_timestamp = datetime.datetime.utcnow() - finally: - self._lock.release() - - if not agent: - # need to create and add a new agent? - matched = False - if self._agent_discovery_filter: - tmp = QmfData.create(values=ai_map, _object_id="agent-filter") - matched = self._agent_discovery_filter.evaluate(tmp) - - if (correlated or matched): - agent = self._create_agent(name) - if not agent: - return # failed to add agent - agent._announce_timestamp = datetime.datetime.utcnow() - - if matched: - # unsolicited, but newly discovered - logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) - wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) - self._work_q.put(wi) - self._work_q_put = True - - if correlated: - # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - mbox.deliver(msg) - - def _handle_data_ind_msg(self, msg, cmap, version, direct): - """ - Process a received data-ind message. - """ - logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time())) - - mbox = self._get_mailbox(msg.correlation_id) - if not mbox: - logging.debug("Data indicate received with unknown correlation_id" - " msg='%s'" % str(msg)) - return - - # wake up all waiters - logging.debug("waking waiters for correlation id %s" % - msg.correlation_id) - mbox.deliver(msg) - - - def _handle_response_msg(self, msg, cmap, version, direct): - """ - Process a received data-ind message. - """ - # @todo code replication - clean me. - logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) - - mbox = self._get_mailbox(msg.correlation_id) - if not mbox: - logging.debug("Response msg received with unknown correlation_id" - " msg='%s'" % str(msg)) - return - - # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) - mbox.deliver(msg) - - def _handle_event_ind_msg(self, msg, cmap, version, _direct): - ei_map = cmap.get(MsgKey.event) - if not ei_map or not isinstance(ei_map, type({})): - logging.warning("Bad event indication message received: '%s'" % msg) - return - - aname = ei_map.get("_name") - emap = ei_map.get("_event") - if not aname: - logging.debug("No '_name' field in event indication message.") - return - if not emap: - logging.debug("No '_event' field in event indication message.") - return - - agent = None - self._lock.acquire() - try: - agent = self._agent_map.get(aname) - finally: - self._lock.release() - if not agent: - logging.debug("Agent '%s' not known." % aname) - return - try: - # @todo: schema??? - event = QmfEvent.from_map(emap) - except TypeError: - logging.debug("Invalid QmfEvent map received: %s" % str(emap)) - return - - # @todo: schema? Need to fetch it, but not from this thread! - # This thread can not pend on a request. - logging.debug("Publishing event received from agent %s" % aname) - wi = WorkItem(WorkItem.EVENT_RECEIVED, None, - {"agent":agent, - "event":event}) - self._work_q.put(wi) - self._work_q_put = True - - - def _expire_mboxes(self): - """ - Check all async mailboxes for outstanding requests that have expired. - """ - now = datetime.datetime.utcnow() - if self._next_mbox_expire and now < self._next_mbox_expire: - return - expired_mboxes = [] - self._next_mbox_expire = None - self._lock.acquire() - try: - for mbox in self._async_mboxes.itervalues(): - if now >= mbox.expiration_date: - expired_mboxes.append(mbox) - else: - if (self._next_mbox_expire is None or - mbox.expiration_date < self._next_mbox_expire): - self._next_mbox_expire = mbox.expiration_date - - for mbox in expired_mboxes: - del self._async_mboxes[mbox.cid] - finally: - self._lock.release() - - for mbox in expired_mboxes: - # note: expire() may deallocate the mbox, so don't touch - # it further. - mbox.expire() - - - def _expire_agents(self): - """ - Check for expired agents and issue notifications when they expire. - """ - now = datetime.datetime.utcnow() - if self._next_agent_expire and now < self._next_agent_expire: - return - lifetime_delta = datetime.timedelta(seconds = self._agent_timeout) - next_expire_delta = lifetime_delta - self._lock.acquire() - try: - logging.debug("!!! expiring agents '%s'" % now) - for agent in self._agent_map.itervalues(): - if agent._announce_timestamp: - agent_deathtime = agent._announce_timestamp + lifetime_delta - if agent_deathtime <= now: - logging.debug("AGENT_DELETED for %s" % agent) - agent._announce_timestamp = None - wi = WorkItem(WorkItem.AGENT_DELETED, None, - {"agent":agent}) - # @todo: remove agent from self._agent_map - self._work_q.put(wi) - self._work_q_put = True - else: - if (agent_deathtime - now) < next_expire_delta: - next_expire_delta = agent_deathtime - now - - self._next_agent_expire = now + next_expire_delta - logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) - finally: - self._lock.release() - - - - def _create_agent( self, name ): - """ - Factory to create/retrieve an agent for this console - """ - logging.debug("creating agent %s" % name) - self._lock.acquire() - try: - agent = self._agent_map.get(name) - if agent: - return agent - - agent = Agent(name, self) - try: - agent._sender = self._session.sender(str(agent._address) + - ";{create:always," - " node-properties:" - " {type:topic," - " x-properties:" - " {type:direct}}}") - except: - logging.warning("Unable to create sender for %s" % name) - return None - logging.debug("created agent sender %s" % agent._sender.target) - - self._agent_map[name] = agent - finally: - self._lock.release() - - # new agent - query for its schema database for - # seeding the schema cache (@todo) - # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None}) - # agent._sendQuery( query ) - - return agent - - - - def enable_agent_discovery(self, _query=None): - """ - Called to enable the asynchronous Agent Discovery process. - Once enabled, AGENT_ADD work items can arrive on the WorkQueue. - """ - # @todo: fix - take predicate only, not entire query! - if _query is not None: - if (not isinstance(_query, QmfQuery) or - _query.get_target() != QmfQuery.TARGET_AGENT): - raise TypeError("Type QmfQuery with target == TARGET_AGENT expected") - self._agent_discovery_filter = _query - else: - # create a match-all agent query (no predicate) - self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) - - def disable_agent_discovery(self): - """ - Called to disable the async Agent Discovery process enabled by - calling enableAgentDiscovery() - """ - self._agent_discovery_filter = None - - - - def get_workitem_count(self): - """ - Returns the count of pending WorkItems that can be retrieved. - """ - return self._work_q.qsize() - - - - def get_next_workitem(self, timeout=None): - """ - Returns the next pending work item, or None if none available. - @todo: subclass and return an Empty event instead. - """ - try: - wi = self._work_q.get(True, timeout) - except Queue.Empty: - return None - return wi - - - def release_workitem(self, wi): - """ - Return a WorkItem to the Console when it is no longer needed. - @todo: call Queue.task_done() - only 2.5+ - - @type wi: class qmfConsole.WorkItem - @param wi: work item object to return. - """ - pass - - def _add_schema(self, schema): - """ - @todo - """ - if not isinstance(schema, SchemaClass): - raise TypeError("SchemaClass type expected") - - self._lock.acquire() - try: - sid = schema.get_class_id() - if not self._schema_cache.has_key(sid): - self._schema_cache[sid] = schema - if sid in self._pending_schema_req: - self._pending_schema_req.remove(sid) - finally: - self._lock.release() - - def _prefetch_schema(self, schema_id, agent): - """ - Send an async request for the schema identified by schema_id if the - schema is not available in the cache. - """ - need_fetch = False - self._lock.acquire() - try: - if ((not self._schema_cache.has_key(schema_id)) and - schema_id not in self._pending_schema_req): - self._pending_schema_req.append(schema_id) - need_fetch = True - finally: - self._lock.release() - - if need_fetch: - mbox = _SchemaPrefetchMailbox(self, schema_id) - query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) - logging.debug("Sending Schema Query to Agent (%s)" % time.time()) - try: - agent._send_query(query, mbox.get_address()) - except SendError, e: - logging.error(str(e)) - mbox.destroy() - self._lock.acquire() - try: - self._pending_schema_req.remove(schema_id) - finally: - self._lock.release() - - - def _fetch_schema(self, schema_id, _agent=None, _timeout=None): - """ - Find the schema identified by schema_id. If not in the cache, ask the - agent for it. - """ - if not isinstance(schema_id, SchemaClassId): - raise TypeError("SchemaClassId type expected") - - self._lock.acquire() - try: - schema = self._schema_cache.get(schema_id) - if schema: - return schema - finally: - self._lock.release() - - if _agent is None: - return None - - # note: do_query will add the new schema to the cache automatically. - slist = self.do_query(_agent, - QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), - _timeout=_timeout) - if slist: - return slist[0] - else: - return None - - def _add_mailbox(self, mbox): - """ - Add a mailbox to the post office, and assign it a unique address. - """ - self._lock.acquire() - try: - mbox.cid = self._correlation_id - self._correlation_id += 1 - self._post_office[mbox.cid] = mbox - finally: - self._lock.release() - - def _get_mailbox(self, mid): - try: - mid = long(mid) - except TypeError: - logging.error("Invalid mailbox id: %s" % str(mid)) - return None - - self._lock.acquire() - try: - return self._post_office.get(mid) - finally: - self._lock.release() - - - def _remove_mailbox(self, mid): - """ Remove a mailbox and its address from the post office """ - try: - mid = long(mid) - except TypeError: - logging.error("Invalid mailbox id: %s" % str(mid)) - return None - - self._lock.acquire() - try: - if mid in self._post_office: - del self._post_office[mid] - finally: - self._lock.release() - - def __repr__(self): - return str(self._address) - - # def get_packages(self): - # plist = [] - # for i in range(self.impl.packageCount()): - # plist.append(self.impl.getPackageName(i)) - # return plist - - - # def get_classes(self, package, kind=CLASS_OBJECT): - # clist = [] - # for i in range(self.impl.classCount(package)): - # key = self.impl.getClass(package, i) - # class_kind = self.impl.getClassKind(key) - # if class_kind == kind: - # if kind == CLASS_OBJECT: - # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) - # elif kind == CLASS_EVENT: - # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) - # return clist - - - # def bind_package(self, package): - # return self.impl.bindPackage(package) - - - # def bind_class(self, kwargs = {}): - # if "key" in kwargs: - # self.impl.bindClass(kwargs["key"]) - # elif "package" in kwargs: - # package = kwargs["package"] - # if "class" in kwargs: - # self.impl.bindClass(package, kwargs["class"]) - # else: - # self.impl.bindClass(package) - # else: - # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") - - - # def get_agents(self, broker=None): - # blist = [] - # if broker: - # blist.append(broker) - # else: - # self._cv.acquire() - # try: - # # copy while holding lock - # blist = self._broker_list[:] - # finally: - # self._cv.release() - - # agents = [] - # for b in blist: - # for idx in range(b.impl.agentCount()): - # agents.append(AgentProxy(b.impl.getAgent(idx), b)) - - # return agents - - - # def get_objects(self, query, kwargs = {}): - # timeout = 30 - # agent = None - # temp_args = kwargs.copy() - # if type(query) == type({}): - # temp_args.update(query) - - # if "_timeout" in temp_args: - # timeout = temp_args["_timeout"] - # temp_args.pop("_timeout") - - # if "_agent" in temp_args: - # agent = temp_args["_agent"] - # temp_args.pop("_agent") - - # if type(query) == type({}): - # query = Query(temp_args) - - # self._select = {} - # for k in temp_args.iterkeys(): - # if type(k) == str: - # self._select[k] = temp_args[k] - - # self._cv.acquire() - # try: - # self._sync_count = 1 - # self._sync_result = [] - # broker = self._broker_list[0] - # broker.send_query(query.impl, None, agent) - # self._cv.wait(timeout) - # if self._sync_count == 1: - # raise Exception("Timed out: waiting for query response") - # finally: - # self._cv.release() - - # return self._sync_result - - - # def get_object(self, query, kwargs = {}): - # ''' - # Return one and only one object or None. - # ''' - # objs = objects(query, kwargs) - # if len(objs) == 1: - # return objs[0] - # else: - # return None - - - # def first_object(self, query, kwargs = {}): - # ''' - # Return the first of potentially many objects. - # ''' - # objs = objects(query, kwargs) - # if objs: - # return objs[0] - # else: - # return None - - - # # Check the object against select to check for a match - # def _select_match(self, object): - # schema_props = object.properties() - # for key in self._select.iterkeys(): - # for prop in schema_props: - # if key == p[0].name() and self._select[key] != p[1]: - # return False - # return True - - - # def _get_result(self, list, context): - # ''' - # Called by Broker proxy to return the result of a query. - # ''' - # self._cv.acquire() - # try: - # for item in list: - # if self._select_match(item): - # self._sync_result.append(item) - # self._sync_count -= 1 - # self._cv.notify() - # finally: - # self._cv.release() - - - # def start_sync(self, query): pass - - - # def touch_sync(self, sync): pass - - - # def end_sync(self, sync): pass - - - - -# def start_console_events(self): -# self._cb_cond.acquire() -# try: -# self._cb_cond.notify() -# finally: -# self._cb_cond.release() - - -# def _do_console_events(self): -# ''' -# Called by the Console thread to poll for events. Passes the events -# onto the ConsoleHandler associated with this Console. Is called -# periodically, but can also be kicked by Console.start_console_events(). -# ''' -# count = 0 -# valid = self.impl.getEvent(self._event) -# while valid: -# count += 1 -# try: -# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: -# logging.debug("Console Event AGENT_ADDED received") -# if self._handler: -# self._handler.agent_added(AgentProxy(self._event.agent, None)) -# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: -# logging.debug("Console Event AGENT_DELETED received") -# if self._handler: -# self._handler.agent_deleted(AgentProxy(self._event.agent, None)) -# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: -# logging.debug("Console Event NEW_PACKAGE received") -# if self._handler: -# self._handler.new_package(self._event.name) -# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: -# logging.debug("Console Event NEW_CLASS received") -# if self._handler: -# self._handler.new_class(SchemaClassKey(self._event.classKey)) -# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: -# logging.debug("Console Event OBJECT_UPDATE received") -# if self._handler: -# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), -# self._event.hasProps, self._event.hasStats) -# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: -# logging.debug("Console Event EVENT_RECEIVED received") -# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: -# logging.debug("Console Event AGENT_HEARTBEAT received") -# if self._handler: -# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) -# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: -# logging.debug("Console Event METHOD_RESPONSE received") -# else: -# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) -# except e: -# print "Exception caught in callback thread:", e -# self.impl.popEvent() -# valid = self.impl.getEvent(self._event) -# return count - - - - - -# class Broker(ConnectionHandler): -# # attr_reader :impl :conn, :console, :broker_bank -# def __init__(self, console, conn): -# self.broker_bank = 1 -# self.console = console -# self.conn = conn -# self._session = None -# self._cv = Condition() -# self._stable = None -# self._event = qmfengine.BrokerEvent() -# self._xmtMessage = qmfengine.Message() -# self.impl = qmfengine.BrokerProxy(self.console.impl) -# self.console.impl.addConnection(self.impl, self) -# self.conn.add_conn_handler(self) -# self._operational = True - - -# def shutdown(self): -# logging.debug("broker.shutdown() called.") -# self.console.impl.delConnection(self.impl) -# self.conn.del_conn_handler(self) -# if self._session: -# self.impl.sessionClosed() -# logging.debug("broker.shutdown() sessionClosed done.") -# self._session.destroy() -# logging.debug("broker.shutdown() session destroy done.") -# self._session = None -# self._operational = False -# logging.debug("broker.shutdown() done.") - - -# def wait_for_stable(self, timeout = None): -# self._cv.acquire() -# try: -# if self._stable: -# return -# if timeout: -# self._cv.wait(timeout) -# if not self._stable: -# raise Exception("Timed out: waiting for broker connection to become stable") -# else: -# while not self._stable: -# self._cv.wait() -# finally: -# self._cv.release() - - -# def send_query(self, query, ctx, agent): -# agent_impl = None -# if agent: -# agent_impl = agent.impl -# self.impl.sendQuery(query, ctx, agent_impl) -# self.conn.kick() - - -# def _do_broker_events(self): -# count = 0 -# valid = self.impl.getEvent(self._event) -# while valid: -# count += 1 -# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: -# logging.debug("Broker Event BROKER_INFO received"); -# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: -# logging.debug("Broker Event DECLARE_QUEUE received"); -# self.conn.impl.declareQueue(self._session.handle, self._event.name) -# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: -# logging.debug("Broker Event DELETE_QUEUE received"); -# self.conn.impl.deleteQueue(self._session.handle, self._event.name) -# elif self._event.kind == qmfengine.BrokerEvent.BIND: -# logging.debug("Broker Event BIND received"); -# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) -# elif self._event.kind == qmfengine.BrokerEvent.UNBIND: -# logging.debug("Broker Event UNBIND received"); -# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) -# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: -# logging.debug("Broker Event SETUP_COMPLETE received"); -# self.impl.startProtocol() -# elif self._event.kind == qmfengine.BrokerEvent.STABLE: -# logging.debug("Broker Event STABLE received"); -# self._cv.acquire() -# try: -# self._stable = True -# self._cv.notify() -# finally: -# self._cv.release() -# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: -# result = [] -# for idx in range(self._event.queryResponse.getObjectCount()): -# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) -# self.console._get_result(result, self._event.context) -# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: -# obj = self._event.context -# obj._method_result(MethodResponse(self._event.methodResponse())) - -# self.impl.popEvent() -# valid = self.impl.getEvent(self._event) - -# return count - - -# def _do_broker_messages(self): -# count = 0 -# valid = self.impl.getXmtMessage(self._xmtMessage) -# while valid: -# count += 1 -# logging.debug("Broker: sending msg on connection") -# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) -# self.impl.popXmt() -# valid = self.impl.getXmtMessage(self._xmtMessage) - -# return count - - -# def _do_events(self): -# while True: -# self.console.start_console_events() -# bcnt = self._do_broker_events() -# mcnt = self._do_broker_messages() -# if bcnt == 0 and mcnt == 0: -# break; - - -# def conn_event_connected(self): -# logging.debug("Broker: Connection event CONNECTED") -# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) -# self.impl.sessionOpened(self._session.handle) -# self._do_events() - - -# def conn_event_disconnected(self, error): -# logging.debug("Broker: Connection event DISCONNECTED") -# pass - - -# def conn_event_visit(self): -# self._do_events() - - -# def sess_event_session_closed(self, context, error): -# logging.debug("Broker: Session event CLOSED") -# self.impl.sessionClosed() - - -# def sess_event_recv(self, context, message): -# logging.debug("Broker: Session event MSG_RECV") -# if not self._operational: -# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) -# self.impl.handleRcvMessage(message) -# self._do_events() - - - -################################################################################ -################################################################################ -################################################################################ -################################################################################ -# TEMPORARY TEST CODE - TO BE DELETED -################################################################################ -################################################################################ -################################################################################ -################################################################################ - -if __name__ == '__main__': - # temp test code - from common import (qmfTypes, SchemaProperty) - - logging.getLogger().setLevel(logging.INFO) - - logging.info( "************* Creating Async Console **************" ) - - class MyNotifier(Notifier): - def __init__(self, context): - self._myContext = context - self.WorkAvailable = False - - def indication(self): - print("Indication received! context=%d" % self._myContext) - self.WorkAvailable = True - - _noteMe = MyNotifier( 666 ) - - _myConsole = Console(notifier=_noteMe) - - _myConsole.enable_agent_discovery() - logging.info("Waiting...") - - - logging.info( "Destroying console:" ) - _myConsole.destroy( 10 ) - - logging.info( "******** Messing around with Schema ********" ) - - _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass", - stype=SchemaClassId.TYPE_EVENT), - _desc="A typical event schema", - _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8, - kwargs = {"min":0, - "max":100, - "unit":"seconds", - "desc":"sleep value"}), - "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR, - kwargs={"maxlen":100, - "desc":"a string argument"})}) - print("_sec=%s" % _sec.get_class_id()) - print("_sec.gePropertyCount()=%d" % _sec.get_property_count() ) - print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') ) - print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') ) - try: - print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') ) - except: - pass - print("_sec.getProperties()='%s'" % _sec.get_properties()) - - print("Adding another argument") - _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL, - kwargs={"dir":"IO", - "desc":"a boolean argument"}) - _sec.add_property('Argument-3', _arg3) - print("_sec=%s" % _sec.get_class_id()) - print("_sec.getPropertyCount()=%d" % _sec.get_property_count() ) - print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') ) - print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') ) - print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') ) - - print("_arg3.mapEncode()='%s'" % _arg3.map_encode() ) - - _secmap = _sec.map_encode() - print("_sec.mapEncode()='%s'" % _secmap ) - - _sec2 = SchemaEventClass( _map=_secmap ) - - print("_sec=%s" % _sec.get_class_id()) - print("_sec2=%s" % _sec2.get_class_id()) - - _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage", - "_class_name": "myOtherClass", - "_type": "_data"}, - "_desc": "A test data object", - "_values": - {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, - "access": "RO", - "index": True, - "unit": "degrees"}, - "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, - "access": "RW", - "index": True, - "desc": "The Second Property(tm)", - "unit": "radians"}, - "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, - "unit": "seconds", - "desc": "time until I retire"}, - "meth1": {"_desc": "A test method", - "_arguments": - {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, - "desc": "an argument 1", - "dir": "I"}, - "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, - "dir": "IO", - "desc": "some weird boolean"}}}, - "meth2": {"_desc": "A test method", - "_arguments": - {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, - "desc": "an 'nuther argument", - "dir": - "I"}}}}, - "_subtypes": - {"prop1":"qmfProperty", - "prop2":"qmfProperty", - "statistics":"qmfProperty", - "meth1":"qmfMethod", - "meth2":"qmfMethod"}, - "_primary_key_names": ["prop2", "prop1"]}) - - print("_soc='%s'" % _soc) - - print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names()) - - print("_soc.getPropertyCount='%d'" % _soc.get_property_count()) - print("_soc.getProperties='%s'" % _soc.get_properties()) - print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2')) - - print("_soc.getMethodCount='%d'" % _soc.get_method_count()) - print("_soc.getMethods='%s'" % _soc.get_methods()) - print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2')) - - _socmap = _soc.map_encode() - print("_socmap='%s'" % _socmap) - _soc2 = SchemaObjectClass( _map=_socmap ) - print("_soc='%s'" % _soc) - print("_soc2='%s'" % _soc2) - - if _soc2.get_class_id() == _soc.get_class_id(): - print("soc and soc2 are the same schema") - - - logging.info( "******** Messing around with ObjectIds ********" ) - - - qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) - print("qd='%s':" % qd) - - print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) - - print("qd map='%s'" % qd.map_encode()) - print("qd getProperty('prop4')='%s'" % qd.get_value("prop4")) - qd.set_value("prop4", 4, "A test property called 4") - print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4")) - qd.prop4 = 9 - print("qd.prop4 = 9 ='%s'" % qd.prop4) - qd["prop4"] = 11 - print("qd[prop4] = 11 ='%s'" % qd["prop4"]) - - print("qd.mapEncode()='%s'" % qd.map_encode()) - _qd2 = QmfData( _map = qd.map_encode() ) - print("_qd2.mapEncode()='%s'" % _qd2.map_encode()) - - _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666, - "prop2": 0}}, - agent="some agent name?", - _schema = _soc) - - print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode()) - - _qmfDesc1._set_schema( _soc ) - - print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2")) - print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id()) - print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id()) - - - _qmfDescMap = _qmfDesc1.map_encode() - print("_qmfDescMap='%s'" % _qmfDescMap) - - _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc ) - - print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode()) - print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2")) - print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) - - - logging.info( "******** Messing around with QmfEvents ********" ) - - - _qmfevent1 = QmfEvent( _timestamp = 1111, - _schema = _sec, - _values = {"Argument-1": 77, - "Argument-3": True, - "Argument-2": "a string"}) - print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode()) - print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp()) - - _qmfevent1Map = _qmfevent1.map_encode() - - _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec) - print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) - - - logging.info( "******** Messing around with Queries ********" ) - - _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, - [QmfQuery.AND, - [QmfQuery.EQ, "vendor", [QmfQuery.QUOTE, "AVendor"]], - [QmfQuery.EQ, [QmfQuery.QUOTE, "SomeProduct"], "product"], - [QmfQuery.EQ, [QmfQuery.UNQUOTE, "name"], [QmfQuery.QUOTE, "Thingy"]], - [QmfQuery.OR, - [QmfQuery.LE, "temperature", -10], - [QmfQuery.FALSE], - [QmfQuery.EXISTS, "namey"]]]) - - print("_q1.mapEncode() = [%s]" % _q1.map_encode()) |