summaryrefslogtreecommitdiff
path: root/python/qmf2/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r--python/qmf2/console.py2295
1 files changed, 0 insertions, 2295 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py
deleted file mode 100644
index c13cf70755..0000000000
--- a/python/qmf2/console.py
+++ /dev/null
@@ -1,2295 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import sys
-import os
-import logging
-import platform
-import time
-import datetime
-import Queue
-from threading import Thread, Event
-from threading import Lock
-from threading import currentThread
-from threading import Condition
-
-from qpid.messaging import Connection, Message, Empty, SendError
-
-from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
- MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId,
- SchemaEventClass, SchemaObjectClass, WorkItem,
- SchemaMethod, QmfEvent, timedelta_to_secs)
-
-
-# global flag that indicates which thread (if any) is
-# running the console notifier callback
-_callback_thread=None
-
-
-
-
-##==============================================================================
-## Console Transaction Management
-##
-## At any given time, a console application may have multiple outstanding
-## message transactions with agents. The following objects allow the console
-## to track these outstanding transactions.
-##==============================================================================
-
-
-class _Mailbox(object):
- """
- Virtual base class for all Mailbox-like objects.
- """
- def __init__(self, console):
- self.console = console
- self.cid = 0
- self.console._add_mailbox(self)
-
- def get_address(self):
- return self.cid
-
- def deliver(self, data):
- """
- Invoked by Console Management thread when a message arrives for
- this mailbox.
- """
- raise Exception("_Mailbox deliver() method must be provided")
-
- def destroy(self):
- """
- Release the mailbox. Once called, the mailbox should no longer be
- referenced.
- """
- self.console._remove_mailbox(self.cid)
-
-
-class _SyncMailbox(_Mailbox):
- """
- A simple mailbox that allows a consumer to wait for delivery of data.
- """
- def __init__(self, console):
- """
- Invoked by application thread.
- """
- super(_SyncMailbox, self).__init__(console)
- self._cv = Condition()
- self._data = []
- self._waiting = False
-
- def deliver(self, data):
- """
- Drop data into the mailbox, waking any waiters if necessary.
- Invoked by Console Management thread only.
- """
- self._cv.acquire()
- try:
- self._data.append(data)
- # if was empty, notify waiters
- if len(self._data) == 1:
- self._cv.notify()
- finally:
- self._cv.release()
-
- def fetch(self, timeout=None):
- """
- Get one data item from a mailbox, with timeout.
- Invoked by application thread.
- """
- self._cv.acquire()
- try:
- if len(self._data) == 0:
- self._cv.wait(timeout)
- if len(self._data):
- return self._data.pop(0)
- return None
- finally:
- self._cv.release()
-
-
-class _AsyncMailbox(_Mailbox):
- """
- A Mailbox for asynchronous delivery, with a timeout value.
- """
- def __init__(self, console,
- _timeout=None):
- """
- Invoked by application thread.
- """
- super(_AsyncMailbox, self).__init__(console)
- self.console = console
-
- if _timeout is None:
- _timeout = console._reply_timeout
- self.expiration_date = (datetime.datetime.utcnow() +
- datetime.timedelta(seconds=_timeout))
- console._lock.acquire()
- try:
- console._async_mboxes[self.cid] = self
- finally:
- console._lock.release()
-
- # now that an async mbox has been created, wake the
- # console mgmt thread so it will know about the mbox expiration
- # date (and adjust its idle sleep period correctly)
-
- console._wake_thread()
-
- def deliver(self, msg):
- """
- """
- raise Exception("deliver() method must be provided")
-
- def expire(self):
- raise Exception("expire() method must be provided")
-
-
- def destroy(self):
- self.console._lock.acquire()
- try:
- if self.cid in self.console._async_mboxes:
- del self.console._async_mboxes[self.cid]
- finally:
- self.console._lock.release()
- super(_AsyncMailbox, self).destroy()
-
-
-
-class _QueryMailbox(_AsyncMailbox):
- """
- A mailbox used for asynchronous query requests.
- """
- def __init__(self, console,
- agent_name,
- context,
- target, msgkey,
- _timeout=None):
- """
- Invoked by application thread.
- """
- super(_QueryMailbox, self).__init__(console,
- _timeout)
- self.agent_name = agent_name
- self.target = target
- self.msgkey = msgkey
- self.context = context
- self.result = []
-
- def deliver(self, reply):
- """
- Process query response messages delivered to this mailbox.
- Invoked by Console Management thread only.
- """
- done = False
- objects = reply.content.get(self.msgkey)
- if not objects:
- done = True
- else:
- # convert from map to native types if needed
- if self.target == QmfQuery.TARGET_SCHEMA_ID:
- for sid_map in objects:
- self.result.append(SchemaClassId.from_map(sid_map))
-
- elif self.target == QmfQuery.TARGET_SCHEMA:
- for schema_map in objects:
- # extract schema id, convert based on schema type
- sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
- if sid_map:
- sid = SchemaClassId.from_map(sid_map)
- if sid:
- if sid.get_type() == SchemaClassId.TYPE_DATA:
- schema = SchemaObjectClass.from_map(schema_map)
- else:
- schema = SchemaEventClass.from_map(schema_map)
- self.console._add_schema(schema) # add to schema cache
- self.result.append(schema)
-
- elif self.target == QmfQuery.TARGET_OBJECT:
- for obj_map in objects:
- # @todo: need the agent name - ideally from the
- # reply message iself.
- agent = self.console.get_agent(self.agent_name)
- if agent:
- obj = QmfConsoleData(map_=obj_map, agent=agent)
- # start fetch of schema if not known
- sid = obj.get_schema_class_id()
- if sid:
- self.console._prefetch_schema(sid, agent)
- self.result.append(obj)
-
-
- else:
- # no conversion needed.
- self.result += objects
-
- if done:
- # create workitem
- # logging.error("QUERY COMPLETE for %s" % str(self.context))
- wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
- self.console._work_q.put(wi)
- self.console._work_q_put = True
-
- self.destroy()
-
-
- def expire(self):
- logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
- datetime.datetime.utcnow())
- # send along whatever (possibly none) has been received so far
- wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
- self.console._work_q.put(wi)
- self.console._work_q_put = True
-
- self.destroy()
-
-
-
-class _SchemaPrefetchMailbox(_AsyncMailbox):
- """
- Handles responses to schema fetches made by the console.
- """
- def __init__(self, console,
- schema_id,
- _timeout=None):
- """
- Invoked by application thread.
- """
- super(_SchemaPrefetchMailbox, self).__init__(console,
- _timeout)
- self.schema_id = schema_id
-
- def deliver(self, reply):
- """
- Process schema response messages.
- """
- done = False
- schemas = reply.content.get(MsgKey.schema)
- if schemas:
- for schema_map in schemas:
- # extract schema id, convert based on schema type
- sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
- if sid_map:
- sid = SchemaClassId.from_map(sid_map)
- if sid:
- if sid.get_type() == SchemaClassId.TYPE_DATA:
- schema = SchemaObjectClass.from_map(schema_map)
- else:
- schema = SchemaEventClass.from_map(schema_map)
- self.console._add_schema(schema) # add to schema cache
- self.destroy()
-
-
- def expire(self):
- self.destroy()
-
-
-
-class _MethodMailbox(_AsyncMailbox):
- """
- A mailbox used for asynchronous method requests.
- """
- def __init__(self, console,
- context,
- _timeout=None):
- """
- Invoked by application thread.
- """
- super(_MethodMailbox, self).__init__(console,
- _timeout)
- self.context = context
-
- def deliver(self, reply):
- """
- Process method response messages delivered to this mailbox.
- Invoked by Console Management thread only.
- """
-
- _map = reply.content.get(MsgKey.method)
- if not _map:
- logging.error("Invalid method call reply message")
- result = None
- else:
- error=_map.get(SchemaMethod.KEY_ERROR)
- if error:
- error = QmfData.from_map(error)
- result = MethodResult(_error=error)
- else:
- result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
-
- # create workitem
- wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result)
- self.console._work_q.put(wi)
- self.console._work_q_put = True
-
- self.destroy()
-
-
- def expire(self):
- """
- The mailbox expired without receiving a reply.
- Invoked by the Console Management thread only.
- """
- logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
- datetime.datetime.utcnow())
- # send along an empty response
- wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None)
- self.console._work_q.put(wi)
- self.console._work_q_put = True
-
- self.destroy()
-
-
-
-##==============================================================================
-## DATA MODEL
-##==============================================================================
-
-
-class QmfConsoleData(QmfData):
- """
- Console's representation of an managed QmfData instance.
- """
- def __init__(self, map_, agent):
- super(QmfConsoleData, self).__init__(_map=map_,
- _const=True)
- self._agent = agent
-
- def get_timestamps(self):
- """
- Returns a list of timestamps describing the lifecycle of
- the object. All timestamps are represented by the AMQP
- timestamp type. [0] = time of last update from Agent,
- [1] = creation timestamp
- [2] = deletion timestamp, or zero if not
- deleted.
- """
- return [self._utime, self._ctime, self._dtime]
-
- def get_create_time(self):
- """
- returns the creation timestamp
- """
- return self._ctime
-
- def get_update_time(self):
- """
- returns the update timestamp
- """
- return self._utime
-
- def get_delete_time(self):
- """
- returns the deletion timestamp, or zero if not yet deleted.
- """
- return self._dtime
-
- def is_deleted(self):
- """
- True if deletion timestamp not zero.
- """
- return self._dtime != long(0)
-
- def refresh(self, _reply_handle=None, _timeout=None):
- """
- request that the Agent update the value of this object's
- contents.
- """
- if _reply_handle is not None:
- logging.error(" ASYNC REFRESH TBD!!!")
- return None
-
- assert self._agent
- assert self._agent._console
-
- if _timeout is None:
- _timeout = self._agent._console._reply_timeout
-
- # create query to agent using this objects ID
- query = QmfQuery.create_id_object(self.get_object_id(),
- self.get_schema_class_id())
- obj_list = self._agent._console.do_query(self._agent, query,
- _timeout=_timeout)
- if obj_list is None or len(obj_list) != 1:
- return None
-
- self._update(obj_list[0])
- return self
-
-
- def invoke_method(self, name, _in_args={}, _reply_handle=None,
- _timeout=None):
- """
- Invoke the named method on this object.
- """
- assert self._agent
- assert self._agent._console
-
- oid = self.get_object_id()
- if oid is None:
- raise ValueError("Cannot invoke methods on unmanaged objects.")
-
- if _timeout is None:
- _timeout = self._agent._console._reply_timeout
-
- if _reply_handle is not None:
- mbox = _MethodMailbox(self._agent._console,
- _reply_handle)
- else:
- mbox = _SyncMailbox(self._agent._console)
- cid = mbox.get_address()
-
- _map = {self.KEY_OBJECT_ID:str(oid),
- SchemaMethod.KEY_NAME:name}
-
- sid = self.get_schema_class_id()
- if sid:
- _map[self.KEY_SCHEMA_ID] = sid.map_encode()
- if _in_args:
- _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
-
- logging.debug("Sending method req to Agent (%s)" % time.time())
- try:
- self._agent._send_method_req(_map, cid)
- except SendError, e:
- logging.error(str(e))
- mbox.destroy()
- return None
-
- if _reply_handle is not None:
- return True
-
- logging.debug("Waiting for response to method req (%s)" % _timeout)
- replyMsg = mbox.fetch(_timeout)
- mbox.destroy()
-
- if not replyMsg:
- logging.debug("Agent method req wait timed-out.")
- return None
-
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
- logging.error("Invalid method call reply message")
- return None
-
- error=_map.get(SchemaMethod.KEY_ERROR)
- if error:
- return MethodResult(_error=QmfData.from_map(error))
- else:
- return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
-
- def _update(self, newer):
- super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes,
- _tag=newer._tag, _object_id=newer._object_id,
- _ctime=newer._ctime, _utime=newer._utime,
- _dtime=newer._dtime,
- _schema_id=newer._schema_id, _const=True)
-
-class QmfLocalData(QmfData):
- """
- Console's representation of an unmanaged QmfData instance. There
- is no remote agent associated with this instance. The Console has
- full control over this instance.
- """
- def __init__(self, values, _subtypes={}, _tag=None, _object_id=None,
- _schema=None):
- # timestamp in millisec since epoch UTC
- ctime = long(time.time() * 1000)
- super(QmfLocalData, self).__init__(_values=values,
- _subtypes=_subtypes, _tag=_tag,
- _object_id=_object_id,
- _schema=_schema, _ctime=ctime,
- _utime=ctime, _const=False)
-
-
-class Agent(object):
- """
- A local representation of a remote agent managed by this console.
- """
- def __init__(self, name, console):
- """
- @type name: string
- @param name: uniquely identifies this agent in the AMQP domain.
- """
-
- if not isinstance(console, Console):
- raise TypeError("parameter must be an instance of class Console")
-
- self._name = name
- self._address = QmfAddress.direct(name, console._domain)
- self._console = console
- self._sender = None
- self._packages = {} # map of {package-name:[list of class-names], } for this agent
- self._subscriptions = [] # list of active standing subscriptions for this agent
- self._announce_timestamp = None # datetime when last announce received
- logging.debug( "Created Agent with address: [%s]" % self._address )
-
-
- def get_name(self):
- return self._name
-
- def is_active(self):
- return self._announce_timestamp != None
-
- def _send_msg(self, msg, correlation_id=None):
- """
- Low-level routine to asynchronously send a message to this agent.
- """
- msg.reply_to = str(self._console._address)
- if correlation_id:
- msg.correlation_id = str(correlation_id)
- # TRACE
- #logging.error("!!! Console %s sending to agent %s (%s)" %
- # (self._console._name, self._name, str(msg)))
- self._sender.send(msg)
- # return handle
-
- def get_packages(self):
- """
- Return a list of the names of all packages known to this agent.
- """
- return self._packages.keys()
-
- def get_classes(self):
- """
- Return a dictionary [key:class] of classes known to this agent.
- """
- return self._packages.copy()
-
- def get_objects(self, query, kwargs={}):
- """
- Return a list of objects that satisfy the given query.
-
- @type query: dict, or common.Query
- @param query: filter for requested objects
- @type kwargs: dict
- @param kwargs: ??? used to build match selector and query ???
- @rtype: list
- @return: list of matching objects, or None.
- """
- pass
-
- def get_object(self, query, kwargs={}):
- """
- Get one object - query is expected to match only one object.
- ??? Recommended: explicit timeout param, default None ???
-
- @type query: dict, or common.Query
- @param query: filter for requested objects
- @type kwargs: dict
- @param kwargs: ??? used to build match selector and query ???
- @rtype: qmfConsole.ObjectProxy
- @return: one matching object, or none
- """
- pass
-
-
- def create_subscription(self, query):
- """
- Factory for creating standing subscriptions based on a given query.
-
- @type query: common.Query object
- @param query: determines the list of objects for which this subscription applies
- @rtype: qmfConsole.Subscription
- @returns: an object representing the standing subscription.
- """
- pass
-
-
- def invoke_method(self, name, _in_args={}, _reply_handle=None,
- _timeout=None):
- """
- Invoke the named method on this agent.
- """
- assert self._console
-
- if _timeout is None:
- _timeout = self._console._reply_timeout
-
- if _reply_handle is not None:
- mbox = _MethodMailbox(self._console,
- _reply_handle)
- else:
- mbox = _SyncMailbox(self._console)
- cid = mbox.get_address()
-
- _map = {SchemaMethod.KEY_NAME:name}
- if _in_args:
- _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy()
-
- logging.debug("Sending method req to Agent (%s)" % time.time())
- try:
- self._send_method_req(_map, cid)
- except SendError, e:
- logging.error(str(e))
- mbox.destroy()
- return None
-
- if _reply_handle is not None:
- return True
-
- logging.debug("Waiting for response to method req (%s)" % _timeout)
- replyMsg = mbox.fetch(_timeout)
- mbox.destroy()
-
- if not replyMsg:
- logging.debug("Agent method req wait timed-out.")
- return None
-
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
- logging.error("Invalid method call reply message")
- return None
-
- return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
- _error=_map.get(SchemaMethod.KEY_ERROR))
-
- def enable_events(self):
- raise Exception("enable_events tbd")
-
- def disable_events(self):
- raise Exception("disable_events tbd")
-
- def destroy(self):
- raise Exception("destroy tbd")
-
- def __repr__(self):
- return str(self._address)
-
- def __str__(self):
- return self.__repr__()
-
- def _send_query(self, query, correlation_id=None):
- """
- """
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.get_query)},
- content={MsgKey.query: query.map_encode()})
- self._send_msg( msg, correlation_id )
-
-
- def _send_method_req(self, mr_map, correlation_id=None):
- """
- """
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.method_req)},
- content=mr_map)
- self._send_msg( msg, correlation_id )
-
-
- ##==============================================================================
- ## METHOD CALL
- ##==============================================================================
-
-class MethodResult(object):
- def __init__(self, _out_args=None, _error=None):
- self._error = _error
- self._out_args = _out_args
-
- def succeeded(self):
- return self._error is None
-
- def get_exception(self):
- return self._error
-
- def get_arguments(self):
- return self._out_args
-
- def get_argument(self, name):
- arg = None
- if self._out_args:
- arg = self._out_args.get(name)
- return arg
-
-
- ##==============================================================================
- ## CONSOLE
- ##==============================================================================
-
-
-
-
-
-
-class Console(Thread):
- """
- A Console manages communications to a collection of agents on behalf of an application.
- """
- def __init__(self, name=None, _domain=None, notifier=None,
- reply_timeout = 60,
- # agent_timeout = 120,
- agent_timeout = 60,
- kwargs={}):
- """
- @type name: str
- @param name: identifier for this console. Must be unique.
- @type notifier: qmfConsole.Notifier
- @param notifier: invoked when events arrive for processing.
- @type kwargs: dict
- @param kwargs: ??? Unused
- """
- Thread.__init__(self)
- self._operational = False
- self._ready = Event()
-
- if not name:
- self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
- else:
- self._name = str(name)
- self._domain = _domain
- self._address = QmfAddress.direct(self._name, self._domain)
- self._notifier = notifier
- self._lock = Lock()
- self._conn = None
- self._session = None
- # dict of "agent-direct-address":class Agent entries
- self._agent_map = {}
- self._direct_recvr = None
- self._announce_recvr = None
- self._locate_sender = None
- self._schema_cache = {}
- self._pending_schema_req = []
- self._agent_discovery_filter = None
- self._reply_timeout = reply_timeout
- self._agent_timeout = agent_timeout
- self._next_agent_expire = None
- self._next_mbox_expire = None
- # for passing WorkItems to the application
- self._work_q = Queue.Queue()
- self._work_q_put = False
- # Correlation ID and mailbox storage
- self._correlation_id = long(time.time()) # pseudo-randomize
- self._post_office = {} # indexed by cid
- self._async_mboxes = {} # indexed by cid, used to expire them
-
- ## Old stuff below???
- #self._broker_list = []
- #self.impl = qmfengine.Console()
- #self._event = qmfengine.ConsoleEvent()
- ##self._cv = Condition()
- ##self._sync_count = 0
- ##self._sync_result = None
- ##self._select = {}
- ##self._cb_cond = Condition()
-
-
-
- def destroy(self, timeout=None):
- """
- Must be called before the Console is deleted.
- Frees up all resources and shuts down all background threads.
-
- @type timeout: float
- @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
- """
- logging.debug("Destroying Console...")
- if self._conn:
- self.remove_connection(self._conn, timeout)
- logging.debug("Console Destroyed")
-
-
-
- def add_connection(self, conn):
- """
- Add a AMQP connection to the console. The console will setup a session over the
- connection. The console will then broadcast an Agent Locate Indication over
- the session in order to discover present agents.
-
- @type conn: qpid.messaging.Connection
- @param conn: the connection to the AMQP messaging infrastructure.
- """
- if self._conn:
- raise Exception( "Multiple connections per Console not supported." );
- self._conn = conn
- self._session = conn.session(name=self._name)
-
- # for messages directly addressed to me
- self._direct_recvr = self._session.receiver(str(self._address) +
- ";{create:always,"
- " node-properties:"
- " {type:topic,"
- " x-properties:"
- " {type:direct}}}",
- capacity=1)
- logging.debug("my direct addr=%s" % self._direct_recvr.source)
-
- self._direct_sender = self._session.sender(str(self._address.get_node()) +
- ";{create:always,"
- " node-properties:"
- " {type:topic,"
- " x-properties:"
- " {type:direct}}}")
- logging.debug("my direct sender=%s" % self._direct_sender.target)
-
- # for receiving "broadcast" messages from agents
- default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#",
- self._domain)
- self._topic_recvr = self._session.receiver(str(default_addr) +
- ";{create:always,"
- " node-properties:{type:topic}}",
- capacity=1)
- logging.debug("default topic recv addr=%s" % self._topic_recvr.source)
-
-
- # for sending to topic subscribers
- topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain)
- self._topic_sender = self._session.sender(str(topic_addr) +
- ";{create:always,"
- " node-properties:{type:topic}}")
- logging.debug("default topic send addr=%s" % self._topic_sender.target)
-
- #
- # Now that receivers are created, fire off the receive thread...
- #
- self._operational = True
- self.start()
- self._ready.wait(10)
- if not self._ready.isSet():
- raise Exception("Console managment thread failed to start.")
-
-
-
- def remove_connection(self, conn, timeout=None):
- """
- Remove an AMQP connection from the console. Un-does the add_connection() operation,
- and releases any agents and sessions associated with the connection.
-
- @type conn: qpid.messaging.Connection
- @param conn: connection previously added by add_connection()
- """
- if self._conn and conn and conn != self._conn:
- logging.error( "Attempt to delete unknown connection: %s" % str(conn))
-
- # tell connection thread to shutdown
- self._operational = False
- if self.isAlive():
- # kick my thread to wake it up
- self._wake_thread()
- logging.debug("waiting for console receiver thread to exit")
- self.join(timeout)
- if self.isAlive():
- logging.error( "Console thread '%s' is hung..." % self.getName() )
- self._direct_recvr.close()
- self._direct_sender.close()
- self._topic_recvr.close()
- self._topic_sender.close()
- self._session.close()
- self._session = None
- self._conn = None
- logging.debug("console connection removal complete")
-
-
- def get_address(self):
- """
- The AMQP address this Console is listening to.
- """
- return self._address
-
-
- def destroy_agent( self, agent ):
- """
- Undoes create.
- """
- if not isinstance(agent, Agent):
- raise TypeError("agent must be an instance of class Agent")
-
- self._lock.acquire()
- try:
- if agent._name in self._agent_map:
- del self._agent_map[agent._name]
- finally:
- self._lock.release()
-
- def find_agent(self, name, timeout=None ):
- """
- Given the name of a particular agent, return an instance of class Agent
- representing that agent. Return None if the agent does not exist.
- """
-
- self._lock.acquire()
- try:
- agent = self._agent_map.get(name)
- if agent:
- return agent
- finally:
- self._lock.release()
-
- # agent not present yet - ping it with an agent_locate
-
- mbox = _SyncMailbox(self)
- cid = mbox.get_address()
-
- query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
- msg = Message(subject="console.ind.locate." + name,
- properties={"method":"request",
- "qmf.subject":make_subject(OpCode.agent_locate)},
- content={MsgKey.query: query.map_encode()})
- msg.reply_to = str(self._address)
- msg.correlation_id = str(cid)
- logging.debug("Sending Agent Locate (%s)" % time.time())
- # TRACE
- #logging.error("!!! Console %s sending agent locate (%s)" %
- # (self._name, str(msg)))
- try:
- self._topic_sender.send(msg)
- except SendError, e:
- logging.error(str(e))
- mbox.destroy()
- return None
-
- if timeout is None:
- timeout = self._reply_timeout
-
- new_agent = None
- logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
- mbox.fetch(timeout)
- mbox.destroy()
- logging.debug("Agent Locate wait ended (%s)" % time.time())
- self._lock.acquire()
- try:
- new_agent = self._agent_map.get(name)
- finally:
- self._lock.release()
-
- return new_agent
-
-
- def get_agents(self):
- """
- Return the list of known agents.
- """
- self._lock.acquire()
- try:
- agents = self._agent_map.values()
- finally:
- self._lock.release()
- return agents
-
-
- def get_agent(self, name):
- """
- Return the named agent, else None if not currently available.
- """
- self._lock.acquire()
- try:
- agent = self._agent_map.get(name)
- finally:
- self._lock.release()
- return agent
-
-
- def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
- """
- """
- query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
- QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
- QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
- QmfQuery.TARGET_SCHEMA: MsgKey.schema,
- QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
- QmfQuery.TARGET_AGENT: MsgKey.agent_info}
-
- target = query.get_target()
- msgkey = query_keymap.get(target)
- if not msgkey:
- raise Exception("Invalid target for query: %s" % str(query))
-
- if _reply_handle is not None:
- mbox = _QueryMailbox(self,
- agent.get_name(),
- _reply_handle,
- target, msgkey,
- _timeout)
- else:
- mbox = _SyncMailbox(self)
-
- cid = mbox.get_address()
-
- try:
- logging.debug("Sending Query to Agent (%s)" % time.time())
- agent._send_query(query, cid)
- except SendError, e:
- logging.error(str(e))
- mbox.destroy()
- return None
-
- # return now if async reply expected
- if _reply_handle is not None:
- return True
-
- if not _timeout:
- _timeout = self._reply_timeout
-
- logging.debug("Waiting for response to Query (%s)" % _timeout)
- now = datetime.datetime.utcnow()
- expire = now + datetime.timedelta(seconds=_timeout)
-
- response = []
- while (expire > now):
- _timeout = timedelta_to_secs(expire - now)
- reply = mbox.fetch(_timeout)
- if not reply:
- logging.debug("Query wait timed-out.")
- break
-
- objects = reply.content.get(msgkey)
- if not objects:
- # last response is empty
- break
-
- # convert from map to native types if needed
- if target == QmfQuery.TARGET_SCHEMA_ID:
- for sid_map in objects:
- response.append(SchemaClassId.from_map(sid_map))
-
- elif target == QmfQuery.TARGET_SCHEMA:
- for schema_map in objects:
- # extract schema id, convert based on schema type
- sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
- if sid_map:
- sid = SchemaClassId.from_map(sid_map)
- if sid:
- if sid.get_type() == SchemaClassId.TYPE_DATA:
- schema = SchemaObjectClass.from_map(schema_map)
- else:
- schema = SchemaEventClass.from_map(schema_map)
- self._add_schema(schema) # add to schema cache
- response.append(schema)
-
- elif target == QmfQuery.TARGET_OBJECT:
- for obj_map in objects:
- obj = QmfConsoleData(map_=obj_map, agent=agent)
- # start fetch of schema if not known
- sid = obj.get_schema_class_id()
- if sid:
- self._prefetch_schema(sid, agent)
- response.append(obj)
- else:
- # no conversion needed.
- response += objects
-
- now = datetime.datetime.utcnow()
-
- mbox.destroy()
- return response
-
- def _wake_thread(self):
- """
- Make the console management thread loop wakeup from its next_receiver
- sleep.
- """
- logging.debug("Sending noop to wake up [%s]" % self._address)
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
- subject=self._name,
- content={"noop":"noop"})
- try:
- self._direct_sender.send( msg, sync=True )
- except SendError, e:
- logging.error(str(e))
-
-
- def run(self):
- """
- Console Management Thread main loop.
- Handles inbound messages, agent discovery, async mailbox timeouts.
- """
- global _callback_thread
-
- self._ready.set()
-
- while self._operational:
-
- # qLen = self._work_q.qsize()
-
- while True:
- try:
- msg = self._topic_recvr.fetch(timeout=0)
- except Empty:
- break
- # TRACE:
- # logging.error("!!! Console %s: msg on %s [%s]" %
- # (self._name, self._topic_recvr.source, msg))
- self._dispatch(msg, _direct=False)
-
- while True:
- try:
- msg = self._direct_recvr.fetch(timeout = 0)
- except Empty:
- break
- # TRACE
- #logging.error("!!! Console %s: msg on %s [%s]" %
- # (self._name, self._direct_recvr.source, msg))
- self._dispatch(msg, _direct=True)
-
- self._expire_agents() # check for expired agents
- self._expire_mboxes() # check for expired async mailbox requests
-
- #if qLen == 0 and self._work_q.qsize() and self._notifier:
- if self._work_q_put and self._notifier:
- # new stuff on work queue, kick the the application...
- self._work_q_put = False
- _callback_thread = currentThread()
- logging.info("Calling console notifier.indication")
- self._notifier.indication()
- _callback_thread = None
-
- if self._operational:
- # wait for a message to arrive, or an agent
- # to expire, or a mailbox requrest to time out
- now = datetime.datetime.utcnow()
- next_expire = self._next_agent_expire
- if (self._next_mbox_expire and
- self._next_mbox_expire < next_expire):
- next_expire = self._next_mbox_expire
- if next_expire > now:
- timeout = timedelta_to_secs(next_expire - now)
- try:
- logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
- xxx = self._session.next_receiver(timeout = timeout)
- except Empty:
- pass
-
-
- logging.debug("Shutting down Console thread")
-
- def get_objects(self,
- _object_id=None,
- _schema_id=None,
- _pname=None, _cname=None,
- _agents=None,
- _timeout=None):
- """
- Retrieve objects by id or schema.
-
- By object_id: must specify schema_id or pname & cname if object defined
- by a schema. Undescribed objects: only object_id needed.
-
- By schema: must specify schema_id or pname & cname - all instances of
- objects defined by that schema are returned.
- """
- if _agents is None:
- # use copy of current agent list
- self._lock.acquire()
- try:
- agent_list = self._agent_map.values()
- finally:
- self._lock.release()
- elif isinstance(_agents, Agent):
- agent_list = [_agents]
- else:
- agent_list = _agents
- # @todo validate this list!
-
- if _timeout is None:
- _timeout = self._reply_timeout
-
- # @todo: fix when async do_query done - query all agents at once, then
- # wait for replies, instead of per-agent querying....
-
- obj_list = []
- expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)
- for agent in agent_list:
- if not agent.is_active():
- continue
- now = datetime.datetime.utcnow()
- if now >= expired:
- break
-
- if _pname is None:
- if _object_id:
- query = QmfQuery.create_id_object(_object_id,
- _schema_id)
- else:
- if _schema_id is not None:
- t_params = {QmfData.KEY_SCHEMA_ID: _schema_id}
- else:
- t_params = None
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
- t_params)
- timeout = timedelta_to_secs(expired - now)
- reply = self.do_query(agent, query, _timeout=timeout)
- if reply:
- obj_list = obj_list + reply
- else:
- # looking up by package name (and maybe class name), need to
- # find all schema_ids in that package, then lookup object by
- # schema_id
- if _cname is not None:
- pred = [QmfQuery.AND,
- [QmfQuery.EQ,
- SchemaClassId.KEY_PACKAGE,
- [QmfQuery.QUOTE, _pname]],
- [QmfQuery.EQ, SchemaClassId.KEY_CLASS,
- [QmfQuery.QUOTE, _cname]]]
- else:
- pred = [QmfQuery.EQ,
- SchemaClassId.KEY_PACKAGE,
- [QmfQuery.QUOTE, _pname]]
- query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred)
- timeout = timedelta_to_secs(expired - now)
- sid_list = self.do_query(agent, query, _timeout=timeout)
- if sid_list:
- for sid in sid_list:
- now = datetime.datetime.utcnow()
- if now >= expired:
- break
- if _object_id is not None:
- query = QmfQuery.create_id_object(_object_id, sid)
- else:
- t_params = {QmfData.KEY_SCHEMA_ID: sid}
- query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
- timeout = timedelta_to_secs(expired - now)
- reply = self.do_query(agent, query, _timeout=timeout)
- if reply:
- obj_list = obj_list + reply
- if obj_list:
- return obj_list
- return None
-
-
-
- # called by run() thread ONLY
- #
- def _dispatch(self, msg, _direct=True):
- """
- PRIVATE: Process a message received from an Agent
- """
- logging.debug( "Message received from Agent! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- # @todo: deal with version mismatch!!!
- except:
- logging.error("Ignoring unrecognized message '%s'" % msg)
- return
-
- cmap = {}; props = {}
- if msg.content_type == "amqp/map":
- cmap = msg.content
- if msg.properties:
- props = msg.properties
-
- if opcode == OpCode.agent_ind:
- self._handle_agent_ind_msg( msg, cmap, version, _direct )
- elif opcode == OpCode.data_ind:
- self._handle_data_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.event_ind:
- self._handle_event_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.managed_object:
- logging.warning("!!! managed_object TBD !!!")
- elif opcode == OpCode.object_ind:
- logging.warning("!!! object_ind TBD !!!")
- elif opcode == OpCode.response:
- self._handle_response_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.schema_ind:
- logging.warning("!!! schema_ind TBD !!!")
- elif opcode == OpCode.noop:
- logging.debug("No-op msg received.")
- else:
- logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
-
-
- def _handle_agent_ind_msg(self, msg, cmap, version, direct):
- """
- Process a received agent-ind message. This message may be a response to a
- agent-locate, or it can be an unsolicited agent announce.
- """
- logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
-
- ai_map = cmap.get(MsgKey.agent_info)
- if not ai_map or not isinstance(ai_map, type({})):
- logging.warning("Bad agent-ind message received: '%s'" % msg)
- return
- name = ai_map.get("_name")
- if not name:
- logging.warning("Bad agent-ind message received: agent name missing"
- " '%s'" % msg)
- return
-
- correlated = False
- if msg.correlation_id:
- mbox = self._get_mailbox(msg.correlation_id)
- correlated = mbox is not None
-
- agent = None
- self._lock.acquire()
- try:
- agent = self._agent_map.get(name)
- if agent:
- # agent already known, just update timestamp
- agent._announce_timestamp = datetime.datetime.utcnow()
- finally:
- self._lock.release()
-
- if not agent:
- # need to create and add a new agent?
- matched = False
- if self._agent_discovery_filter:
- tmp = QmfData.create(values=ai_map, _object_id="agent-filter")
- matched = self._agent_discovery_filter.evaluate(tmp)
-
- if (correlated or matched):
- agent = self._create_agent(name)
- if not agent:
- return # failed to add agent
- agent._announce_timestamp = datetime.datetime.utcnow()
-
- if matched:
- # unsolicited, but newly discovered
- logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
- wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
- self._work_q.put(wi)
- self._work_q_put = True
-
- if correlated:
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- mbox.deliver(msg)
-
- def _handle_data_ind_msg(self, msg, cmap, version, direct):
- """
- Process a received data-ind message.
- """
- logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
-
- mbox = self._get_mailbox(msg.correlation_id)
- if not mbox:
- logging.debug("Data indicate received with unknown correlation_id"
- " msg='%s'" % str(msg))
- return
-
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" %
- msg.correlation_id)
- mbox.deliver(msg)
-
-
- def _handle_response_msg(self, msg, cmap, version, direct):
- """
- Process a received data-ind message.
- """
- # @todo code replication - clean me.
- logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
-
- mbox = self._get_mailbox(msg.correlation_id)
- if not mbox:
- logging.debug("Response msg received with unknown correlation_id"
- " msg='%s'" % str(msg))
- return
-
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- mbox.deliver(msg)
-
- def _handle_event_ind_msg(self, msg, cmap, version, _direct):
- ei_map = cmap.get(MsgKey.event)
- if not ei_map or not isinstance(ei_map, type({})):
- logging.warning("Bad event indication message received: '%s'" % msg)
- return
-
- aname = ei_map.get("_name")
- emap = ei_map.get("_event")
- if not aname:
- logging.debug("No '_name' field in event indication message.")
- return
- if not emap:
- logging.debug("No '_event' field in event indication message.")
- return
-
- agent = None
- self._lock.acquire()
- try:
- agent = self._agent_map.get(aname)
- finally:
- self._lock.release()
- if not agent:
- logging.debug("Agent '%s' not known." % aname)
- return
- try:
- # @todo: schema???
- event = QmfEvent.from_map(emap)
- except TypeError:
- logging.debug("Invalid QmfEvent map received: %s" % str(emap))
- return
-
- # @todo: schema? Need to fetch it, but not from this thread!
- # This thread can not pend on a request.
- logging.debug("Publishing event received from agent %s" % aname)
- wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
- {"agent":agent,
- "event":event})
- self._work_q.put(wi)
- self._work_q_put = True
-
-
- def _expire_mboxes(self):
- """
- Check all async mailboxes for outstanding requests that have expired.
- """
- now = datetime.datetime.utcnow()
- if self._next_mbox_expire and now < self._next_mbox_expire:
- return
- expired_mboxes = []
- self._next_mbox_expire = None
- self._lock.acquire()
- try:
- for mbox in self._async_mboxes.itervalues():
- if now >= mbox.expiration_date:
- expired_mboxes.append(mbox)
- else:
- if (self._next_mbox_expire is None or
- mbox.expiration_date < self._next_mbox_expire):
- self._next_mbox_expire = mbox.expiration_date
-
- for mbox in expired_mboxes:
- del self._async_mboxes[mbox.cid]
- finally:
- self._lock.release()
-
- for mbox in expired_mboxes:
- # note: expire() may deallocate the mbox, so don't touch
- # it further.
- mbox.expire()
-
-
- def _expire_agents(self):
- """
- Check for expired agents and issue notifications when they expire.
- """
- now = datetime.datetime.utcnow()
- if self._next_agent_expire and now < self._next_agent_expire:
- return
- lifetime_delta = datetime.timedelta(seconds = self._agent_timeout)
- next_expire_delta = lifetime_delta
- self._lock.acquire()
- try:
- logging.debug("!!! expiring agents '%s'" % now)
- for agent in self._agent_map.itervalues():
- if agent._announce_timestamp:
- agent_deathtime = agent._announce_timestamp + lifetime_delta
- if agent_deathtime <= now:
- logging.debug("AGENT_DELETED for %s" % agent)
- agent._announce_timestamp = None
- wi = WorkItem(WorkItem.AGENT_DELETED, None,
- {"agent":agent})
- # @todo: remove agent from self._agent_map
- self._work_q.put(wi)
- self._work_q_put = True
- else:
- if (agent_deathtime - now) < next_expire_delta:
- next_expire_delta = agent_deathtime - now
-
- self._next_agent_expire = now + next_expire_delta
- logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
- finally:
- self._lock.release()
-
-
-
- def _create_agent( self, name ):
- """
- Factory to create/retrieve an agent for this console
- """
- logging.debug("creating agent %s" % name)
- self._lock.acquire()
- try:
- agent = self._agent_map.get(name)
- if agent:
- return agent
-
- agent = Agent(name, self)
- try:
- agent._sender = self._session.sender(str(agent._address) +
- ";{create:always,"
- " node-properties:"
- " {type:topic,"
- " x-properties:"
- " {type:direct}}}")
- except:
- logging.warning("Unable to create sender for %s" % name)
- return None
- logging.debug("created agent sender %s" % agent._sender.target)
-
- self._agent_map[name] = agent
- finally:
- self._lock.release()
-
- # new agent - query for its schema database for
- # seeding the schema cache (@todo)
- # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None})
- # agent._sendQuery( query )
-
- return agent
-
-
-
- def enable_agent_discovery(self, _query=None):
- """
- Called to enable the asynchronous Agent Discovery process.
- Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
- """
- # @todo: fix - take predicate only, not entire query!
- if _query is not None:
- if (not isinstance(_query, QmfQuery) or
- _query.get_target() != QmfQuery.TARGET_AGENT):
- raise TypeError("Type QmfQuery with target == TARGET_AGENT expected")
- self._agent_discovery_filter = _query
- else:
- # create a match-all agent query (no predicate)
- self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT)
-
- def disable_agent_discovery(self):
- """
- Called to disable the async Agent Discovery process enabled by
- calling enableAgentDiscovery()
- """
- self._agent_discovery_filter = None
-
-
-
- def get_workitem_count(self):
- """
- Returns the count of pending WorkItems that can be retrieved.
- """
- return self._work_q.qsize()
-
-
-
- def get_next_workitem(self, timeout=None):
- """
- Returns the next pending work item, or None if none available.
- @todo: subclass and return an Empty event instead.
- """
- try:
- wi = self._work_q.get(True, timeout)
- except Queue.Empty:
- return None
- return wi
-
-
- def release_workitem(self, wi):
- """
- Return a WorkItem to the Console when it is no longer needed.
- @todo: call Queue.task_done() - only 2.5+
-
- @type wi: class qmfConsole.WorkItem
- @param wi: work item object to return.
- """
- pass
-
- def _add_schema(self, schema):
- """
- @todo
- """
- if not isinstance(schema, SchemaClass):
- raise TypeError("SchemaClass type expected")
-
- self._lock.acquire()
- try:
- sid = schema.get_class_id()
- if not self._schema_cache.has_key(sid):
- self._schema_cache[sid] = schema
- if sid in self._pending_schema_req:
- self._pending_schema_req.remove(sid)
- finally:
- self._lock.release()
-
- def _prefetch_schema(self, schema_id, agent):
- """
- Send an async request for the schema identified by schema_id if the
- schema is not available in the cache.
- """
- need_fetch = False
- self._lock.acquire()
- try:
- if ((not self._schema_cache.has_key(schema_id)) and
- schema_id not in self._pending_schema_req):
- self._pending_schema_req.append(schema_id)
- need_fetch = True
- finally:
- self._lock.release()
-
- if need_fetch:
- mbox = _SchemaPrefetchMailbox(self, schema_id)
- query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
- logging.debug("Sending Schema Query to Agent (%s)" % time.time())
- try:
- agent._send_query(query, mbox.get_address())
- except SendError, e:
- logging.error(str(e))
- mbox.destroy()
- self._lock.acquire()
- try:
- self._pending_schema_req.remove(schema_id)
- finally:
- self._lock.release()
-
-
- def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
- """
- Find the schema identified by schema_id. If not in the cache, ask the
- agent for it.
- """
- if not isinstance(schema_id, SchemaClassId):
- raise TypeError("SchemaClassId type expected")
-
- self._lock.acquire()
- try:
- schema = self._schema_cache.get(schema_id)
- if schema:
- return schema
- finally:
- self._lock.release()
-
- if _agent is None:
- return None
-
- # note: do_query will add the new schema to the cache automatically.
- slist = self.do_query(_agent,
- QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
- _timeout=_timeout)
- if slist:
- return slist[0]
- else:
- return None
-
- def _add_mailbox(self, mbox):
- """
- Add a mailbox to the post office, and assign it a unique address.
- """
- self._lock.acquire()
- try:
- mbox.cid = self._correlation_id
- self._correlation_id += 1
- self._post_office[mbox.cid] = mbox
- finally:
- self._lock.release()
-
- def _get_mailbox(self, mid):
- try:
- mid = long(mid)
- except TypeError:
- logging.error("Invalid mailbox id: %s" % str(mid))
- return None
-
- self._lock.acquire()
- try:
- return self._post_office.get(mid)
- finally:
- self._lock.release()
-
-
- def _remove_mailbox(self, mid):
- """ Remove a mailbox and its address from the post office """
- try:
- mid = long(mid)
- except TypeError:
- logging.error("Invalid mailbox id: %s" % str(mid))
- return None
-
- self._lock.acquire()
- try:
- if mid in self._post_office:
- del self._post_office[mid]
- finally:
- self._lock.release()
-
- def __repr__(self):
- return str(self._address)
-
- # def get_packages(self):
- # plist = []
- # for i in range(self.impl.packageCount()):
- # plist.append(self.impl.getPackageName(i))
- # return plist
-
-
- # def get_classes(self, package, kind=CLASS_OBJECT):
- # clist = []
- # for i in range(self.impl.classCount(package)):
- # key = self.impl.getClass(package, i)
- # class_kind = self.impl.getClassKind(key)
- # if class_kind == kind:
- # if kind == CLASS_OBJECT:
- # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)}))
- # elif kind == CLASS_EVENT:
- # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)}))
- # return clist
-
-
- # def bind_package(self, package):
- # return self.impl.bindPackage(package)
-
-
- # def bind_class(self, kwargs = {}):
- # if "key" in kwargs:
- # self.impl.bindClass(kwargs["key"])
- # elif "package" in kwargs:
- # package = kwargs["package"]
- # if "class" in kwargs:
- # self.impl.bindClass(package, kwargs["class"])
- # else:
- # self.impl.bindClass(package)
- # else:
- # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']")
-
-
- # def get_agents(self, broker=None):
- # blist = []
- # if broker:
- # blist.append(broker)
- # else:
- # self._cv.acquire()
- # try:
- # # copy while holding lock
- # blist = self._broker_list[:]
- # finally:
- # self._cv.release()
-
- # agents = []
- # for b in blist:
- # for idx in range(b.impl.agentCount()):
- # agents.append(AgentProxy(b.impl.getAgent(idx), b))
-
- # return agents
-
-
- # def get_objects(self, query, kwargs = {}):
- # timeout = 30
- # agent = None
- # temp_args = kwargs.copy()
- # if type(query) == type({}):
- # temp_args.update(query)
-
- # if "_timeout" in temp_args:
- # timeout = temp_args["_timeout"]
- # temp_args.pop("_timeout")
-
- # if "_agent" in temp_args:
- # agent = temp_args["_agent"]
- # temp_args.pop("_agent")
-
- # if type(query) == type({}):
- # query = Query(temp_args)
-
- # self._select = {}
- # for k in temp_args.iterkeys():
- # if type(k) == str:
- # self._select[k] = temp_args[k]
-
- # self._cv.acquire()
- # try:
- # self._sync_count = 1
- # self._sync_result = []
- # broker = self._broker_list[0]
- # broker.send_query(query.impl, None, agent)
- # self._cv.wait(timeout)
- # if self._sync_count == 1:
- # raise Exception("Timed out: waiting for query response")
- # finally:
- # self._cv.release()
-
- # return self._sync_result
-
-
- # def get_object(self, query, kwargs = {}):
- # '''
- # Return one and only one object or None.
- # '''
- # objs = objects(query, kwargs)
- # if len(objs) == 1:
- # return objs[0]
- # else:
- # return None
-
-
- # def first_object(self, query, kwargs = {}):
- # '''
- # Return the first of potentially many objects.
- # '''
- # objs = objects(query, kwargs)
- # if objs:
- # return objs[0]
- # else:
- # return None
-
-
- # # Check the object against select to check for a match
- # def _select_match(self, object):
- # schema_props = object.properties()
- # for key in self._select.iterkeys():
- # for prop in schema_props:
- # if key == p[0].name() and self._select[key] != p[1]:
- # return False
- # return True
-
-
- # def _get_result(self, list, context):
- # '''
- # Called by Broker proxy to return the result of a query.
- # '''
- # self._cv.acquire()
- # try:
- # for item in list:
- # if self._select_match(item):
- # self._sync_result.append(item)
- # self._sync_count -= 1
- # self._cv.notify()
- # finally:
- # self._cv.release()
-
-
- # def start_sync(self, query): pass
-
-
- # def touch_sync(self, sync): pass
-
-
- # def end_sync(self, sync): pass
-
-
-
-
-# def start_console_events(self):
-# self._cb_cond.acquire()
-# try:
-# self._cb_cond.notify()
-# finally:
-# self._cb_cond.release()
-
-
-# def _do_console_events(self):
-# '''
-# Called by the Console thread to poll for events. Passes the events
-# onto the ConsoleHandler associated with this Console. Is called
-# periodically, but can also be kicked by Console.start_console_events().
-# '''
-# count = 0
-# valid = self.impl.getEvent(self._event)
-# while valid:
-# count += 1
-# try:
-# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
-# logging.debug("Console Event AGENT_ADDED received")
-# if self._handler:
-# self._handler.agent_added(AgentProxy(self._event.agent, None))
-# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
-# logging.debug("Console Event AGENT_DELETED received")
-# if self._handler:
-# self._handler.agent_deleted(AgentProxy(self._event.agent, None))
-# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
-# logging.debug("Console Event NEW_PACKAGE received")
-# if self._handler:
-# self._handler.new_package(self._event.name)
-# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
-# logging.debug("Console Event NEW_CLASS received")
-# if self._handler:
-# self._handler.new_class(SchemaClassKey(self._event.classKey))
-# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
-# logging.debug("Console Event OBJECT_UPDATE received")
-# if self._handler:
-# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
-# self._event.hasProps, self._event.hasStats)
-# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
-# logging.debug("Console Event EVENT_RECEIVED received")
-# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
-# logging.debug("Console Event AGENT_HEARTBEAT received")
-# if self._handler:
-# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
-# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
-# logging.debug("Console Event METHOD_RESPONSE received")
-# else:
-# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
-# except e:
-# print "Exception caught in callback thread:", e
-# self.impl.popEvent()
-# valid = self.impl.getEvent(self._event)
-# return count
-
-
-
-
-
-# class Broker(ConnectionHandler):
-# # attr_reader :impl :conn, :console, :broker_bank
-# def __init__(self, console, conn):
-# self.broker_bank = 1
-# self.console = console
-# self.conn = conn
-# self._session = None
-# self._cv = Condition()
-# self._stable = None
-# self._event = qmfengine.BrokerEvent()
-# self._xmtMessage = qmfengine.Message()
-# self.impl = qmfengine.BrokerProxy(self.console.impl)
-# self.console.impl.addConnection(self.impl, self)
-# self.conn.add_conn_handler(self)
-# self._operational = True
-
-
-# def shutdown(self):
-# logging.debug("broker.shutdown() called.")
-# self.console.impl.delConnection(self.impl)
-# self.conn.del_conn_handler(self)
-# if self._session:
-# self.impl.sessionClosed()
-# logging.debug("broker.shutdown() sessionClosed done.")
-# self._session.destroy()
-# logging.debug("broker.shutdown() session destroy done.")
-# self._session = None
-# self._operational = False
-# logging.debug("broker.shutdown() done.")
-
-
-# def wait_for_stable(self, timeout = None):
-# self._cv.acquire()
-# try:
-# if self._stable:
-# return
-# if timeout:
-# self._cv.wait(timeout)
-# if not self._stable:
-# raise Exception("Timed out: waiting for broker connection to become stable")
-# else:
-# while not self._stable:
-# self._cv.wait()
-# finally:
-# self._cv.release()
-
-
-# def send_query(self, query, ctx, agent):
-# agent_impl = None
-# if agent:
-# agent_impl = agent.impl
-# self.impl.sendQuery(query, ctx, agent_impl)
-# self.conn.kick()
-
-
-# def _do_broker_events(self):
-# count = 0
-# valid = self.impl.getEvent(self._event)
-# while valid:
-# count += 1
-# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
-# logging.debug("Broker Event BROKER_INFO received");
-# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
-# logging.debug("Broker Event DECLARE_QUEUE received");
-# self.conn.impl.declareQueue(self._session.handle, self._event.name)
-# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
-# logging.debug("Broker Event DELETE_QUEUE received");
-# self.conn.impl.deleteQueue(self._session.handle, self._event.name)
-# elif self._event.kind == qmfengine.BrokerEvent.BIND:
-# logging.debug("Broker Event BIND received");
-# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
-# elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
-# logging.debug("Broker Event UNBIND received");
-# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
-# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
-# logging.debug("Broker Event SETUP_COMPLETE received");
-# self.impl.startProtocol()
-# elif self._event.kind == qmfengine.BrokerEvent.STABLE:
-# logging.debug("Broker Event STABLE received");
-# self._cv.acquire()
-# try:
-# self._stable = True
-# self._cv.notify()
-# finally:
-# self._cv.release()
-# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE:
-# result = []
-# for idx in range(self._event.queryResponse.getObjectCount()):
-# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self}))
-# self.console._get_result(result, self._event.context)
-# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE:
-# obj = self._event.context
-# obj._method_result(MethodResponse(self._event.methodResponse()))
-
-# self.impl.popEvent()
-# valid = self.impl.getEvent(self._event)
-
-# return count
-
-
-# def _do_broker_messages(self):
-# count = 0
-# valid = self.impl.getXmtMessage(self._xmtMessage)
-# while valid:
-# count += 1
-# logging.debug("Broker: sending msg on connection")
-# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
-# self.impl.popXmt()
-# valid = self.impl.getXmtMessage(self._xmtMessage)
-
-# return count
-
-
-# def _do_events(self):
-# while True:
-# self.console.start_console_events()
-# bcnt = self._do_broker_events()
-# mcnt = self._do_broker_messages()
-# if bcnt == 0 and mcnt == 0:
-# break;
-
-
-# def conn_event_connected(self):
-# logging.debug("Broker: Connection event CONNECTED")
-# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
-# self.impl.sessionOpened(self._session.handle)
-# self._do_events()
-
-
-# def conn_event_disconnected(self, error):
-# logging.debug("Broker: Connection event DISCONNECTED")
-# pass
-
-
-# def conn_event_visit(self):
-# self._do_events()
-
-
-# def sess_event_session_closed(self, context, error):
-# logging.debug("Broker: Session event CLOSED")
-# self.impl.sessionClosed()
-
-
-# def sess_event_recv(self, context, message):
-# logging.debug("Broker: Session event MSG_RECV")
-# if not self._operational:
-# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
-# self.impl.handleRcvMessage(message)
-# self._do_events()
-
-
-
-################################################################################
-################################################################################
-################################################################################
-################################################################################
-# TEMPORARY TEST CODE - TO BE DELETED
-################################################################################
-################################################################################
-################################################################################
-################################################################################
-
-if __name__ == '__main__':
- # temp test code
- from common import (qmfTypes, SchemaProperty)
-
- logging.getLogger().setLevel(logging.INFO)
-
- logging.info( "************* Creating Async Console **************" )
-
- class MyNotifier(Notifier):
- def __init__(self, context):
- self._myContext = context
- self.WorkAvailable = False
-
- def indication(self):
- print("Indication received! context=%d" % self._myContext)
- self.WorkAvailable = True
-
- _noteMe = MyNotifier( 666 )
-
- _myConsole = Console(notifier=_noteMe)
-
- _myConsole.enable_agent_discovery()
- logging.info("Waiting...")
-
-
- logging.info( "Destroying console:" )
- _myConsole.destroy( 10 )
-
- logging.info( "******** Messing around with Schema ********" )
-
- _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass",
- stype=SchemaClassId.TYPE_EVENT),
- _desc="A typical event schema",
- _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8,
- kwargs = {"min":0,
- "max":100,
- "unit":"seconds",
- "desc":"sleep value"}),
- "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR,
- kwargs={"maxlen":100,
- "desc":"a string argument"})})
- print("_sec=%s" % _sec.get_class_id())
- print("_sec.gePropertyCount()=%d" % _sec.get_property_count() )
- print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') )
- print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') )
- try:
- print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') )
- except:
- pass
- print("_sec.getProperties()='%s'" % _sec.get_properties())
-
- print("Adding another argument")
- _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL,
- kwargs={"dir":"IO",
- "desc":"a boolean argument"})
- _sec.add_property('Argument-3', _arg3)
- print("_sec=%s" % _sec.get_class_id())
- print("_sec.getPropertyCount()=%d" % _sec.get_property_count() )
- print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') )
- print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') )
- print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') )
-
- print("_arg3.mapEncode()='%s'" % _arg3.map_encode() )
-
- _secmap = _sec.map_encode()
- print("_sec.mapEncode()='%s'" % _secmap )
-
- _sec2 = SchemaEventClass( _map=_secmap )
-
- print("_sec=%s" % _sec.get_class_id())
- print("_sec2=%s" % _sec2.get_class_id())
-
- _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage",
- "_class_name": "myOtherClass",
- "_type": "_data"},
- "_desc": "A test data object",
- "_values":
- {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8,
- "access": "RO",
- "index": True,
- "unit": "degrees"},
- "prop2": {"amqp_type": qmfTypes.TYPE_UINT8,
- "access": "RW",
- "index": True,
- "desc": "The Second Property(tm)",
- "unit": "radians"},
- "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
- "unit": "seconds",
- "desc": "time until I retire"},
- "meth1": {"_desc": "A test method",
- "_arguments":
- {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
- "desc": "an argument 1",
- "dir": "I"},
- "arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
- "dir": "IO",
- "desc": "some weird boolean"}}},
- "meth2": {"_desc": "A test method",
- "_arguments":
- {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
- "desc": "an 'nuther argument",
- "dir":
- "I"}}}},
- "_subtypes":
- {"prop1":"qmfProperty",
- "prop2":"qmfProperty",
- "statistics":"qmfProperty",
- "meth1":"qmfMethod",
- "meth2":"qmfMethod"},
- "_primary_key_names": ["prop2", "prop1"]})
-
- print("_soc='%s'" % _soc)
-
- print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names())
-
- print("_soc.getPropertyCount='%d'" % _soc.get_property_count())
- print("_soc.getProperties='%s'" % _soc.get_properties())
- print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2'))
-
- print("_soc.getMethodCount='%d'" % _soc.get_method_count())
- print("_soc.getMethods='%s'" % _soc.get_methods())
- print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2'))
-
- _socmap = _soc.map_encode()
- print("_socmap='%s'" % _socmap)
- _soc2 = SchemaObjectClass( _map=_socmap )
- print("_soc='%s'" % _soc)
- print("_soc2='%s'" % _soc2)
-
- if _soc2.get_class_id() == _soc.get_class_id():
- print("soc and soc2 are the same schema")
-
-
- logging.info( "******** Messing around with ObjectIds ********" )
-
-
- qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} )
- print("qd='%s':" % qd)
-
- print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4))
-
- print("qd map='%s'" % qd.map_encode())
- print("qd getProperty('prop4')='%s'" % qd.get_value("prop4"))
- qd.set_value("prop4", 4, "A test property called 4")
- print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4"))
- qd.prop4 = 9
- print("qd.prop4 = 9 ='%s'" % qd.prop4)
- qd["prop4"] = 11
- print("qd[prop4] = 11 ='%s'" % qd["prop4"])
-
- print("qd.mapEncode()='%s'" % qd.map_encode())
- _qd2 = QmfData( _map = qd.map_encode() )
- print("_qd2.mapEncode()='%s'" % _qd2.map_encode())
-
- _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666,
- "prop2": 0}},
- agent="some agent name?",
- _schema = _soc)
-
- print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode())
-
- _qmfDesc1._set_schema( _soc )
-
- print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2"))
- print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id())
- print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id())
-
-
- _qmfDescMap = _qmfDesc1.map_encode()
- print("_qmfDescMap='%s'" % _qmfDescMap)
-
- _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc )
-
- print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode())
- print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2"))
- print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id())
-
-
- logging.info( "******** Messing around with QmfEvents ********" )
-
-
- _qmfevent1 = QmfEvent( _timestamp = 1111,
- _schema = _sec,
- _values = {"Argument-1": 77,
- "Argument-3": True,
- "Argument-2": "a string"})
- print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode())
- print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp())
-
- _qmfevent1Map = _qmfevent1.map_encode()
-
- _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec)
- print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode())
-
-
- logging.info( "******** Messing around with Queries ********" )
-
- _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
- [QmfQuery.AND,
- [QmfQuery.EQ, "vendor", [QmfQuery.QUOTE, "AVendor"]],
- [QmfQuery.EQ, [QmfQuery.QUOTE, "SomeProduct"], "product"],
- [QmfQuery.EQ, [QmfQuery.UNQUOTE, "name"], [QmfQuery.QUOTE, "Thingy"]],
- [QmfQuery.OR,
- [QmfQuery.LE, "temperature", -10],
- [QmfQuery.FALSE],
- [QmfQuery.EXISTS, "namey"]]])
-
- print("_q1.mapEncode() = [%s]" % _q1.map_encode())