diff options
author | Ted Ross <tross@apache.org> | 2009-12-14 15:28:34 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-12-14 15:28:34 +0000 |
commit | b9ed41f57178248064be233ea42887e2e9eed497 (patch) | |
tree | 47ad0afbd2932e6e2fbcb0d45fd9d1a7a6f1261a | |
parent | ac3d1986b5673b3de060ec781d206e57b1edd53a (diff) | |
download | qpid-python-b9ed41f57178248064be233ea42887e2e9eed497.tar.gz |
QPID-2261 - Branch patch from Ken Giusti
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@890369 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qmf/qmfAgent.py | 127 | ||||
-rw-r--r-- | qpid/python/qmf/qmfCommon.py | 27 | ||||
-rw-r--r-- | qpid/python/qmf/qmfConsole.py | 362 |
3 files changed, 427 insertions, 89 deletions
diff --git a/qpid/python/qmf/qmfAgent.py b/qpid/python/qmf/qmfAgent.py index 59731a234b..427d3e549d 100644 --- a/qpid/python/qmf/qmfAgent.py +++ b/qpid/python/qmf/qmfAgent.py @@ -25,7 +25,8 @@ from threading import Thread from qpid.messaging import Connection, Message from uuid import uuid4 from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, - AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged) + AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject, + parseSubject, OpCode) @@ -59,53 +60,52 @@ class Agent(Thread): self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address) self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION) self._running = True - self._start() + self.start() - def _dispatch(self, msg): - if msg.subject != "qmf4": - logging.debug("Ignoring non-qmf message '%s'" % msg.subject) + def _dispatch(self, msg, _direct=False): + """ + @param _direct: True if msg directly addressed to this agent. + """ + try: + version,opcode = parseSubject(msg.subject) + except: + logging.debug("Ignoring unrecognized message '%s'" % msg.subject) return - cmap = {} + cmap = {}; props={} if msg.content_type == "amqp/map": cmap = msg.content + if msg.properties: + props = msg.properties - if (not msg.properties or - not "method" in msg.properties or - not "opcode" in msg.properties): - logging.error("INVALID MESSAGE PROPERTIES: '%s'" % str(msg.properties)) - return - - if msg.properties["method"] == "request": - if msg.properties["opcode"] == "agent-locate": + if opcode == OpCode.agent_locate: + reply = False + if "method" in props and props["method"] == "request": if "query" in cmap: - query = cmap["query"] - if ("vendor" in query and (query["vendor"] == "*" or query["vendor"] == self.vendor) and - "product" in query and (query["product"] == "*" or query["product"] == self.product) and - "name" in query and (query["name"] == "*" or query["name"] == self.name)): - logging.debug("Query received for %s:%s:%s" % (self.vendor, self.product, self.name)) - logging.debug("reply-to [%s], cid=%s" % (msg.reply_to, msg.correlation_id)) - try: - tmp_snd = self.session.sender( msg.reply_to ) - m = Message( subject="qmf4", - properties={"method":"response", - "opcode":"agent"}, - content={"name": {"vendor":"redhat.com", - "product":"agent", - "name":"tross"}}, - correlation_id=msg.correlation_id) - tmp_snd.send(m) - logging.debug("reply-to [%s] sent" % msg.reply_to) - except e: - logging.error("Failed to send reply to msg '%s'" % str(e)) - + if self._doQuery(cmap["query"]): + reply=True + else: + reply=True + + if reply: + try: + tmp_snd = self._session.sender( msg.reply_to ) + m = Message( subject=makeSubject(OpCode.agent_locate), + properties={"method":"response"}, + content={"name": {"vendor":"redhat.com", + "product":"agent", + "name":"tross"}}, + correlation_id=msg.correlation_id) + tmp_snd.send(m) + logging.debug("reply-to [%s] sent" % msg.reply_to) + except e: + logging.error("Failed to send reply to msg '%s'" % str(e)) else: - logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" - % msg.properties["opcode"]) + logging.debug("Ignoring invalid agent-locate msg") else: - logging.warning("Ignoring message with unrecognized 'method' value: '%s'" - % msg.properties["method"] ) + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + % opcode) @@ -114,8 +114,9 @@ class Agent(Thread): while self._running: try: msg = self._locate_receiver.fetch(1) + logging.info("Agent Locate Rcvd: '%s'" % msg) if msg.content_type == "amqp/map": - self._dispatch(msg) + self._dispatch(msg, _direct=False) except KeyboardInterrupt: break except: @@ -123,8 +124,9 @@ class Agent(Thread): try: msg = self._direct_receiver.fetch(1) + logging.info("Agent Msg Rcvd: '%s'" % msg) if msg.content_type == "amqp/map": - self._dispatch(msg) + self._dispatch(msg, _direct=True) except KeyboardInterrupt: break except: @@ -133,13 +135,12 @@ class Agent(Thread): count+= 1 if count == 5: count = 0 - m = Message( subject="qmf4", - properties={"method":"indication", - "opcode":"agent"}, + m = Message( subject=makeSubject(OpCode.agent_ind), + properties={"method":"indication"}, content={"name": {"vendor":"redhat.com", "product":"agent", "name":"tross"}} ) - self.ind_sender.send(m) + self._ind_sender.send(m) logging.info("Agent Indication Sent") @@ -177,7 +178,15 @@ class Agent(Thread): """ logging.error("!!!Agent.releaseWorkItem() TBD!!!") - + def _doQuery(self, query): + # query = cmap["query"] + # if ("vendor" in query and (query["vendor"] == "*" or query["vendor"] == self.vendor) and + # "product" in query and (query["product"] == "*" or query["product"] == self.product) and + # "name" in query and (query["name"] == "*" or query["name"] == self.name)): + # logging.debug("Query received for %s:%s:%s" % (self.vendor, self.product, self.name)) + # logging.debug("reply-to [%s], cid=%s" % (msg.reply_to, msg.correlation_id)) + logging.error("!!!Agent._doQuery() TBD!!!") + return True ##============================================================================== @@ -209,3 +218,31 @@ class QmfAgentData(QmfManaged): def setProperty( self, _name, _value): super(QmfAgentData, self).setProperty(_name, _value) # @todo: publish change + + + +################################################################################ +################################################################################ +################################################################################ +################################################################################ + +if __name__ == '__main__': + import time + #logging.getLogger().setLevel(logging.INFO) + logging.info( "Starting Connection" ) + _c = Connection("localhost") + _c.connect() + #c.start() + + logging.info( "Starting Agent" ) + _agent = Agent("redhat.com", "agent", "tross") + _agent.setConnection(_c) + + logging.info( "Running Agent" ) + + while True: + try: + time.sleep(10) + except KeyboardInterrupt: + break + diff --git a/qpid/python/qmf/qmfCommon.py b/qpid/python/qmf/qmfCommon.py index 66162a18bb..cfdc87806b 100644 --- a/qpid/python/qmf/qmfCommon.py +++ b/qpid/python/qmf/qmfCommon.py @@ -43,6 +43,33 @@ AMQP_QMF_NAME_SEPARATOR = "/" AMQP_QMF_AGENT_LOCATE = "amq.topic/agent.locate" AMQP_QMF_AGENT_INDICATION = "amq.topic/agent.ind" +AMQP_QMF_SUBJECT = "qmf" +AMQP_QMF_VERSION = 4 +AMQP_QMF_SUBJECT_FMT = "%s%d.%s" + +class OpCode(object): + agent_locate = "agent-locate" + agent_ind = "agent" + noop = "noop" + +def makeSubject(_code): + """ + Create a message subject field value. + """ + return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code) + + +def parseSubject(_sub): + """ + Deconstruct a subject field, return version,opcode values + """ + if _sub[:3] != "qmf": + raise Exception("Non-QMF message received") + + return _sub[3:].split('.', 1) + + + ##============================================================================== ## Agent Identification diff --git a/qpid/python/qmf/qmfConsole.py b/qpid/python/qmf/qmfConsole.py index ec22bbbb50..24f263adab 100644 --- a/qpid/python/qmf/qmfConsole.py +++ b/qpid/python/qmf/qmfConsole.py @@ -30,8 +30,7 @@ from threading import Condition from qpid.messaging import * from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION, - AMQP_QMF_AGENT_LOCATE) -from qmfCommon import AgentId + AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode) @@ -492,10 +491,7 @@ class Console(Thread): logging.debug("Making temp sender for [%s]" % self._address) tmp_sender = self._session.sender(self._address) try: - msg = Message(subject="qmf4", - properties={"method":"request", - "opcode":"console-ping"}, - content={"data":"ignore"}) + msg = Message(subject=makeSubject(OpCode.noop)) tmp_sender.send( msg, sync=True ) except SendError, e: logging.error(str(e)) @@ -572,9 +568,8 @@ class Console(Thread): self._agent_map_lock.release() new_agent = self.create_agent(agent_name) - msg = Message(subject="qmf4", - properties={"method":"request", - "opcode":"agent-locate"}, + msg = Message(subject=makeSubject(OpCode.agent_locate), + properties={"method":"request"}, content={"query": {"vendor" : agent_name.vendor(), "product" : agent_name.product(), "name" : agent_name.name()}}) @@ -650,45 +645,37 @@ class Console(Thread): PRIVATE: Process a message received on the announce receiver """ logging.info( "Announce message received!" ) - if msg.subject != "qmf4": - logging.debug("Ignoring non-qmf message '%s'" % msg.subject) + try: + version,opcode = parseSubject(msg.subject) + except: + logging.debug("Ignoring unrecognized broadcast message '%s'" % msg.subject) return - amap = {} + amap = {}; props = {} if msg.content_type == "amqp/map": amap = msg.content + if msg.properties: + props = msg.properties - if (not msg.properties or - not "method" in msg.properties or - not "opcode" in msg.properties): - logging.error("INVALID MESSAGE PROPERTIES: '%s'" % str(msg.properties)) - return - - if msg.properties["method"] == "indication": + if opcode == OpCode.agent_ind: # agent indication - if msg.properties["opcode"] == "agent": - if "name" in amap: - if self._agent_discovery: - ind = amap["name"] - if "vendor" in ind and "product" in ind and "name" in ind: - - agent = self.create_agent(AgentId( ind["vendor"], - ind["product"], - ind["name"] )) - if not agent._exists: - # new agent - agent._exists = True - logging.info("AGENT_ADDED for %s" % agent) - wi = WorkItem(WorkItem.AGENT_ADDED, - {"agent": agent}) - self._work_q.put(wi) - else: - logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" - % msg.properties["opcode"]) + if "name" in amap: + if self._agent_discovery: + ind = amap["name"] + if "vendor" in ind and "product" in ind and "name" in ind: + + agent = self.create_agent(AgentId( ind["vendor"], + ind["product"], + ind["name"] )) + if not agent._exists: + # new agent + agent._exists = True + logging.info("AGENT_ADDED for %s" % agent) + wi = WorkItem(WorkItem.AGENT_ADDED, + {"agent": agent}) + self._work_q.put(wi) else: - logging.warning("Ignoring message with unrecognized 'method' value: '%s'" - % msg.properties["method"] ) - + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) @@ -711,9 +698,8 @@ class Console(Thread): """ if not self._agent_discovery: self._agent_discovery = True - msg = Message(subject="qmf4", - properties={"method":"request", - "opcode":"agent-locate"}, + msg = Message(subject=makeSubject(OpCode.agent_locate), + properties={"method":"request"}, content={"query": {"vendor": "*", "product": "*", "name": "*"}}) @@ -1127,3 +1113,291 @@ class Console(Thread): +################################################################################ +################################################################################ +################################################################################ +################################################################################ +# TEMPORARY TEST CODE - TO BE DELETED +################################################################################ +################################################################################ +################################################################################ +################################################################################ + +if __name__ == '__main__': + # temp test code + import time + from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory, + SchemaObjectClassFactory, ObjectIdFactory, QmfData, QmfDescribed, + QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, + QmfEvent) + logging.getLogger().setLevel(logging.INFO) + + logging.info( "Starting Connection" ) + _c = Connection("localhost") + _c.connect() + #c.start() + + logging.info( "Starting Console" ) + _myConsole = Console() + _myConsole.add_connection( _c ) + + logging.info( "Finding Agent" ) + _myAgent = _myConsole.find_agent( AgentId( "redhat.com", "agent", "tross" ), 5 ) + + logging.info( "Agent Found: %s" % _myAgent ) + + logging.info( "Removing connection" ) + _myConsole.remove_connection( _c, 10 ) + + logging.info( "Destroying console:" ) + _myConsole.destroy( 10 ) + + logging.info( "************* Starting Async Console **************" ) + + class MyNotifier(Notifier): + def __init__(self, context): + self._myContext = context + self.WorkAvailable = False + + def console_indication(self): + print("Indication received! context=%d" % self._myContext) + self.WorkAvailable = True + + _noteMe = MyNotifier( 666 ) + + _myConsole = Console(notifier=_noteMe) + _myConsole.add_connection( _c ) + + _myConsole.enable_agent_discovery() + logging.info("Waiting...") + + + while not _noteMe.WorkAvailable: + try: + print("No work yet...sleeping!") + time.sleep(1) + except KeyboardInterrupt: + break + + + print("Work available = %d items!" % _myConsole.get_workitem_count()) + _wi = _myConsole.get_next_workitem(timeout=0) + while _wi: + print("work item %d:%s" % (_wi.getType(), str(_wi.getParams()))) + _wi = _myConsole.get_next_workitem(timeout=0) + + + logging.info( "Removing connection" ) + _myConsole.remove_connection( _c, 10 ) + + logging.info( "Destroying console:" ) + _myConsole.destroy( 10 ) + + logging.info( "******** Messing around with Schema ********" ) + + _sec = SchemaEventClassFactory( { "schema_id": # SchemaClassId map + {"package_name": "myPackage", + "class_name": "myClass", + "type": "event"}, + "desc": "A typical event schema", + "properties": {"Argument-1": + {"amqp_type": qmfTypes.TYPE_UINT8, + "min": 0, + "max": 100, + "unit": "seconds", + "desc": "sleep value"}, + "Argument-2": + {"amqp_type": qmfTypes.TYPE_LSTR, + "maxlen": 100, + "desc": "a string argument"}}} ) + print("_sec=%s" % _sec.getClassId()) + print("_sec.gePropertyCount()=%d" % _sec.getPropertyCount() ) + print("_sec.getProperty('Argument-1`)=%s" % _sec.getProperty('Argument-1') ) + print("_sec.getProperty('Argument-2`)=%s" % _sec.getProperty('Argument-2') ) + try: + print("_sec.getProperty('not-found')=%s" % _sec.getProperty('not-found') ) + except: + pass + print("_sec.getProperties()='%s'" % _sec.getProperties()) + + print("Adding another argument") + _arg3 = SchemaPropertyFactory( { "amqp_type": qmfTypes.TYPE_BOOL, + "dir": "IO", + "desc": "a boolean argument"} ) + _sec.addProperty('Argument-3', _arg3) + print("_sec=%s" % _sec.getClassId()) + print("_sec.getPropertyCount()=%d" % _sec.getPropertyCount() ) + print("_sec.getProperty('Argument-1')=%s" % _sec.getProperty('Argument-1') ) + print("_sec.getProperty('Argument-2')=%s" % _sec.getProperty('Argument-2') ) + print("_sec.getProperty('Argument-3')=%s" % _sec.getProperty('Argument-3') ) + + print("_arg3.mapEncode()='%s'" % _arg3.mapEncode() ) + + _secmap = _sec.mapEncode() + print("_sec.mapEncode()='%s'" % _secmap ) + + _sec2 = SchemaEventClassFactory( _secmap ) + + print("_sec=%s" % _sec.getClassId()) + print("_sec2=%s" % _sec2.getClassId()) + + + + + _soc = SchemaObjectClassFactory( {"schema_id": {"package_name": "myOtherPackage", + "class_name": "myOtherClass", + "type": "data"}, + "desc": "A test data object", + "properties": + {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RO", + "index": True, + "unit": "degrees"}, + "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RW", + "index": True, + "desc": "The Second Property(tm)", + "unit": "radians"}, + "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, + "unit": "seconds", + "desc": "time until I retire"}}, + "methods": + {"meth1": {"desc": "A test method", + "arguments": + {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an argument 1", + "dir": "I"}, + "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, + "dir": "IO", + "desc": "some weird boolean"}}}, + "meth2": {"desc": "A test method", + "arguments": + {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an 'nuther argument", + "dir": "I"}}}}, + "primary_key": ["prop2", "prop1"]}) + + print("_soc='%s'" % _soc) + + print("_soc.getPrimaryKeyList='%s'" % _soc.getPrimaryKeyList()) + + print("_soc.getPropertyCount='%d'" % _soc.getPropertyCount()) + print("_soc.getProperties='%s'" % _soc.getProperties()) + print("_soc.getProperty('prop2')='%s'" % _soc.getProperty('prop2')) + + print("_soc.getMethodCount='%d'" % _soc.getMethodCount()) + print("_soc.getMethods='%s'" % _soc.getMethods()) + print("_soc.getMethod('meth2')='%s'" % _soc.getMethod('meth2')) + + _socmap = _soc.mapEncode() + print("_socmap='%s'" % _socmap) + _soc2 = SchemaObjectClassFactory( _socmap ) + print("_soc='%s'" % _soc) + print("_soc2='%s'" % _soc2) + + if _soc2.getClassId() == _soc.getClassId(): + print("soc and soc2 are the same schema") + + + logging.info( "******** Messing around with ObjectIds ********" ) + + oid = ObjectIdFactory( {"agent_id": {"vendor": "redhat.com", + "product": "mgmt-tool", + "name": "myAgent1"}, + "primary_key": "key1:key2" }) + + print("oid = %s" % oid) + + oid2 = ObjectIdFactory( oid.mapEncode() ) + + print("oid2 = %s" % oid2) + + if oid == oid2: + print("oid1 == oid2") + else: + print("oid1 != oid2") + + hashme = {oid: "myoid"} + print("oid hash = %s" % hashme[oid2] ) + + + qd = QmfData( {"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) + print("qd='%s':" % qd) + + print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) + + print("qd map='%s'" % qd.mapEncode()) + print("qd getProperty('prop4')='%s'" % qd.getProperty("prop4")) + qd.setProperty("prop4", 4) + print("qd setProperty('prop4', 4)='%s'" % qd.getProperty("prop4")) + qd.prop4 = 9 + print("qd.prop4 = 9 ='%s'" % qd.prop4) + qd["prop4"] = 11 + print("qd[prop4] = 11 ='%s'" % qd["prop4"]) + + print("qd.mapEncode()='%s'" % qd.mapEncode()) + _qd2 = QmfDataFactory( qd.mapEncode() ) + print("_qd2.mapEncode()='%s'" % _qd2.mapEncode()) + + _qmfDesc1 = QmfDescribed( _schemaId = _soc.getClassId(), + _props = {"prop1": 1, "statistics": 666, "prop2": 0}) + + print("_qmfDesc1 map='%s'" % _qmfDesc1.mapEncode()) + + _qmfDesc1.setSchema( _soc ) + + print("_qmfDesc1 props{} = '%s'" % _qmfDesc1.getProperties()) + print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.getPrimaryKey()) + print("_qmfDesc1 classid = '%s'" % _qmfDesc1.getSchemaClassId()) + + + _qmfDescMap = _qmfDesc1.mapEncode() + print("_qmfDescMap='%s'" % _qmfDescMap) + + _qmfDesc2 = QmfDescribedFactory( _qmfDescMap, _schema=_soc ) + + print("_qmfDesc2 map='%s'" % _qmfDesc2.mapEncode()) + print("_qmfDesc2 props = '%s'" % _qmfDesc2.getProperties()) + print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.getPrimaryKey()) + + + _qmfMgd1 = QmfManaged( _agentId=AgentId("redhat.com", "anAgent", "tross"), + _schema = _soc, + _schemaId = _soc.getClassId(), + _props = {"prop1": 11, "prop2": 10, "statistics":999}) + + + print("_qmfMgd1 map='%s'" % _qmfMgd1.mapEncode()) + + print("_qmfMgd1.getObjectId()='%s'" % _qmfMgd1.getObjectId()) + print("_qmfMgd1 props = '%s'" % _qmfMgd1.getProperties()) + + _qmfMgd1Map = _qmfMgd1.mapEncode() + print("_qmfMgd1Map='%s'" % _qmfMgd1Map) + + _qmfMgd2 = QmfManagedFactory( param=_qmfMgd1.mapEncode(), _schema=_soc ) + + print("_qmfMgd2 map='%s'" % _qmfMgd2.mapEncode()) + print("_qmfMgd2 getObjectId() = '%s'" % _qmfMgd2.getObjectId()) + print("_qmfMgd2 props = '%s'" % _qmfMgd2.getProperties()) + + + logging.info( "******** Messing around with QmfEvents ********" ) + + + _qmfevent1 = QmfEvent( _timestamp = 1111, + _agentId = AgentId("redhat.com", "whizzbang2000", "ted"), + _schema = _sec, + _props = {"Argument-1": 77, + "Argument-3": True, + "Argument-2": "a string"}) + print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.mapEncode()) + print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.getTimestamp()) + print("_qmfevent1.getAgentId()='%s'" % _qmfevent1.getAgentId()) + + _qmfevent1Map = _qmfevent1.mapEncode() + + _qmfevent2 = QmfEvent(_map=_qmfevent1Map) + print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.mapEncode()) + + |