summaryrefslogtreecommitdiff
path: root/ruby
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-03-27 20:46:24 +0000
committerTed Ross <tross@apache.org>2009-03-27 20:46:24 +0000
commitb264a276f994526ce24062c3f852fdd658857d29 (patch)
tree6aedda9cc832c27d39856a6bb3f9dcc0b64ee146 /ruby
parent2c8be931523ca30352ed01164ff70ac0f60fc02a (diff)
downloadqpid-python-b264a276f994526ce24062c3f852fdd658857d29.tar.gz
QPID-1702 QPID-1706
Updated qmf console in Python and Ruby - Added support for asynchronous method invocation - Added option to override timeout for method request and get request - Added exception handler in delegates.rb to catch Sasl errors - Added tests for the async method features git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@759341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'ruby')
-rw-r--r--ruby/lib/qpid/delegates.rb14
-rw-r--r--ruby/lib/qpid/qmf.rb61
-rw-r--r--ruby/tests/qmf.rb86
3 files changed, 130 insertions, 31 deletions
diff --git a/ruby/lib/qpid/delegates.rb b/ruby/lib/qpid/delegates.rb
index 171f310e48..8d866e895f 100644
--- a/ruby/lib/qpid/delegates.rb
+++ b/ruby/lib/qpid/delegates.rb
@@ -200,10 +200,16 @@ module Qpid
start.mechanisms.each do |m|
mech_list += m + " "
end
- resp = Sasl.client_start(@saslConn, mech_list)
- ch.connection_start_ok(:client_properties => PROPERTIES,
- :mechanism => resp[2],
- :response => resp[1])
+ begin
+ resp = Sasl.client_start(@saslConn, mech_list)
+ ch.connection_start_ok(:client_properties => PROPERTIES,
+ :mechanism => resp[2],
+ :response => resp[1])
+ rescue exception
+ ch.connection_close(:message => $!.message)
+ @connection.failed = true
+ @connection.signal
+ end
end
def connection_secure(ch, secure)
diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb
index ee165305c3..ebca7ee5ab 100644
--- a/ruby/lib/qpid/qmf.rb
+++ b/ruby/lib/qpid/qmf.rb
@@ -58,9 +58,14 @@ module Qpid::Qmf
# Invoked when an event is raised
def event(broker, event); end
+ # Invoked when an agent heartbeat is received.
def heartbeat(agent, timestamp); end
+ # Invoked when the connection sequence reaches the point where broker information is available.
def broker_info(broker); end
+
+ # Invoked when a method response from an asynchronous method call is received.
+ def method_response(broker, seq, response); end
end
class BrokerURL
@@ -105,7 +110,7 @@ module Qpid::Qmf
CONTEXT_STARTUP = 2
CONTEXT_MULTIGET = 3
- GET_WAIT_TIME = 60
+ DEFAULT_GET_WAIT_TIME = 60
include MonitorMixin
@@ -305,11 +310,17 @@ module Qpid::Qmf
# Otherwise, the query will go to all agents.
#
# :agent = <agent> - supply an agent from the list returned by getAgents.
+ #
# If the get query is to be restricted to one broker (as opposed to
# all connected brokers), add the following argument:
#
# :broker = <broker> - supply a broker as returned by addBroker.
#
+ # The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ # use the following argument:
+ #
+ # :_timeout = <time in seconds>
+ #
# If additional arguments are supplied, they are used as property
# selectors, as long as their keys are strings. For example, if
# the argument "name" => "test" is supplied, only objects whose
@@ -389,9 +400,13 @@ module Qpid::Qmf
end
timeout = false
+ if kwargs.include?(:_timeout)
+ wait_time = kwargs[:_timeout]
+ else
+ wait_time = DEFAULT_GET_WAIT_TIME
+ end
synchronize do
- unless @cv.wait_for(GET_WAIT_TIME) {
- @sync_sequence_list.empty? || @error }
+ unless @cv.wait_for(wait_time) { @sync_sequence_list.empty? || @error }
@sync_sequence_list.each do |pending_seq|
@seq_mgr.release(pending_seq)
end
@@ -504,10 +519,11 @@ module Qpid::Qmf
def handle_method_resp(broker, codec, seq)
code = codec.read_uint32
-
text = codec.read_str16
out_args = {}
- method, synchronous = @seq_mgr.release(seq)
+ pair = @seq_mgr.release(seq)
+ return unless pair
+ method, synchronous = pair
if code == 0
method.arguments.each do |arg|
if arg.dir.index(?O)
@@ -1054,7 +1070,7 @@ module Qpid::Qmf
private
- def send_method_request(method, name, args, synchronous = false)
+ def send_method_request(method, name, args, synchronous = false, time_wait = nil)
@schema.methods.each do |schema_method|
if name == schema_method.name
send_codec = Qpid::StringCodec.new(@broker.conn.spec)
@@ -1077,9 +1093,9 @@ module Qpid::Qmf
@session.encode_value(send_codec, actual, formal.type)
end
+ ttl = time_wait ? time_wait * 1000 : nil
smsg = @broker.message(send_codec.encoded,
- "agent.#{object_id.broker_bank}.#{object_id.agent_bank}")
-
+ "agent.#{object_id.broker_bank}.#{object_id.agent_bank}", ttl=ttl)
@broker.sync_start if synchronous
@broker.emit(smsg)
@@ -1089,8 +1105,25 @@ module Qpid::Qmf
end
def invoke(method, name, args)
- if send_method_request(method, name, args, synchronous = true)
- unless @broker.wait_for_sync_done
+ kwargs = args[args.size - 1]
+ sync = true
+ timeout = nil
+
+ if kwargs.class == Hash
+ if kwargs.include?(:_timeout)
+ timeout = kwargs[:_timeout]
+ end
+
+ if kwargs.include?(:_async)
+ sync = !kwargs[:_async]
+ end
+ args.pop
+ end
+
+ seq = send_method_request(method, name, args, synchronous = sync)
+ if seq
+ return seq unless sync
+ unless @broker.wait_for_sync_done(timeout)
@session.seq_mgr.release(seq)
raise "Timed out waiting for method to respond"
end
@@ -1284,9 +1317,10 @@ module Qpid::Qmf
end
end
- def wait_for_sync_done
+ def wait_for_sync_done(timeout=nil)
+ wait_time = timeout ? timeout : SYNC_TIME
synchronize do
- return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error }
+ return @cv.wait_for(wait_time) { ! @sync_in_flight || @error }
end
end
@@ -1309,9 +1343,10 @@ module Qpid::Qmf
codec.write_uint32(seq)
end
- def message(body, routing_key="broker")
+ def message(body, routing_key="broker", ttl=nil)
dp = @amqp_session.delivery_properties
dp.routing_key = routing_key
+ dp.ttl = ttl if ttl
mp = @amqp_session.message_properties
mp.content_type = "x-application/qmf"
mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
diff --git a/ruby/tests/qmf.rb b/ruby/tests/qmf.rb
index b0ee19c3d5..06371a5a82 100644
--- a/ruby/tests/qmf.rb
+++ b/ruby/tests/qmf.rb
@@ -21,31 +21,69 @@ require "test/unit"
require "qpid"
require "tests/util"
require "socket"
+require "monitor.rb"
class QmfTest < Test::Unit::TestCase
+ class Handler < Qpid::Qmf::Console
+ include MonitorMixin
+
+ def initialize
+ super()
+ @xmt_list = {}
+ @rcv_list = {}
+ end
+
+ def method_response(broker, seq, response)
+ synchronize do
+ @rcv_list[seq] = response
+ end
+ end
+
+ def request(broker, count)
+ @count = count
+ for idx in 0...count
+ synchronize do
+ seq = broker.echo(idx, "Echo Message", :_async => true)
+ @xmt_list[seq] = idx
+ end
+ end
+ end
+
+ def check
+ return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size
+ lost = 0
+ mismatched = 0
+ @xmt_list.each do |seq, value|
+ if @rcv_list.include?(seq)
+ result = @rcv_list.delete(seq)
+ mismatch += 1 unless result.sequence == value
+ else
+ lost += 1
+ end
+ end
+ spurious = @rcv_list.size
+ if lost == 0 and mismatched == 0 and spurious == 0
+ return "pass"
+ else
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious]
+ end
+ end
+ end
+
def setup()
# Make sure errors in threads lead to a noisy death of the test
Thread.abort_on_exception = true
- host = ENV.fetch("QMF_TEST_HOST", 'localhost')
- port = ENV.fetch("QMF_TEST_PORT", 5672)
+ @host = ENV.fetch("QMF_TEST_HOST", 'localhost')
+ @port = ENV.fetch("QMF_TEST_PORT", 5672)
- sock = TCPSocket.new(host, port)
+ sock = TCPSocket.new(@host, @port)
@conn = Qpid::Connection.new(sock)
@conn.start()
@session = @conn.session("test-session")
-
- # It's a bit odd that we're using two connections but that's the way
- # the python one works afaict.
- @qmf = Qpid::Qmf::Session.new()
- @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [host, port])
-
- brokers = @qmf.objects(:class => "broker")
- assert_equal(1, brokers.length)
- @broker = brokers[0]
end
def teardown
@@ -58,10 +96,20 @@ class QmfTest < Test::Unit::TestCase
end
end
- def test_broker_connectivity()
+ def start_qmf(kwargs = {})
+ @qmf = Qpid::Qmf::Session.new(kwargs)
+ @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port])
+
+ brokers = @qmf.objects(:class => "broker")
+ assert_equal(1, brokers.length)
+ @broker = brokers[0]
+ end
+
+ def test_methods_sync()
+ start_qmf
body = "Echo Message Body"
for seq in 1..10
- res = @broker.echo(seq, body)
+ res = @broker.echo(seq, body, :_timeout => 10)
assert_equal(0, res.status)
assert_equal("OK", res.text)
assert_equal(seq, res.sequence)
@@ -69,6 +117,14 @@ class QmfTest < Test::Unit::TestCase
end
end
+ def test_methods_async()
+ handler = Handler.new
+ start_qmf(:console => handler)
+ handler.request(@broker, 20)
+ sleep(1)
+ assert_equal("pass", handler.check)
+ end
+
def test_move_queued_messages()
"""
Test ability to move messages from the head of one queue to another.
@@ -76,6 +132,7 @@ class QmfTest < Test::Unit::TestCase
"""
"Set up source queue"
+ start_qmf
@session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true)
@session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key")
@@ -151,6 +208,7 @@ class QmfTest < Test::Unit::TestCase
# Test ability to purge messages from the head of a queue. Need to test
# moveing all, 1 (top message) and N messages.
def test_purge_queue
+ start_qmf
# Set up purge queue"
@session.queue_declare(:queue => "purge-queue",
:exclusive => true,