diff options
| author | Ted Ross <tross@apache.org> | 2011-02-04 06:38:31 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2011-02-04 06:38:31 +0000 |
| commit | cf47f99d276a50ac32ed9835a9afb818fd90f4ba (patch) | |
| tree | e7a3b44d64f8be42d7abc8e3191d3090d48ed01c /cpp/bindings/qmf2/ruby | |
| parent | 8cea22dc96999d3f462a41a36b9803327fb28005 (diff) | |
| download | qpid-python-cf47f99d276a50ac32ed9835a9afb818fd90f4ba.tar.gz | |
Merged missing functionality from the QMFv1 Ruby and Python interfaces to the QMFv2 interfaces:
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067095 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/bindings/qmf2/ruby')
| -rw-r--r-- | cpp/bindings/qmf2/ruby/qmf2.rb | 344 |
1 files changed, 323 insertions, 21 deletions
diff --git a/cpp/bindings/qmf2/ruby/qmf2.rb b/cpp/bindings/qmf2/ruby/qmf2.rb index 86ba72cc0b..642af35eca 100644 --- a/cpp/bindings/qmf2/ruby/qmf2.rb +++ b/cpp/bindings/qmf2/ruby/qmf2.rb @@ -78,12 +78,270 @@ module Qmf2 end ##============================================================================== - ## AGENT HANDLER TODO + ## AGENT HANDLER ##============================================================================== class AgentHandler - def get_query(context, query, userId); end - def method_call(context, name, object_id, args, userId); end + + def initialize(session) + @_session = session + @_running = false + @_thread = nil + end + + ## + ## Call the "start" method to run the handler on a new thread. + ## + def start + @_thread = Thread.new do + run + end + end + + ## + ## Request that the running thread complete and exit. + ## + def cancel + @_running = false + @_thread.join if @_thread + @_thread = nil + end + + ## + ## Call the "run" method only if you want the handler to run on your own thread. + ## + def run + @_running = true + event = Cqmf2::AgentEvent.new + while @_running do + valid = @_session.impl.nextEvent(event, Cqpid::Duration.SECOND) + if valid and @_running + case event.getType + when Cqmf2::AGENT_AUTH_QUERY + yes = authorize_query(Query.new(event.getQuery()), event.getUserId()) + if yes == true + @_session.impl.authAccept(event) + else + @_session.impl.authReject(event) + end + + when Cqmf2::AGENT_QUERY + context = QueryContext.new(@_session, event) + get_query(context, Query.new(event.getQuery()), event.getUserId()) + + when Cqmf2::AGENT_METHOD + context = MethodContext.new(@_session, event) + begin + method_call(context, event.getMethodName(), event.getDataAddr(), event.getArguments(), event.getUserId()) + rescue Exception => ex + @_session.impl.raiseException(event, "#{ex}") + end + + end + end + end + end + + + ## + ## The following methods are intended to be overridden in a sub-class. They are + ## handlers for events that occur on QMF consoles. + ## + + # + # This method will only be invoked if the "allow-queries" option is enabled on the + # agent session. When invoked, it provides the query and the authenticated user-id + # of the querying client. + # + # This method must return true if the query is permitted, false otherwise. + # + def authorize_query(query, user_id); end + + # + # This method will only be invoked if the "external" option is "True" on the agent + # session. When invoked, the method should begin the process of responding to a data + # query. The authenticated user-id of the requestor is provided for informational + # purposes. The 'context' variable is used to provide the results back to the requestor. + # + # For each matching Data object, call context.response(data). When the query is complete, + # call context.complete(). After completing the query, you should not use 'context' any + # longer. + # + # Note: It is not necessary to process the query synchronously. If desired, this method + # may store the context for asynchronous processing or pass it to another thread for + # processing. There is no restriction on the number of contexts that may be in-flight + # concurrently. + # + def get_query(context, query, user_id); end + + # + # This method is invoked when a console calls a QMF method on the agent. Supplied are + # a context for the response, the method name, the data address of the data object being + # called, the input arguments (a dictionary), and the caller's authenticated user-id. + # + # A method call can end one of two ways: Successful completion, in which the output + # arguments (if any) are supplied; and Exceptional completion if there is an error. + # + # Successful Completion: + # For each output argument, assign the value directly to context (context.arg1 = "value") + # Once arguments are assigned, call context._success(). + # + # Exceptional Completion: + # Method 1: Call context._exception(data) where 'data' is a string or a Data object. + # Method 2: Raise an exception (raise "Error Text") synchronously in the method body. + # + # Note: Like get_query, method_call may process methods synchronously or asynchronously. + # This method may store the context for later asynchronous processing. There is no + # restriction on the number of contexts that may be in-flight concurrently. + # + # However, "Method 2" for Exceptional Completion can only be done synchronously. + # + def method_call(context, method_name, data_addr, args, user_id); end + end + + class QueryContext + def initialize(agent, context) + @agent = agent + @context = context + end + + def response(data) + @agent.impl.response(@context, data.impl) + end + + def complete + @agent.impl.complete(@context) + end + end + + class MethodContext + def initialize(agent, context) + @agent = agent + @context = context + end + + def _success + @agent.impl.methodSuccess(@context) + end + + def _exception(ex) + if ex.class == Data + @agent.impl.raiseException(@context, ex.impl) + else + @agent.impl.raiseException(@context, ex) + end + end + + def method_missing(name_in, *args) + name = name_in.to_s + if name[name.length - 1] == 61 + name = name[0..name.length - 2] + @context.impl.addReturnArgument(name, args[0]) + else + super.method_missing(name_in, args) + end + end + end + + ##============================================================================== + ## CONSOLE HANDLER + ##============================================================================== + + class ConsoleHandler + + def initialize(session) + @_session = session + @_running = false + @_thread = nil + end + + ## + ## Call the "start" method to run the handler on a new thread. + ## + def start + @_thread = Thread.new do + run + end + end + + ## + ## Request that the running thread complete and exit. + ## + def cancel + @_running = false + @_thread.join if @_thread + @_thread = nil + end + + ## + ## Call the "run" method only if you want the handler to run on your own thread. + ## + def run + @_running = true + event = Cqmf2::ConsoleEvent.new + while @_running do + valid = @_session.impl.nextEvent(event, Cqpid::Duration.SECOND) + if valid and @_running + case event.getType + when Cqmf2::CONSOLE_AGENT_ADD + agent_added(Agent.new(event.getAgent)) + + when Cqmf2::CONSOLE_AGENT_DEL + reason = :filter + reason = :aged if event.getAgentDelReason == Cqmf2::AGENT_DEL_AGED + agent_deleted(Agent.new(event.getAgent), reason) + + when Cqmf2::CONSOLE_AGENT_RESTART + agent_restarted(Agent.new(event.getAgent)) + + when Cqmf2::CONSOLE_AGENT_SCHEMA_UPDATE + agent_schema_updated(Agent.new(event.getAgent)) + + end + end + end + end + + + ## + ## The following methods are intended to be overridden in a sub-class. They are + ## handlers for events that occur on QMF consoles. + ## + + # + # A new agent, whose attributes match the console's agent filter, has been discovered. + # + def agent_added(agent); end + + # + # A known agent has been removed from the agent list. There are two possible reasons + # for agent deletion: + # + # 1) :aged - The agent hasn't been heard from for the maximum age interval and is + # presumed dead. + # 2) :filter - The agent no longer matches the console's agent-filter and has been + # effectively removed from the agent list. Such occurrences are likely + # to be seen immediately after setting the filter to a new value. + # + def agent_deleted(agent, reason); end + + # + # An agent-restart was detected. This occurs when the epoch number advertised by the + # agent changes. It indicates that the agent in question was shut-down/crashed and + # restarted. + # + def agent_restarted(agent); end + + # + # The agent has registered new schema information which can now be queried, if desired. + # + def agent_schema_updated(agent); end + + # + # An agent raised an event. The 'data' argument is a Data object that contains the + # content of the event. + # + def event_raised(agent, data, timestamp, severity); end end ##============================================================================== @@ -93,6 +351,16 @@ module Qmf2 class ConsoleSession attr_reader :impl + ## The options string is of the form "{key:value,key:value}". The following keys are supported: + ## + ## domain:NAME - QMF Domain to join [default: "default"] + ## max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from + ## an agent before deleting it [default: 5] + ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + ## If False: Listen only on the routable direct address + ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network + ## - If False: Operate more flexibly with regard to use of messaging facilities [default] + ## def initialize(connection, options="") @impl = Cqmf2::ConsoleSession.new(connection, options) end @@ -124,6 +392,24 @@ module Qmf2 class AgentSession attr_reader :impl + ## The options string is of the form "{key:value,key:value}". The following keys are supported: + ## + ## interval:N - Heartbeat interval in seconds [default: 60] + ## external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False] + ## allow-queries:{True,False} - If True: automatically allow all queries [default] + ## If False: generate an AUTH_QUERY event to allow per-query authorization + ## allow-methods:{True,False} - If True: automatically allow all methods [default] + ## If False: generate an AUTH_METHOD event to allow per-method authorization + ## max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64] + ## min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000] + ## sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300] + ## public-events:{True,False} - If True: QMF events are sent to the topic exchange [default] + ## If False: QMF events are only sent to authorized subscribers + ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default] + ## If False: Listen only on the routable direct address + ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network + ## - If False: Operate more flexibly with regard to use of messaging facilities [default] + ## def initialize(connection, options="") @impl = Cqmf2::AgentSession.new(connection, options) end @@ -137,25 +423,13 @@ module Qmf2 def close() @impl.close end def register_schema(cls) @impl.registerSchema(cls.impl) end - def add_data(data, name="", persistent=:false) + def add_data(data, name="", persistent=false) DataAddr.new(@impl.addData(data.impl, name, persistent)) end def del_data(addr) @impl.del_data(addr.impl) end - - def method_success(handle) - @impl.methodSuccess(handle) - end - - def raise_exception(handle, data) - if data.class == Data - @impl.raiseException(handle, data.impl) - else - @impl.raiseException(handle, data) - end - end end ##============================================================================== @@ -228,7 +502,7 @@ module Qmf2 end ##============================================================================== - ## QUERY TODO + ## QUERY ##============================================================================== class Query @@ -238,6 +512,16 @@ module Qmf2 @impl = Cqmf2::Query.new(arg1.impl) end end + + def addr() DataAddr.new(@impl.getDataAddr()) end + def schema_id() SchemaId.new(@impl.getSchemaId()) end + def predicate() @impl.getPredicate() end + + def matches?(data) + map = data + map = data.properties if data.class == Data + @impl.matchesPredicate(map) + end end ##============================================================================== @@ -248,16 +532,17 @@ module Qmf2 attr_reader :impl def initialize(arg=nil) + @schema = nil if arg == nil @impl = Cqmf2::Data.new elsif arg.class == Cqmf2::Data @impl = arg elsif arg.class == Schema - @impl = Cqmf2::Data(arg.impl) + @impl = Cqmf2::Data.new(arg.impl) + @schema = arg else raise "Unsupported initializer for Data" end - @schema = nil end def to_s @@ -271,6 +556,10 @@ module Qmf2 return nil end + def set_addr(addr) + @impl.setAddr(addr.impl) + end + def addr if @impl.hasAddr return DataAddr.new(@impl.getAddr) @@ -282,6 +571,17 @@ module Qmf2 return Agent.new(@impl.getAgent) end + def update(timeout=5) + dur = Cqpid::Duration.new(Cqpid::Duration.SECOND.getMilliseconds * timeout) + agent = @impl.getAgent + query = Cqmf2::Query.new(@impl.getAddr) + result = agent.query(query, dur) + raise "Update query failed" if result.getType != Cqmf2::CONSOLE_QUERY_RESPONSE + raise "Object no longer exists on agent" if result.getDataCount == 0 + @impl = Cqmf2::Data.new(result.getData(0)) + return nil + end + def properties return @impl.getProperties end @@ -392,11 +692,13 @@ module Qmf2 class DataAddr attr_reader :impl - def initialize(arg) + def initialize(arg, agentName="") if arg.class == Hash @impl = Cqmf2::DataAddr.new(arg) - else + elsif arg.class == Cqmf2::DataAddr @impl = arg + else + @impl = Cqmf2::DataAddr.new(arg, agentName) end end |
