# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys import os import logging import platform import time import datetime import Queue from threading import Thread, Event from threading import Lock from threading import currentThread from threading import Condition from qpid.messaging import Connection, Message, Empty, SendError from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier, MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId, SchemaEventClass, SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent, timedelta_to_secs) # global flag that indicates which thread (if any) is # running the console notifier callback _callback_thread=None ##============================================================================== ## Console Transaction Management ## ## At any given time, a console application may have multiple outstanding ## message transactions with agents. The following objects allow the console ## to track these outstanding transactions. ##============================================================================== class _Mailbox(object): """ Virtual base class for all Mailbox-like objects. """ def __init__(self, console): self.console = console self.cid = 0 self.console._add_mailbox(self) def get_address(self): return self.cid def deliver(self, data): """ Invoked by Console Management thread when a message arrives for this mailbox. """ raise Exception("_Mailbox deliver() method must be provided") def destroy(self): """ Release the mailbox. Once called, the mailbox should no longer be referenced. """ self.console._remove_mailbox(self.cid) class _SyncMailbox(_Mailbox): """ A simple mailbox that allows a consumer to wait for delivery of data. """ def __init__(self, console): """ Invoked by application thread. """ super(_SyncMailbox, self).__init__(console) self._cv = Condition() self._data = [] self._waiting = False def deliver(self, data): """ Drop data into the mailbox, waking any waiters if necessary. Invoked by Console Management thread only. """ self._cv.acquire() try: self._data.append(data) # if was empty, notify waiters if len(self._data) == 1: self._cv.notify() finally: self._cv.release() def fetch(self, timeout=None): """ Get one data item from a mailbox, with timeout. Invoked by application thread. """ self._cv.acquire() try: if len(self._data) == 0: self._cv.wait(timeout) if len(self._data): return self._data.pop(0) return None finally: self._cv.release() class _AsyncMailbox(_Mailbox): """ A Mailbox for asynchronous delivery, with a timeout value. """ def __init__(self, console, _timeout=None): """ Invoked by application thread. """ super(_AsyncMailbox, self).__init__(console) self.console = console if _timeout is None: _timeout = console._reply_timeout self.expiration_date = (datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)) console._lock.acquire() try: console._async_mboxes[self.cid] = self finally: console._lock.release() # now that an async mbox has been created, wake the # console mgmt thread so it will know about the mbox expiration # date (and adjust its idle sleep period correctly) console._wake_thread() def deliver(self, msg): """ """ raise Exception("deliver() method must be provided") def expire(self): raise Exception("expire() method must be provided") def destroy(self): self.console._lock.acquire() try: if self.cid in self.console._async_mboxes: del self.console._async_mboxes[self.cid] finally: self.console._lock.release() super(_AsyncMailbox, self).destroy() class _QueryMailbox(_AsyncMailbox): """ A mailbox used for asynchronous query requests. """ def __init__(self, console, agent_name, context, target, msgkey, _timeout=None): """ Invoked by application thread. """ super(_QueryMailbox, self).__init__(console, _timeout) self.agent_name = agent_name self.target = target self.msgkey = msgkey self.context = context self.result = [] def deliver(self, reply): """ Process query response messages delivered to this mailbox. Invoked by Console Management thread only. """ done = False objects = reply.content.get(self.msgkey) if not objects: done = True else: # convert from map to native types if needed if self.target == QmfQuery.TARGET_SCHEMA_ID: for sid_map in objects: self.result.append(SchemaClassId.from_map(sid_map)) elif self.target == QmfQuery.TARGET_SCHEMA: for schema_map in objects: # extract schema id, convert based on schema type sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) if sid_map: sid = SchemaClassId.from_map(sid_map) if sid: if sid.get_type() == SchemaClassId.TYPE_DATA: schema = SchemaObjectClass.from_map(schema_map) else: schema = SchemaEventClass.from_map(schema_map) self.console._add_schema(schema) # add to schema cache self.result.append(schema) elif self.target == QmfQuery.TARGET_OBJECT: for obj_map in objects: # @todo: need the agent name - ideally from the # reply message iself. agent = self.console.get_agent(self.agent_name) if agent: obj = QmfConsoleData(map_=obj_map, agent=agent) # start fetch of schema if not known sid = obj.get_schema_class_id() if sid: self.console._prefetch_schema(sid, agent) self.result.append(obj) else: # no conversion needed. self.result += objects if done: # create workitem # logging.error("QUERY COMPLETE for %s" % str(self.context)) wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) self.console._work_q.put(wi) self.console._work_q_put = True self.destroy() def expire(self): logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % datetime.datetime.utcnow()) # send along whatever (possibly none) has been received so far wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) self.console._work_q.put(wi) self.console._work_q_put = True self.destroy() class _SchemaPrefetchMailbox(_AsyncMailbox): """ Handles responses to schema fetches made by the console. """ def __init__(self, console, schema_id, _timeout=None): """ Invoked by application thread. """ super(_SchemaPrefetchMailbox, self).__init__(console, _timeout) self.schema_id = schema_id def deliver(self, reply): """ Process schema response messages. """ done = False schemas = reply.content.get(MsgKey.schema) if schemas: for schema_map in schemas: # extract schema id, convert based on schema type sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) if sid_map: sid = SchemaClassId.from_map(sid_map) if sid: if sid.get_type() == SchemaClassId.TYPE_DATA: schema = SchemaObjectClass.from_map(schema_map) else: schema = SchemaEventClass.from_map(schema_map) self.console._add_schema(schema) # add to schema cache self.destroy() def expire(self): self.destroy() class _MethodMailbox(_AsyncMailbox): """ A mailbox used for asynchronous method requests. """ def __init__(self, console, context, _timeout=None): """ Invoked by application thread. """ super(_MethodMailbox, self).__init__(console, _timeout) self.context = context def deliver(self, reply): """ Process method response messages delivered to this mailbox. Invoked by Console Management thread only. """ _map = reply.content.get(MsgKey.method) if not _map: logging.error("Invalid method call reply message") result = None else: error=_map.get(SchemaMethod.KEY_ERROR) if error: error = QmfData.from_map(error) result = MethodResult(_error=error) else: result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) # create workitem wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result) self.console._work_q.put(wi) self.console._work_q_put = True self.destroy() def expire(self): """ The mailbox expired without receiving a reply. Invoked by the Console Management thread only. """ logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % datetime.datetime.utcnow()) # send along an empty response wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None) self.console._work_q.put(wi) self.console._work_q_put = True self.destroy() ##============================================================================== ## DATA MODEL ##============================================================================== class QmfConsoleData(QmfData): """ Console's representation of an managed QmfData instance. """ def __init__(self, map_, agent): super(QmfConsoleData, self).__init__(_map=map_, _const=True) self._agent = agent def get_timestamps(self): """ Returns a list of timestamps describing the lifecycle of the object. All timestamps are represented by the AMQP timestamp type. [0] = time of last update from Agent, [1] = creation timestamp [2] = deletion timestamp, or zero if not deleted. """ return [self._utime, self._ctime, self._dtime] def get_create_time(self): """ returns the creation timestamp """ return self._ctime def get_update_time(self): """ returns the update timestamp """ return self._utime def get_delete_time(self): """ returns the deletion timestamp, or zero if not yet deleted. """ return self._dtime def is_deleted(self): """ True if deletion timestamp not zero. """ return self._dtime != long(0) def refresh(self, _reply_handle=None, _timeout=None): """ request that the Agent update the value of this object's contents. """ if _reply_handle is not None: logging.error(" ASYNC REFRESH TBD!!!") return None assert self._agent assert self._agent._console if _timeout is None: _timeout = self._agent._console._reply_timeout # create query to agent using this objects ID query = QmfQuery.create_id_object(self.get_object_id(), self.get_schema_class_id()) obj_list = self._agent._console.do_query(self._agent, query, _timeout=_timeout) if obj_list is None or len(obj_list) != 1: return None self._update(obj_list[0]) return self def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ Invoke the named method on this object. """ assert self._agent assert self._agent._console oid = self.get_object_id() if oid is None: raise ValueError("Cannot invoke methods on unmanaged objects.") if _timeout is None: _timeout = self._agent._console._reply_timeout if _reply_handle is not None: mbox = _MethodMailbox(self._agent._console, _reply_handle) else: mbox = _SyncMailbox(self._agent._console) cid = mbox.get_address() _map = {self.KEY_OBJECT_ID:str(oid), SchemaMethod.KEY_NAME:name} sid = self.get_schema_class_id() if sid: _map[self.KEY_SCHEMA_ID] = sid.map_encode() if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args logging.debug("Sending method req to Agent (%s)" % time.time()) try: self._agent._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) mbox.destroy() return None if _reply_handle is not None: return True logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) mbox.destroy() if not replyMsg: logging.debug("Agent method req wait timed-out.") return None _map = replyMsg.content.get(MsgKey.method) if not _map: logging.error("Invalid method call reply message") return None error=_map.get(SchemaMethod.KEY_ERROR) if error: return MethodResult(_error=QmfData.from_map(error)) else: return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS)) def _update(self, newer): super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes, _tag=newer._tag, _object_id=newer._object_id, _ctime=newer._ctime, _utime=newer._utime, _dtime=newer._dtime, _schema_id=newer._schema_id, _const=True) class QmfLocalData(QmfData): """ Console's representation of an unmanaged QmfData instance. There is no remote agent associated with this instance. The Console has full control over this instance. """ def __init__(self, values, _subtypes={}, _tag=None, _object_id=None, _schema=None): # timestamp in millisec since epoch UTC ctime = long(time.time() * 1000) super(QmfLocalData, self).__init__(_values=values, _subtypes=_subtypes, _tag=_tag, _object_id=_object_id, _schema=_schema, _ctime=ctime, _utime=ctime, _const=False) class Agent(object): """ A local representation of a remote agent managed by this console. """ def __init__(self, name, console): """ @type name: string @param name: uniquely identifies this agent in the AMQP domain. """ if not isinstance(console, Console): raise TypeError("parameter must be an instance of class Console") self._name = name self._address = QmfAddress.direct(name, console._domain) self._console = console self._sender = None self._packages = {} # map of {package-name:[list of class-names], } for this agent self._subscriptions = [] # list of active standing subscriptions for this agent self._announce_timestamp = None # datetime when last announce received logging.debug( "Created Agent with address: [%s]" % self._address ) def get_name(self): return self._name def is_active(self): return self._announce_timestamp != None def _send_msg(self, msg, correlation_id=None): """ Low-level routine to asynchronously send a message to this agent. """ msg.reply_to = str(self._console._address) if correlation_id: msg.correlation_id = str(correlation_id) # TRACE #logging.error("!!! Console %s sending to agent %s (%s)" % # (self._console._name, self._name, str(msg))) self._sender.send(msg) # return handle def get_packages(self): """ Return a list of the names of all packages known to this agent. """ return self._packages.keys() def get_classes(self): """ Return a dictionary [key:class] of classes known to this agent. """ return self._packages.copy() def get_objects(self, query, kwargs={}): """ Return a list of objects that satisfy the given query. @type query: dict, or common.Query @param query: filter for requested objects @type kwargs: dict @param kwargs: ??? used to build match selector and query ??? @rtype: list @return: list of matching objects, or None. """ pass def get_object(self, query, kwargs={}): """ Get one object - query is expected to match only one object. ??? Recommended: explicit timeout param, default None ??? @type query: dict, or common.Query @param query: filter for requested objects @type kwargs: dict @param kwargs: ??? used to build match selector and query ??? @rtype: qmfConsole.ObjectProxy @return: one matching object, or none """ pass def create_subscription(self, query): """ Factory for creating standing subscriptions based on a given query. @type query: common.Query object @param query: determines the list of objects for which this subscription applies @rtype: qmfConsole.Subscription @returns: an object representing the standing subscription. """ pass def invoke_method(self, name, _in_args={}, _reply_handle=None, _timeout=None): """ Invoke the named method on this agent. """ assert self._console if _timeout is None: _timeout = self._console._reply_timeout if _reply_handle is not None: mbox = _MethodMailbox(self._console, _reply_handle) else: mbox = _SyncMailbox(self._console) cid = mbox.get_address() _map = {SchemaMethod.KEY_NAME:name} if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy() logging.debug("Sending method req to Agent (%s)" % time.time()) try: self._send_method_req(_map, cid) except SendError, e: logging.error(str(e)) mbox.destroy() return None if _reply_handle is not None: return True logging.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) mbox.destroy() if not replyMsg: logging.debug("Agent method req wait timed-out.") return None _map = replyMsg.content.get(MsgKey.method) if not _map: logging.error("Invalid method call reply message") return None return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), _error=_map.get(SchemaMethod.KEY_ERROR)) def enable_events(self): raise Exception("enable_events tbd") def disable_events(self): raise Exception("disable_events tbd") def destroy(self): raise Exception("destroy tbd") def __repr__(self): return str(self._address) def __str__(self): return self.__repr__() def _send_query(self, query, correlation_id=None): """ """ msg = Message(properties={"method":"request", "qmf.subject":make_subject(OpCode.get_query)}, content={MsgKey.query: query.map_encode()}) self._send_msg( msg, correlation_id ) def _send_method_req(self, mr_map, correlation_id=None): """ """ msg = Message(properties={"method":"request", "qmf.subject":make_subject(OpCode.method_req)}, content=mr_map) self._send_msg( msg, correlation_id ) ##============================================================================== ## METHOD CALL ##============================================================================== class MethodResult(object): def __init__(self, _out_args=None, _error=None): self._error = _error self._out_args = _out_args def succeeded(self): return self._error is None def get_exception(self): return self._error def get_arguments(self): return self._out_args def get_argument(self, name): arg = None if self._out_args: arg = self._out_args.get(name) return arg ##============================================================================== ## CONSOLE ##============================================================================== class Console(Thread): """ A Console manages communications to a collection of agents on behalf of an application. """ def __init__(self, name=None, _domain=None, notifier=None, reply_timeout = 60, # agent_timeout = 120, agent_timeout = 60, kwargs={}): """ @type name: str @param name: identifier for this console. Must be unique. @type notifier: qmfConsole.Notifier @param notifier: invoked when events arrive for processing. @type kwargs: dict @param kwargs: ??? Unused """ Thread.__init__(self) self._operational = False self._ready = Event() if not name: self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) else: self._name = str(name) self._domain = _domain self._address = QmfAddress.direct(self._name, self._domain) self._notifier = notifier self._lock = Lock() self._conn = None self._session = None # dict of "agent-direct-address":class Agent entries self._agent_map = {} self._direct_recvr = None self._announce_recvr = None self._locate_sender = None self._schema_cache = {} self._pending_schema_req = [] self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout self._next_agent_expire = None self._next_mbox_expire = None # for passing WorkItems to the application self._work_q = Queue.Queue() self._work_q_put = False # Correlation ID and mailbox storage self._correlation_id = long(time.time()) # pseudo-randomize self._post_office = {} # indexed by cid self._async_mboxes = {} # indexed by cid, used to expire them ## Old stuff below??? #self._broker_list = [] #self.impl = qmfengine.Console() #self._event = qmfengine.ConsoleEvent() ##self._cv = Condition() ##self._sync_count = 0 ##self._sync_result = None ##self._select = {} ##self._cb_cond = Condition() def destroy(self, timeout=None): """ Must be called before the Console is deleted. Frees up all resources and shuts down all background threads. @type timeout: float @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. """ logging.debug("Destroying Console...") if self._conn: self.remove_connection(self._conn, timeout) logging.debug("Console Destroyed") def add_connection(self, conn): """ Add a AMQP connection to the console. The console will setup a session over the connection. The console will then broadcast an Agent Locate Indication over the session in order to discover present agents. @type conn: qpid.messaging.Connection @param conn: the connection to the AMQP messaging infrastructure. """ if self._conn: raise Exception( "Multiple connections per Console not supported." ); self._conn = conn self._session = conn.session(name=self._name) # for messages directly addressed to me self._direct_recvr = self._session.receiver(str(self._address) + ";{create:always," " node-properties:" " {type:topic," " x-properties:" " {type:direct}}}", capacity=1) logging.debug("my direct addr=%s" % self._direct_recvr.source) self._direct_sender = self._session.sender(str(self._address.get_node()) + ";{create:always," " node-properties:" " {type:topic," " x-properties:" " {type:direct}}}") logging.debug("my direct sender=%s" % self._direct_sender.target) # for receiving "broadcast" messages from agents default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#", self._domain) self._topic_recvr = self._session.receiver(str(default_addr) + ";{create:always," " node-properties:{type:topic}}", capacity=1) logging.debug("default topic recv addr=%s" % self._topic_recvr.source) # for sending to topic subscribers topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain) self._topic_sender = self._session.sender(str(topic_addr) + ";{create:always," " node-properties:{type:topic}}") logging.debug("default topic send addr=%s" % self._topic_sender.target) # # Now that receivers are created, fire off the receive thread... # self._operational = True self.start() self._ready.wait(10) if not self._ready.isSet(): raise Exception("Console managment thread failed to start.") def remove_connection(self, conn, timeout=None): """ Remove an AMQP connection from the console. Un-does the add_connection() operation, and releases any agents and sessions associated with the connection. @type conn: qpid.messaging.Connection @param conn: connection previously added by add_connection() """ if self._conn and conn and conn != self._conn: logging.error( "Attempt to delete unknown connection: %s" % str(conn)) # tell connection thread to shutdown self._operational = False if self.isAlive(): # kick my thread to wake it up self._wake_thread() logging.debug("waiting for console receiver thread to exit") self.join(timeout) if self.isAlive(): logging.error( "Console thread '%s' is hung..." % self.getName() ) self._direct_recvr.close() self._direct_sender.close() self._topic_recvr.close() self._topic_sender.close() self._session.close() self._session = None self._conn = None logging.debug("console connection removal complete") def get_address(self): """ The AMQP address this Console is listening to. """ return self._address def destroy_agent( self, agent ): """ Undoes create. """ if not isinstance(agent, Agent): raise TypeError("agent must be an instance of class Agent") self._lock.acquire() try: if agent._name in self._agent_map: del self._agent_map[agent._name] finally: self._lock.release() def find_agent(self, name, timeout=None ): """ Given the name of a particular agent, return an instance of class Agent representing that agent. Return None if the agent does not exist. """ self._lock.acquire() try: agent = self._agent_map.get(name) if agent: return agent finally: self._lock.release() # agent not present yet - ping it with an agent_locate mbox = _SyncMailbox(self) cid = mbox.get_address() query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) msg = Message(subject="console.ind.locate." + name, properties={"method":"request", "qmf.subject":make_subject(OpCode.agent_locate)}, content={MsgKey.query: query.map_encode()}) msg.reply_to = str(self._address) msg.correlation_id = str(cid) logging.debug("Sending Agent Locate (%s)" % time.time()) # TRACE #logging.error("!!! Console %s sending agent locate (%s)" % # (self._name, str(msg))) try: self._topic_sender.send(msg) except SendError, e: logging.error(str(e)) mbox.destroy() return None if timeout is None: timeout = self._reply_timeout new_agent = None logging.debug("Waiting for response to Agent Locate (%s)" % timeout) mbox.fetch(timeout) mbox.destroy() logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: new_agent = self._agent_map.get(name) finally: self._lock.release() return new_agent def get_agents(self): """ Return the list of known agents. """ self._lock.acquire() try: agents = self._agent_map.values() finally: self._lock.release() return agents def get_agent(self, name): """ Return the named agent, else None if not currently available. """ self._lock.acquire() try: agent = self._agent_map.get(name) finally: self._lock.release() return agent def do_query(self, agent, query, _reply_handle=None, _timeout=None ): """ """ query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, QmfQuery.TARGET_SCHEMA: MsgKey.schema, QmfQuery.TARGET_OBJECT: MsgKey.data_obj, QmfQuery.TARGET_AGENT: MsgKey.agent_info} target = query.get_target() msgkey = query_keymap.get(target) if not msgkey: raise Exception("Invalid target for query: %s" % str(query)) if _reply_handle is not None: mbox = _QueryMailbox(self, agent.get_name(), _reply_handle, target, msgkey, _timeout) else: mbox = _SyncMailbox(self) cid = mbox.get_address() try: logging.debug("Sending Query to Agent (%s)" % time.time()) agent._send_query(query, cid) except SendError, e: logging.error(str(e)) mbox.destroy() return None # return now if async reply expected if _reply_handle is not None: return True if not _timeout: _timeout = self._reply_timeout logging.debug("Waiting for response to Query (%s)" % _timeout) now = datetime.datetime.utcnow() expire = now + datetime.timedelta(seconds=_timeout) response = [] while (expire > now): _timeout = timedelta_to_secs(expire - now) reply = mbox.fetch(_timeout) if not reply: logging.debug("Query wait timed-out.") break objects = reply.content.get(msgkey) if not objects: # last response is empty break # convert from map to native types if needed if target == QmfQuery.TARGET_SCHEMA_ID: for sid_map in objects: response.append(SchemaClassId.from_map(sid_map)) elif target == QmfQuery.TARGET_SCHEMA: for schema_map in objects: # extract schema id, convert based on schema type sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) if sid_map: sid = SchemaClassId.from_map(sid_map) if sid: if sid.get_type() == SchemaClassId.TYPE_DATA: schema = SchemaObjectClass.from_map(schema_map) else: schema = SchemaEventClass.from_map(schema_map) self._add_schema(schema) # add to schema cache response.append(schema) elif target == QmfQuery.TARGET_OBJECT: for obj_map in objects: obj = QmfConsoleData(map_=obj_map, agent=agent) # start fetch of schema if not known sid = obj.get_schema_class_id() if sid: self._prefetch_schema(sid, agent) response.append(obj) else: # no conversion needed. response += objects now = datetime.datetime.utcnow() mbox.destroy() return response def _wake_thread(self): """ Make the console management thread loop wakeup from its next_receiver sleep. """ logging.debug("Sending noop to wake up [%s]" % self._address) msg = Message(properties={"method":"request", "qmf.subject":make_subject(OpCode.noop)}, subject=self._name, content={"noop":"noop"}) try: self._direct_sender.send( msg, sync=True ) except SendError, e: logging.error(str(e)) def run(self): """ Console Management Thread main loop. Handles inbound messages, agent discovery, async mailbox timeouts. """ global _callback_thread self._ready.set() while self._operational: # qLen = self._work_q.qsize() while True: try: msg = self._topic_recvr.fetch(timeout=0) except Empty: break # TRACE: # logging.error("!!! Console %s: msg on %s [%s]" % # (self._name, self._topic_recvr.source, msg)) self._dispatch(msg, _direct=False) while True: try: msg = self._direct_recvr.fetch(timeout = 0) except Empty: break # TRACE #logging.error("!!! Console %s: msg on %s [%s]" % # (self._name, self._direct_recvr.source, msg)) self._dispatch(msg, _direct=True) self._expire_agents() # check for expired agents self._expire_mboxes() # check for expired async mailbox requests #if qLen == 0 and self._work_q.qsize() and self._notifier: if self._work_q_put and self._notifier: # new stuff on work queue, kick the the application... self._work_q_put = False _callback_thread = currentThread() logging.info("Calling console notifier.indication") self._notifier.indication() _callback_thread = None if self._operational: # wait for a message to arrive, or an agent # to expire, or a mailbox requrest to time out now = datetime.datetime.utcnow() next_expire = self._next_agent_expire if (self._next_mbox_expire and self._next_mbox_expire < next_expire): next_expire = self._next_mbox_expire if next_expire > now: timeout = timedelta_to_secs(next_expire - now) try: logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) xxx = self._session.next_receiver(timeout = timeout) except Empty: pass logging.debug("Shutting down Console thread") def get_objects(self, _object_id=None, _schema_id=None, _pname=None, _cname=None, _agents=None, _timeout=None): """ Retrieve objects by id or schema. By object_id: must specify schema_id or pname & cname if object defined by a schema. Undescribed objects: only object_id needed. By schema: must specify schema_id or pname & cname - all instances of objects defined by that schema are returned. """ if _agents is None: # use copy of current agent list self._lock.acquire() try: agent_list = self._agent_map.values() finally: self._lock.release() elif isinstance(_agents, Agent): agent_list = [_agents] else: agent_list = _agents # @todo validate this list! if _timeout is None: _timeout = self._reply_timeout # @todo: fix when async do_query done - query all agents at once, then # wait for replies, instead of per-agent querying.... obj_list = [] expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) for agent in agent_list: if not agent.is_active(): continue now = datetime.datetime.utcnow() if now >= expired: break if _pname is None: if _object_id: query = QmfQuery.create_id_object(_object_id, _schema_id) else: if _schema_id is not None: t_params = {QmfData.KEY_SCHEMA_ID: _schema_id} else: t_params = None query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) timeout = timedelta_to_secs(expired - now) reply = self.do_query(agent, query, _timeout=timeout) if reply: obj_list = obj_list + reply else: # looking up by package name (and maybe class name), need to # find all schema_ids in that package, then lookup object by # schema_id if _cname is not None: pred = [QmfQuery.AND, [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, [QmfQuery.QUOTE, _pname]], [QmfQuery.EQ, SchemaClassId.KEY_CLASS, [QmfQuery.QUOTE, _cname]]] else: pred = [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, [QmfQuery.QUOTE, _pname]] query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred) timeout = timedelta_to_secs(expired - now) sid_list = self.do_query(agent, query, _timeout=timeout) if sid_list: for sid in sid_list: now = datetime.datetime.utcnow() if now >= expired: break if _object_id is not None: query = QmfQuery.create_id_object(_object_id, sid) else: t_params = {QmfData.KEY_SCHEMA_ID: sid} query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params) timeout = timedelta_to_secs(expired - now) reply = self.do_query(agent, query, _timeout=timeout) if reply: obj_list = obj_list + reply if obj_list: return obj_list return None # called by run() thread ONLY # def _dispatch(self, msg, _direct=True): """ PRIVATE: Process a message received from an Agent """ logging.debug( "Message received from Agent! [%s]" % msg ) try: version,opcode = parse_subject(msg.properties.get("qmf.subject")) # @todo: deal with version mismatch!!! except: logging.error("Ignoring unrecognized message '%s'" % msg) return cmap = {}; props = {} if msg.content_type == "amqp/map": cmap = msg.content if msg.properties: props = msg.properties if opcode == OpCode.agent_ind: self._handle_agent_ind_msg( msg, cmap, version, _direct ) elif opcode == OpCode.data_ind: self._handle_data_ind_msg(msg, cmap, version, _direct) elif opcode == OpCode.event_ind: self._handle_event_ind_msg(msg, cmap, version, _direct) elif opcode == OpCode.managed_object: logging.warning("!!! managed_object TBD !!!") elif opcode == OpCode.object_ind: logging.warning("!!! object_ind TBD !!!") elif opcode == OpCode.response: self._handle_response_msg(msg, cmap, version, _direct) elif opcode == OpCode.schema_ind: logging.warning("!!! schema_ind TBD !!!") elif opcode == OpCode.noop: logging.debug("No-op msg received.") else: logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) def _handle_agent_ind_msg(self, msg, cmap, version, direct): """ Process a received agent-ind message. This message may be a response to a agent-locate, or it can be an unsolicited agent announce. """ logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) ai_map = cmap.get(MsgKey.agent_info) if not ai_map or not isinstance(ai_map, type({})): logging.warning("Bad agent-ind message received: '%s'" % msg) return name = ai_map.get("_name") if not name: logging.warning("Bad agent-ind message received: agent name missing" " '%s'" % msg) return correlated = False if msg.correlation_id: mbox = self._get_mailbox(msg.correlation_id) correlated = mbox is not None agent = None self._lock.acquire() try: agent = self._agent_map.get(name) if agent: # agent already known, just update timestamp agent._announce_timestamp = datetime.datetime.utcnow() finally: self._lock.release() if not agent: # need to create and add a new agent? matched = False if self._agent_discovery_filter: tmp = QmfData.create(values=ai_map, _object_id="agent-filter") matched = self._agent_discovery_filter.evaluate(tmp) if (correlated or matched): agent = self._create_agent(name) if not agent: return # failed to add agent agent._announce_timestamp = datetime.datetime.utcnow() if matched: # unsolicited, but newly discovered logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) self._work_q.put(wi) self._work_q_put = True if correlated: # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) def _handle_data_ind_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time())) mbox = self._get_mailbox(msg.correlation_id) if not mbox: logging.debug("Data indicate received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) def _handle_response_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ # @todo code replication - clean me. logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) mbox = self._get_mailbox(msg.correlation_id) if not mbox: logging.debug("Response msg received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters logging.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) def _handle_event_ind_msg(self, msg, cmap, version, _direct): ei_map = cmap.get(MsgKey.event) if not ei_map or not isinstance(ei_map, type({})): logging.warning("Bad event indication message received: '%s'" % msg) return aname = ei_map.get("_name") emap = ei_map.get("_event") if not aname: logging.debug("No '_name' field in event indication message.") return if not emap: logging.debug("No '_event' field in event indication message.") return agent = None self._lock.acquire() try: agent = self._agent_map.get(aname) finally: self._lock.release() if not agent: logging.debug("Agent '%s' not known." % aname) return try: # @todo: schema??? event = QmfEvent.from_map(emap) except TypeError: logging.debug("Invalid QmfEvent map received: %s" % str(emap)) return # @todo: schema? Need to fetch it, but not from this thread! # This thread can not pend on a request. logging.debug("Publishing event received from agent %s" % aname) wi = WorkItem(WorkItem.EVENT_RECEIVED, None, {"agent":agent, "event":event}) self._work_q.put(wi) self._work_q_put = True def _expire_mboxes(self): """ Check all async mailboxes for outstanding requests that have expired. """ now = datetime.datetime.utcnow() if self._next_mbox_expire and now < self._next_mbox_expire: return expired_mboxes = [] self._next_mbox_expire = None self._lock.acquire() try: for mbox in self._async_mboxes.itervalues(): if now >= mbox.expiration_date: expired_mboxes.append(mbox) else: if (self._next_mbox_expire is None or mbox.expiration_date < self._next_mbox_expire): self._next_mbox_expire = mbox.expiration_date for mbox in expired_mboxes: del self._async_mboxes[mbox.cid] finally: self._lock.release() for mbox in expired_mboxes: # note: expire() may deallocate the mbox, so don't touch # it further. mbox.expire() def _expire_agents(self): """ Check for expired agents and issue notifications when they expire. """ now = datetime.datetime.utcnow() if self._next_agent_expire and now < self._next_agent_expire: return lifetime_delta = datetime.timedelta(seconds = self._agent_timeout) next_expire_delta = lifetime_delta self._lock.acquire() try: logging.debug("!!! expiring agents '%s'" % now) for agent in self._agent_map.itervalues(): if agent._announce_timestamp: agent_deathtime = agent._announce_timestamp + lifetime_delta if agent_deathtime <= now: logging.debug("AGENT_DELETED for %s" % agent) agent._announce_timestamp = None wi = WorkItem(WorkItem.AGENT_DELETED, None, {"agent":agent}) # @todo: remove agent from self._agent_map self._work_q.put(wi) self._work_q_put = True else: if (agent_deathtime - now) < next_expire_delta: next_expire_delta = agent_deathtime - now self._next_agent_expire = now + next_expire_delta logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) finally: self._lock.release() def _create_agent( self, name ): """ Factory to create/retrieve an agent for this console """ logging.debug("creating agent %s" % name) self._lock.acquire() try: agent = self._agent_map.get(name) if agent: return agent agent = Agent(name, self) try: agent._sender = self._session.sender(str(agent._address) + ";{create:always," " node-properties:" " {type:topic," " x-properties:" " {type:direct}}}") except: logging.warning("Unable to create sender for %s" % name) return None logging.debug("created agent sender %s" % agent._sender.target) self._agent_map[name] = agent finally: self._lock.release() # new agent - query for its schema database for # seeding the schema cache (@todo) # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None}) # agent._sendQuery( query ) return agent def enable_agent_discovery(self, _query=None): """ Called to enable the asynchronous Agent Discovery process. Once enabled, AGENT_ADD work items can arrive on the WorkQueue. """ # @todo: fix - take predicate only, not entire query! if _query is not None: if (not isinstance(_query, QmfQuery) or _query.get_target() != QmfQuery.TARGET_AGENT): raise TypeError("Type QmfQuery with target == TARGET_AGENT expected") self._agent_discovery_filter = _query else: # create a match-all agent query (no predicate) self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) def disable_agent_discovery(self): """ Called to disable the async Agent Discovery process enabled by calling enableAgentDiscovery() """ self._agent_discovery_filter = None def get_workitem_count(self): """ Returns the count of pending WorkItems that can be retrieved. """ return self._work_q.qsize() def get_next_workitem(self, timeout=None): """ Returns the next pending work item, or None if none available. @todo: subclass and return an Empty event instead. """ try: wi = self._work_q.get(True, timeout) except Queue.Empty: return None return wi def release_workitem(self, wi): """ Return a WorkItem to the Console when it is no longer needed. @todo: call Queue.task_done() - only 2.5+ @type wi: class qmfConsole.WorkItem @param wi: work item object to return. """ pass def _add_schema(self, schema): """ @todo """ if not isinstance(schema, SchemaClass): raise TypeError("SchemaClass type expected") self._lock.acquire() try: sid = schema.get_class_id() if not self._schema_cache.has_key(sid): self._schema_cache[sid] = schema if sid in self._pending_schema_req: self._pending_schema_req.remove(sid) finally: self._lock.release() def _prefetch_schema(self, schema_id, agent): """ Send an async request for the schema identified by schema_id if the schema is not available in the cache. """ need_fetch = False self._lock.acquire() try: if ((not self._schema_cache.has_key(schema_id)) and schema_id not in self._pending_schema_req): self._pending_schema_req.append(schema_id) need_fetch = True finally: self._lock.release() if need_fetch: mbox = _SchemaPrefetchMailbox(self, schema_id) query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) logging.debug("Sending Schema Query to Agent (%s)" % time.time()) try: agent._send_query(query, mbox.get_address()) except SendError, e: logging.error(str(e)) mbox.destroy() self._lock.acquire() try: self._pending_schema_req.remove(schema_id) finally: self._lock.release() def _fetch_schema(self, schema_id, _agent=None, _timeout=None): """ Find the schema identified by schema_id. If not in the cache, ask the agent for it. """ if not isinstance(schema_id, SchemaClassId): raise TypeError("SchemaClassId type expected") self._lock.acquire() try: schema = self._schema_cache.get(schema_id) if schema: return schema finally: self._lock.release() if _agent is None: return None # note: do_query will add the new schema to the cache automatically. slist = self.do_query(_agent, QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), _timeout=_timeout) if slist: return slist[0] else: return None def _add_mailbox(self, mbox): """ Add a mailbox to the post office, and assign it a unique address. """ self._lock.acquire() try: mbox.cid = self._correlation_id self._correlation_id += 1 self._post_office[mbox.cid] = mbox finally: self._lock.release() def _get_mailbox(self, mid): try: mid = long(mid) except TypeError: logging.error("Invalid mailbox id: %s" % str(mid)) return None self._lock.acquire() try: return self._post_office.get(mid) finally: self._lock.release() def _remove_mailbox(self, mid): """ Remove a mailbox and its address from the post office """ try: mid = long(mid) except TypeError: logging.error("Invalid mailbox id: %s" % str(mid)) return None self._lock.acquire() try: if mid in self._post_office: del self._post_office[mid] finally: self._lock.release() def __repr__(self): return str(self._address) # def get_packages(self): # plist = [] # for i in range(self.impl.packageCount()): # plist.append(self.impl.getPackageName(i)) # return plist # def get_classes(self, package, kind=CLASS_OBJECT): # clist = [] # for i in range(self.impl.classCount(package)): # key = self.impl.getClass(package, i) # class_kind = self.impl.getClassKind(key) # if class_kind == kind: # if kind == CLASS_OBJECT: # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) # elif kind == CLASS_EVENT: # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) # return clist # def bind_package(self, package): # return self.impl.bindPackage(package) # def bind_class(self, kwargs = {}): # if "key" in kwargs: # self.impl.bindClass(kwargs["key"]) # elif "package" in kwargs: # package = kwargs["package"] # if "class" in kwargs: # self.impl.bindClass(package, kwargs["class"]) # else: # self.impl.bindClass(package) # else: # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") # def get_agents(self, broker=None): # blist = [] # if broker: # blist.append(broker) # else: # self._cv.acquire() # try: # # copy while holding lock # blist = self._broker_list[:] # finally: # self._cv.release() # agents = [] # for b in blist: # for idx in range(b.impl.agentCount()): # agents.append(AgentProxy(b.impl.getAgent(idx), b)) # return agents # def get_objects(self, query, kwargs = {}): # timeout = 30 # agent = None # temp_args = kwargs.copy() # if type(query) == type({}): # temp_args.update(query) # if "_timeout" in temp_args: # timeout = temp_args["_timeout"] # temp_args.pop("_timeout") # if "_agent" in temp_args: # agent = temp_args["_agent"] # temp_args.pop("_agent") # if type(query) == type({}): # query = Query(temp_args) # self._select = {} # for k in temp_args.iterkeys(): # if type(k) == str: # self._select[k] = temp_args[k] # self._cv.acquire() # try: # self._sync_count = 1 # self._sync_result = [] # broker = self._broker_list[0] # broker.send_query(query.impl, None, agent) # self._cv.wait(timeout) # if self._sync_count == 1: # raise Exception("Timed out: waiting for query response") # finally: # self._cv.release() # return self._sync_result # def get_object(self, query, kwargs = {}): # ''' # Return one and only one object or None. # ''' # objs = objects(query, kwargs) # if len(objs) == 1: # return objs[0] # else: # return None # def first_object(self, query, kwargs = {}): # ''' # Return the first of potentially many objects. # ''' # objs = objects(query, kwargs) # if objs: # return objs[0] # else: # return None # # Check the object against select to check for a match # def _select_match(self, object): # schema_props = object.properties() # for key in self._select.iterkeys(): # for prop in schema_props: # if key == p[0].name() and self._select[key] != p[1]: # return False # return True # def _get_result(self, list, context): # ''' # Called by Broker proxy to return the result of a query. # ''' # self._cv.acquire() # try: # for item in list: # if self._select_match(item): # self._sync_result.append(item) # self._sync_count -= 1 # self._cv.notify() # finally: # self._cv.release() # def start_sync(self, query): pass # def touch_sync(self, sync): pass # def end_sync(self, sync): pass # def start_console_events(self): # self._cb_cond.acquire() # try: # self._cb_cond.notify() # finally: # self._cb_cond.release() # def _do_console_events(self): # ''' # Called by the Console thread to poll for events. Passes the events # onto the ConsoleHandler associated with this Console. Is called # periodically, but can also be kicked by Console.start_console_events(). # ''' # count = 0 # valid = self.impl.getEvent(self._event) # while valid: # count += 1 # try: # if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: # logging.debug("Console Event AGENT_ADDED received") # if self._handler: # self._handler.agent_added(AgentProxy(self._event.agent, None)) # elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: # logging.debug("Console Event AGENT_DELETED received") # if self._handler: # self._handler.agent_deleted(AgentProxy(self._event.agent, None)) # elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: # logging.debug("Console Event NEW_PACKAGE received") # if self._handler: # self._handler.new_package(self._event.name) # elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: # logging.debug("Console Event NEW_CLASS received") # if self._handler: # self._handler.new_class(SchemaClassKey(self._event.classKey)) # elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: # logging.debug("Console Event OBJECT_UPDATE received") # if self._handler: # self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), # self._event.hasProps, self._event.hasStats) # elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: # logging.debug("Console Event EVENT_RECEIVED received") # elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: # logging.debug("Console Event AGENT_HEARTBEAT received") # if self._handler: # self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) # elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: # logging.debug("Console Event METHOD_RESPONSE received") # else: # logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) # except e: # print "Exception caught in callback thread:", e # self.impl.popEvent() # valid = self.impl.getEvent(self._event) # return count # class Broker(ConnectionHandler): # # attr_reader :impl :conn, :console, :broker_bank # def __init__(self, console, conn): # self.broker_bank = 1 # self.console = console # self.conn = conn # self._session = None # self._cv = Condition() # self._stable = None # self._event = qmfengine.BrokerEvent() # self._xmtMessage = qmfengine.Message() # self.impl = qmfengine.BrokerProxy(self.console.impl) # self.console.impl.addConnection(self.impl, self) # self.conn.add_conn_handler(self) # self._operational = True # def shutdown(self): # logging.debug("broker.shutdown() called.") # self.console.impl.delConnection(self.impl) # self.conn.del_conn_handler(self) # if self._session: # self.impl.sessionClosed() # logging.debug("broker.shutdown() sessionClosed done.") # self._session.destroy() # logging.debug("broker.shutdown() session destroy done.") # self._session = None # self._operational = False # logging.debug("broker.shutdown() done.") # def wait_for_stable(self, timeout = None): # self._cv.acquire() # try: # if self._stable: # return # if timeout: # self._cv.wait(timeout) # if not self._stable: # raise Exception("Timed out: waiting for broker connection to become stable") # else: # while not self._stable: # self._cv.wait() # finally: # self._cv.release() # def send_query(self, query, ctx, agent): # agent_impl = None # if agent: # agent_impl = agent.impl # self.impl.sendQuery(query, ctx, agent_impl) # self.conn.kick() # def _do_broker_events(self): # count = 0 # valid = self.impl.getEvent(self._event) # while valid: # count += 1 # if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: # logging.debug("Broker Event BROKER_INFO received"); # elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: # logging.debug("Broker Event DECLARE_QUEUE received"); # self.conn.impl.declareQueue(self._session.handle, self._event.name) # elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: # logging.debug("Broker Event DELETE_QUEUE received"); # self.conn.impl.deleteQueue(self._session.handle, self._event.name) # elif self._event.kind == qmfengine.BrokerEvent.BIND: # logging.debug("Broker Event BIND received"); # self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) # elif self._event.kind == qmfengine.BrokerEvent.UNBIND: # logging.debug("Broker Event UNBIND received"); # self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) # elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: # logging.debug("Broker Event SETUP_COMPLETE received"); # self.impl.startProtocol() # elif self._event.kind == qmfengine.BrokerEvent.STABLE: # logging.debug("Broker Event STABLE received"); # self._cv.acquire() # try: # self._stable = True # self._cv.notify() # finally: # self._cv.release() # elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: # result = [] # for idx in range(self._event.queryResponse.getObjectCount()): # result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) # self.console._get_result(result, self._event.context) # elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: # obj = self._event.context # obj._method_result(MethodResponse(self._event.methodResponse())) # self.impl.popEvent() # valid = self.impl.getEvent(self._event) # return count # def _do_broker_messages(self): # count = 0 # valid = self.impl.getXmtMessage(self._xmtMessage) # while valid: # count += 1 # logging.debug("Broker: sending msg on connection") # self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) # self.impl.popXmt() # valid = self.impl.getXmtMessage(self._xmtMessage) # return count # def _do_events(self): # while True: # self.console.start_console_events() # bcnt = self._do_broker_events() # mcnt = self._do_broker_messages() # if bcnt == 0 and mcnt == 0: # break; # def conn_event_connected(self): # logging.debug("Broker: Connection event CONNECTED") # self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) # self.impl.sessionOpened(self._session.handle) # self._do_events() # def conn_event_disconnected(self, error): # logging.debug("Broker: Connection event DISCONNECTED") # pass # def conn_event_visit(self): # self._do_events() # def sess_event_session_closed(self, context, error): # logging.debug("Broker: Session event CLOSED") # self.impl.sessionClosed() # def sess_event_recv(self, context, message): # logging.debug("Broker: Session event MSG_RECV") # if not self._operational: # logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) # self.impl.handleRcvMessage(message) # self._do_events() ################################################################################ ################################################################################ ################################################################################ ################################################################################ # TEMPORARY TEST CODE - TO BE DELETED ################################################################################ ################################################################################ ################################################################################ ################################################################################ if __name__ == '__main__': # temp test code from common import (qmfTypes, SchemaProperty) logging.getLogger().setLevel(logging.INFO) logging.info( "************* Creating Async Console **************" ) class MyNotifier(Notifier): def __init__(self, context): self._myContext = context self.WorkAvailable = False def indication(self): print("Indication received! context=%d" % self._myContext) self.WorkAvailable = True _noteMe = MyNotifier( 666 ) _myConsole = Console(notifier=_noteMe) _myConsole.enable_agent_discovery() logging.info("Waiting...") logging.info( "Destroying console:" ) _myConsole.destroy( 10 ) logging.info( "******** Messing around with Schema ********" ) _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass", stype=SchemaClassId.TYPE_EVENT), _desc="A typical event schema", _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8, kwargs = {"min":0, "max":100, "unit":"seconds", "desc":"sleep value"}), "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR, kwargs={"maxlen":100, "desc":"a string argument"})}) print("_sec=%s" % _sec.get_class_id()) print("_sec.gePropertyCount()=%d" % _sec.get_property_count() ) print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') ) print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') ) try: print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') ) except: pass print("_sec.getProperties()='%s'" % _sec.get_properties()) print("Adding another argument") _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL, kwargs={"dir":"IO", "desc":"a boolean argument"}) _sec.add_property('Argument-3', _arg3) print("_sec=%s" % _sec.get_class_id()) print("_sec.getPropertyCount()=%d" % _sec.get_property_count() ) print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') ) print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') ) print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') ) print("_arg3.mapEncode()='%s'" % _arg3.map_encode() ) _secmap = _sec.map_encode() print("_sec.mapEncode()='%s'" % _secmap ) _sec2 = SchemaEventClass( _map=_secmap ) print("_sec=%s" % _sec.get_class_id()) print("_sec2=%s" % _sec2.get_class_id()) _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage", "_class_name": "myOtherClass", "_type": "_data"}, "_desc": "A test data object", "_values": {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, "access": "RO", "index": True, "unit": "degrees"}, "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, "access": "RW", "index": True, "desc": "The Second Property(tm)", "unit": "radians"}, "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, "unit": "seconds", "desc": "time until I retire"}, "meth1": {"_desc": "A test method", "_arguments": {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, "desc": "an argument 1", "dir": "I"}, "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, "dir": "IO", "desc": "some weird boolean"}}}, "meth2": {"_desc": "A test method", "_arguments": {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, "desc": "an 'nuther argument", "dir": "I"}}}}, "_subtypes": {"prop1":"qmfProperty", "prop2":"qmfProperty", "statistics":"qmfProperty", "meth1":"qmfMethod", "meth2":"qmfMethod"}, "_primary_key_names": ["prop2", "prop1"]}) print("_soc='%s'" % _soc) print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names()) print("_soc.getPropertyCount='%d'" % _soc.get_property_count()) print("_soc.getProperties='%s'" % _soc.get_properties()) print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2')) print("_soc.getMethodCount='%d'" % _soc.get_method_count()) print("_soc.getMethods='%s'" % _soc.get_methods()) print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2')) _socmap = _soc.map_encode() print("_socmap='%s'" % _socmap) _soc2 = SchemaObjectClass( _map=_socmap ) print("_soc='%s'" % _soc) print("_soc2='%s'" % _soc2) if _soc2.get_class_id() == _soc.get_class_id(): print("soc and soc2 are the same schema") logging.info( "******** Messing around with ObjectIds ********" ) qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) print("qd='%s':" % qd) print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) print("qd map='%s'" % qd.map_encode()) print("qd getProperty('prop4')='%s'" % qd.get_value("prop4")) qd.set_value("prop4", 4, "A test property called 4") print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4")) qd.prop4 = 9 print("qd.prop4 = 9 ='%s'" % qd.prop4) qd["prop4"] = 11 print("qd[prop4] = 11 ='%s'" % qd["prop4"]) print("qd.mapEncode()='%s'" % qd.map_encode()) _qd2 = QmfData( _map = qd.map_encode() ) print("_qd2.mapEncode()='%s'" % _qd2.map_encode()) _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666, "prop2": 0}}, agent="some agent name?", _schema = _soc) print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode()) _qmfDesc1._set_schema( _soc ) print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2")) print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id()) print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id()) _qmfDescMap = _qmfDesc1.map_encode() print("_qmfDescMap='%s'" % _qmfDescMap) _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc ) print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode()) print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2")) print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) logging.info( "******** Messing around with QmfEvents ********" ) _qmfevent1 = QmfEvent( _timestamp = 1111, _schema = _sec, _values = {"Argument-1": 77, "Argument-3": True, "Argument-2": "a string"}) print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode()) print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp()) _qmfevent1Map = _qmfevent1.map_encode() _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec) print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) logging.info( "******** Messing around with Queries ********" ) _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, [QmfQuery.AND, [QmfQuery.EQ, "vendor", [QmfQuery.QUOTE, "AVendor"]], [QmfQuery.EQ, [QmfQuery.QUOTE, "SomeProduct"], "product"], [QmfQuery.EQ, [QmfQuery.UNQUOTE, "name"], [QmfQuery.QUOTE, "Thingy"]], [QmfQuery.OR, [QmfQuery.LE, "temperature", -10], [QmfQuery.FALSE], [QmfQuery.EXISTS, "namey"]]]) print("_q1.mapEncode() = [%s]" % _q1.map_encode())