diff options
| author | Ted Ross <tross@apache.org> | 2009-03-27 20:46:24 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2009-03-27 20:46:24 +0000 |
| commit | b264a276f994526ce24062c3f852fdd658857d29 (patch) | |
| tree | 6aedda9cc832c27d39856a6bb3f9dcc0b64ee146 /ruby | |
| parent | 2c8be931523ca30352ed01164ff70ac0f60fc02a (diff) | |
| download | qpid-python-b264a276f994526ce24062c3f852fdd658857d29.tar.gz | |
QPID-1702 QPID-1706
Updated qmf console in Python and Ruby
- Added support for asynchronous method invocation
- Added option to override timeout for method request and get request
- Added exception handler in delegates.rb to catch Sasl errors
- Added tests for the async method features
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@759341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'ruby')
| -rw-r--r-- | ruby/lib/qpid/delegates.rb | 14 | ||||
| -rw-r--r-- | ruby/lib/qpid/qmf.rb | 61 | ||||
| -rw-r--r-- | ruby/tests/qmf.rb | 86 |
3 files changed, 130 insertions, 31 deletions
diff --git a/ruby/lib/qpid/delegates.rb b/ruby/lib/qpid/delegates.rb index 171f310e48..8d866e895f 100644 --- a/ruby/lib/qpid/delegates.rb +++ b/ruby/lib/qpid/delegates.rb @@ -200,10 +200,16 @@ module Qpid start.mechanisms.each do |m| mech_list += m + " " end - resp = Sasl.client_start(@saslConn, mech_list) - ch.connection_start_ok(:client_properties => PROPERTIES, - :mechanism => resp[2], - :response => resp[1]) + begin + resp = Sasl.client_start(@saslConn, mech_list) + ch.connection_start_ok(:client_properties => PROPERTIES, + :mechanism => resp[2], + :response => resp[1]) + rescue exception + ch.connection_close(:message => $!.message) + @connection.failed = true + @connection.signal + end end def connection_secure(ch, secure) diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb index ee165305c3..ebca7ee5ab 100644 --- a/ruby/lib/qpid/qmf.rb +++ b/ruby/lib/qpid/qmf.rb @@ -58,9 +58,14 @@ module Qpid::Qmf # Invoked when an event is raised def event(broker, event); end + # Invoked when an agent heartbeat is received. def heartbeat(agent, timestamp); end + # Invoked when the connection sequence reaches the point where broker information is available. def broker_info(broker); end + + # Invoked when a method response from an asynchronous method call is received. + def method_response(broker, seq, response); end end class BrokerURL @@ -105,7 +110,7 @@ module Qpid::Qmf CONTEXT_STARTUP = 2 CONTEXT_MULTIGET = 3 - GET_WAIT_TIME = 60 + DEFAULT_GET_WAIT_TIME = 60 include MonitorMixin @@ -305,11 +310,17 @@ module Qpid::Qmf # Otherwise, the query will go to all agents. # # :agent = <agent> - supply an agent from the list returned by getAgents. + # # If the get query is to be restricted to one broker (as opposed to # all connected brokers), add the following argument: # # :broker = <broker> - supply a broker as returned by addBroker. # + # The default timeout for this synchronous operation is 60 seconds. To change the timeout, + # use the following argument: + # + # :_timeout = <time in seconds> + # # If additional arguments are supplied, they are used as property # selectors, as long as their keys are strings. For example, if # the argument "name" => "test" is supplied, only objects whose @@ -389,9 +400,13 @@ module Qpid::Qmf end timeout = false + if kwargs.include?(:_timeout) + wait_time = kwargs[:_timeout] + else + wait_time = DEFAULT_GET_WAIT_TIME + end synchronize do - unless @cv.wait_for(GET_WAIT_TIME) { - @sync_sequence_list.empty? || @error } + unless @cv.wait_for(wait_time) { @sync_sequence_list.empty? || @error } @sync_sequence_list.each do |pending_seq| @seq_mgr.release(pending_seq) end @@ -504,10 +519,11 @@ module Qpid::Qmf def handle_method_resp(broker, codec, seq) code = codec.read_uint32 - text = codec.read_str16 out_args = {} - method, synchronous = @seq_mgr.release(seq) + pair = @seq_mgr.release(seq) + return unless pair + method, synchronous = pair if code == 0 method.arguments.each do |arg| if arg.dir.index(?O) @@ -1054,7 +1070,7 @@ module Qpid::Qmf private - def send_method_request(method, name, args, synchronous = false) + def send_method_request(method, name, args, synchronous = false, time_wait = nil) @schema.methods.each do |schema_method| if name == schema_method.name send_codec = Qpid::StringCodec.new(@broker.conn.spec) @@ -1077,9 +1093,9 @@ module Qpid::Qmf @session.encode_value(send_codec, actual, formal.type) end + ttl = time_wait ? time_wait * 1000 : nil smsg = @broker.message(send_codec.encoded, - "agent.#{object_id.broker_bank}.#{object_id.agent_bank}") - + "agent.#{object_id.broker_bank}.#{object_id.agent_bank}", ttl=ttl) @broker.sync_start if synchronous @broker.emit(smsg) @@ -1089,8 +1105,25 @@ module Qpid::Qmf end def invoke(method, name, args) - if send_method_request(method, name, args, synchronous = true) - unless @broker.wait_for_sync_done + kwargs = args[args.size - 1] + sync = true + timeout = nil + + if kwargs.class == Hash + if kwargs.include?(:_timeout) + timeout = kwargs[:_timeout] + end + + if kwargs.include?(:_async) + sync = !kwargs[:_async] + end + args.pop + end + + seq = send_method_request(method, name, args, synchronous = sync) + if seq + return seq unless sync + unless @broker.wait_for_sync_done(timeout) @session.seq_mgr.release(seq) raise "Timed out waiting for method to respond" end @@ -1284,9 +1317,10 @@ module Qpid::Qmf end end - def wait_for_sync_done + def wait_for_sync_done(timeout=nil) + wait_time = timeout ? timeout : SYNC_TIME synchronize do - return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error } + return @cv.wait_for(wait_time) { ! @sync_in_flight || @error } end end @@ -1309,9 +1343,10 @@ module Qpid::Qmf codec.write_uint32(seq) end - def message(body, routing_key="broker") + def message(body, routing_key="broker", ttl=nil) dp = @amqp_session.delivery_properties dp.routing_key = routing_key + dp.ttl = ttl if ttl mp = @amqp_session.message_properties mp.content_type = "x-application/qmf" mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name) diff --git a/ruby/tests/qmf.rb b/ruby/tests/qmf.rb index b0ee19c3d5..06371a5a82 100644 --- a/ruby/tests/qmf.rb +++ b/ruby/tests/qmf.rb @@ -21,31 +21,69 @@ require "test/unit" require "qpid" require "tests/util" require "socket" +require "monitor.rb" class QmfTest < Test::Unit::TestCase + class Handler < Qpid::Qmf::Console + include MonitorMixin + + def initialize + super() + @xmt_list = {} + @rcv_list = {} + end + + def method_response(broker, seq, response) + synchronize do + @rcv_list[seq] = response + end + end + + def request(broker, count) + @count = count + for idx in 0...count + synchronize do + seq = broker.echo(idx, "Echo Message", :_async => true) + @xmt_list[seq] = idx + end + end + end + + def check + return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size + lost = 0 + mismatched = 0 + @xmt_list.each do |seq, value| + if @rcv_list.include?(seq) + result = @rcv_list.delete(seq) + mismatch += 1 unless result.sequence == value + else + lost += 1 + end + end + spurious = @rcv_list.size + if lost == 0 and mismatched == 0 and spurious == 0 + return "pass" + else + return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious] + end + end + end + def setup() # Make sure errors in threads lead to a noisy death of the test Thread.abort_on_exception = true - host = ENV.fetch("QMF_TEST_HOST", 'localhost') - port = ENV.fetch("QMF_TEST_PORT", 5672) + @host = ENV.fetch("QMF_TEST_HOST", 'localhost') + @port = ENV.fetch("QMF_TEST_PORT", 5672) - sock = TCPSocket.new(host, port) + sock = TCPSocket.new(@host, @port) @conn = Qpid::Connection.new(sock) @conn.start() @session = @conn.session("test-session") - - # It's a bit odd that we're using two connections but that's the way - # the python one works afaict. - @qmf = Qpid::Qmf::Session.new() - @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [host, port]) - - brokers = @qmf.objects(:class => "broker") - assert_equal(1, brokers.length) - @broker = brokers[0] end def teardown @@ -58,10 +96,20 @@ class QmfTest < Test::Unit::TestCase end end - def test_broker_connectivity() + def start_qmf(kwargs = {}) + @qmf = Qpid::Qmf::Session.new(kwargs) + @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port]) + + brokers = @qmf.objects(:class => "broker") + assert_equal(1, brokers.length) + @broker = brokers[0] + end + + def test_methods_sync() + start_qmf body = "Echo Message Body" for seq in 1..10 - res = @broker.echo(seq, body) + res = @broker.echo(seq, body, :_timeout => 10) assert_equal(0, res.status) assert_equal("OK", res.text) assert_equal(seq, res.sequence) @@ -69,6 +117,14 @@ class QmfTest < Test::Unit::TestCase end end + def test_methods_async() + handler = Handler.new + start_qmf(:console => handler) + handler.request(@broker, 20) + sleep(1) + assert_equal("pass", handler.check) + end + def test_move_queued_messages() """ Test ability to move messages from the head of one queue to another. @@ -76,6 +132,7 @@ class QmfTest < Test::Unit::TestCase """ "Set up source queue" + start_qmf @session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true) @session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key") @@ -151,6 +208,7 @@ class QmfTest < Test::Unit::TestCase # Test ability to purge messages from the head of a queue. Need to test # moveing all, 1 (top message) and N messages. def test_purge_queue + start_qmf # Set up purge queue" @session.queue_declare(:queue => "purge-queue", :exclusive => true, |
