From cf47f99d276a50ac32ed9835a9afb818fd90f4ba Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Fri, 4 Feb 2011 06:38:31 +0000 Subject: Merged missing functionality from the QMFv1 Ruby and Python interfaces to the QMFv2 interfaces: git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067095 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/bindings/qmf2/python/qmf2.py | 151 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 147 insertions(+), 4 deletions(-) (limited to 'cpp/bindings/qmf2/python') diff --git a/cpp/bindings/qmf2/python/qmf2.py b/cpp/bindings/qmf2/python/qmf2.py index 37efb8708b..9bf464e155 100644 --- a/cpp/bindings/qmf2/python/qmf2.py +++ b/cpp/bindings/qmf2/python/qmf2.py @@ -137,6 +137,87 @@ class AgentHandler(Thread): pass +#=================================================================================================== +# CONSOLE HANDLER +#=================================================================================================== +class ConsoleHandler(Thread): + + def __init__(self, consoleSession): + Thread.__init__(self) + self.__session = consoleSession + self.__running = True + + def cancel(self): + """ + Stop the handler thread. + """ + self.__running = None + + def run(self): + event = cqmf2.ConsoleEvent() + while self.__running: + valid = self.__session._impl.nextEvent(event, cqpid.Duration.SECOND) + if valid and self.__running: + if event.getType() == cqmf2.CONSOLE_AGENT_ADD: + self.agentAdded(Agent(event.getAgent())) + + elif event.getType() == cqmf2.CONSOLE_AGENT_DEL: + reason = 'filter' + if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED: + reason = 'aged' + self.agentDeleted(Agent(event.getAgent(), reason)) + + elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART: + self.agentRestarted(Agent(event.getAgent())) + + elif event.getType() == cqmf2.CONSOLE_AGENT_SCHEMA_UPDATE: + self.agentSchemaUpdated(Agent(event.getAgent())) + + ## + ## The following methods are intended to be overridden in a sub-class. They are + ## handlers for events that occur on QMF consoles. + ## + + # + # A new agent, whose attributes match the console's agent filter, has been discovered. + # + def agentAdded(self, agent): + pass + + # + # A known agent has been removed from the agent list. There are two possible reasons + # for agent deletion: + # + # 1) 'aged' - The agent hasn't been heard from for the maximum age interval and is + # presumed dead. + # 2) 'filter' - The agent no longer matches the console's agent-filter and has been + # effectively removed from the agent list. Such occurrences are likely + # to be seen immediately after setting the filter to a new value. + # + def agentDeleted(self, agent, reason): + pass + + # + # An agent-restart was detected. This occurs when the epoch number advertised by the + # agent changes. It indicates that the agent in question was shut-down/crashed and + # restarted. + # + def agentRestarted(self, agent): + pass + + # + # The agent has registered new schema information which can now be queried, if desired. + # + def agentSchemaUpdated(self, agent): + pass + + # + # An agent raised an event. The 'data' argument is a Data object that contains the + # content of the event. + # + def eventRaised(self, agent, data, timestamp, severity): + pass + #=================================================================================================== # CONSOLE SESSION @@ -147,6 +228,16 @@ class ConsoleSession(object): def __init__(self, connection, options=""): """ + ## The options string is of the form "{key:value,key:value}". The following keys are supported: + ## + ## domain:NAME - QMF Domain to join [default: "default"] + ## max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from + ## an agent before deleting it [default: 5] + ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + ## If False: Listen only on the routable direct address + ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network + ## - If False: Operate more flexibly with regard to use of messaging facilities [default] + ## """ self._impl = cqmf2.ConsoleSession(connection, options) @@ -195,6 +286,24 @@ class AgentSession(object): def __init__(self, connection, options=""): """ + ## The options string is of the form "{key:value,key:value}". The following keys are supported: + ## + ## interval:N - Heartbeat interval in seconds [default: 60] + ## external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False] + ## allow-queries:{True,False} - If True: automatically allow all queries [default] + ## If False: generate an AUTH_QUERY event to allow per-query authorization + ## allow-methods:{True,False} - If True: automatically allow all methods [default] + ## If False: generate an AUTH_METHOD event to allow per-method authorization + ## max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64] + ## min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000] + ## sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300] + ## public-events:{True,False} - If True: QMF events are sent to the topic exchange [default] + ## If False: QMF events are only sent to authorized subscribers + ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + ## If False: Listen only on the routable direct address + ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network + ## - If False: Operate more flexibly with regard to use of messaging facilities [default] + ## """ self._impl = cqmf2.AgentSession(connection, options) @@ -261,8 +370,6 @@ class AgentSession(object): else: self._impl.raiseException(handle, data) - ## TODO: async and external operations - #=================================================================================================== # AGENT PROXY @@ -372,6 +479,29 @@ class Query(object): if arg1.__class__ == DataAddr: self._impl = cqmf2.Query(arg1._impl) + def getAddr(self): + """ + """ + return DataAddr(self._impl.getDataAddr()) + + def getSchemaId(self): + """ + """ + return SchemaId(self._impl.getSchemaId()) + + def getPredicate(self): + """ + """ + return self._impl.getPredicate() + + def matches(self, data): + """ + """ + m = data + if data.__class__ == Data: + m = data.getProperties() + return self._impl.matchesPredicate(m) + #=================================================================================================== # DATA #=================================================================================================== @@ -411,6 +541,17 @@ class Data(object): """ return Agent(self._impl.getAgent()) + def update(self, timeout=5): + dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout) + agent = self._impl.getAgent() + query = cqmf2.Query(self._impl.getAddr()) + result = agent.query(query, dur) + if result.getType() != cqmf2.CONSOLE_QUERY_RESPONSE: + raise "Update query failed" + if result.getDataCount == 0: + raise "Object no longer exists on agent" + self._impl = cqmf2.Data(result.getData(0)) + def getProperties(self): """ """ @@ -519,11 +660,13 @@ class DataAddr(object): """ """ - def __init__(self, arg): + def __init__(self, arg, agentName=""): if arg.__class__ == dict: self._impl = cqmf2.DataAddr(arg) - else: + elif arg.__class__ == cqmf2.DataAddr: self._impl = arg + else: + self._impl = cqmf2.DataAddr(arg, agentName) def __repr__(self): return "%s:%s" % (self.getAgentName(), self.getName()) -- cgit v1.2.1