# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # require "test/unit" require "qpid" require "tests/util" require "socket" require "monitor.rb" class QmfTest < Test::Unit::TestCase class Handler < Qpid::Qmf::Console include MonitorMixin def initialize super() @xmt_list = {} @rcv_list = {} end def method_response(broker, seq, response) synchronize do @rcv_list[seq] = response end end def request(broker, count) @count = count for idx in 0...count synchronize do seq = broker.echo(idx, "Echo Message", :async => true) @xmt_list[seq] = idx end end end def check return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size lost = 0 mismatched = 0 @xmt_list.each do |seq, value| if @rcv_list.include?(seq) result = @rcv_list.delete(seq) mismatch += 1 unless result.sequence == value else lost += 1 end end spurious = @rcv_list.size if lost == 0 and mismatched == 0 and spurious == 0 return "pass" else return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious] end end end def setup() # Make sure errors in threads lead to a noisy death of the test Thread.abort_on_exception = true @host = ENV.fetch("QMF_TEST_HOST", 'localhost') @port = ENV.fetch("QMF_TEST_PORT", 5672) sock = TCPSocket.new(@host, @port) @conn = Qpid::Connection.new(sock) @conn.start() @session = @conn.session("test-session") end def teardown unless @session.error? @session.close(10) end @conn.close(10) if @qmf @qmf.del_broker(@qmf_broker) end end def start_qmf(kwargs = {}) @qmf = Qpid::Qmf::Session.new(kwargs) @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port]) brokers = @qmf.objects(:class => "broker") assert_equal(1, brokers.length) @broker = brokers[0] end def test_methods_sync() start_qmf body = "Echo Message Body" for seq in 1..10 res = @broker.echo(seq, body, :timeout => 10) assert_equal(0, res.status) assert_equal("OK", res.text) assert_equal(seq, res.sequence) assert_equal(body, res.body) end end def test_methods_async() handler = Handler.new start_qmf(:console => handler) handler.request(@broker, 20) sleep(1) assert_equal("pass", handler.check) end def test_move_queued_messages() """ Test ability to move messages from the head of one queue to another. Need to test moveing all and N messages. """ "Set up source queue" start_qmf @session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true) @session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key") props = @session.delivery_properties(:routing_key => "routing_key") for count in 1..20 body = "Move Message %d" % count src_msg = Qpid::Message.new(props, body) @session.message_transfer(:destination => "amq.direct", :message => src_msg) end "Set up destination queue" @session.queue_declare(:queue => "dest-queue", :exclusive => true, :auto_delete => true) @session.exchange_bind(:queue => "dest-queue", :exchange => "amq.direct") queues = @qmf.objects(:class => "queue") "Move 10 messages from src-queue to dest-queue" result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) assert_equal(0, result.status) sq = @qmf.objects(:class => "queue", "name" => "src-queue")[0] dq = @qmf.objects(:class => "queue", "name" => "dest-queue")[0] assert_equal(10, sq.msgDepth) assert_equal(10, dq.msgDepth) "Move all remaining messages to destination" result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) assert_equal(0, result.status) sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0] dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0] assert_equal(0, sq.msgDepth) assert_equal(20, dq.msgDepth) "Use a bad source queue name" result = @qmf.objects(:class => "broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) assert_equal(4, result.status) "Use a bad destination queue name" result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) assert_equal(4, result.status) " Use a large qty (40) to move from dest-queue back to " " src-queue- should move all " result = @qmf.objects(:class => "broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) assert_equal(0, result.status) sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0] dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0] assert_equal(20, sq.msgDepth) assert_equal(0, dq.msgDepth) "Consume the messages of the queue and check they are all there in order" @session.message_subscribe(:queue => "src-queue", :destination => "tag") @session.message_flow(:destination => "tag", :unit => @session.message_credit_unit.message, :value => 0xFFFFFFFF) @session.message_flow(:destination => "tag", :unit => @session.message_credit_unit.byte, :value => 0xFFFFFFFF) queue = @session.incoming("tag") for count in 1..20 consumed_msg = queue.get(timeout=1) body = "Move Message %d" % count assert_equal(body, consumed_msg.body) end end # Test ability to purge messages from the head of a queue. Need to test # moveing all, 1 (top message) and N messages. def test_purge_queue start_qmf # Set up purge queue" @session.queue_declare(:queue => "purge-queue", :exclusive => true, :auto_delete => true) @session.exchange_bind(:queue => "purge-queue", :exchange => "amq.direct", :binding_key => "routing_key") props = @session.delivery_properties(:routing_key => "routing_key") 20.times do |count| body = "Purge Message %d" % count msg = Qpid::Message.new(props, body) @session.message_transfer(:destination => "amq.direct", :message => msg) end pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] "Purge top message from purge-queue" result = pq.purge(1) assert_equal(0, result.status) pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] assert_equal(19, pq.msgDepth) "Purge top 9 messages from purge-queue" result = pq.purge(9) assert_equal(0, result.status) pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] assert_equal(10, pq.msgDepth) "Purge all messages from purge-queue" result = pq.purge(0) assert_equal(0, result.status) pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] assert_equal(0, pq.msgDepth) end end