summaryrefslogtreecommitdiff
path: root/cpp/bindings/qmf2/python
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/bindings/qmf2/python')
-rw-r--r--cpp/bindings/qmf2/python/qmf2.py151
1 files changed, 147 insertions, 4 deletions
diff --git a/cpp/bindings/qmf2/python/qmf2.py b/cpp/bindings/qmf2/python/qmf2.py
index 37efb8708b..9bf464e155 100644
--- a/cpp/bindings/qmf2/python/qmf2.py
+++ b/cpp/bindings/qmf2/python/qmf2.py
@@ -137,6 +137,87 @@ class AgentHandler(Thread):
pass
+#===================================================================================================
+# CONSOLE HANDLER
+#===================================================================================================
+class ConsoleHandler(Thread):
+
+ def __init__(self, consoleSession):
+ Thread.__init__(self)
+ self.__session = consoleSession
+ self.__running = True
+
+ def cancel(self):
+ """
+ Stop the handler thread.
+ """
+ self.__running = None
+
+ def run(self):
+ event = cqmf2.ConsoleEvent()
+ while self.__running:
+ valid = self.__session._impl.nextEvent(event, cqpid.Duration.SECOND)
+ if valid and self.__running:
+ if event.getType() == cqmf2.CONSOLE_AGENT_ADD:
+ self.agentAdded(Agent(event.getAgent()))
+
+ elif event.getType() == cqmf2.CONSOLE_AGENT_DEL:
+ reason = 'filter'
+ if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED:
+ reason = 'aged'
+ self.agentDeleted(Agent(event.getAgent(), reason))
+
+ elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART:
+ self.agentRestarted(Agent(event.getAgent()))
+
+ elif event.getType() == cqmf2.CONSOLE_AGENT_SCHEMA_UPDATE:
+ self.agentSchemaUpdated(Agent(event.getAgent()))
+
+ ##
+ ## The following methods are intended to be overridden in a sub-class. They are
+ ## handlers for events that occur on QMF consoles.
+ ##
+
+ #
+ # A new agent, whose attributes match the console's agent filter, has been discovered.
+ #
+ def agentAdded(self, agent):
+ pass
+
+ #
+ # A known agent has been removed from the agent list. There are two possible reasons
+ # for agent deletion:
+ #
+ # 1) 'aged' - The agent hasn't been heard from for the maximum age interval and is
+ # presumed dead.
+ # 2) 'filter' - The agent no longer matches the console's agent-filter and has been
+ # effectively removed from the agent list. Such occurrences are likely
+ # to be seen immediately after setting the filter to a new value.
+ #
+ def agentDeleted(self, agent, reason):
+ pass
+
+ #
+ # An agent-restart was detected. This occurs when the epoch number advertised by the
+ # agent changes. It indicates that the agent in question was shut-down/crashed and
+ # restarted.
+ #
+ def agentRestarted(self, agent):
+ pass
+
+ #
+ # The agent has registered new schema information which can now be queried, if desired.
+ #
+ def agentSchemaUpdated(self, agent):
+ pass
+
+ #
+ # An agent raised an event. The 'data' argument is a Data object that contains the
+ # content of the event.
+ #
+ def eventRaised(self, agent, data, timestamp, severity):
+ pass
+
#===================================================================================================
# CONSOLE SESSION
@@ -147,6 +228,16 @@ class ConsoleSession(object):
def __init__(self, connection, options=""):
"""
+ ## The options string is of the form "{key:value,key:value}". The following keys are supported:
+ ##
+ ## domain:NAME - QMF Domain to join [default: "default"]
+ ## max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from
+ ## an agent before deleting it [default: 5]
+ ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ ## If False: Listen only on the routable direct address
+ ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
+ ## - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ ##
"""
self._impl = cqmf2.ConsoleSession(connection, options)
@@ -195,6 +286,24 @@ class AgentSession(object):
def __init__(self, connection, options=""):
"""
+ ## The options string is of the form "{key:value,key:value}". The following keys are supported:
+ ##
+ ## interval:N - Heartbeat interval in seconds [default: 60]
+ ## external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False]
+ ## allow-queries:{True,False} - If True: automatically allow all queries [default]
+ ## If False: generate an AUTH_QUERY event to allow per-query authorization
+ ## allow-methods:{True,False} - If True: automatically allow all methods [default]
+ ## If False: generate an AUTH_METHOD event to allow per-method authorization
+ ## max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64]
+ ## min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000]
+ ## sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300]
+ ## public-events:{True,False} - If True: QMF events are sent to the topic exchange [default]
+ ## If False: QMF events are only sent to authorized subscribers
+ ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ ## If False: Listen only on the routable direct address
+ ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
+ ## - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ ##
"""
self._impl = cqmf2.AgentSession(connection, options)
@@ -261,8 +370,6 @@ class AgentSession(object):
else:
self._impl.raiseException(handle, data)
- ## TODO: async and external operations
-
#===================================================================================================
# AGENT PROXY
@@ -372,6 +479,29 @@ class Query(object):
if arg1.__class__ == DataAddr:
self._impl = cqmf2.Query(arg1._impl)
+ def getAddr(self):
+ """
+ """
+ return DataAddr(self._impl.getDataAddr())
+
+ def getSchemaId(self):
+ """
+ """
+ return SchemaId(self._impl.getSchemaId())
+
+ def getPredicate(self):
+ """
+ """
+ return self._impl.getPredicate()
+
+ def matches(self, data):
+ """
+ """
+ m = data
+ if data.__class__ == Data:
+ m = data.getProperties()
+ return self._impl.matchesPredicate(m)
+
#===================================================================================================
# DATA
#===================================================================================================
@@ -411,6 +541,17 @@ class Data(object):
"""
return Agent(self._impl.getAgent())
+ def update(self, timeout=5):
+ dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout)
+ agent = self._impl.getAgent()
+ query = cqmf2.Query(self._impl.getAddr())
+ result = agent.query(query, dur)
+ if result.getType() != cqmf2.CONSOLE_QUERY_RESPONSE:
+ raise "Update query failed"
+ if result.getDataCount == 0:
+ raise "Object no longer exists on agent"
+ self._impl = cqmf2.Data(result.getData(0))
+
def getProperties(self):
"""
"""
@@ -519,11 +660,13 @@ class DataAddr(object):
"""
"""
- def __init__(self, arg):
+ def __init__(self, arg, agentName=""):
if arg.__class__ == dict:
self._impl = cqmf2.DataAddr(arg)
- else:
+ elif arg.__class__ == cqmf2.DataAddr:
self._impl = arg
+ else:
+ self._impl = cqmf2.DataAddr(arg, agentName)
def __repr__(self):
return "%s:%s" % (self.getAgentName(), self.getName())