summaryrefslogtreecommitdiff
path: root/qpid/python/qmf2/agent.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-14 14:59:24 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-14 14:59:24 +0000
commitb57cbea2e80030da1ec46b74ed5c09a7329299c9 (patch)
tree01c130d31fc22f2975dcfd7ec40db349f7d98a09 /qpid/python/qmf2/agent.py
parent02336adf2b3ce963fcd6db9ecb1cb6397ed2fc47 (diff)
downloadqpid-python-b57cbea2e80030da1ec46b74ed5c09a7329299c9.tar.gz
moved qpid-* tools out of qpid/python into qpid/tools; moved qmf library into extras/qmf
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@910016 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/qmf2/agent.py')
-rw-r--r--qpid/python/qmf2/agent.py961
1 files changed, 0 insertions, 961 deletions
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py
deleted file mode 100644
index a6b3c39ad1..0000000000
--- a/qpid/python/qmf2/agent.py
+++ /dev/null
@@ -1,961 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import sys
-import logging
-import datetime
-import time
-import Queue
-from threading import Thread, Lock, currentThread, Event
-from qpid.messaging import Connection, Message, Empty, SendError
-from uuid import uuid4
-from common import (make_subject, parse_subject, OpCode, QmfQuery,
- SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod,
- timedelta_to_secs)
-
-# global flag that indicates which thread (if any) is
-# running the agent notifier callback
-_callback_thread=None
-
- ##==============================================================================
- ## METHOD CALL
- ##==============================================================================
-
-class _MethodCallHandle(object):
- """
- Private class used to hold context when handing off a method call to the
- application. Given to the app in a WorkItem, provided to the agent when
- method_response() is invoked.
- """
- def __init__(self, correlation_id, reply_to, meth_name, _oid=None,
- _schema_id=None):
- self.correlation_id = correlation_id
- self.reply_to = reply_to
- self.meth_name = meth_name
- self.oid = _oid
- self.schema_id = _schema_id
-
-class MethodCallParams(object):
- """
- """
- def __init__(self, name, _oid=None, _schema_id=None, _in_args=None,
- _user_id=None):
- self._meth_name = name
- self._oid = _oid
- self._schema_id = _schema_id
- self._in_args = _in_args
- self._user_id = _user_id
-
- def get_name(self):
- return self._meth_name
-
- def get_object_id(self):
- return self._oid
-
- def get_schema_id(self):
- return self._schema_id
-
- def get_args(self):
- return self._in_args
-
- def get_user_id(self):
- return self._user_id
-
-
-
- ##==============================================================================
- ## AGENT
- ##==============================================================================
-
-class Agent(Thread):
- def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30,
- _max_msg_size=0, _capacity=10):
- Thread.__init__(self)
- self._running = False
- self._ready = Event()
-
- self.name = str(name)
- self._domain = _domain
- self._address = QmfAddress.direct(self.name, self._domain)
- self._notifier = _notifier
- self._heartbeat_interval = _heartbeat_interval
- # @todo: currently, max # of objects in a single reply message, would
- # be better if it were max bytesize of per-msg content...
- self._max_msg_size = _max_msg_size
- self._capacity = _capacity
-
- self._conn = None
- self._session = None
- self._direct_receiver = None
- self._topic_receiver = None
- self._direct_sender = None
- self._topic_sender = None
-
- self._lock = Lock()
- self._packages = {}
- self._schema_timestamp = long(0)
- self._schema = {}
- # _described_data holds QmfData objects that are associated with schema
- # it is index by schema_id, object_id
- self._described_data = {}
- # _undescribed_data holds unstructured QmfData objects - these objects
- # have no schema. it is indexed by object_id only.
- self._undescribed_data = {}
- self._work_q = Queue.Queue()
- self._work_q_put = False
-
-
- def destroy(self, timeout=None):
- """
- Must be called before the Agent is deleted.
- Frees up all resources and shuts down all background threads.
-
- @type timeout: float
- @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
- """
- logging.debug("Destroying Agent %s" % self.name)
- if self._conn:
- self.remove_connection(timeout)
- logging.debug("Agent Destroyed")
-
-
- def get_name(self):
- return self.name
-
- def set_connection(self, conn):
- self._conn = conn
- self._session = self._conn.session()
-
- # for messages directly addressed to me
- self._direct_receiver = self._session.receiver(str(self._address) +
- ";{create:always,"
- " node-properties:"
- " {type:topic,"
- " x-properties:"
- " {type:direct}}}",
- capacity=self._capacity)
- logging.debug("my direct addr=%s" % self._direct_receiver.source)
-
- # for sending directly addressed messages.
- self._direct_sender = self._session.sender(str(self._address.get_node()) +
- ";{create:always,"
- " node-properties:"
- " {type:topic,"
- " x-properties:"
- " {type:direct}}}")
- logging.debug("my default direct send addr=%s" % self._direct_sender.target)
-
- # for receiving "broadcast" messages from consoles
- default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#",
- self._domain)
- self._topic_receiver = self._session.receiver(str(default_addr) +
- ";{create:always,"
- " node-properties:"
- " {type:topic}}",
- capacity=self._capacity)
- logging.debug("console.ind addr=%s" % self._topic_receiver.source)
-
- # for sending to topic subscribers
- ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND,
- self._domain)
- self._topic_sender = self._session.sender(str(ind_addr) +
- ";{create:always,"
- " node-properties:"
- " {type:topic}}")
- logging.debug("agent.ind addr=%s" % self._topic_sender.target)
-
- self._running = True
- self.start()
- self._ready.wait(10)
- if not self._ready.isSet():
- raise Exception("Agent managment thread failed to start.")
-
- def remove_connection(self, timeout=None):
- # tell connection thread to shutdown
- self._running = False
- if self.isAlive():
- # kick my thread to wake it up
- try:
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
- subject=self.name,
- content={"noop":"noop"})
-
- # TRACE
- #logging.error("!!! sending wakeup to myself: %s" % msg)
- self._direct_sender.send( msg, sync=True )
- except SendError, e:
- logging.error(str(e))
- logging.debug("waiting for agent receiver thread to exit")
- self.join(timeout)
- if self.isAlive():
- logging.error( "Agent thread '%s' is hung..." % self.name)
- self._direct_receiver.close()
- self._direct_receiver = None
- self._direct_sender.close()
- self._direct_sender = None
- self._topic_receiver.close()
- self._topic_receiver = None
- self._topic_sender.close()
- self._topic_sender = None
- self._session.close()
- self._session = None
- self._conn = None
- logging.debug("agent connection removal complete")
-
- def register_object_class(self, schema):
- """
- Register an instance of a SchemaClass with this agent
- """
- # @todo: need to update subscriptions
- # @todo: need to mark schema as "non-const"
- if not isinstance(schema, SchemaClass):
- raise TypeError("SchemaClass instance expected")
-
- classId = schema.get_class_id()
- pname = classId.get_package_name()
- cname = classId.get_class_name()
- hstr = classId.get_hash_string()
- if not hstr:
- raise Exception("Schema hash is not set.")
-
- self._lock.acquire()
- try:
- if pname not in self._packages:
- self._packages[pname] = [cname]
- else:
- if cname not in self._packages[pname]:
- self._packages[pname].append(cname)
- self._schema[classId] = schema
- self._schema_timestamp = long(time.time() * 1000)
- finally:
- self._lock.release()
-
- def register_event_class(self, schema):
- return self.register_object_class(schema)
-
- def raise_event(self, qmfEvent):
- """
- TBD
- """
- if not self._topic_sender:
- raise Exception("No connection available")
-
- # @todo: should we validate against the schema?
- _map = {"_name": self.get_name(),
- "_event": qmfEvent.map_encode()}
- msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
- qmfEvent.get_severity() + "." + self.name,
- properties={"method":"response",
- "qmf.subject":make_subject(OpCode.event_ind)},
- content={MsgKey.event:_map})
- # TRACE
- # logging.error("!!! Agent %s sending Event (%s)" %
- # (self.name, str(msg)))
- self._topic_sender.send(msg)
-
- def add_object(self, data):
- """
- Register an instance of a QmfAgentData object.
- """
- # @todo: need to update subscriptions
- # @todo: need to mark schema as "non-const"
- if not isinstance(data, QmfAgentData):
- raise TypeError("QmfAgentData instance expected")
-
- oid = data.get_object_id()
- if not oid:
- raise TypeError("No identifier assigned to QmfAgentData!")
-
- sid = data.get_schema_class_id()
-
- self._lock.acquire()
- try:
- if sid:
- if sid not in self._described_data:
- self._described_data[sid] = {oid: data}
- else:
- self._described_data[sid][oid] = data
- else:
- self._undescribed_data[oid] = data
- finally:
- self._lock.release()
-
- def get_object(self, oid, schema_id):
- data = None
- self._lock.acquire()
- try:
- if schema_id:
- data = self._described_data.get(schema_id)
- if data:
- data = data.get(oid)
- else:
- data = self._undescribed_data.get(oid)
- finally:
- self._lock.release()
- return data
-
-
- def method_response(self, handle, _out_args=None, _error=None):
- """
- """
- if not isinstance(handle, _MethodCallHandle):
- raise TypeError("Invalid handle passed to method_response!")
-
- _map = {SchemaMethod.KEY_NAME:handle.meth_name}
- if handle.oid is not None:
- _map[QmfData.KEY_OBJECT_ID] = handle.oid
- if handle.schema_id is not None:
- _map[QmfData.KEY_SCHEMA_ID] = handle.schema_id.map_encode()
- if _out_args is not None:
- _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
- if _error is not None:
- if not isinstance(_error, QmfData):
- raise TypeError("Invalid type for error - must be QmfData")
- _map[SchemaMethod.KEY_ERROR] = _error.map_encode()
-
- msg = Message( properties={"method":"response",
- "qmf.subject":make_subject(OpCode.response)},
- content={MsgKey.method:_map})
- msg.correlation_id = handle.correlation_id
-
- self._send_reply(msg, handle.reply_to)
-
- def get_workitem_count(self):
- """
- Returns the count of pending WorkItems that can be retrieved.
- """
- return self._work_q.qsize()
-
- def get_next_workitem(self, timeout=None):
- """
- Obtains the next pending work item, or None if none available.
- """
- try:
- wi = self._work_q.get(True, timeout)
- except Queue.Empty:
- return None
- return wi
-
- def release_workitem(self, wi):
- """
- Releases a WorkItem instance obtained by getNextWorkItem(). Called when
- the application has finished processing the WorkItem.
- """
- pass
-
-
- def run(self):
- global _callback_thread
- next_heartbeat = datetime.datetime.utcnow()
- batch_limit = 10 # a guess
-
- self._ready.set()
-
- while self._running:
-
- now = datetime.datetime.utcnow()
- # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
- if now >= next_heartbeat:
- ind = self._makeAgentIndMsg()
- ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT
- # TRACE
- #logging.error("!!! Agent %s sending Heartbeat (%s)" %
- # (self.name, str(ind)))
- self._topic_sender.send(ind)
- logging.debug("Agent Indication Sent")
- next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
-
- timeout = timedelta_to_secs(next_heartbeat - now)
- # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
- try:
- self._session.next_receiver(timeout=timeout)
- except Empty:
- continue
-
- for i in range(batch_limit):
- try:
- msg = self._topic_receiver.fetch(timeout=0)
- except Empty:
- break
- # TRACE
- # logging.error("!!! Agent %s: msg on %s [%s]" %
- # (self.name, self._topic_receiver.source, msg))
- self._dispatch(msg, _direct=False)
-
- for i in range(batch_limit):
- try:
- msg = self._direct_receiver.fetch(timeout=0)
- except Empty:
- break
- # TRACE
- # logging.error("!!! Agent %s: msg on %s [%s]" %
- # (self.name, self._direct_receiver.source, msg))
- self._dispatch(msg, _direct=True)
-
- if self._work_q_put and self._notifier:
- # new stuff on work queue, kick the the application...
- self._work_q_put = False
- _callback_thread = currentThread()
- logging.info("Calling agent notifier.indication")
- self._notifier.indication()
- _callback_thread = None
-
- #
- # Private:
- #
-
- def _makeAgentIndMsg(self):
- """
- Create an agent indication message identifying this agent
- """
- _map = {"_name": self.get_name(),
- "_schema_timestamp": self._schema_timestamp}
- return Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.agent_ind)},
- content={MsgKey.agent_info: _map})
-
- def _send_reply(self, msg, reply_to):
- """
- Send a reply message to the given reply_to address
- """
- if not isinstance(reply_to, QmfAddress):
- try:
- reply_to = QmfAddress.from_string(str(reply_to))
- except ValueError:
- logging.error("Invalid reply-to address '%s'" % reply_to)
-
- msg.subject = reply_to.get_subject()
-
- try:
- if reply_to.is_direct():
- # TRACE
- #logging.error("!!! Agent %s direct REPLY-To:%s (%s)" %
- # (self.name, str(reply_to), str(msg)))
- self._direct_sender.send(msg)
- else:
- # TRACE
- # logging.error("!!! Agent %s topic REPLY-To:%s (%s)" %
- # (self.name, str(reply_to), str(msg)))
- self._topic_sender.send(msg)
- logging.debug("reply msg sent to [%s]" % str(reply_to))
- except SendError, e:
- logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
-
- def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
- """
- Send a response to a query, breaking the result into multiple
- messages based on the agent's _max_msg_size config parameter
- """
-
- total = len(objects)
- if self._max_msg_size:
- max_count = self._max_msg_size
- else:
- max_count = total
-
- start = 0
- end = min(total, max_count)
- while end <= total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
- correlation_id = cid,
- content={msgkey:objects[start:end]})
- self._send_reply(m, reply_to)
- if end == total:
- break;
- start = end
- end = min(total, end + max_count)
-
- # response terminator - last message has empty object array
- if total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
- correlation_id = cid,
- content={msgkey: []} )
- self._send_reply(m, reply_to)
-
- def _dispatch(self, msg, _direct=False):
- """
- Process a message from a console.
-
- @param _direct: True if msg directly addressed to this agent.
- """
- logging.debug( "Message received from Console! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- except:
- logging.warning("Ignoring unrecognized message '%s'" % msg.subject)
- return
-
- cmap = {}; props={}
- if msg.content_type == "amqp/map":
- cmap = msg.content
- if msg.properties:
- props = msg.properties
-
- if opcode == OpCode.agent_locate:
- self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
- elif opcode == OpCode.get_query:
- self._handleQueryMsg( msg, cmap, props, version, _direct )
- elif opcode == OpCode.method_req:
- self._handleMethodReqMsg(msg, cmap, props, version, _direct)
- elif opcode == OpCode.cancel_subscription:
- logging.warning("!!! CANCEL_SUB TBD !!!")
- elif opcode == OpCode.create_subscription:
- logging.warning("!!! CREATE_SUB TBD !!!")
- elif opcode == OpCode.renew_subscription:
- logging.warning("!!! RENEW_SUB TBD !!!")
- elif opcode == OpCode.schema_query:
- logging.warning("!!! SCHEMA_QUERY TBD !!!")
- elif opcode == OpCode.noop:
- logging.debug("No-op msg received.")
- else:
- logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
- % opcode)
-
- def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ):
- """
- Process a received agent-locate message
- """
- logging.debug("_handleAgentLocateMsg")
-
- reply = True
- if "method" in props and props["method"] == "request":
- query = cmap.get(MsgKey.query)
- if query is not None:
- # fake a QmfData containing my identifier for the query compare
- tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
- self.get_name()},
- _object_id="my-name")
- reply = QmfQuery.from_map(query).evaluate(tmpData)
-
- if reply:
- m = self._makeAgentIndMsg()
- m.correlation_id = msg.correlation_id
- self._send_reply(m, msg.reply_to)
- else:
- logging.debug("agent-locate msg not mine - no reply sent")
-
-
- def _handleQueryMsg(self, msg, cmap, props, version, _direct ):
- """
- Handle received query message
- """
- logging.debug("_handleQueryMsg")
-
- if "method" in props and props["method"] == "request":
- qmap = cmap.get(MsgKey.query)
- if qmap:
- query = QmfQuery.from_map(qmap)
- target = query.get_target()
- if target == QmfQuery.TARGET_PACKAGES:
- self._queryPackages( msg, query )
- elif target == QmfQuery.TARGET_SCHEMA_ID:
- self._querySchema( msg, query, _idOnly=True )
- elif target == QmfQuery.TARGET_SCHEMA:
- self._querySchema( msg, query)
- elif target == QmfQuery.TARGET_AGENT:
- logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
- elif target == QmfQuery.TARGET_OBJECT_ID:
- self._queryData(msg, query, _idOnly=True)
- elif target == QmfQuery.TARGET_OBJECT:
- self._queryData(msg, query)
- else:
- logging.warning("Unrecognized query target: '%s'" % str(target))
-
-
-
- def _handleMethodReqMsg(self, msg, cmap, props, version, _direct):
- """
- Process received Method Request
- """
- if "method" in props and props["method"] == "request":
- mname = cmap.get(SchemaMethod.KEY_NAME)
- if not mname:
- logging.warning("Invalid method call from '%s': no name"
- % msg.reply_to)
- return
-
- in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
- oid = cmap.get(QmfData.KEY_OBJECT_ID)
- schema_id = cmap.get(QmfData.KEY_SCHEMA_ID)
- if schema_id:
- schema_id = SchemaClassId.from_map(schema_id)
- handle = _MethodCallHandle(msg.correlation_id,
- msg.reply_to,
- mname,
- oid, schema_id)
- param = MethodCallParams( mname, oid, schema_id, in_args,
- msg.user_id)
-
- # @todo: validate the method against the schema:
- # if self._schema:
- # # validate
- # _in_args = _in_args.copy()
- # ms = self._schema.get_method(name)
- # if ms is None:
- # raise ValueError("Method '%s' is undefined." % name)
-
- # for aname,prop in ms.get_arguments().iteritems():
- # if aname not in _in_args:
- # if prop.get_default():
- # _in_args[aname] = prop.get_default()
- # elif not prop.is_optional():
- # raise ValueError("Method '%s' requires argument '%s'"
- # % (name, aname))
- # for aname in _in_args.iterkeys():
- # prop = ms.get_argument(aname)
- # if prop is None:
- # raise ValueError("Method '%s' does not define argument"
- # " '%s'" % (name, aname))
- # if "I" not in prop.get_direction():
- # raise ValueError("Method '%s' argument '%s' is not an"
- # " input." % (name, aname))
-
- # # @todo check if value is correct (type, range, etc)
-
- self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
- self._work_q_put = True
-
- def _queryPackages(self, msg, query):
- """
- Run a query against the list of known packages
- """
- pnames = []
- self._lock.acquire()
- try:
- for name in self._packages.iterkeys():
- qmfData = QmfData.create({SchemaClassId.KEY_PACKAGE:name},
- _object_id="_package")
- if query.evaluate(qmfData):
- pnames.append(name)
- finally:
- self._lock.release()
-
- self._send_query_response(make_subject(OpCode.data_ind),
- MsgKey.package_info,
- msg.correlation_id,
- msg.reply_to,
- pnames)
-
- def _querySchema( self, msg, query, _idOnly=False ):
- """
- """
- schemas = []
- # if querying for a specific schema, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- found = None
- self._lock.acquire()
- try:
- found = self._schema.get(query.get_id())
- finally:
- self._lock.release()
- if found:
- if _idOnly:
- schemas.append(query.get_id().map_encode())
- else:
- schemas.append(found.map_encode())
- else: # otherwise, evaluate all schema
- self._lock.acquire()
- try:
- for sid,val in self._schema.iteritems():
- if query.evaluate(val):
- if _idOnly:
- schemas.append(sid.map_encode())
- else:
- schemas.append(val.map_encode())
- finally:
- self._lock.release()
-
- if _idOnly:
- msgkey = MsgKey.schema_id
- else:
- msgkey = MsgKey.schema
-
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- schemas)
-
-
- def _queryData( self, msg, query, _idOnly=False ):
- """
- """
- data_objs = []
- # extract optional schema_id from target params
- sid = None
- t_params = query.get_target_param()
- if t_params:
- sid = t_params.get(QmfData.KEY_SCHEMA_ID)
- # if querying for a specific object, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- oid = query.get_id()
- found = None
- self._lock.acquire()
- try:
- if sid and not sid.get_hash_string():
- # wildcard schema_id match, check each schema
- for name,db in self._described_data.iteritems():
- if (name.get_class_name() == sid.get_class_name()
- and name.get_package_name() == sid.get_package_name()):
- found = db.get(oid)
- if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
- else:
- if sid:
- db = self._described_data.get(sid)
- if db:
- found = db.get(oid)
- else:
- found = self._undescribed_data.get(oid)
- if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
- finally:
- self._lock.release()
- else: # otherwise, evaluate all data
- self._lock.acquire()
- try:
- if sid and not sid.get_hash_string():
- # wildcard schema_id match, check each schema
- for name,db in self._described_data.iteritems():
- if (name.get_class_name() == sid.get_class_name()
- and name.get_package_name() == sid.get_package_name()):
- for oid,data in db.iteritems():
- if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
- else:
- if sid:
- db = self._described_data.get(sid)
- else:
- db = self._undescribed_data
-
- if db:
- for oid,data in db.iteritems():
- if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
- finally:
- self._lock.release()
-
- if _idOnly:
- msgkey = MsgKey.object_id
- else:
- msgkey = MsgKey.data_obj
-
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- data_objs)
-
-
-
- ##==============================================================================
- ## DATA MODEL
- ##==============================================================================
-
-
-class QmfAgentData(QmfData):
- """
- A managed data object that is owned by an agent.
- """
-
- def __init__(self, agent, _values={}, _subtypes={}, _tag=None,
- _object_id=None, _schema=None):
- schema_id = None
- if _schema:
- schema_id = _schema.get_class_id()
-
- if _object_id is None:
- if not isinstance(_schema, SchemaObjectClass):
- raise Exception("An object_id must be provided if the object"
- "doesn't have an associated schema.")
- ids = _schema.get_id_names()
- if not ids:
- raise Exception("Object must have an Id or a schema that"
- " provides an Id")
- _object_id = u""
- for key in ids:
- value = _values.get(key)
- if value is None:
- raise Exception("Object must have a value for key"
- " attribute '%s'" % str(key))
- try:
- _object_id += unicode(value)
- except:
- raise Exception("Cannot create object_id from key"
- " value '%s'" % str(value))
-
- # timestamp in millisec since epoch UTC
- ctime = long(time.time() * 1000)
- super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
- _tag=_tag, _ctime=ctime,
- _utime=ctime, _object_id=_object_id,
- _schema_id=schema_id, _const=False)
- self._agent = agent
- self._validated = False
-
- def destroy(self):
- self._dtime = long(time.time() * 1000)
- # @todo: publish change
-
- def is_deleted(self):
- return self._dtime == 0
-
- def set_value(self, _name, _value, _subType=None):
- super(QmfAgentData, self).set_value(_name, _value, _subType)
- # @todo: publish change
-
- def inc_value(self, name, delta=1):
- """ add the delta to the property """
- # @todo: need to take write-lock
- val = self.get_value(name)
- try:
- val += delta
- except:
- raise
- self.set_value(name, val)
-
- def dec_value(self, name, delta=1):
- """ subtract the delta from the property """
- # @todo: need to take write-lock
- logging.error(" TBD!!!")
-
- def validate(self):
- """
- Compares this object's data against the associated schema. Throws an
- exception if the data does not conform to the schema.
- """
- props = self._schema.get_properties()
- for name,val in props.iteritems():
- # @todo validate: type compatible with amqp_type?
- # @todo validate: primary keys have values
- if name not in self._values:
- if val._isOptional:
- # ok not to be present, put in dummy value
- # to simplify access
- self._values[name] = None
- else:
- raise Exception("Required property '%s' not present." % name)
- self._validated = True
-
-
-
-################################################################################
-################################################################################
-################################################################################
-################################################################################
-
-if __name__ == '__main__':
- # static test cases - no message passing, just exercise API
- from common import (AgentName, SchemaProperty, qmfTypes, SchemaEventClass)
-
- logging.getLogger().setLevel(logging.INFO)
-
- logging.info( "Create an Agent" )
- _agent_name = AgentName("redhat.com", "agent", "tross")
- _agent = Agent(str(_agent_name))
-
- logging.info( "Get agent name: '%s'" % _agent.get_name())
-
- logging.info( "Create SchemaObjectClass" )
-
- _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"),
- _desc="A test data schema",
- _object_id_names=["index1", "index2"])
- # add properties
- _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8))
- _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- # these two properties are statistics
- _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
- _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
- # These two properties can be set via the method call
- _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
-
- # add method
- _meth = SchemaMethod(_desc="Method to set string and int in object." )
- _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
- _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
- _schema.add_method( "set_meth", _meth )
-
- # Add schema to Agent
-
- print("Schema Map='%s'" % str(_schema.map_encode()))
-
- _agent.register_object_class(_schema)
-
- # instantiate managed data objects matching the schema
-
- logging.info( "Create QmfAgentData" )
-
- _obj = QmfAgentData( _agent, _schema=_schema )
- _obj.set_value("index1", 100)
- _obj.set_value("index2", "a name" )
- _obj.set_value("set_string", "UNSET")
- _obj.set_value("set_int", 0)
- _obj.set_value("query_count", 0)
- _obj.set_value("method_call_count", 0)
-
- print("Obj1 Map='%s'" % str(_obj.map_encode()))
-
- _agent.add_object( _obj )
-
- _obj = QmfAgentData( _agent,
- _values={"index1":99,
- "index2": "another name",
- "set_string": "UNSET",
- "set_int": 0,
- "query_count": 0,
- "method_call_count": 0},
- _schema=_schema)
-
- print("Obj2 Map='%s'" % str(_obj.map_encode()))
-
- _agent.add_object(_obj)
-
- ##############
-
-
-
- logging.info( "Create SchemaEventClass" )
-
- _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent",
- stype=SchemaClassId.TYPE_EVENT),
- _desc="A test data schema",
- _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)})
- _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR))
-
- print("Event Map='%s'" % str(_event.map_encode()))
-
- _agent.register_event_class(_event)