diff options
author | Ted Ross <tross@apache.org> | 2011-02-04 06:38:31 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2011-02-04 06:38:31 +0000 |
commit | cf47f99d276a50ac32ed9835a9afb818fd90f4ba (patch) | |
tree | e7a3b44d64f8be42d7abc8e3191d3090d48ed01c /cpp | |
parent | 8cea22dc96999d3f462a41a36b9803327fb28005 (diff) | |
download | qpid-python-cf47f99d276a50ac32ed9835a9afb818fd90f4ba.tar.gz |
Merged missing functionality from the QMFv1 Ruby and Python interfaces to the QMFv2 interfaces:
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067095 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/bindings/qmf2/examples/python/find_agents.py | 54 | ||||
-rw-r--r-- | cpp/bindings/qmf2/examples/ruby/agent_external.rb | 84 | ||||
-rw-r--r-- | cpp/bindings/qmf2/examples/ruby/agent_internal.rb | 77 | ||||
-rw-r--r-- | cpp/bindings/qmf2/examples/ruby/find_agents.rb | 59 | ||||
-rw-r--r-- | cpp/bindings/qmf2/python/qmf2.py | 151 | ||||
-rw-r--r-- | cpp/bindings/qmf2/ruby/qmf2.rb | 344 |
6 files changed, 744 insertions, 25 deletions
diff --git a/cpp/bindings/qmf2/examples/python/find_agents.py b/cpp/bindings/qmf2/examples/python/find_agents.py new file mode 100644 index 0000000000..15cc83649c --- /dev/null +++ b/cpp/bindings/qmf2/examples/python/find_agents.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import cqpid +import qmf2 + +class FindAgents(qmf2.ConsoleHandler): + + def __init__(self, session): + qmf2.ConsoleHandler.__init__(self, session) + + def agentAdded(self, agent): + print "Agent Added: %r" % agent + + def agentDeleted(self, agent, reason): + print "Agent Deleted: %r reason: %s" % (agent, reason) + + def agentRestarted(self, agent): + print "Agent Restarted: %r" % agent + + def agentSchemaUpdated(self, agent): + print "Agent Schema Updated: %r" % agent + + + +url = "localhost" +options = "" + +connection = cqpid.Connection(url, options) +connection.open() + +session = qmf2.ConsoleSession(connection) +session.open() +session.setAgentFilter("[]") + +main = FindAgents(session) +main.run() + diff --git a/cpp/bindings/qmf2/examples/ruby/agent_external.rb b/cpp/bindings/qmf2/examples/ruby/agent_external.rb new file mode 100644 index 0000000000..75171931ed --- /dev/null +++ b/cpp/bindings/qmf2/examples/ruby/agent_external.rb @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'cqpid' +require 'qmf2' + +class MyAgent < Qmf2::AgentHandler + + def initialize(session, data) + super(session) + @data = data + end + + def authorize_query(query, user_id) + puts "Authorizing #{user_id}" + return true + end + + def get_query(context, query, user_id) + puts "Get Query" + context.response(@data) + context.complete + end + + def method_call(context, method_name, data_addr, args, user_id) + puts "Method: #{method_name}" + context._success + end + +end + + +class Program + + def initialize(url) + @url = url + @sess_options = "{allow-queries:False, external:True}" + end + + def setup_schema(agent) + @cls_control = Qmf2::Schema.new(Qmf2::SCHEMA_TYPE_DATA, "org.package", "control") + @cls_control.add_property(Qmf2::SchemaProperty.new("state", Qmf2::SCHEMA_DATA_STRING)) + agent.register_schema(@cls_control) + end + + def run + connection = Cqpid::Connection.new(@url) + connection.open + + session = Qmf2::AgentSession.new(connection, @sess_options) + session.set_vendor("package.org") + session.set_product("external_agent") + setup_schema(session) + session.open + + @control = Qmf2::Data.new(@cls_control) + @control.state = "OPERATIONAL-EXTERNAL" + @control.set_addr(Qmf2::DataAddr.new("singleton")) + + main = MyAgent.new(session, @control) + main.run + end +end + +prog = Program.new("localhost") +prog.run + + diff --git a/cpp/bindings/qmf2/examples/ruby/agent_internal.rb b/cpp/bindings/qmf2/examples/ruby/agent_internal.rb new file mode 100644 index 0000000000..fc49a885f7 --- /dev/null +++ b/cpp/bindings/qmf2/examples/ruby/agent_internal.rb @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'cqpid' +require 'qmf2' + +class MyAgent < Qmf2::AgentHandler + + def initialize(session) + super(session) + end + + def authorize_query(query, user_id) + puts "Authorizing #{user_id}" + return true + end + + def method_call(context, method_name, data_addr, args, user_id) + puts "Method: #{method_name}" + context._success + end + +end + + +class Program + + def initialize(url) + @url = url + @sess_options = "{allow-queries:False}" + end + + def setup_schema(agent) + @cls_control = Qmf2::Schema.new(Qmf2::SCHEMA_TYPE_DATA, "org.package", "control") + @cls_control.add_property(Qmf2::SchemaProperty.new("state", Qmf2::SCHEMA_DATA_STRING)) + agent.register_schema(@cls_control) + end + + def run + connection = Cqpid::Connection.new(@url) + connection.open + + session = Qmf2::AgentSession.new(connection, @sess_options) + session.set_vendor("package.org") + session.set_product("internal_agent") + setup_schema(session) + session.open + + control = Qmf2::Data.new(@cls_control) + control.state = "OPERATIONAL" + session.add_data(control) + + main = MyAgent.new(session) + main.run + end +end + +prog = Program.new("localhost") +prog.run + + diff --git a/cpp/bindings/qmf2/examples/ruby/find_agents.rb b/cpp/bindings/qmf2/examples/ruby/find_agents.rb new file mode 100644 index 0000000000..deefd5491a --- /dev/null +++ b/cpp/bindings/qmf2/examples/ruby/find_agents.rb @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'cqpid' +require 'qmf2' + +class FindAgents < Qmf2::ConsoleHandler + + def initialize(session) + super(session) + end + + def agent_added(agent) + puts "Agent Added: #{agent.to_s}" + end + + def agent_deleted(agent, reason) + puts "Agent Deleted: #{agent.to_s} reason: #{reason}" + end + + def agent_restarted(agent) + puts "Agent Restarted: #{agent.to_s} epoch: #{agent.epoch}" + end + + def agent_schema_updated(agent) + puts "Agent with new Schemata: #{agent.to_s}" + end +end + + +url = "localhost" +options = "" + +connection = Cqpid::Connection.new(url, options) +connection.open + +session = Qmf2::ConsoleSession.new(connection) +session.open +session.set_agent_filter("[]") + +main = FindAgents.new(session) +main.run + diff --git a/cpp/bindings/qmf2/python/qmf2.py b/cpp/bindings/qmf2/python/qmf2.py index 37efb8708b..9bf464e155 100644 --- a/cpp/bindings/qmf2/python/qmf2.py +++ b/cpp/bindings/qmf2/python/qmf2.py @@ -137,6 +137,87 @@ class AgentHandler(Thread): pass +#=================================================================================================== +# CONSOLE HANDLER +#=================================================================================================== +class ConsoleHandler(Thread): + + def __init__(self, consoleSession): + Thread.__init__(self) + self.__session = consoleSession + self.__running = True + + def cancel(self): + """ + Stop the handler thread. + """ + self.__running = None + + def run(self): + event = cqmf2.ConsoleEvent() + while self.__running: + valid = self.__session._impl.nextEvent(event, cqpid.Duration.SECOND) + if valid and self.__running: + if event.getType() == cqmf2.CONSOLE_AGENT_ADD: + self.agentAdded(Agent(event.getAgent())) + + elif event.getType() == cqmf2.CONSOLE_AGENT_DEL: + reason = 'filter' + if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED: + reason = 'aged' + self.agentDeleted(Agent(event.getAgent(), reason)) + + elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART: + self.agentRestarted(Agent(event.getAgent())) + + elif event.getType() == cqmf2.CONSOLE_AGENT_SCHEMA_UPDATE: + self.agentSchemaUpdated(Agent(event.getAgent())) + + ## + ## The following methods are intended to be overridden in a sub-class. They are + ## handlers for events that occur on QMF consoles. + ## + + # + # A new agent, whose attributes match the console's agent filter, has been discovered. + # + def agentAdded(self, agent): + pass + + # + # A known agent has been removed from the agent list. There are two possible reasons + # for agent deletion: + # + # 1) 'aged' - The agent hasn't been heard from for the maximum age interval and is + # presumed dead. + # 2) 'filter' - The agent no longer matches the console's agent-filter and has been + # effectively removed from the agent list. Such occurrences are likely + # to be seen immediately after setting the filter to a new value. + # + def agentDeleted(self, agent, reason): + pass + + # + # An agent-restart was detected. This occurs when the epoch number advertised by the + # agent changes. It indicates that the agent in question was shut-down/crashed and + # restarted. + # + def agentRestarted(self, agent): + pass + + # + # The agent has registered new schema information which can now be queried, if desired. + # + def agentSchemaUpdated(self, agent): + pass + + # + # An agent raised an event. The 'data' argument is a Data object that contains the + # content of the event. + # + def eventRaised(self, agent, data, timestamp, severity): + pass + #=================================================================================================== # CONSOLE SESSION @@ -147,6 +228,16 @@ class ConsoleSession(object): def __init__(self, connection, options=""): """ + ## The options string is of the form "{key:value,key:value}". The following keys are supported: + ## + ## domain:NAME - QMF Domain to join [default: "default"] + ## max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from + ## an agent before deleting it [default: 5] + ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + ## If False: Listen only on the routable direct address + ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network + ## - If False: Operate more flexibly with regard to use of messaging facilities [default] + ## """ self._impl = cqmf2.ConsoleSession(connection, options) @@ -195,6 +286,24 @@ class AgentSession(object): def __init__(self, connection, options=""): """ + ## The options string is of the form "{key:value,key:value}". The following keys are supported: + ## + ## interval:N - Heartbeat interval in seconds [default: 60] + ## external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False] + ## allow-queries:{True,False} - If True: automatically allow all queries [default] + ## If False: generate an AUTH_QUERY event to allow per-query authorization + ## allow-methods:{True,False} - If True: automatically allow all methods [default] + ## If False: generate an AUTH_METHOD event to allow per-method authorization + ## max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64] + ## min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000] + ## sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300] + ## public-events:{True,False} - If True: QMF events are sent to the topic exchange [default] + ## If False: QMF events are only sent to authorized subscribers + ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + ## If False: Listen only on the routable direct address + ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network + ## - If False: Operate more flexibly with regard to use of messaging facilities [default] + ## """ self._impl = cqmf2.AgentSession(connection, options) @@ -261,8 +370,6 @@ class AgentSession(object): else: self._impl.raiseException(handle, data) - ## TODO: async and external operations - #=================================================================================================== # AGENT PROXY @@ -372,6 +479,29 @@ class Query(object): if arg1.__class__ == DataAddr: self._impl = cqmf2.Query(arg1._impl) + def getAddr(self): + """ + """ + return DataAddr(self._impl.getDataAddr()) + + def getSchemaId(self): + """ + """ + return SchemaId(self._impl.getSchemaId()) + + def getPredicate(self): + """ + """ + return self._impl.getPredicate() + + def matches(self, data): + """ + """ + m = data + if data.__class__ == Data: + m = data.getProperties() + return self._impl.matchesPredicate(m) + #=================================================================================================== # DATA #=================================================================================================== @@ -411,6 +541,17 @@ class Data(object): """ return Agent(self._impl.getAgent()) + def update(self, timeout=5): + dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout) + agent = self._impl.getAgent() + query = cqmf2.Query(self._impl.getAddr()) + result = agent.query(query, dur) + if result.getType() != cqmf2.CONSOLE_QUERY_RESPONSE: + raise "Update query failed" + if result.getDataCount == 0: + raise "Object no longer exists on agent" + self._impl = cqmf2.Data(result.getData(0)) + def getProperties(self): """ """ @@ -519,11 +660,13 @@ class DataAddr(object): """ """ - def __init__(self, arg): + def __init__(self, arg, agentName=""): if arg.__class__ == dict: self._impl = cqmf2.DataAddr(arg) - else: + elif arg.__class__ == cqmf2.DataAddr: self._impl = arg + else: + self._impl = cqmf2.DataAddr(arg, agentName) def __repr__(self): return "%s:%s" % (self.getAgentName(), self.getName()) diff --git a/cpp/bindings/qmf2/ruby/qmf2.rb b/cpp/bindings/qmf2/ruby/qmf2.rb index 86ba72cc0b..642af35eca 100644 --- a/cpp/bindings/qmf2/ruby/qmf2.rb +++ b/cpp/bindings/qmf2/ruby/qmf2.rb @@ -78,12 +78,270 @@ module Qmf2 end ##============================================================================== - ## AGENT HANDLER TODO + ## AGENT HANDLER ##============================================================================== class AgentHandler - def get_query(context, query, userId); end - def method_call(context, name, object_id, args, userId); end + + def initialize(session) + @_session = session + @_running = false + @_thread = nil + end + + ## + ## Call the "start" method to run the handler on a new thread. + ## + def start + @_thread = Thread.new do + run + end + end + + ## + ## Request that the running thread complete and exit. + ## + def cancel + @_running = false + @_thread.join if @_thread + @_thread = nil + end + + ## + ## Call the "run" method only if you want the handler to run on your own thread. + ## + def run + @_running = true + event = Cqmf2::AgentEvent.new + while @_running do + valid = @_session.impl.nextEvent(event, Cqpid::Duration.SECOND) + if valid and @_running + case event.getType + when Cqmf2::AGENT_AUTH_QUERY + yes = authorize_query(Query.new(event.getQuery()), event.getUserId()) + if yes == true + @_session.impl.authAccept(event) + else + @_session.impl.authReject(event) + end + + when Cqmf2::AGENT_QUERY + context = QueryContext.new(@_session, event) + get_query(context, Query.new(event.getQuery()), event.getUserId()) + + when Cqmf2::AGENT_METHOD + context = MethodContext.new(@_session, event) + begin + method_call(context, event.getMethodName(), event.getDataAddr(), event.getArguments(), event.getUserId()) + rescue Exception => ex + @_session.impl.raiseException(event, "#{ex}") + end + + end + end + end + end + + + ## + ## The following methods are intended to be overridden in a sub-class. They are + ## handlers for events that occur on QMF consoles. + ## + + # + # This method will only be invoked if the "allow-queries" option is enabled on the + # agent session. When invoked, it provides the query and the authenticated user-id + # of the querying client. + # + # This method must return true if the query is permitted, false otherwise. + # + def authorize_query(query, user_id); end + + # + # This method will only be invoked if the "external" option is "True" on the agent + # session. When invoked, the method should begin the process of responding to a data + # query. The authenticated user-id of the requestor is provided for informational + # purposes. The 'context' variable is used to provide the results back to the requestor. + # + # For each matching Data object, call context.response(data). When the query is complete, + # call context.complete(). After completing the query, you should not use 'context' any + # longer. + # + # Note: It is not necessary to process the query synchronously. If desired, this method + # may store the context for asynchronous processing or pass it to another thread for + # processing. There is no restriction on the number of contexts that may be in-flight + # concurrently. + # + def get_query(context, query, user_id); end + + # + # This method is invoked when a console calls a QMF method on the agent. Supplied are + # a context for the response, the method name, the data address of the data object being + # called, the input arguments (a dictionary), and the caller's authenticated user-id. + # + # A method call can end one of two ways: Successful completion, in which the output + # arguments (if any) are supplied; and Exceptional completion if there is an error. + # + # Successful Completion: + # For each output argument, assign the value directly to context (context.arg1 = "value") + # Once arguments are assigned, call context._success(). + # + # Exceptional Completion: + # Method 1: Call context._exception(data) where 'data' is a string or a Data object. + # Method 2: Raise an exception (raise "Error Text") synchronously in the method body. + # + # Note: Like get_query, method_call may process methods synchronously or asynchronously. + # This method may store the context for later asynchronous processing. There is no + # restriction on the number of contexts that may be in-flight concurrently. + # + # However, "Method 2" for Exceptional Completion can only be done synchronously. + # + def method_call(context, method_name, data_addr, args, user_id); end + end + + class QueryContext + def initialize(agent, context) + @agent = agent + @context = context + end + + def response(data) + @agent.impl.response(@context, data.impl) + end + + def complete + @agent.impl.complete(@context) + end + end + + class MethodContext + def initialize(agent, context) + @agent = agent + @context = context + end + + def _success + @agent.impl.methodSuccess(@context) + end + + def _exception(ex) + if ex.class == Data + @agent.impl.raiseException(@context, ex.impl) + else + @agent.impl.raiseException(@context, ex) + end + end + + def method_missing(name_in, *args) + name = name_in.to_s + if name[name.length - 1] == 61 + name = name[0..name.length - 2] + @context.impl.addReturnArgument(name, args[0]) + else + super.method_missing(name_in, args) + end + end + end + + ##============================================================================== + ## CONSOLE HANDLER + ##============================================================================== + + class ConsoleHandler + + def initialize(session) + @_session = session + @_running = false + @_thread = nil + end + + ## + ## Call the "start" method to run the handler on a new thread. + ## + def start + @_thread = Thread.new do + run + end + end + + ## + ## Request that the running thread complete and exit. + ## + def cancel + @_running = false + @_thread.join if @_thread + @_thread = nil + end + + ## + ## Call the "run" method only if you want the handler to run on your own thread. + ## + def run + @_running = true + event = Cqmf2::ConsoleEvent.new + while @_running do + valid = @_session.impl.nextEvent(event, Cqpid::Duration.SECOND) + if valid and @_running + case event.getType + when Cqmf2::CONSOLE_AGENT_ADD + agent_added(Agent.new(event.getAgent)) + + when Cqmf2::CONSOLE_AGENT_DEL + reason = :filter + reason = :aged if event.getAgentDelReason == Cqmf2::AGENT_DEL_AGED + agent_deleted(Agent.new(event.getAgent), reason) + + when Cqmf2::CONSOLE_AGENT_RESTART + agent_restarted(Agent.new(event.getAgent)) + + when Cqmf2::CONSOLE_AGENT_SCHEMA_UPDATE + agent_schema_updated(Agent.new(event.getAgent)) + + end + end + end + end + + + ## + ## The following methods are intended to be overridden in a sub-class. They are + ## handlers for events that occur on QMF consoles. + ## + + # + # A new agent, whose attributes match the console's agent filter, has been discovered. + # + def agent_added(agent); end + + # + # A known agent has been removed from the agent list. There are two possible reasons + # for agent deletion: + # + # 1) :aged - The agent hasn't been heard from for the maximum age interval and is + # presumed dead. + # 2) :filter - The agent no longer matches the console's agent-filter and has been + # effectively removed from the agent list. Such occurrences are likely + # to be seen immediately after setting the filter to a new value. + # + def agent_deleted(agent, reason); end + + # + # An agent-restart was detected. This occurs when the epoch number advertised by the + # agent changes. It indicates that the agent in question was shut-down/crashed and + # restarted. + # + def agent_restarted(agent); end + + # + # The agent has registered new schema information which can now be queried, if desired. + # + def agent_schema_updated(agent); end + + # + # An agent raised an event. The 'data' argument is a Data object that contains the + # content of the event. + # + def event_raised(agent, data, timestamp, severity); end end ##============================================================================== @@ -93,6 +351,16 @@ module Qmf2 class ConsoleSession attr_reader :impl + ## The options string is of the form "{key:value,key:value}". The following keys are supported: + ## + ## domain:NAME - QMF Domain to join [default: "default"] + ## max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from + ## an agent before deleting it [default: 5] + ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + ## If False: Listen only on the routable direct address + ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network + ## - If False: Operate more flexibly with regard to use of messaging facilities [default] + ## def initialize(connection, options="") @impl = Cqmf2::ConsoleSession.new(connection, options) end @@ -124,6 +392,24 @@ module Qmf2 class AgentSession attr_reader :impl + ## The options string is of the form "{key:value,key:value}". The following keys are supported: + ## + ## interval:N - Heartbeat interval in seconds [default: 60] + ## external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False] + ## allow-queries:{True,False} - If True: automatically allow all queries [default] + ## If False: generate an AUTH_QUERY event to allow per-query authorization + ## allow-methods:{True,False} - If True: automatically allow all methods [default] + ## If False: generate an AUTH_METHOD event to allow per-method authorization + ## max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64] + ## min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000] + ## sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300] + ## public-events:{True,False} - If True: QMF events are sent to the topic exchange [default] + ## If False: QMF events are only sent to authorized subscribers + ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + ## If False: Listen only on the routable direct address + ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network + ## - If False: Operate more flexibly with regard to use of messaging facilities [default] + ## def initialize(connection, options="") @impl = Cqmf2::AgentSession.new(connection, options) end @@ -137,25 +423,13 @@ module Qmf2 def close() @impl.close end def register_schema(cls) @impl.registerSchema(cls.impl) end - def add_data(data, name="", persistent=:false) + def add_data(data, name="", persistent=false) DataAddr.new(@impl.addData(data.impl, name, persistent)) end def del_data(addr) @impl.del_data(addr.impl) end - - def method_success(handle) - @impl.methodSuccess(handle) - end - - def raise_exception(handle, data) - if data.class == Data - @impl.raiseException(handle, data.impl) - else - @impl.raiseException(handle, data) - end - end end ##============================================================================== @@ -228,7 +502,7 @@ module Qmf2 end ##============================================================================== - ## QUERY TODO + ## QUERY ##============================================================================== class Query @@ -238,6 +512,16 @@ module Qmf2 @impl = Cqmf2::Query.new(arg1.impl) end end + + def addr() DataAddr.new(@impl.getDataAddr()) end + def schema_id() SchemaId.new(@impl.getSchemaId()) end + def predicate() @impl.getPredicate() end + + def matches?(data) + map = data + map = data.properties if data.class == Data + @impl.matchesPredicate(map) + end end ##============================================================================== @@ -248,16 +532,17 @@ module Qmf2 attr_reader :impl def initialize(arg=nil) + @schema = nil if arg == nil @impl = Cqmf2::Data.new elsif arg.class == Cqmf2::Data @impl = arg elsif arg.class == Schema - @impl = Cqmf2::Data(arg.impl) + @impl = Cqmf2::Data.new(arg.impl) + @schema = arg else raise "Unsupported initializer for Data" end - @schema = nil end def to_s @@ -271,6 +556,10 @@ module Qmf2 return nil end + def set_addr(addr) + @impl.setAddr(addr.impl) + end + def addr if @impl.hasAddr return DataAddr.new(@impl.getAddr) @@ -282,6 +571,17 @@ module Qmf2 return Agent.new(@impl.getAgent) end + def update(timeout=5) + dur = Cqpid::Duration.new(Cqpid::Duration.SECOND.getMilliseconds * timeout) + agent = @impl.getAgent + query = Cqmf2::Query.new(@impl.getAddr) + result = agent.query(query, dur) + raise "Update query failed" if result.getType != Cqmf2::CONSOLE_QUERY_RESPONSE + raise "Object no longer exists on agent" if result.getDataCount == 0 + @impl = Cqmf2::Data.new(result.getData(0)) + return nil + end + def properties return @impl.getProperties end @@ -392,11 +692,13 @@ module Qmf2 class DataAddr attr_reader :impl - def initialize(arg) + def initialize(arg, agentName="") if arg.class == Hash @impl = Cqmf2::DataAddr.new(arg) - else + elsif arg.class == Cqmf2::DataAddr @impl = arg + else + @impl = Cqmf2::DataAddr.new(arg, agentName) end end |