diff options
Diffstat (limited to 'python/qmf2/agent.py')
| -rw-r--r-- | python/qmf2/agent.py | 231 |
1 files changed, 123 insertions, 108 deletions
diff --git a/python/qmf2/agent.py b/python/qmf2/agent.py index c6a518ca31..88aee8034f 100644 --- a/python/qmf2/agent.py +++ b/python/qmf2/agent.py @@ -1,4 +1,3 @@ - # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -25,10 +24,9 @@ import Queue from threading import Thread, Lock, currentThread from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 -from common import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION, - makeSubject, parseSubject, OpCode, QmfQuery, - SchemaObjectClass, MsgKey, QmfData, QmfAddress, - SchemaClass, SchemaClassId, WorkItem, SchemaMethod) +from common import (makeSubject, parseSubject, OpCode, QmfQuery, + SchemaObjectClass, MsgKey, QmfData, QmfAddress, + SchemaClass, SchemaClassId, WorkItem, SchemaMethod) # global flag that indicates which thread (if any) is # running the agent notifier callback @@ -85,6 +83,7 @@ class Agent(Thread): self.name = str(name) self._domain = _domain + self._address = QmfAddress.direct(self.name, self._domain) self._notifier = _notifier self._heartbeat_interval = _heartbeat_interval self._max_msg_size = _max_msg_size @@ -93,9 +92,9 @@ class Agent(Thread): self._conn = None self._session = None self._direct_receiver = None - self._locate_receiver = None - self._ind_sender = None - self._event_sender = None + self._topic_receiver = None + self._direct_sender = None + self._topic_sender = None self._lock = Lock() self._packages = {} @@ -127,8 +126,8 @@ class Agent(Thread): self._conn = conn self._session = self._conn.session() - my_addr = QmfAddress.direct(self.name, self._domain) - self._direct_receiver = self._session.receiver(str(my_addr) + + # for messages directly addressed to me + self._direct_receiver = self._session.receiver(str(self._address) + ";{create:always," " node-properties:" " {type:topic," @@ -137,28 +136,33 @@ class Agent(Thread): capacity=self._capacity) logging.debug("my direct addr=%s" % self._direct_receiver.source) - locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) - self._locate_receiver = self._session.receiver(str(locate_addr) + + # for sending directly addressed messages. + self._direct_sender = self._session.sender(str(self._address.get_node()) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + logging.debug("my default direct send addr=%s" % self._direct_sender.target) + + # for receiving "broadcast" messages from consoles + default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#", + self._domain) + self._topic_receiver = self._session.receiver(str(default_addr) + ";{create:always," " node-properties:" " {type:topic}}", capacity=self._capacity) - logging.debug("agent.locate addr=%s" % self._locate_receiver.source) - + logging.debug("console.ind addr=%s" % self._topic_receiver.source) - ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) - self._ind_sender = self._session.sender(str(ind_addr) + + # for sending to topic subscribers + ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND, + self._domain) + self._topic_sender = self._session.sender(str(ind_addr) + ";{create:always," " node-properties:" " {type:topic}}") - logging.debug("agent.ind addr=%s" % self._ind_sender.target) - - my_events = QmfAddress.topic(self.name, self._domain) - self._event_sender = self._session.sender(str(my_events) + - ";{create:always," - " node-properties:" - " {type:topic}}") - logging.debug("my event addr=%s" % self._event_sender.target) + logging.debug("agent.ind addr=%s" % self._topic_sender.target) self._running = True self.start() @@ -168,12 +172,15 @@ class Agent(Thread): self._running = False if self.isAlive(): # kick my thread to wake it up - my_addr = QmfAddress.direct(self.name, self._domain) - logging.debug("Making temp sender for [%s]" % str(my_addr)) - tmp_sender = self._session.sender(str(my_addr)) try: - msg = Message(subject=makeSubject(OpCode.noop)) - tmp_sender.send( msg, sync=True ) + msg = Message(properties={"method":"request", + "qmf.subject":makeSubject(OpCode.noop)}, + subject=self.name, + content={"noop":"noop"}) + + # TRACE + #logging.error("!!! sending wakeup to myself: %s" % msg) + self._direct_sender.send( msg, sync=True ) except SendError, e: logging.error(str(e)) logging.debug("waiting for agent receiver thread to exit") @@ -182,12 +189,12 @@ class Agent(Thread): logging.error( "Agent thread '%s' is hung..." % self.name) self._direct_receiver.close() self._direct_receiver = None - self._locate_receiver.close() - self._locate_receiver = None - self._ind_sender.close() - self._ind_sender = None - self._event_sender.close() - self._event_sender = None + self._direct_sender.close() + self._direct_sender = None + self._topic_receiver.close() + self._topic_receiver = None + self._topic_sender.close() + self._topic_sender = None self._session.close() self._session = None self._conn = None @@ -224,16 +231,21 @@ class Agent(Thread): """ TBD """ - if not self._event_sender: + if not self._topic_sender: raise Exception("No connection available") # @todo: should we validate against the schema? _map = {"_name": self.get_name(), "_event": qmfEvent.map_encode()} - msg = Message(subject=makeSubject(OpCode.event_ind), - properties={"method":"response"}, + msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." + + qmfEvent.get_severity() + "." + self.name, + properties={"method":"response", + "qmf.subject":makeSubject(OpCode.event_ind)}, content={MsgKey.event:_map}) - self._event_sender.send(msg) + # TRACE + # logging.error("!!! Agent %s sending Event (%s)" % + # (self.name, str(msg))) + self._topic_sender.send(msg) def add_object(self, data ): """ @@ -279,17 +291,12 @@ class Agent(Thread): raise TypeError("Invalid type for error - must be QmfData") _map[SchemaMethod.KEY_ERROR] = _error.map_encode() - msg = Message(subject=makeSubject(OpCode.response), - properties={"method":"response"}, - content={MsgKey.method:_map}) + msg = Message( properties={"method":"response", + "qmf.subject":makeSubject(OpCode.response)}, + content={MsgKey.method:_map}) msg.correlation_id = handle.correlation_id - try: - tmp_snd = self._session.sender( handle.reply_to ) - tmp_snd.send(msg) - logging.debug("method-response sent to [%s]" % handle.reply_to) - except SendError, e: - logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e))) + self._send_reply(msg, handle.reply_to) def get_workitem_count(self): """ @@ -324,7 +331,12 @@ class Agent(Thread): now = datetime.datetime.utcnow() # print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) if now >= next_heartbeat: - self._ind_sender.send(self._makeAgentIndMsg()) + ind = self._makeAgentIndMsg() + ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT + # TRACE + #logging.error("!!! Agent %s sending Heartbeat (%s)" % + # (self.name, str(ind))) + self._topic_sender.send(ind) logging.debug("Agent Indication Sent") next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) @@ -337,19 +349,23 @@ class Agent(Thread): for i in range(batch_limit): try: - msg = self._locate_receiver.fetch(timeout=0) + msg = self._topic_receiver.fetch(timeout=0) except Empty: break - if msg and msg.content_type == "amqp/map": - self._dispatch(msg, _direct=False) + # TRACE + # logging.error("!!! Agent %s: msg on %s [%s]" % + # (self.name, self._topic_receiver.source, msg)) + self._dispatch(msg, _direct=False) for i in range(batch_limit): try: msg = self._direct_receiver.fetch(timeout=0) except Empty: break - if msg and msg.content_type == "amqp/map": - self._dispatch(msg, _direct=True) + # TRACE + # logging.error("!!! Agent %s: msg on %s [%s]" % + # (self.name, self._direct_receiver.source, msg)) + self._dispatch(msg, _direct=True) if self._work_q_put and self._notifier: # new stuff on work queue, kick the the application... @@ -369,9 +385,37 @@ class Agent(Thread): """ _map = {"_name": self.get_name(), "_schema_timestamp": self._schema_timestamp} - return Message( subject=makeSubject(OpCode.agent_ind), - properties={"method":"response"}, - content={MsgKey.agent_info: _map}) + return Message(properties={"method":"response", + "qmf.subject":makeSubject(OpCode.agent_ind)}, + content={MsgKey.agent_info: _map}) + + def _send_reply(self, msg, reply_to): + """ + Send a reply message to the given reply_to address + """ + if not isinstance(reply_to, QmfAddress): + try: + reply_to = QmfAddress.from_string(str(reply_to)) + except ValueError: + logging.error("Invalid reply-to address '%s'" % + handle.reply_to) + + msg.subject = reply_to.get_subject() + + try: + if reply_to.is_direct(): + # TRACE + #logging.error("!!! Agent %s direct REPLY-To:%s (%s)" % + # (self.name, str(reply_to), str(msg))) + self._direct_sender.send(msg) + else: + # TRACE + # logging.error("!!! Agent %s topic REPLY-To:%s (%s)" % + # (self.name, str(reply_to), str(msg))) + self._topic_sender.send(msg) + logging.debug("reply msg sent to [%s]" % str(reply_to)) + except SendError, e: + logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) def _dispatch(self, msg, _direct=False): @@ -382,9 +426,9 @@ class Agent(Thread): """ logging.debug( "Message received from Console! [%s]" % msg ) try: - version,opcode = parseSubject(msg.subject) + version,opcode = parseSubject(msg.properties.get("qmf.subject")) except: - logging.debug("Ignoring unrecognized message '%s'" % msg.subject) + logging.warning("Ignoring unrecognized message '%s'" % msg.subject) return cmap = {}; props={} @@ -425,17 +469,12 @@ class Agent(Thread): if query is not None: # fake a QmfData containing my identifier for the query compare tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()}) - reply = QmfQuery(query).evaluate(tmpData) + reply = QmfQuery.from_map(query).evaluate(tmpData) if reply: - try: - tmp_snd = self._session.sender( msg.reply_to ) - m = self._makeAgentIndMsg() - m.correlation_id = msg.correlation_id - tmp_snd.send(m) - logging.debug("agent-ind sent to [%s]" % msg.reply_to) - except SendError, e: - logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e))) + m = self._makeAgentIndMsg() + m.correlation_id = msg.correlation_id + self._send_reply(m, msg.reply_to) else: logging.debug("agent-locate msg not mine - no reply sent") @@ -481,13 +520,6 @@ class Agent(Thread): in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS) oid = cmap.get(QmfData.KEY_OBJECT_ID) - - print("!!! ci=%s rt=%s mn=%s oid=%s" % - (msg.correlation_id, - msg.reply_to, - mname, - oid)) - handle = _MethodCallHandle(msg.correlation_id, msg.reply_to, mname, @@ -509,18 +541,12 @@ class Agent(Thread): finally: self._lock.release() - try: - tmp_snd = self._session.sender( msg.reply_to ) - m = Message( subject=makeSubject(OpCode.data_ind), - properties={"method":"response"}, - content={MsgKey.package_info: pnames} ) - if msg.correlation_id != None: - m.correlation_id = msg.correlation_id - tmp_snd.send(m) - logging.debug("package_info sent to [%s]" % msg.reply_to) - except SendError, e: - logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) - + m = Message(properties={"qmf.subject":makeSubject(OpCode.data_ind), + "method":"response"}, + content={MsgKey.package_info: pnames} ) + if msg.correlation_id != None: + m.correlation_id = msg.correlation_id + self._send_reply(m, msg.reply_to) def _querySchema( self, msg, query, _idOnly=False ): """ @@ -551,24 +577,18 @@ class Agent(Thread): finally: self._lock.release() - - tmp_snd = self._session.sender( msg.reply_to ) - if _idOnly: content = {MsgKey.schema_id: schemas} else: content = {MsgKey.schema:schemas} - m = Message( subject=makeSubject(OpCode.data_ind), - properties={"method":"response"}, - content=content ) + m = Message(properties={"method":"response", + "qmf.subject":makeSubject(OpCode.data_ind)}, + content=content ) if msg.correlation_id != None: m.correlation_id = msg.correlation_id - try: - tmp_snd.send(m) - logging.debug("schema_id sent to [%s]" % msg.reply_to) - except SendError, e: - logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + self._send_reply(m, msg.reply_to) def _queryData( self, msg, query, _idOnly=False ): @@ -600,23 +620,18 @@ class Agent(Thread): finally: self._lock.release() - tmp_snd = self._session.sender( msg.reply_to ) - if _idOnly: content = {MsgKey.object_id:data_objs} else: content = {MsgKey.data_obj:data_objs} - m = Message( subject=makeSubject(OpCode.data_ind), - properties={"method":"response"}, - content=content ) + m = Message(properties={"method":"response", + "qmf.subject":makeSubject(OpCode.data_ind)}, + content=content ) if msg.correlation_id != None: m.correlation_id = msg.correlation_id - try: - tmp_snd.send(m) - logging.debug("data reply sent to [%s]" % msg.reply_to) - except SendError, e: - logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e))) + + self._send_reply(m, msg.reply_to) ##============================================================================== |
