summaryrefslogtreecommitdiff
path: root/cpp/bindings/qmf2/ruby
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-02-04 06:38:31 +0000
committerTed Ross <tross@apache.org>2011-02-04 06:38:31 +0000
commitcf47f99d276a50ac32ed9835a9afb818fd90f4ba (patch)
treee7a3b44d64f8be42d7abc8e3191d3090d48ed01c /cpp/bindings/qmf2/ruby
parent8cea22dc96999d3f462a41a36b9803327fb28005 (diff)
downloadqpid-python-cf47f99d276a50ac32ed9835a9afb818fd90f4ba.tar.gz
Merged missing functionality from the QMFv1 Ruby and Python interfaces to the QMFv2 interfaces:
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1067095 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/bindings/qmf2/ruby')
-rw-r--r--cpp/bindings/qmf2/ruby/qmf2.rb344
1 files changed, 323 insertions, 21 deletions
diff --git a/cpp/bindings/qmf2/ruby/qmf2.rb b/cpp/bindings/qmf2/ruby/qmf2.rb
index 86ba72cc0b..642af35eca 100644
--- a/cpp/bindings/qmf2/ruby/qmf2.rb
+++ b/cpp/bindings/qmf2/ruby/qmf2.rb
@@ -78,12 +78,270 @@ module Qmf2
end
##==============================================================================
- ## AGENT HANDLER TODO
+ ## AGENT HANDLER
##==============================================================================
class AgentHandler
- def get_query(context, query, userId); end
- def method_call(context, name, object_id, args, userId); end
+
+ def initialize(session)
+ @_session = session
+ @_running = false
+ @_thread = nil
+ end
+
+ ##
+ ## Call the "start" method to run the handler on a new thread.
+ ##
+ def start
+ @_thread = Thread.new do
+ run
+ end
+ end
+
+ ##
+ ## Request that the running thread complete and exit.
+ ##
+ def cancel
+ @_running = false
+ @_thread.join if @_thread
+ @_thread = nil
+ end
+
+ ##
+ ## Call the "run" method only if you want the handler to run on your own thread.
+ ##
+ def run
+ @_running = true
+ event = Cqmf2::AgentEvent.new
+ while @_running do
+ valid = @_session.impl.nextEvent(event, Cqpid::Duration.SECOND)
+ if valid and @_running
+ case event.getType
+ when Cqmf2::AGENT_AUTH_QUERY
+ yes = authorize_query(Query.new(event.getQuery()), event.getUserId())
+ if yes == true
+ @_session.impl.authAccept(event)
+ else
+ @_session.impl.authReject(event)
+ end
+
+ when Cqmf2::AGENT_QUERY
+ context = QueryContext.new(@_session, event)
+ get_query(context, Query.new(event.getQuery()), event.getUserId())
+
+ when Cqmf2::AGENT_METHOD
+ context = MethodContext.new(@_session, event)
+ begin
+ method_call(context, event.getMethodName(), event.getDataAddr(), event.getArguments(), event.getUserId())
+ rescue Exception => ex
+ @_session.impl.raiseException(event, "#{ex}")
+ end
+
+ end
+ end
+ end
+ end
+
+
+ ##
+ ## The following methods are intended to be overridden in a sub-class. They are
+ ## handlers for events that occur on QMF consoles.
+ ##
+
+ #
+ # This method will only be invoked if the "allow-queries" option is enabled on the
+ # agent session. When invoked, it provides the query and the authenticated user-id
+ # of the querying client.
+ #
+ # This method must return true if the query is permitted, false otherwise.
+ #
+ def authorize_query(query, user_id); end
+
+ #
+ # This method will only be invoked if the "external" option is "True" on the agent
+ # session. When invoked, the method should begin the process of responding to a data
+ # query. The authenticated user-id of the requestor is provided for informational
+ # purposes. The 'context' variable is used to provide the results back to the requestor.
+ #
+ # For each matching Data object, call context.response(data). When the query is complete,
+ # call context.complete(). After completing the query, you should not use 'context' any
+ # longer.
+ #
+ # Note: It is not necessary to process the query synchronously. If desired, this method
+ # may store the context for asynchronous processing or pass it to another thread for
+ # processing. There is no restriction on the number of contexts that may be in-flight
+ # concurrently.
+ #
+ def get_query(context, query, user_id); end
+
+ #
+ # This method is invoked when a console calls a QMF method on the agent. Supplied are
+ # a context for the response, the method name, the data address of the data object being
+ # called, the input arguments (a dictionary), and the caller's authenticated user-id.
+ #
+ # A method call can end one of two ways: Successful completion, in which the output
+ # arguments (if any) are supplied; and Exceptional completion if there is an error.
+ #
+ # Successful Completion:
+ # For each output argument, assign the value directly to context (context.arg1 = "value")
+ # Once arguments are assigned, call context._success().
+ #
+ # Exceptional Completion:
+ # Method 1: Call context._exception(data) where 'data' is a string or a Data object.
+ # Method 2: Raise an exception (raise "Error Text") synchronously in the method body.
+ #
+ # Note: Like get_query, method_call may process methods synchronously or asynchronously.
+ # This method may store the context for later asynchronous processing. There is no
+ # restriction on the number of contexts that may be in-flight concurrently.
+ #
+ # However, "Method 2" for Exceptional Completion can only be done synchronously.
+ #
+ def method_call(context, method_name, data_addr, args, user_id); end
+ end
+
+ class QueryContext
+ def initialize(agent, context)
+ @agent = agent
+ @context = context
+ end
+
+ def response(data)
+ @agent.impl.response(@context, data.impl)
+ end
+
+ def complete
+ @agent.impl.complete(@context)
+ end
+ end
+
+ class MethodContext
+ def initialize(agent, context)
+ @agent = agent
+ @context = context
+ end
+
+ def _success
+ @agent.impl.methodSuccess(@context)
+ end
+
+ def _exception(ex)
+ if ex.class == Data
+ @agent.impl.raiseException(@context, ex.impl)
+ else
+ @agent.impl.raiseException(@context, ex)
+ end
+ end
+
+ def method_missing(name_in, *args)
+ name = name_in.to_s
+ if name[name.length - 1] == 61
+ name = name[0..name.length - 2]
+ @context.impl.addReturnArgument(name, args[0])
+ else
+ super.method_missing(name_in, args)
+ end
+ end
+ end
+
+ ##==============================================================================
+ ## CONSOLE HANDLER
+ ##==============================================================================
+
+ class ConsoleHandler
+
+ def initialize(session)
+ @_session = session
+ @_running = false
+ @_thread = nil
+ end
+
+ ##
+ ## Call the "start" method to run the handler on a new thread.
+ ##
+ def start
+ @_thread = Thread.new do
+ run
+ end
+ end
+
+ ##
+ ## Request that the running thread complete and exit.
+ ##
+ def cancel
+ @_running = false
+ @_thread.join if @_thread
+ @_thread = nil
+ end
+
+ ##
+ ## Call the "run" method only if you want the handler to run on your own thread.
+ ##
+ def run
+ @_running = true
+ event = Cqmf2::ConsoleEvent.new
+ while @_running do
+ valid = @_session.impl.nextEvent(event, Cqpid::Duration.SECOND)
+ if valid and @_running
+ case event.getType
+ when Cqmf2::CONSOLE_AGENT_ADD
+ agent_added(Agent.new(event.getAgent))
+
+ when Cqmf2::CONSOLE_AGENT_DEL
+ reason = :filter
+ reason = :aged if event.getAgentDelReason == Cqmf2::AGENT_DEL_AGED
+ agent_deleted(Agent.new(event.getAgent), reason)
+
+ when Cqmf2::CONSOLE_AGENT_RESTART
+ agent_restarted(Agent.new(event.getAgent))
+
+ when Cqmf2::CONSOLE_AGENT_SCHEMA_UPDATE
+ agent_schema_updated(Agent.new(event.getAgent))
+
+ end
+ end
+ end
+ end
+
+
+ ##
+ ## The following methods are intended to be overridden in a sub-class. They are
+ ## handlers for events that occur on QMF consoles.
+ ##
+
+ #
+ # A new agent, whose attributes match the console's agent filter, has been discovered.
+ #
+ def agent_added(agent); end
+
+ #
+ # A known agent has been removed from the agent list. There are two possible reasons
+ # for agent deletion:
+ #
+ # 1) :aged - The agent hasn't been heard from for the maximum age interval and is
+ # presumed dead.
+ # 2) :filter - The agent no longer matches the console's agent-filter and has been
+ # effectively removed from the agent list. Such occurrences are likely
+ # to be seen immediately after setting the filter to a new value.
+ #
+ def agent_deleted(agent, reason); end
+
+ #
+ # An agent-restart was detected. This occurs when the epoch number advertised by the
+ # agent changes. It indicates that the agent in question was shut-down/crashed and
+ # restarted.
+ #
+ def agent_restarted(agent); end
+
+ #
+ # The agent has registered new schema information which can now be queried, if desired.
+ #
+ def agent_schema_updated(agent); end
+
+ #
+ # An agent raised an event. The 'data' argument is a Data object that contains the
+ # content of the event.
+ #
+ def event_raised(agent, data, timestamp, severity); end
end
##==============================================================================
@@ -93,6 +351,16 @@ module Qmf2
class ConsoleSession
attr_reader :impl
+ ## The options string is of the form "{key:value,key:value}". The following keys are supported:
+ ##
+ ## domain:NAME - QMF Domain to join [default: "default"]
+ ## max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from
+ ## an agent before deleting it [default: 5]
+ ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ ## If False: Listen only on the routable direct address
+ ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
+ ## - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ ##
def initialize(connection, options="")
@impl = Cqmf2::ConsoleSession.new(connection, options)
end
@@ -124,6 +392,24 @@ module Qmf2
class AgentSession
attr_reader :impl
+ ## The options string is of the form "{key:value,key:value}". The following keys are supported:
+ ##
+ ## interval:N - Heartbeat interval in seconds [default: 60]
+ ## external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False]
+ ## allow-queries:{True,False} - If True: automatically allow all queries [default]
+ ## If False: generate an AUTH_QUERY event to allow per-query authorization
+ ## allow-methods:{True,False} - If True: automatically allow all methods [default]
+ ## If False: generate an AUTH_METHOD event to allow per-method authorization
+ ## max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64]
+ ## min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000]
+ ## sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300]
+ ## public-events:{True,False} - If True: QMF events are sent to the topic exchange [default]
+ ## If False: QMF events are only sent to authorized subscribers
+ ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ ## If False: Listen only on the routable direct address
+ ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
+ ## - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ ##
def initialize(connection, options="")
@impl = Cqmf2::AgentSession.new(connection, options)
end
@@ -137,25 +423,13 @@ module Qmf2
def close() @impl.close end
def register_schema(cls) @impl.registerSchema(cls.impl) end
- def add_data(data, name="", persistent=:false)
+ def add_data(data, name="", persistent=false)
DataAddr.new(@impl.addData(data.impl, name, persistent))
end
def del_data(addr)
@impl.del_data(addr.impl)
end
-
- def method_success(handle)
- @impl.methodSuccess(handle)
- end
-
- def raise_exception(handle, data)
- if data.class == Data
- @impl.raiseException(handle, data.impl)
- else
- @impl.raiseException(handle, data)
- end
- end
end
##==============================================================================
@@ -228,7 +502,7 @@ module Qmf2
end
##==============================================================================
- ## QUERY TODO
+ ## QUERY
##==============================================================================
class Query
@@ -238,6 +512,16 @@ module Qmf2
@impl = Cqmf2::Query.new(arg1.impl)
end
end
+
+ def addr() DataAddr.new(@impl.getDataAddr()) end
+ def schema_id() SchemaId.new(@impl.getSchemaId()) end
+ def predicate() @impl.getPredicate() end
+
+ def matches?(data)
+ map = data
+ map = data.properties if data.class == Data
+ @impl.matchesPredicate(map)
+ end
end
##==============================================================================
@@ -248,16 +532,17 @@ module Qmf2
attr_reader :impl
def initialize(arg=nil)
+ @schema = nil
if arg == nil
@impl = Cqmf2::Data.new
elsif arg.class == Cqmf2::Data
@impl = arg
elsif arg.class == Schema
- @impl = Cqmf2::Data(arg.impl)
+ @impl = Cqmf2::Data.new(arg.impl)
+ @schema = arg
else
raise "Unsupported initializer for Data"
end
- @schema = nil
end
def to_s
@@ -271,6 +556,10 @@ module Qmf2
return nil
end
+ def set_addr(addr)
+ @impl.setAddr(addr.impl)
+ end
+
def addr
if @impl.hasAddr
return DataAddr.new(@impl.getAddr)
@@ -282,6 +571,17 @@ module Qmf2
return Agent.new(@impl.getAgent)
end
+ def update(timeout=5)
+ dur = Cqpid::Duration.new(Cqpid::Duration.SECOND.getMilliseconds * timeout)
+ agent = @impl.getAgent
+ query = Cqmf2::Query.new(@impl.getAddr)
+ result = agent.query(query, dur)
+ raise "Update query failed" if result.getType != Cqmf2::CONSOLE_QUERY_RESPONSE
+ raise "Object no longer exists on agent" if result.getDataCount == 0
+ @impl = Cqmf2::Data.new(result.getData(0))
+ return nil
+ end
+
def properties
return @impl.getProperties
end
@@ -392,11 +692,13 @@ module Qmf2
class DataAddr
attr_reader :impl
- def initialize(arg)
+ def initialize(arg, agentName="")
if arg.class == Hash
@impl = Cqmf2::DataAddr.new(arg)
- else
+ elsif arg.class == Cqmf2::DataAddr
@impl = arg
+ else
+ @impl = Cqmf2::DataAddr.new(arg, agentName)
end
end