diff options
Diffstat (limited to 'qpid/ruby/tests')
-rw-r--r-- | qpid/ruby/tests/assembler.rb | 78 | ||||
-rw-r--r-- | qpid/ruby/tests/codec010.rb | 122 | ||||
-rw-r--r-- | qpid/ruby/tests/connection.rb | 246 | ||||
-rw-r--r-- | qpid/ruby/tests/datatypes.rb | 224 | ||||
-rw-r--r-- | qpid/ruby/tests/framer.rb | 99 | ||||
-rw-r--r-- | qpid/ruby/tests/qmf.rb | 248 | ||||
-rw-r--r-- | qpid/ruby/tests/queue.rb | 80 | ||||
-rw-r--r-- | qpid/ruby/tests/spec010.rb | 80 | ||||
-rw-r--r-- | qpid/ruby/tests/util.rb | 72 |
9 files changed, 1249 insertions, 0 deletions
diff --git a/qpid/ruby/tests/assembler.rb b/qpid/ruby/tests/assembler.rb new file mode 100644 index 0000000000..1181ece547 --- /dev/null +++ b/qpid/ruby/tests/assembler.rb @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require 'tests/util' + +require 'logger' + +class TestAssembler< Test::Unit::TestCase + + Segment = Qpid::Segment + Assembler = Qpid::Assembler + + def setup + # Qpid::asm_logger = Logger.new(STDOUT) + + @server = Util::ServerThread.new do |socket| + asm = Assembler.new(socket) + begin + header = asm.read_header + asm.write_header(header[-2], header[-1]) + loop do + seg = asm.read_segment + asm.write_segment(seg) + end + rescue Qpid::Closed + nil # Ignore + end + end + end + + def teardown + @server.finish + @server.join + end + + def test_assembler + asm = Assembler.new(@server.client, max_payload = 1) + asm.write_header(0, 10) + asm.write_segment(Segment.new(true, false, 1, 2, 3, "TEST")) + asm.write_segment(Segment.new(false, true, 1, 2, 3, "ING")) + + assert_equal( ["AMQP", 1, 1, 0, 10], asm.read_header) + + seg = asm.read_segment + assert_equal(true, seg.first_segment?) + assert_equal(false, seg.last_segment?) + assert_equal(1, seg.type) + assert_equal(2, seg.track) + assert_equal(3, seg.channel) + assert_equal("TEST", seg.payload) + + seg = asm.read_segment + assert_equal(false, seg.first_segment?) + assert_equal(true, seg.last_segment?) + assert_equal(1, seg.type) + assert_equal(2, seg.track) + assert_equal(3, seg.channel) + assert_equal("ING", seg.payload) + end +end diff --git a/qpid/ruby/tests/codec010.rb b/qpid/ruby/tests/codec010.rb new file mode 100644 index 0000000000..a9a5ca81e0 --- /dev/null +++ b/qpid/ruby/tests/codec010.rb @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require "tests/util" +require "socket" + +class CodecTest < Test::Unit::TestCase + + def setup + @spec = Qpid::Spec010::load + end + + def check(type, value) + t = @spec[type] + sc = Qpid::StringCodec.new(@spec) + t.encode(sc, value) + decoded = t.decode(sc) + assert_equal(value, decoded) + end + + + def testMapString + check("map", {"string" => "this is a test"}) + end + + def testMapInt + check("map", {"int" => 3}) + end + + def testMapLong + check("map", {"long" => 2**32}) + end + + def testMapNone + check("map", {"none" => None}) + end + + def testMapNested + check("map", {"map" => {"string" => "nested test"}}) + end + + def testMapList + check("map", {"list" => [1, "two", 3.0, -4]}) + end + + def testMapAll + check("map", {"string" => "this is a test", + "int" => 3, + "long" => 2**32, + "nil" => nil, + "map" => {"string" => "nested map"}, + "list" => [1, "two", 3.0, -4]}) + end + + def testMapEmpty + check("map", {}) + end + + def testMapNone + check("map", nil) + end + + def testList + check("list", [1, "two", 3.0, -4]) + end + + def testListEmpty + check("list", []) + end + + def testListNone + check("list", nil) + end + + def testArrayInt + check("array", [1, 2, 3, 4]) + end + + def testArrayString + check("array", ["one", "two", "three", "four"]) + end + + def testArrayEmpty + check("array", []) + end + + def testArrayNone + check("array", nil) + end + + def testInt64 + check("int64", 2 ** 40 * -1 + 43) + end + + def testUint64 + check("int64", 2 ** 42) + end + + def testReadNone + sc = Qpid::StringCodec.new(@spec) + # Python behaves this way + assert_equal("", sc.read(nil)) + end +end diff --git a/qpid/ruby/tests/connection.rb b/qpid/ruby/tests/connection.rb new file mode 100644 index 0000000000..c2a851ec0a --- /dev/null +++ b/qpid/ruby/tests/connection.rb @@ -0,0 +1,246 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'test/unit' +require 'qpid' +require 'tests/util' + +class MockServer + + def initialize(queue) + @queue = queue + end + + def connection(conn, args={}) + return Qpid::Delegate::Server.new(conn, :delegate => method(:session)) + end + + def session(ssn, args={}) + ssn.auto_sync = false + return MockSession.new(ssn, @queue) + end +end + +class MockSession < Qpid::Session::Delegate + + def initialize(session, queue) + @session = session + @queue = queue + end + + def execution_sync(es) + nil + end + + def queue_query(qq) + return qq.st_type.result.create(qq.queue) + end + + def message_transfer(cmd, headers, body) + if cmd.destination == "echo" + m = Qpid::Message.new(body) + m.headers = headers + @session.message_transfer(cmd.destination, cmd.accept_mode, + cmd.acquire_mode, m) + elsif cmd.destination == "abort" + @session.channel.connection.sock.close() + else + @queue.put([cmd, headers, body]) + end + end + + def exchange_declare(ed) + # do nothing + end +end + +class TestConnectionTest < Test::Unit::TestCase + + def setup + # Make sure errors in threads lead to a noisy death of the test + Thread.abort_on_exception = true + + @queue = Qpid::Queue.new + @running = true + ts = MockServer.new(@queue) + @server = Util::ServerThread.new do |socket| + conn = Qpid::Connection.new(socket, :delegate => ts.method(:connection)) + begin + conn.start(5) + rescue Qpid::Closed + # Ignore + end + end + + class << @server + def finish + @running.lock + client.close + @sockets.each { |sock| sock.close unless sock.closed? } + end + end + + @server[:name] = 'server' + Thread.current[:name] = 'test' + end + + def teardown + @server.finish + @server.join + end + + def connect + sock = @server.client + return Qpid::Connection.new(sock) + end + + def test_basic + c = connect + c.start(10) + + ssn1 = c.session("test1", :timeout => 10) + ssn2 = c.session("test2", :timeout => 10) + + assert_equal(c.sessions["test1"], ssn1) + assert_equal(c.sessions["test2"], ssn2) + assert_not_nil ssn1.channel + assert_not_nil ssn2.channel + assert(c.attached.values.include?(ssn1)) + assert(c.attached.values.include?(ssn2)) + + ssn1.close(5) + + assert_nil(ssn1.channel) + assert(! c.attached.values.include?(ssn1)) + assert(c.sessions.values.include?(ssn2)) + + ssn2.close(5) + + assert_nil(ssn2.channel) + assert(! c.attached.values.include?(ssn2)) + assert(! c.sessions.values.include?(ssn2)) + + ssn = c.session("session", :timeout => 10) + + assert_not_nil(ssn.channel) + assert(c.sessions.values.include?(ssn)) + + destinations = ["one", "two", "three"] + + destinations.each { |d| ssn.message_transfer(d) } + + destinations.each do |d| + cmd, header, body = @queue.get(10) + assert_equal(d, cmd.destination) + assert_nil(header) + assert_nil(body) + end + + msg = Qpid::Message.new("this is a test") + ssn.message_transfer("four", :message => msg) + cmd, header, body = @queue.get(10) + assert_equal("four", cmd.destination) + assert_nil(header) + assert_equal(msg.body, body) + + qq = ssn.queue_query("asdf") + assert_equal("asdf", qq.queue) + c.close(5) + end + + def test_close_get + c = connect + c.start(10) + ssn = c.session("test", :timeout => 10) + echos = ssn.incoming("echo") + + 10.times do |i| + ssn.message_transfer("echo", + :message => Qpid::Message.new("test#{i}")) + end + + ssn.auto_sync=false + ssn.message_transfer("abort") + + 10.times do |i| + m = echos.get(timeout=10) + assert_equal("test#{i}", m.body) + end + + begin + m = echos.get(timeout=10) + flunk("Expected Closed") + rescue Qpid::Closed + # Ignore + end + end + + def test_close_listen + c = connect + c.start(10) + ssn = c.session("test", :timeout => 10) + echos = ssn.incoming("echo") + + messages = [] + exceptions = [] + lock = Monitor.new + condition = lock.new_cond + + echos.exc_listen do |e| + exceptions << e + lock.synchronize { condition.signal } + end + echos.listen do |m| + messages << m + end + + 10.times do |i| + ssn.message_transfer("echo", + :message => Qpid::Message.new("test#{i}")) + end + ssn.auto_sync=false + ssn.message_transfer("abort") + + lock.synchronize { condition.wait(10) } + + 10.times do |i| + m = messages.shift + assert_equal("test#{i}", m.body) + end + + assert_equal(1, exceptions.size) + end + + def test_sync + c = connect + c.start(10) + s = c.session("test") + s.auto_sync = false + s.message_transfer("echo", + :message => Qpid::Message.new("test")) + s.sync(10) + end + + def test_exchange_declare + c = connect + c.start(10) + s = c.session("test") + s.exchange_declare("test-exchange") + end +end diff --git a/qpid/ruby/tests/datatypes.rb b/qpid/ruby/tests/datatypes.rb new file mode 100644 index 0000000000..65b1f9e3f5 --- /dev/null +++ b/qpid/ruby/tests/datatypes.rb @@ -0,0 +1,224 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'test/unit' +require 'qpid' +require 'tests/util' + +class TestSerial < Test::Unit::TestCase + + def test_cmp + [0, 0x8FFFFFFF, 0xFFFFFFFF].each do |s| + s = s.to_serial + assert(s + 1 > s) + assert(s - 1 < s) + assert(s < s + 1) + assert(s > s - 1) + end + last = 0xFFFFFFFF.to_serial + zero = 0.to_serial + assert_equal(zero, last + 1) + + assert_equal(last, [last, zero].min) + assert_equal(zero, [last, zero].max) + end + + def test_incr + s = 0.to_serial + s += 1 + assert_equal(1.to_serial, s) + end + + def test_in + l = [1, 2, 3, 4].collect { |i| i.to_serial } + assert(l.include?(1.to_serial)) + assert(l.include?((0xFFFFFFFF + 2).to_serial)) + assert(l.include?(4)) + end + + def test_none + assert_not_equal(nil, 0.to_serial) + end + + def test_hash + zero = 0.to_serial + d = { zero => :zero } + # FIXME: this does not work, since Ruby looks up the key and does + # a 0.eql?(zero), which bypasses the Qpid::Serial::eql? + # assert_equal(:zero, d[0]) + end +end + +class TestRangedSet < Test::Unit::TestCase + + def assert_contains(rset, elts, nonelts = []) + assert_equal(elts, elts.select { |e| rset.include?(e) }) + assert_equal(nonelts, nonelts.select { |e| ! rset.include?(e) }) + end + + def assert_ranges(rs, *ranges) + assert_equal(ranges.size, rs.ranges.size) + assert( ranges.all? { |rng| rs.include?(rng) } ) + end + + def test_simple + rs = Qpid::RangedSet.new + + assert(rs.ranges.empty?) + + rs.add(1) + assert_contains(rs, [1], [0,2]) + assert_ranges(rs, 1..1) + + rs.add(2) + assert_contains(rs, [1,2], [0,3]) + assert_ranges(rs, 1..2) + + rs.add(0) + assert_contains(rs, [0,1,2], [-1, 3]) + assert_ranges(rs, 0..2) + + rs.add(37) + assert_contains(rs, [0,1,2,37], [-1, 3, 36, 38]) + assert_ranges(rs, 0..2, 37..37) + + rs.add(-1) + assert_ranges(rs, -1..2, 37..37) + + rs.add(-3) + assert_ranges(rs, -1..2, 37..37, -3..-3) + + rs.add(1, 20) + assert_contains(rs, [20], [21]) + assert_ranges(rs, -1..20, 37..37, -3..-3) + + rs.add(21,36) + assert_ranges(rs, -1..37, -3..-3) + + rs.add(-3, 5) + assert_ranges(rs, -3..37) + end + + def test_add_self + a = Qpid::RangedSet.new + a.add(0, 8) + assert_ranges(a, 0..8) + + a.add(0, 8) + assert_ranges(a, 0..8) + end +end + +class TestRange < Test::Unit::TestCase + + def test_intersect1 + a = Range.new(0, 10) + b = Range.new(9, 20) + i1 = a.intersect(b) + i2 = b.intersect(a) + assert_equal(9..10, i1) + assert_equal(9..10, i2) + end + + def test_intersect2 + a = Range.new(0, 10) + b = Range.new(11, 20) + assert_equal(nil, a.intersect(b)) + assert_equal(nil, b.intersect(a)) + end + + def test_intersect3 + a = Range.new(0, 10) + b = Range.new(3, 5) + i1 = a.intersect(b) + i2 = b.intersect(a) + assert_equal(3..5, i1) + assert_equal(3..5, i2) + end +end + +class TestUUIDTest < Test::Unit::TestCase + + def test_simple + # this test is kind of lame, but it does excercise the basic + # functionality of the class + u = Qpid::UUID::uuid4 + 1024.times { |i| assert_not_equal(u, Qpid::UUID::uuid4) } + assert_raise NotImplementedError do + u == 0 + end + end +end + +class TestMessage < Test::Unit::TestCase + + def setup + @@spec ||= Qpid::Spec010::load() + @mp = Qpid::struct(@@spec["message_properties"]) + @dp = Qpid::struct(@@spec["delivery_properties"]) + @fp = Qpid::struct(@@spec["fragment_properties"]) + end + + def test_has + m = Qpid::Message.new(@mp, @dp, @fp, "body") + assert m.has("message_properties") + assert m.has("delivery_properties") + assert m.has("fragment_properties") + end + + def test_get + m = Qpid::Message.new(@mp, @dp, @fp, "body") + assert_same(@mp, m.get("message_properties")) + assert_same(@dp, m.get("delivery_properties")) + assert_same(@fp, m.get("fragment_properties")) + end + + def test_set + m = Qpid::Message.new(@mp, @dp, "body") + assert_nil m.get("fragment_properties") + m.set(@fp) + assert_same(@fp, m.get("fragment_properties"), "4") + end + + def test_set_on_empty + m = Qpid::Message.new("body") + assert_nil m.get("delivery_properties") + m.set(@dp) + assert_same(@dp, m.get("delivery_properties"), "5") + end + + def test_set_replace + m = Qpid::Message.new(@mp, @dp, @fp, "body") + dp = Qpid::struct(@@spec["delivery_properties"]) + assert_same(@dp, m.get("delivery_properties"), "6") + m.set(dp) + assert_same(dp, m.get("delivery_properties"), "7") + end + + def test_clear + m = Qpid::Message.new(@mp, @dp, @fp, "body") + assert_same(@mp, m.get("message_properties"), "8") + assert_same(@dp, m.get("delivery_properties"), "9") + assert_same(@fp, m.get("fragment_properties"), "10") + m.clear("fragment_properties") + assert_nil m.get("fragment_properties") + assert_same(@mp, m.get("message_properties"), "11") + assert_same(@dp, m.get("delivery_properties"), "12") + end +end diff --git a/qpid/ruby/tests/framer.rb b/qpid/ruby/tests/framer.rb new file mode 100644 index 0000000000..1d56f2faf1 --- /dev/null +++ b/qpid/ruby/tests/framer.rb @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require 'tests/util' + +require 'logger' + +class TestFramer < Test::Unit::TestCase + + include Test + + def setup + #Qpid::raw_logger = Logger.new(STDOUT) + #Qpid::frm_logger = Logger.new(STDOUT) + + @server = Util::ServerThread.new do |socket| + conn = Qpid::Framer.new(socket) + begin + h = conn.read_header + conn.write_header(h[-2], h[-1]) + loop do + frame = conn.read_frame + conn.write_frame(frame) + conn.flush + end + rescue Qpid::Closed + nil # Ignore + end + end + end + + def teardown + @server.finish + @server.join + end + + Frame = Qpid::Frame + + def test_framer + c = Qpid::Framer.new(@server.client) + + c.write_header(0, 10) + assert_equal( ["AMQP", 1, 1, 0, 10], c.read_header()) + + c.write_frame(Frame.new(Qpid::FIRST_FRM, 1, 2, 3, "THIS")) + c.write_frame(Frame.new(0, 1, 2, 3, "IS")) + c.write_frame(Frame.new(0, 1, 2, 3, "A")) + c.write_frame(Frame.new(Qpid::LAST_FRM, 1, 2, 3, "TEST")) + c.flush() + + f = c.read_frame + assert(f.first_frame?) + assert(! f.last_frame?) + assert_equal(1, f.type) + assert_equal(2, f.track) + assert_equal(3, f.channel) + assert_equal("THIS", f.payload) + + f = c.read_frame + assert_equal(0, f.flags) + assert_equal(1, f.type) + assert_equal(2, f.track) + assert_equal(3, f.channel) + assert_equal("IS", f.payload) + + f = c.read_frame + assert_equal(0, f.flags) + assert_equal(1, f.type) + assert_equal(2, f.track) + assert_equal(3, f.channel) + assert_equal("A", f.payload) + + f = c.read_frame + assert(f.last_frame?) + assert(! f.first_frame?) + assert_equal(1, f.type) + assert_equal(2, f.track) + assert_equal(3, f.channel) + assert_equal("TEST", f.payload) + end +end diff --git a/qpid/ruby/tests/qmf.rb b/qpid/ruby/tests/qmf.rb new file mode 100644 index 0000000000..274e38416e --- /dev/null +++ b/qpid/ruby/tests/qmf.rb @@ -0,0 +1,248 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require "tests/util" +require "socket" +require "monitor.rb" + +class QmfTest < Test::Unit::TestCase + + class Handler < Qpid::Qmf::Console + include MonitorMixin + + def initialize + super() + @xmt_list = {} + @rcv_list = {} + end + + def method_response(broker, seq, response) + synchronize do + @rcv_list[seq] = response + end + end + + def request(broker, count) + @count = count + for idx in 0...count + synchronize do + seq = broker.echo(idx, "Echo Message", :async => true) + @xmt_list[seq] = idx + end + end + end + + def check + return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size + lost = 0 + mismatched = 0 + @xmt_list.each do |seq, value| + if @rcv_list.include?(seq) + result = @rcv_list.delete(seq) + mismatch += 1 unless result.sequence == value + else + lost += 1 + end + end + spurious = @rcv_list.size + if lost == 0 and mismatched == 0 and spurious == 0 + return "pass" + else + return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious] + end + end + end + + def setup() + # Make sure errors in threads lead to a noisy death of the test + Thread.abort_on_exception = true + + @host = ENV.fetch("QMF_TEST_HOST", 'localhost') + @port = ENV.fetch("QMF_TEST_PORT", 5672) + + sock = TCPSocket.new(@host, @port) + + @conn = Qpid::Connection.new(sock) + @conn.start() + + @session = @conn.session("test-session") + end + + def teardown + unless @session.error? + @session.close(10) + end + @conn.close(10) + if @qmf + @qmf.del_broker(@qmf_broker) + end + end + + def start_qmf(kwargs = {}) + @qmf = Qpid::Qmf::Session.new(kwargs) + @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port]) + + brokers = @qmf.objects(:class => "broker") + assert_equal(1, brokers.length) + @broker = brokers[0] + end + + def test_methods_sync() + start_qmf + body = "Echo Message Body" + for seq in 1..10 + res = @broker.echo(seq, body, :timeout => 10) + assert_equal(0, res.status) + assert_equal("OK", res.text) + assert_equal(seq, res.sequence) + assert_equal(body, res.body) + end + end + + def test_methods_async() + handler = Handler.new + start_qmf(:console => handler) + handler.request(@broker, 20) + sleep(1) + assert_equal("pass", handler.check) + end + + def test_move_queued_messages() + """ + Test ability to move messages from the head of one queue to another. + Need to test moveing all and N messages. + """ + + "Set up source queue" + start_qmf + @session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true) + @session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key") + + props = @session.delivery_properties(:routing_key => "routing_key") + for count in 1..20 + body = "Move Message %d" % count + src_msg = Qpid::Message.new(props, body) + @session.message_transfer(:destination => "amq.direct", :message => src_msg) + end + + "Set up destination queue" + @session.queue_declare(:queue => "dest-queue", :exclusive => true, :auto_delete => true) + @session.exchange_bind(:queue => "dest-queue", :exchange => "amq.direct") + + queues = @qmf.objects(:class => "queue") + + "Move 10 messages from src-queue to dest-queue" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", "name" => "src-queue")[0] + dq = @qmf.objects(:class => "queue", "name" => "dest-queue")[0] + + assert_equal(10, sq.msgDepth) + assert_equal(10, dq.msgDepth) + + "Move all remaining messages to destination" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0] + dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0] + + assert_equal(0, sq.msgDepth) + assert_equal(20, dq.msgDepth) + + "Use a bad source queue name" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + assert_equal(4, result.status) + + "Use a bad destination queue name" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + assert_equal(4, result.status) + + " Use a large qty (40) to move from dest-queue back to " + " src-queue- should move all " + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0] + dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0] + + assert_equal(20, sq.msgDepth) + assert_equal(0, dq.msgDepth) + + "Consume the messages of the queue and check they are all there in order" + @session.message_subscribe(:queue => "src-queue", + :destination => "tag") + @session.message_flow(:destination => "tag", + :unit => @session.message_credit_unit.message, + :value => 0xFFFFFFFF) + @session.message_flow(:destination => "tag", + :unit => @session.message_credit_unit.byte, + :value => 0xFFFFFFFF) + queue = @session.incoming("tag") + for count in 1..20 + consumed_msg = queue.get(timeout=1) + body = "Move Message %d" % count + assert_equal(body, consumed_msg.body) + end + end + + # Test ability to purge messages from the head of a queue. Need to test + # moveing all, 1 (top message) and N messages. + def test_purge_queue + start_qmf + # Set up purge queue" + @session.queue_declare(:queue => "purge-queue", + :exclusive => true, + :auto_delete => true) + @session.exchange_bind(:queue => "purge-queue", + :exchange => "amq.direct", + :binding_key => "routing_key") + + props = @session.delivery_properties(:routing_key => "routing_key") + 20.times do |count| + body = "Purge Message %d" % count + msg = Qpid::Message.new(props, body) + @session.message_transfer(:destination => "amq.direct", + :message => msg) + end + + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + + "Purge top message from purge-queue" + result = pq.purge(1) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(19, pq.msgDepth) + + "Purge top 9 messages from purge-queue" + result = pq.purge(9) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(10, pq.msgDepth) + + "Purge all messages from purge-queue" + result = pq.purge(0) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(0, pq.msgDepth) + end +end diff --git a/qpid/ruby/tests/queue.rb b/qpid/ruby/tests/queue.rb new file mode 100644 index 0000000000..4ec0e07ffb --- /dev/null +++ b/qpid/ruby/tests/queue.rb @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'test/unit' +require 'qpid' + +class TestQueue < Test::Unit::TestCase + + # The qpid queue class just provides sime simple extensions to + # python's standard queue data structure, so we don't need to test + # all the queue functionality. + + def setup + # Make sure errors in threads lead to a noisy death of the test + Thread.abort_on_exception = true + end + + def test_listen + values = [] + heard = Qpid::Util::Event.new + + listener = Proc.new do |x| + values << x + heard.set + end + + q = Qpid::Queue.new + q.listen(&listener) + + heard.clear + q.put(1) + heard.wait + assert_equal([1], values) + heard.clear + q.put(2) + heard.wait + assert_equal([1, 2], values) + + q.listen + q.put(3) + assert_equal(3, q.get) + + q.listen(&listener) + heard.clear + q.put(4) + heard.wait + assert_equal([1,2,4], values) + end + + def test_close + q = Qpid::Queue.new + (1..3).each { |i| q.put(i) } + q.close + assert_equal(1, q.get) + assert_equal(2, q.get) + assert_equal(3, q.get) + 10.times do |i| + assert_raises(Qpid::Closed) do + q.get + end + end + end + +end diff --git a/qpid/ruby/tests/spec010.rb b/qpid/ruby/tests/spec010.rb new file mode 100644 index 0000000000..6db1523455 --- /dev/null +++ b/qpid/ruby/tests/spec010.rb @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid/test" +require "qpid/spec010" + +class SpecTest < Test::Unit::TestCase + + def setup() + @spec = Qpid::Spec010.load() + end + + def testSessionHeader() + hdr = @spec[:header] + sc = Qpid::StringCodec.new(@spec) + hdr.encode(sc, Qpid::struct(hdr, :sync=>true)) + assert sc.encoded == "\x01\x01" + + sc = Qpid::StringCodec.new(@spec) + hdr.encode(sc, Qpid::struct(hdr, :sync=>false)) + assert sc.encoded == "\x01\x00" + end + + def encdec(type, value) + sc = Qpid::StringCodec.new(@spec) + type.encode(sc, value) + decoded = type.decode(sc) + return decoded + end + + def testMessageProperties() + mp = @spec[:message_properties] + rt = @spec[:reply_to] + + props = Qpid::struct(mp, + :content_length=>3735928559, + :reply_to=>Qpid::struct(rt, + :exchange=>"the exchange name", + :routing_key=>"the routing key")) + dec = encdec(mp, props) + assert props.content_length == dec.content_length + assert props.reply_to.exchange == dec.reply_to.exchange + assert props.reply_to.routing_key == dec.reply_to.routing_key + end + + def testMessageSubscribe() + ms = @spec[:message_subscribe] + cmd = Qpid::struct(ms, :exclusive=>true, :destination=>"this is a test") + dec = encdec(@spec[:message_subscribe], cmd) + assert cmd.exclusive == dec.exclusive + assert cmd.destination == dec.destination + end + + def testXid() + xid = @spec[:xid] + sc = Qpid::StringCodec.new(@spec) + st = Qpid::struct(xid, :format=>0, :global_id=>"gid", :branch_id=>"bid") + xid.encode(sc, st) + assert sc.encoded == "\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid" + assert xid.decode(sc) == st + end + +end diff --git a/qpid/ruby/tests/util.rb b/qpid/ruby/tests/util.rb new file mode 100644 index 0000000000..b22a6bab2f --- /dev/null +++ b/qpid/ruby/tests/util.rb @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'thread' +require 'socket' + +module Util + + TOPDIR = File::dirname(File::dirname(File::expand_path(__FILE__))) + SPEC = File::join(TOPDIR, "specs", "amqp.0-10-qpid-errata.xml") + + PORT = 1234 + HOST = "0.0.0.0" + + def self.connect(host = HOST, port = PORT) + TCPSocket.new(host, port) + end + + class ServerThread < Thread + def initialize(&block) + @sockets = [] + @running = Mutex.new + started = Qpid::Util::Event.new + super(started, @running) do |started, running| + tcp_srv = TCPServer.new(HOST, PORT) + begin + started.set + while ! running.locked? and (session = tcp_srv.accept) + yield(session) + end + rescue Exception => e + # Exceptions in the server thread are hard to see + # Make sure they apear loudly on the console + $stderr.puts "#{ "*" * 20} Server exception #{ "*" * 20}" + $stderr.puts e.message + $stderr.puts e.backtrace + raise + ensure + tcp_srv.close + end + end + started.wait + end + + def finish + @running.lock + @sockets.each { |sock| sock.close unless sock.closed? } + end + + def client(host = HOST, port = PORT) + sock = Util::connect(host, port) + @sockets << sock + sock + end + end +end |