summaryrefslogtreecommitdiff
path: root/python/qmf2/agent.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qmf2/agent.py')
-rw-r--r--python/qmf2/agent.py231
1 files changed, 123 insertions, 108 deletions
diff --git a/python/qmf2/agent.py b/python/qmf2/agent.py
index c6a518ca31..88aee8034f 100644
--- a/python/qmf2/agent.py
+++ b/python/qmf2/agent.py
@@ -1,4 +1,3 @@
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -25,10 +24,9 @@ import Queue
from threading import Thread, Lock, currentThread
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
-from common import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
- makeSubject, parseSubject, OpCode, QmfQuery,
- SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
+from common import (makeSubject, parseSubject, OpCode, QmfQuery,
+ SchemaObjectClass, MsgKey, QmfData, QmfAddress,
+ SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
# global flag that indicates which thread (if any) is
# running the agent notifier callback
@@ -85,6 +83,7 @@ class Agent(Thread):
self.name = str(name)
self._domain = _domain
+ self._address = QmfAddress.direct(self.name, self._domain)
self._notifier = _notifier
self._heartbeat_interval = _heartbeat_interval
self._max_msg_size = _max_msg_size
@@ -93,9 +92,9 @@ class Agent(Thread):
self._conn = None
self._session = None
self._direct_receiver = None
- self._locate_receiver = None
- self._ind_sender = None
- self._event_sender = None
+ self._topic_receiver = None
+ self._direct_sender = None
+ self._topic_sender = None
self._lock = Lock()
self._packages = {}
@@ -127,8 +126,8 @@ class Agent(Thread):
self._conn = conn
self._session = self._conn.session()
- my_addr = QmfAddress.direct(self.name, self._domain)
- self._direct_receiver = self._session.receiver(str(my_addr) +
+ # for messages directly addressed to me
+ self._direct_receiver = self._session.receiver(str(self._address) +
";{create:always,"
" node-properties:"
" {type:topic,"
@@ -137,28 +136,33 @@ class Agent(Thread):
capacity=self._capacity)
logging.debug("my direct addr=%s" % self._direct_receiver.source)
- locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
- self._locate_receiver = self._session.receiver(str(locate_addr) +
+ # for sending directly addressed messages.
+ self._direct_sender = self._session.sender(str(self._address.get_node()) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+ logging.debug("my default direct send addr=%s" % self._direct_sender.target)
+
+ # for receiving "broadcast" messages from consoles
+ default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#",
+ self._domain)
+ self._topic_receiver = self._session.receiver(str(default_addr) +
";{create:always,"
" node-properties:"
" {type:topic}}",
capacity=self._capacity)
- logging.debug("agent.locate addr=%s" % self._locate_receiver.source)
-
+ logging.debug("console.ind addr=%s" % self._topic_receiver.source)
- ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- self._ind_sender = self._session.sender(str(ind_addr) +
+ # for sending to topic subscribers
+ ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND,
+ self._domain)
+ self._topic_sender = self._session.sender(str(ind_addr) +
";{create:always,"
" node-properties:"
" {type:topic}}")
- logging.debug("agent.ind addr=%s" % self._ind_sender.target)
-
- my_events = QmfAddress.topic(self.name, self._domain)
- self._event_sender = self._session.sender(str(my_events) +
- ";{create:always,"
- " node-properties:"
- " {type:topic}}")
- logging.debug("my event addr=%s" % self._event_sender.target)
+ logging.debug("agent.ind addr=%s" % self._topic_sender.target)
self._running = True
self.start()
@@ -168,12 +172,15 @@ class Agent(Thread):
self._running = False
if self.isAlive():
# kick my thread to wake it up
- my_addr = QmfAddress.direct(self.name, self._domain)
- logging.debug("Making temp sender for [%s]" % str(my_addr))
- tmp_sender = self._session.sender(str(my_addr))
try:
- msg = Message(subject=makeSubject(OpCode.noop))
- tmp_sender.send( msg, sync=True )
+ msg = Message(properties={"method":"request",
+ "qmf.subject":makeSubject(OpCode.noop)},
+ subject=self.name,
+ content={"noop":"noop"})
+
+ # TRACE
+ #logging.error("!!! sending wakeup to myself: %s" % msg)
+ self._direct_sender.send( msg, sync=True )
except SendError, e:
logging.error(str(e))
logging.debug("waiting for agent receiver thread to exit")
@@ -182,12 +189,12 @@ class Agent(Thread):
logging.error( "Agent thread '%s' is hung..." % self.name)
self._direct_receiver.close()
self._direct_receiver = None
- self._locate_receiver.close()
- self._locate_receiver = None
- self._ind_sender.close()
- self._ind_sender = None
- self._event_sender.close()
- self._event_sender = None
+ self._direct_sender.close()
+ self._direct_sender = None
+ self._topic_receiver.close()
+ self._topic_receiver = None
+ self._topic_sender.close()
+ self._topic_sender = None
self._session.close()
self._session = None
self._conn = None
@@ -224,16 +231,21 @@ class Agent(Thread):
"""
TBD
"""
- if not self._event_sender:
+ if not self._topic_sender:
raise Exception("No connection available")
# @todo: should we validate against the schema?
_map = {"_name": self.get_name(),
"_event": qmfEvent.map_encode()}
- msg = Message(subject=makeSubject(OpCode.event_ind),
- properties={"method":"response"},
+ msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
+ qmfEvent.get_severity() + "." + self.name,
+ properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.event_ind)},
content={MsgKey.event:_map})
- self._event_sender.send(msg)
+ # TRACE
+ # logging.error("!!! Agent %s sending Event (%s)" %
+ # (self.name, str(msg)))
+ self._topic_sender.send(msg)
def add_object(self, data ):
"""
@@ -279,17 +291,12 @@ class Agent(Thread):
raise TypeError("Invalid type for error - must be QmfData")
_map[SchemaMethod.KEY_ERROR] = _error.map_encode()
- msg = Message(subject=makeSubject(OpCode.response),
- properties={"method":"response"},
- content={MsgKey.method:_map})
+ msg = Message( properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.response)},
+ content={MsgKey.method:_map})
msg.correlation_id = handle.correlation_id
- try:
- tmp_snd = self._session.sender( handle.reply_to )
- tmp_snd.send(msg)
- logging.debug("method-response sent to [%s]" % handle.reply_to)
- except SendError, e:
- logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e)))
+ self._send_reply(msg, handle.reply_to)
def get_workitem_count(self):
"""
@@ -324,7 +331,12 @@ class Agent(Thread):
now = datetime.datetime.utcnow()
# print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
if now >= next_heartbeat:
- self._ind_sender.send(self._makeAgentIndMsg())
+ ind = self._makeAgentIndMsg()
+ ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT
+ # TRACE
+ #logging.error("!!! Agent %s sending Heartbeat (%s)" %
+ # (self.name, str(ind)))
+ self._topic_sender.send(ind)
logging.debug("Agent Indication Sent")
next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
@@ -337,19 +349,23 @@ class Agent(Thread):
for i in range(batch_limit):
try:
- msg = self._locate_receiver.fetch(timeout=0)
+ msg = self._topic_receiver.fetch(timeout=0)
except Empty:
break
- if msg and msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=False)
+ # TRACE
+ # logging.error("!!! Agent %s: msg on %s [%s]" %
+ # (self.name, self._topic_receiver.source, msg))
+ self._dispatch(msg, _direct=False)
for i in range(batch_limit):
try:
msg = self._direct_receiver.fetch(timeout=0)
except Empty:
break
- if msg and msg.content_type == "amqp/map":
- self._dispatch(msg, _direct=True)
+ # TRACE
+ # logging.error("!!! Agent %s: msg on %s [%s]" %
+ # (self.name, self._direct_receiver.source, msg))
+ self._dispatch(msg, _direct=True)
if self._work_q_put and self._notifier:
# new stuff on work queue, kick the the application...
@@ -369,9 +385,37 @@ class Agent(Thread):
"""
_map = {"_name": self.get_name(),
"_schema_timestamp": self._schema_timestamp}
- return Message( subject=makeSubject(OpCode.agent_ind),
- properties={"method":"response"},
- content={MsgKey.agent_info: _map})
+ return Message(properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.agent_ind)},
+ content={MsgKey.agent_info: _map})
+
+ def _send_reply(self, msg, reply_to):
+ """
+ Send a reply message to the given reply_to address
+ """
+ if not isinstance(reply_to, QmfAddress):
+ try:
+ reply_to = QmfAddress.from_string(str(reply_to))
+ except ValueError:
+ logging.error("Invalid reply-to address '%s'" %
+ handle.reply_to)
+
+ msg.subject = reply_to.get_subject()
+
+ try:
+ if reply_to.is_direct():
+ # TRACE
+ #logging.error("!!! Agent %s direct REPLY-To:%s (%s)" %
+ # (self.name, str(reply_to), str(msg)))
+ self._direct_sender.send(msg)
+ else:
+ # TRACE
+ # logging.error("!!! Agent %s topic REPLY-To:%s (%s)" %
+ # (self.name, str(reply_to), str(msg)))
+ self._topic_sender.send(msg)
+ logging.debug("reply msg sent to [%s]" % str(reply_to))
+ except SendError, e:
+ logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
def _dispatch(self, msg, _direct=False):
@@ -382,9 +426,9 @@ class Agent(Thread):
"""
logging.debug( "Message received from Console! [%s]" % msg )
try:
- version,opcode = parseSubject(msg.subject)
+ version,opcode = parseSubject(msg.properties.get("qmf.subject"))
except:
- logging.debug("Ignoring unrecognized message '%s'" % msg.subject)
+ logging.warning("Ignoring unrecognized message '%s'" % msg.subject)
return
cmap = {}; props={}
@@ -425,17 +469,12 @@ class Agent(Thread):
if query is not None:
# fake a QmfData containing my identifier for the query compare
tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
- reply = QmfQuery(query).evaluate(tmpData)
+ reply = QmfQuery.from_map(query).evaluate(tmpData)
if reply:
- try:
- tmp_snd = self._session.sender( msg.reply_to )
- m = self._makeAgentIndMsg()
- m.correlation_id = msg.correlation_id
- tmp_snd.send(m)
- logging.debug("agent-ind sent to [%s]" % msg.reply_to)
- except SendError, e:
- logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e)))
+ m = self._makeAgentIndMsg()
+ m.correlation_id = msg.correlation_id
+ self._send_reply(m, msg.reply_to)
else:
logging.debug("agent-locate msg not mine - no reply sent")
@@ -481,13 +520,6 @@ class Agent(Thread):
in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
oid = cmap.get(QmfData.KEY_OBJECT_ID)
-
- print("!!! ci=%s rt=%s mn=%s oid=%s" %
- (msg.correlation_id,
- msg.reply_to,
- mname,
- oid))
-
handle = _MethodCallHandle(msg.correlation_id,
msg.reply_to,
mname,
@@ -509,18 +541,12 @@ class Agent(Thread):
finally:
self._lock.release()
- try:
- tmp_snd = self._session.sender( msg.reply_to )
- m = Message( subject=makeSubject(OpCode.data_ind),
- properties={"method":"response"},
- content={MsgKey.package_info: pnames} )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
- tmp_snd.send(m)
- logging.debug("package_info sent to [%s]" % msg.reply_to)
- except SendError, e:
- logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
-
+ m = Message(properties={"qmf.subject":makeSubject(OpCode.data_ind),
+ "method":"response"},
+ content={MsgKey.package_info: pnames} )
+ if msg.correlation_id != None:
+ m.correlation_id = msg.correlation_id
+ self._send_reply(m, msg.reply_to)
def _querySchema( self, msg, query, _idOnly=False ):
"""
@@ -551,24 +577,18 @@ class Agent(Thread):
finally:
self._lock.release()
-
- tmp_snd = self._session.sender( msg.reply_to )
-
if _idOnly:
content = {MsgKey.schema_id: schemas}
else:
content = {MsgKey.schema:schemas}
- m = Message( subject=makeSubject(OpCode.data_ind),
- properties={"method":"response"},
- content=content )
+ m = Message(properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.data_ind)},
+ content=content )
if msg.correlation_id != None:
m.correlation_id = msg.correlation_id
- try:
- tmp_snd.send(m)
- logging.debug("schema_id sent to [%s]" % msg.reply_to)
- except SendError, e:
- logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+ self._send_reply(m, msg.reply_to)
def _queryData( self, msg, query, _idOnly=False ):
@@ -600,23 +620,18 @@ class Agent(Thread):
finally:
self._lock.release()
- tmp_snd = self._session.sender( msg.reply_to )
-
if _idOnly:
content = {MsgKey.object_id:data_objs}
else:
content = {MsgKey.data_obj:data_objs}
- m = Message( subject=makeSubject(OpCode.data_ind),
- properties={"method":"response"},
- content=content )
+ m = Message(properties={"method":"response",
+ "qmf.subject":makeSubject(OpCode.data_ind)},
+ content=content )
if msg.correlation_id != None:
m.correlation_id = msg.correlation_id
- try:
- tmp_snd.send(m)
- logging.debug("data reply sent to [%s]" % msg.reply_to)
- except SendError, e:
- logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+ self._send_reply(m, msg.reply_to)
##==============================================================================