diff options
Diffstat (limited to 'RC9/qpid/ruby/lib')
24 files changed, 5933 insertions, 0 deletions
diff --git a/RC9/qpid/ruby/lib/qpid.rb b/RC9/qpid/ruby/lib/qpid.rb new file mode 100644 index 0000000000..1c719e9b1d --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid.rb @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + def self.logger + @logger ||= {} + @logger + end +end + +require "qpid/util" +require "qpid/queue" +require "qpid/packer" +require "qpid/framer" +require "qpid/codec" +require 'qpid/datatypes' +require 'qpid/spec010' +require 'qpid/delegates' +require 'qpid/invoker' +require "qpid/assembler" +require 'qpid/session' +require "qpid/connection" +require "qpid/spec" +require 'qpid/queue' +require 'qpid/qmf' diff --git a/RC9/qpid/ruby/lib/qpid/assembler.rb b/RC9/qpid/ruby/lib/qpid/assembler.rb new file mode 100644 index 0000000000..b768c3f195 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/assembler.rb @@ -0,0 +1,148 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + + class << self + attr_accessor :asm_logger + end + + class Segment + + attr_reader :type, :payload, :track, :channel + attr_accessor :id, :offset + + def initialize(first, last, type, track, channel, payload) + @id = nil + @offset = nil + @first = first + @last = last + @type = type + @track = track + @channel = channel + @payload = payload + end + + def first_segment? ; @first ; end + + def last_segment? ; @last ; end + + def decode(spec) + segs = spec[:segment_type] + choice = segs.enum.choices[type] + return method("decode_#{choice.name}").call(spec) + end + + def decode_control(spec) + sc = StringCodec.new(spec, payload) + return sc.read_control() + end + + def decode_command(spec) + sc = StringCodec.new(spec, payload) + hdr, cmd = sc.read_command() + cmd.id = id + return hdr, cmd + end + + def decode_header(spec) + sc = StringCodec.new(spec, payload) + values = [] + until sc.encoded.empty? + values << sc.read_struct32() + end + return values + end + + def decode_body(spec) + payload + end + + def append(frame) + @payload += frame.payload + end + + def to_s + f = first_segment? ? 'F' : '.' + l = last_segment? ? 'L' : '.' + return "%s%s %s %s %s %s" % [f, l, @type, + @track, @channel, @payload.inspect] + end + + end + + class Assembler < Framer + + def logger; Qpid::asm_logger; end + + def initialize(sock, max_payload = Frame::MAX_PAYLOAD) + super(sock) + @max_payload = max_payload + @fragments = {} + end + + def read_segment + loop do + frame = read_frame + key = [frame.channel, frame.track] + seg = @fragments[key] + unless seg + seg = Segment.new(frame.first_segment?, + frame.last_segment?, + frame.type, frame.track, + frame.channel, "") + @fragments[key] = seg + end + + seg.append(frame) + + if frame.last_frame? + @fragments.delete(key) + logger.debug("RECV #{seg}") if logger + return seg + end + end + end + + def write_segment(segment) + remaining = segment.payload + + first = true + while first or remaining + payload = remaining[0, @max_payload] + remaining = remaining[@max_payload, remaining.size] + + flags = 0 + + flags |= FIRST_FRM if first + flags |= LAST_FRM unless remaining + flags |= FIRST_SEG if segment.first_segment? + flags |= LAST_SEG if segment.last_segment? + + frame = Frame.new(flags, segment.type, segment.track, + segment.channel, payload) + write_frame(frame) + + first = false + end + + logger.debug("SENT #{segment}") if logger + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/client.rb b/RC9/qpid/ruby/lib/qpid/client.rb new file mode 100644 index 0000000000..ec3d100a9c --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/client.rb @@ -0,0 +1,136 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "thread" +require "qpid/peer" +require "qpid/queue" + +module Qpid08 + + class Client + def initialize(host, port, spec, vhost = "/") + @host = host + @port = port + @spec = spec + @vhost = vhost + + @mechanism = nil + @response = nil + @locale = nil + + @queues = {} + @mutex = Mutex.new() + + @closed = false + @code = nil + @started = ConditionVariable.new() + + @conn = Connection.new(@host, @port, @spec) + @peer = Peer.new(@conn, ClientDelegate.new(self)) + end + + attr_reader :mechanism, :response, :locale + + def closed?; @closed end + def closed=(value); @closed = value end + def code; @code end + + def wait() + @mutex.synchronize do + @started.wait(@mutex) + end + raise EOFError.new() if closed? + end + + def signal_start() + @started.broadcast() + end + + def queue(key) + @mutex.synchronize do + q = @queues[key] + if q.nil? + q = Queue.new() + @queues[key] = q + end + return q + end + end + + def start(response, mechanism="AMQPLAIN", locale="en_US") + @response = response + @mechanism = mechanism + @locale = locale + + @conn.connect() + @conn.init() + @peer.start() + wait() + channel(0).connection_open(@vhost) + end + + def channel(id) + return @peer.channel(id) + end + + def close(msg = nil) + @closed = true + @code = msg + @peer.close() + end + end + + class ClientDelegate + + include Delegate + + def initialize(client) + @client = client + end + + def connection_start(ch, msg) + ch.connection_start_ok(:mechanism => @client.mechanism, + :response => @client.response, + :locale => @client.locale) + end + + def connection_tune(ch, msg) + ch.connection_tune_ok(*msg.fields) + @client.signal_start() + end + + def connection_close(ch, msg) + puts "CONNECTION CLOSED: #{msg.args.join(", ")}" + @client.close(msg) + end + + def channel_close(ch, msg) + puts "CHANNEL[#{ch.id}] CLOSED: #{msg.args.join(", ")}" + ch.channel_close_ok() + ch.close() + end + + def basic_deliver(ch, msg) + queue = @client.queue(msg.consumer_tag) + queue << msg + end + + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/codec.rb b/RC9/qpid/ruby/lib/qpid/codec.rb new file mode 100644 index 0000000000..009b1eef53 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/codec.rb @@ -0,0 +1,455 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'qpid/packer.rb' +require 'iconv' + +module Qpid + + class Codec + + include Qpid::Packer + + def initialize(spec = "") + @spec = spec + end + + def write_void(v) + unless v.nil? + raise Exception.new("void not nil: #{v}") + end + end + + def read_void + return nil + end + + def write_bit(b) + unless b + raise Exception.new("bit is nil: #{b}") + end + end + + def read_bit + return true + end + + def read_uint8 + return unpack("C", 1) + end + + def write_uint8(n) + return pack("C", n) + end + + def read_int8 + return unpack("c", 1) + end + + def write_int8(n) + pack("c", n) + end + + def read_char + return unpack("c", 1) + end + + def write_char(c) + pack("c") + end + + def read_boolean + return read_uint8 != 0 + end + + def write_boolean(b) + n = 0 + n = 1 if b != 0 + write_uint8(n) + end + + def read_uint16 + return unpack("n", 2) + end + + def write_uint16(n) + pack("n", n) + end + + def read_int16 + # XXX: holy moly.. pack/unpack doesn't have signed network byte order. Crazy hackery. + val = unpack("n", 2) + val -= 2 ** 16 if val >= 2 ** 15 + return val + end + + def write_int16(n) + # XXX: Magically this one works even though it's not signed. + pack("n", n) + end + + def read_uint32 + return unpack("N", 4) + end + + def write_uint32(n) + pack("N", n) + end + + def read_int32 + # Again no pack/unpack for signed int + return unpack("N", 4) + end + + def write_int32(n) + # FIXME + pack("N", n) + end + + def read_float + return unpack("g", 4) + end + + def write_float(n) + pack("g", n) + end + + def read_sequence_no + return read_uint32.to_serial + end + + def write_sequence_no(n) + write_uint32(n.value) + end + + def encode_64bit(num, signed = false) + b = [] + + if num < 0 && signed + num += 2 ** 64 + end + + (0..7).each do |c| + d = 7 - c + b[c] = (num & (0xff << d * 8)) >> d * 8 + end + pack('C8', *b) + end + + + def decode_64bit(signed = false) + # Silly ruby pack/unpack does not implement 64 bit network byte order + # encode/decode. + a = unpack('C8', 8) + num = 0 + (0..7).each do |c| + d = 7 - c + num |= a[c] << 8 * d + end + + if signed && num >= 2 ** 63 + num -= 2 ** 64 + end + return num + end + + def read_uint64 + return decode_64bit + end + + def write_uint64(n) + encode_64bit(n) + end + + def read_int64 + return decode_64bit(signed = true) + end + + def write_int64(n) + encode_64bit(n, signed = true) + end + + def read_datetime + return read_uint64 + end + + def write_datetime(n) + write_uint64(n) + end + + def read_double + return unpack("G", 8) + end + + def write_double(n) + pack("G", n) + end + + def read_vbin8 + # XXX + return read(read_uint8) + end + + def write_vbin8(b) + # XXX + write_uint8(b.length) + write(b) + end + + def read_str8 + # FIXME: Check iconv.. I think this will throw if there are odd characters. + return Iconv.conv("ASCII", "UTF-8", read_vbin8) + end + + def write_str8(s) + write_vbin8(Iconv.conv("UTF-8", "ASCII", s)) + end + + def read_str16 + return Iconv.conv("ASCII", "UTF-8", read_vbin16) + end + + def write_str16(s) + write_vbin16(Iconv.conv("UTF-8", "ASCII", s)) + end + + def read_vbin16 + # XXX: Using read method? + return read(read_uint16) + end + + def write_vbin16(b) + write_uint16(b.length) + write(b) + end + + def read_sequence_set + # FIXME: Need datatypes + result = RangedSet.new + size = read_uint16 + nranges = size / 8 + nranges.times do |i| + lower = read_sequence_no + upper = read_sequence_no + result.add(lower, upper) + end + return result + end + + def write_sequence_set(ss) + size = 8 * ss.ranges.length + write_uint16(size) + ss.ranges.each do |range| + write_sequence_no(range.lower) + write_sequence_no(range.upper) + end + end + + def read_vbin32 + return read(read_uint32) + end + + def write_vbin32(b) + write_uint32(b.length) + write(b) + end + + def write_map(m) + sc = StringCodec.new(@spec) + unless m.nil? + sc.write_uint32(m.size) + m.each do |k, v| + unless type = @spec.encoding(v.class) + raise Exception.new("no encoding for: #{v.class}") + end + sc.write_str8(k) + sc.write_uint8(type.code) + type.encode(sc, v) + end + end + write_vbin32(sc.encoded) + end + + def read_map + sc = StringCodec.new(@spec, read_vbin32) + return nil unless sc.encoded + count = sc.read_uint32 + result = nil + if count + result = {} + until sc.encoded.empty? + k = sc.read_str8 + code = sc.read_uint8 + type = @spec.types[code] + v = type.decode(sc) + result[k] = v + end + end + return result + end + + def write_array(a) + sc = StringCodec.new(@spec) + unless a.nil? + if a.length > 0 + type = @spec.encoding(a[0].class) + else + type = @spec.encoding(nil.class) + end + sc.write_uint8(type.code) + sc.write_uint32(a.size) + a.each { |o| type.encode(sc, o) } + end + write_vbin32(sc.encoded) + end + + def read_array + sc = StringCodec.new(@spec, read_vbin32) + return nil if not sc.encoded + type = @spec.types[sc.read_uint8] + count = sc.read_uint32 + result = nil + if count + result = [] + count.times { |i| result << (type.decode(sc)) } + end + return result + end + + def write_list(l) + sc = StringCodec.new(@spec) + unless l.nil? + sc.write_uint32(l.length) + l.each do |o| + type = @spec.encoding(o.class) + sc.write_uint8(type.code) + type.encode(sc, o) + end + end + write_vbin32(sc.encoded) + end + + def read_list + sc = StringCodec.new(@spec, read_vbin32) + return nil if not sc.encoded + count = sc.read_uint32 + result = nil + if count + result = [] + count.times do |i| + type = @spec.types[sc.read_uint8] + result << type.decode(sc) + end + end + return result + end + + def read_struct32 + size = read_uint32 + code = read_uint16 + type = @spec.structs[code] + # XXX: BLEH! + fields = type.decode_fields(self) + return Qpid::struct(type, fields) + end + + def write_struct32(value) + type = value.type + sc = StringCodec.new(@spec) + sc.write_uint16(type.code) + type.encode_fields(sc, value) + write_vbin32(sc.encoded) + end + + def read_control + cntrl = @spec.controls[read_uint16] + return Qpid::struct(cntrl, cntrl.decode_fields(self)) + end + + def write_control(ctrl) + type = ctrl.type + write_uint16(type.code) + type.encode_fields(self, ctrl) + end + + def read_command + type = @spec.commands[read_uint16] + hdr = @spec[:header].decode(self) + cmd = Qpid::struct(type, type.decode_fields(self)) + return hdr, cmd + end + + def write_command(hdr, cmd) + type = cmd.type + write_uint16(type.code) + hdr.type.encode(self, hdr) + type.encode_fields(self, cmd) + end + + def read_size(width) + if width > 0 + return send(:"read_uint#{width * 8}") + end + end + + def write_size(width, n) + if width > 0 + send(:"write_uint#{width * 8}", n) + end + end + + def read_uuid + return unpack("A16", 16) + end + + def write_uuid(s) + pack("A16", s) + end + + def read_bin128 + return unpack("A16", 16) + end + + def write_bin128(b) + pack("A16", b) + end + + end + + class StringCodec < Codec + + def initialize(spec, encoded = "") + @spec = spec + @encoded = encoded + end + + attr_reader :encoded + + def write(s) + @encoded += s + end + + def read(n) + return "" if n.nil? + result = @encoded[0...n] + @encoded = @encoded[n...@encoded.size] || "" + return result + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/codec08.rb b/RC9/qpid/ruby/lib/qpid/codec08.rb new file mode 100644 index 0000000000..148dee07bb --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/codec08.rb @@ -0,0 +1,265 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid08 + # is there a better way to do this? + class StringWriter + + def initialize(str = "") + @str = str + end + + def write(value) + @str << value + end + + def to_s() + return @str + end + + end + + class EOF < Exception; end + + class Encoder + + def initialize(out) + @out = out + @bits = [] + end + + attr_reader(:out) + + def encode(type, value) + send(type, value) + end + + def bit(b) + @bits << b + end + + def octet(o) + pack("C", o) + end + + def short(s) + pack("n", s) + end + + def long(l) + pack("N", l) + end + + def longlong(l) + lower = l & 0xffffffff + upper = (l & ~0xffffffff) >> 32 + long(upper) + long(lower) + end + + def timestamp(l) + longlong(l) + end + + def shortstr(s) + # shortstr is actually octetstr + octet(s.length) + write(s) + end + + def longstr(s) + case s + when Hash + table(s) + else + long(s.length) + write(s) + end + end + + def table(t) + t = {} if t.nil? + enc = Encoder.new(StringWriter.new()) + t.each {|key, value| + enc.shortstr(key) + # I offer this chicken to the gods of polymorphism. May they + # choke on it. + case value + when String + type = :longstr + desc = "S" + when Numeric + type = :long + desc = "I" + else + raise Exception.new("unknown table value: #{value.class}") + end + enc.write(desc) + enc.encode(type, value) + } + longstr(enc.out.to_s()) + end + + def write(str) + flushbits() + @out.write(str) + # puts "OUT #{str.inspect()}" + end + + def pack(fmt, *args) + write(args.pack(fmt)) + end + + def flush() + flushbits() + end + + private + + def flushbits() + if @bits.empty? then return end + + bytes = [] + index = 0 + @bits.each {|b| + bytes << 0 if index == 0 + if b then bytes[-1] |= 1 << index end + index = (index + 1) % 8 + } + @bits.clear() + bytes.each {|b| + octet(b) + } + end + + end + + class StringReader + + def initialize(str) + @str = str + @index = 0 + end + + def read(n) + result = @str[@index, n] + @index += result.length + return result + end + + end + + class Decoder + + def initialize(_in) + @in = _in + @bits = [] + end + + def decode(type) + return send(type) + end + + def bit() + if @bits.empty? + byte = octet() + 7.downto(0) {|i| + @bits << (byte[i] == 1) + } + end + return @bits.pop() + end + + def octet() + return unpack("C", 1) + end + + def short() + return unpack("n", 2) + end + + def long() + return unpack("N", 4) + end + + def longlong() + upper = long() + lower = long() + return upper << 32 | lower + end + + def timestamp() + return longlong() + end + + def shortstr() + # shortstr is actually octetstr + return read(octet()) + end + + def longstr() + return read(long()) + end + + def table() + dec = Decoder.new(StringReader.new(longstr())) + result = {} + while true + begin + key = dec.shortstr() + rescue EOF + break + end + desc = dec.read(1) + case desc + when "S" + value = dec.longstr() + when "I" + value = dec.long() + else + raise Exception.new("unrecognized descriminator: #{desc.inspect()}") + end + result[key] = value + end + return result + end + + def read(n) + return "" if n == 0 + result = @in.read(n) + if result.nil? or result.empty? + raise EOF.new() + else + # puts " IN #{result.inspect()}" + return result + end + end + + def unpack(fmt, size) + result = read(size).unpack(fmt) + if result.length == 1 + return result[0] + else + return result + end + end + + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/config.rb b/RC9/qpid/ruby/lib/qpid/config.rb new file mode 100644 index 0000000000..1a0942d5d5 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/config.rb @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + module Config + + def self.amqp_spec + dirs = [File::expand_path(File::join(File::dirname(__FILE__), "../../../specs")), + "/usr/share/amqp"] + dirs.each do |d| + spec = File::join(d, "amqp.0-10-qpid-errata.xml") + return spec if File::exists? spec + end + end + + end +end diff --git a/RC9/qpid/ruby/lib/qpid/connection.rb b/RC9/qpid/ruby/lib/qpid/connection.rb new file mode 100644 index 0000000000..59d88196a3 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/connection.rb @@ -0,0 +1,221 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' + +module Qpid + + class ChannelBusy< Exception ; end + + class ChannelsBusy < Exception ; end + + class SessionBusy < Exception ; end + + class ConnectionFailed < Exception ; end + + class Timeout < Exception ; end + + class Connection < Assembler + + include MonitorMixin + + attr_reader :spec, :attached, :sessions, :thread + attr_accessor :opened, :failed, :close_code + + def initialize(sock, args={}) + super(sock) + + delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) + spec = args[:spec] || nil + + @spec = Qpid::Spec010::load(spec) + @track = @spec["track"] + + @attached = {} + @sessions = {} + + @condition = new_cond + @opened = false + @failed = false + @close_code = [nil, "connection aborted"] + + @thread = nil + + @channel_max = 65535 + + @delegate = delegate.call(self, args) + end + + def attach(name, ch, delegate, force=false) + synchronize do + ssn = @attached[ch.id] + if ssn + raise ChannelBusy.new(ch, ssn) unless ssn.name == name + else + ssn = @sessions[name] + if ssn.nil? + ssn = Session.new(name, @spec, :delegate => delegate) + @sessions[name] = ssn + elsif ssn.channel + if force + @attached.delete(ssn.channel.id) + ssn.channel = nil + else + raise SessionBusy.new(ssn) + end + end + @attached[ch.id] = ssn + ssn.channel = ch + end + ch.session = ssn + return ssn + end + end + + def detach(name, ch) + synchronize do + @attached.delete(ch.id) + ssn = @sessions.delete(name) + if ssn + ssn.channel = nil + ssn.closed + return ssn + end + end + end + + def session(name, kwargs = {}) + timeout = kwargs[:timeout] + delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new) + + # FIXME: Python has cryptic comment about 'ch 0 ?' + channel = (0..@channel_max).detect { |i| ! @attached.key?(i) } + raise ChannelsBusy unless channel + + synchronize do + ch = Channel.new(self, channel) + ssn = attach(name, ch, delegate) + ssn.channel.session_attach(name) + if ssn.wait_for(timeout) { ssn.channel } + return ssn + else + detach(name, ch) + raise Timeout + end + end + end + + def detach_all + synchronize do + attached.values.each do |ssn| + ssn.exceptions << @close_code unless @close_code[0] == 200 + detach(ssn.name, ssn.channel) + end + end + end + + def start(timeout=nil) + @delegate.start + @thread = Thread.new { run } + @thread[:name] = 'conn' + synchronize do + unless @condition.wait_for(timeout) { @opened || @failed } + raise Timeout + end + end + if @failed + raise ConnectionFailed.new(@close_code) + end + end + + def run + # XXX: we don't really have a good way to exit this loop without + # getting the other end to kill the socket + loop do + begin + seg = read_segment + rescue Qpid::Closed => e + detach_all + break + end + @delegate.received(seg) + end + end + + def close(timeout=nil) + return unless @opened + Channel.new(self, 0).connection_close(200) + synchronize do + unless @condition.wait_for(timeout) { ! @opened } + raise Timeout + end + end + @thread.join(timeout) + @thread = nil + end + + def signal + synchronize { @condition.signal } + end + + def to_s + # FIXME: We'd like to report something like HOST:PORT + return @sock.to_s + end + + class Channel < Invoker + + attr_reader :id, :connection + attr_accessor :session + + def initialize(connection, id) + @connection = connection + @id = id + @session = nil + end + + def resolve_method(name) + inst = @connection.spec[name] + if inst.is_a?(Qpid::Spec010::Control) + return invocation(:method, inst) + else + return invocation(:error, nil) + end + end + + def invoke(type, args) + ctl = type.create(*args) + sc = StringCodec.new(@connection.spec) + sc.write_control(ctl) + @connection.write_segment(Segment.new(true, true, type.segment_type, + type.track, self.id, sc.encoded)) + + log = Qpid::logger["qpid.io.ctl"] + log.debug("SENT %s", ctl) if log + end + + def to_s + return "#{@connection}[#{@id}]" + end + + end + + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/connection08.rb b/RC9/qpid/ruby/lib/qpid/connection08.rb new file mode 100644 index 0000000000..09a4888cc4 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/connection08.rb @@ -0,0 +1,252 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "socket" +require "qpid/codec08" + +module Qpid08 + + class Connection + + def initialize(host, port, spec) + @host = host + @port = port + @spec = spec + end + + attr_reader(:host, :port, :spec) + + def connect() + @sock = TCPSocket.open(@host, @port) + @out = Encoder.new(@sock) + @in = Decoder.new(@sock) + end + + def init() + @out.write("AMQP") + [1, 1, @spec.major, @spec.minor].each {|o| + @out.octet(o) + } + end + + def write(frame) + # puts "OUT #{frame.inspect()}" + @out.octet(@spec.constants[frame.payload.type].id) + @out.short(frame.channel) + frame.payload.encode(@out) + @out.octet(frame_end) + end + + def read() + type = @spec.constants[@in.octet()].name + channel = @in.short() + payload = Payload.decode(type, @spec, @in) + oct = @in.octet() + if oct != frame_end + raise Exception.new("framing error: expected #{frame_end}, got #{oct}") + end + frame = Frame.new(channel, payload) + # puts " IN #{frame.inspect}" + return frame + end + + private + + def frame_end + @spec.constants[:"frame_end"].id + end + + end + + class Frame + + def initialize(channel, payload) + @channel = channel + @payload = payload + end + + attr_reader(:channel, :payload) + + end + + class Payload + + TYPES = {} + + def Payload.singleton_method_added(name) + if name == :type + TYPES[type] = self + end + end + + def Payload.decode(type, spec, dec) + klass = TYPES[type] + klass.decode(spec, dec) + end + + end + + class Method < Payload + + def initialize(method, args) + if args.size != method.fields.size + raise ArgumentError.new("argument mismatch #{method} #{args}") + end + @method = method + @args = args + end + + attr_reader(:method, :args) + + def Method.type; :frame_method end + + def type; Method.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@method.parent.id) + enc.short(@method.id) + @method.fields.zip(self.args).each {|f, a| + if a.nil?; a = f.default end + enc.encode(f.type, a) + } + enc.flush() + encoder.longstr(buf.to_s) + end + + def Method.decode(spec, decoder) + buf = decoder.longstr() + dec = Decoder.new(StringReader.new(buf)) + klass = spec.classes[dec.short()] + meth = klass.methods[dec.short()] + args = meth.fields.map {|f| dec.decode(f.type)} + return Method.new(meth, args) + end + + def inspect(); "#{method.qname}(#{args.join(", ")})" end + + end + + class Header < Payload + + def Header.type; :frame_header end + + def initialize(klass, weight, size, properties) + @klass = klass + @weight = weight + @size = size + @properties = properties + end + + attr_reader :weight, :size, :properties + + def type; Header.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@klass.id) + enc.short(@weight) + enc.longlong(@size) + + # property flags + nprops = @klass.fields.size + flags = 0 + 0.upto(nprops - 1) do |i| + f = @klass.fields[i] + flags <<= 1 + flags |= 1 unless @properties[f.name].nil? + # the last bit indicates more flags + if i > 0 and (i % 15) == 0 + flags <<= 1 + if nprops > (i + 1) + flags |= 1 + enc.short(flags) + flags = 0 + end + end + end + flags <<= ((16 - (nprops % 15)) % 16) + enc.short(flags) + + # properties + @klass.fields.each do |f| + v = @properties[f.name] + enc.encode(f.type, v) unless v.nil? + end + enc.flush() + encoder.longstr(buf.to_s) + end + + def Header.decode(spec, decoder) + dec = Decoder.new(StringReader.new(decoder.longstr())) + klass = spec.classes[dec.short()] + weight = dec.short() + size = dec.longlong() + + # property flags + bits = [] + while true + flags = dec.short() + 15.downto(1) do |i| + if flags >> i & 0x1 != 0 + bits << true + else + bits << false + end + end + break if flags & 0x1 == 0 + end + + # properties + properties = {} + bits.zip(klass.fields).each do |b, f| + properties[f.name] = dec.decode(f.type) if b + end + return Header.new(klass, weight, size, properties) + end + + def inspect(); "#{@klass.name}(#{@properties.inspect()})" end + + end + + class Body < Payload + + def Body.type; :frame_body end + + def type; Body.type end + + def initialize(content) + @content = content + end + + attr_reader :content + + def encode(enc) + enc.longstr(@content) + end + + def Body.decode(spec, dec) + return Body.new(dec.longstr()) + end + + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/datatypes.rb b/RC9/qpid/ruby/lib/qpid/datatypes.rb new file mode 100644 index 0000000000..96afe58dee --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/datatypes.rb @@ -0,0 +1,353 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + + def self.struct(type, *args) + # FIXME: This is fragile; the last arg could be a hash, + # without being hte keywords + kwargs = {} + kwargs = args.pop if args.any? && args[-1].is_a?(Hash) + + if args.size > type.fields.size + raise TypeError, + "%s() takes at most %d arguments (%d given)" % + [type.name, type.fields.size, args.size] + end + + attrs = type.fields.inject({}) do |attrs, field| + if args.any? + attrs[field.name] = args.shift + if kwargs.key?(field.name) + raise TypeError, + "%s() got multiple values for keyword argument '%s'" % + [type.name, field.name] + end + elsif kwargs.key?(field.name) + attrs[field.name] = kwargs.delete(field.name) + else + attrs[field.name] = field.default + end + attrs + end + + unless kwargs.empty? + unexpected = kwargs.keys[0] + raise TypeError, + "%s() got an unexpected keyword argument '%s'" % + [type.name, unexpected] + end + + attrs[:type] = type + attrs[:id] = nil + + name = "Qpid_" + type.name.to_s.capitalize + unless ::Struct.const_defined?(name) + vars = type.fields.collect { |f| f.name } << :type << :id + ::Struct.new(name, *vars) + end + st = ::Struct.const_get(name) + + result = st.new + attrs.each { |k, v| result[k] = v } + return result + end + + class Message + + attr_accessor :headers, :body, :id + + def initialize(*args) + @body = nil + @headers = nil + + @body = args.pop unless args.empty? + @headers = args unless args.empty? + + @id = nil + end + + def has(name) + return ! get(name).nil? + end + + def get(name) + if @headers + name = name.to_sym + @headers.find { |h| h.type.name == name } + end + end + + def set(header) + @headers ||= [] + if h = @headers.find { |h| h.type == header.type } + ind = @headers.index(h) + @headers[ind] = header + else + @headers << header + end + end + + def clear(name) + if @headers + name = name.to_sym + @headers.delete_if { |h| h.type.name == name } + end + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # args = [] + # if self.headers: + # args.extend(map(repr, self.headers)) + # if self.body: + # args.append(repr(self.body)) + # if self.id is not None: + # args.append("id=%s" % self.id) + # return "Message(%s)" % ", ".join(args) + # end + end + + class ::Object + + def to_serial + Qpid::Serial.new(self) + end + end + + class Serial + + include Comparable + + attr_accessor :value + + def initialize(value) + @value = value & 0xFFFFFFFF + end + + def hash + @value.hash + end + + def to_serial + self + end + + def eql?(other) + other = other.to_serial + value.eql?(other.value) + end + + def <=>(other) + return 1 if other.nil? + + other = other.to_serial + + delta = (value - other.value) & 0xFFFFFFFF + neg = delta & 0x80000000 + mag = delta & 0x7FFFFFFF + + return (neg>0) ? -mag : mag + end + + def +(other) + result = other.to_serial + result.value += value + return result + end + + def -(other) + result = other.to_serial + result.value = value - result.value + return result + end + + def succ + Serial.new(value + 1) + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # return "serial(%s)" % self.value + # end + + def to_s + value.to_s + end + + end + + # The Python class datatypes.Range is emulated by the standard + # Range class with a few additions + class ::Range + + alias :lower :begin + alias :upper :end + + def touches(r) + # XXX: are we doing more checks than we need? + return (r.include?(lower - 1) || + r.include?(upper + 1) || + include?(r.lower - 1) || + include?(r.upper + 1) || + r.include?(lower) || + r.include?(upper) || + include?(r.lower) || + include?(r.upper)) + end + + def span(r) + Range.new([lower, r.lower].min, [upper, r.upper].max) + end + + def intersect(r) + l = [lower, r.lower].max + u = [upper, r.upper].min + return l > u ? nil : Range.new(l, u) + end + + end + + class RangedSet + + include Enumerable + + attr_accessor :ranges + + def initialize(*args) + @ranges = [] + args.each { |n| add(n) } + end + + def each(&block) + ranges.each { |r| yield(r) } + end + + def include?(n) + if (n.is_a?(Range)) + super(n) + else + ranges.find { |r| r.include?(n) } + end + end + + def add_range(range) + ranges.delete_if do |r| + if range.touches(r) + range = range.span(r) + true + else + false + end + end + ranges << range + end + + def add(lower, upper = nil) + upper = lower if upper.nil? + add_range(Range.new(lower, upper)) + end + + def to_s + repr = ranges.sort { |a,b| b.lower <=> a.lower }. + map { |r| r.to_s }.join(",") + "<RangedSet: {#{repr}}" + end + end + + class Future + def initialize(initial=nil, exception=Exception) + @value = initial + @error = nil + @set = Util::Event.new + @exception = exception + end + + def error(error) + @error = error + @set.set + end + + def set(value) + @value = value + @set.set + end + + def get(timeout=nil) + @set.wait(timeout) + unless @error.nil? + raise @exception.new(@error) + end + @value + end + end + + class UUID + include Comparable + + attr_accessor :bytes + + def initialize(bytes) + @bytes = bytes + end + + def <=>(other) + if other.respond_to?(:bytes) + return bytes <=> other.bytes + else + raise NotImplementedError + end + end + + def to_s + UUID::format(bytes) + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # return "UUID(%r)" % str(self) + # end + + def self.random_uuid + bytes = (1..16).collect { |i| rand(256) } + + # From RFC4122, the version bits are set to 0100 + bytes[7] &= 0x0F + bytes[7] |= 0x40 + + # From RFC4122, the top two bits of byte 8 get set to 01 + bytes[8] &= 0x3F + bytes[8] |= 0x80 + return bytes.pack("C16") + end + + def self.uuid4 + UUID.new(random_uuid) + end + + def self.format(s) + # Python format !LHHHHL + # big-endian, ulong, ushort x 4, ulong + "%08x-%04x-%04x-%04x-%04x%08x" % s.unpack("NnnnnN") + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/delegates.rb b/RC9/qpid/ruby/lib/qpid/delegates.rb new file mode 100644 index 0000000000..21513fc677 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/delegates.rb @@ -0,0 +1,204 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'rbconfig' + +module Qpid + + class Delegate + + def initialize(connection, args={}) + @connection = connection + @spec = connection.spec + @delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) + @control = @spec[:track].enum[:control].value + end + + def log ; Qpid::logger["qpid.io.ctl"]; end + + def received(seg) + ssn = @connection.attached[seg.channel] + unless ssn + ch = Qpid::Connection::Channel.new(@connection, seg.channel) + else + ch = ssn.channel + end + + if seg.track == @control + ctl = seg.decode(@spec) + log.debug("RECV %s", ctl) if log + attr = ctl.type.name + method(attr).call(ch, ctl) + elsif ssn.nil? + ch.session_detached + else + ssn.received(seg) + end + end + + def connection_close(ch, close) + @connection.close_code = [close.reply_code, close.reply_text] + ch.connection_close_ok + @connection.sock.close_write() + unless @connection.opened + @connection.failed = true + @connection.signal + end + end + + def connection_close_ok(ch, close_ok) + @connection.opened = false + @connection.signal + end + + def session_attach(ch, a) + begin + @connection.attach(a.name, ch, @delegate, a.force) + ch.session_attached(a.name) + rescue Qpid::ChannelBusy + ch.session_detached(a.name) + rescue Qpid::SessionBusy + ch.session_detached(a.name) + end + end + + def session_attached(ch, a) + ch.session.signal + end + + def session_detach(ch, d) + #send back the confirmation of detachment before removing the + #channel from the attached set; this avoids needing to hold the + #connection lock during the sending of this control and ensures + #that if the channel is immediately reused for a new session the + #attach request will follow the detached notification. + ch.session_detached(d.name) + ssn = @connection.detach(d.name, ch) + end + + def session_detached(ch, d) + @connection.detach(d.name, ch) + end + + def session_request_timeout(ch, rt) + ch.session_timeout(rt.timeout) + end + + def session_command_point(ch, cp) + ssn = ch.session + ssn.receiver.next_id = cp.command_id + ssn.receiver.next_offset = cp.command_offset + end + + def session_completed(ch, cmp) + ch.session.sender.has_completed(cmp.commands) + if cmp.timely_reply + ch.session_known_completed(cmp.commands) + end + ch.session.signal + end + + def session_known_completed(ch, kn_cmp) + ch.session.receiver.known_completed(kn_cmp.commands) + end + + def session_flush(ch, f) + rcv = ch.session.receiver + if f.expected + if rcv.next_id + exp = Qpid::RangedSet.new(rcv.next_id) + else + exp = nil + end + ch.session_expected(exp) + end + if f.confirmed + ch.session_confirmed(rcv.completed) + end + if f.completed + ch.session_completed(rcv.completed) + end + end + + class Server < Delegate + + def start + @connection.read_header() + @connection.write_header(@spec.major, @spec.minor) + ch = Qpid::Connection::Channel.new(@connection, 0) + ch.connection_start(:mechanisms => ["ANONYMOUS"]) + ch + end + + def connection_start_ok(ch, start_ok) + ch.connection_tune(:channel_max => 65535) + end + + def connection_tune_ok(ch, tune_ok) + nil + end + + def connection_open(ch, open) + @connection.opened = true + ch.connection_open_ok() + @connection.signal + end + end + + class Client < Delegate + + # FIXME: Python uses os.name for platform - we don't have an exact + # analog in Ruby + PROPERTIES = {"product" => "qpid python client", + "version" => "development", + "platform" => Config::CONFIG["build_os"]} + + + def initialize(connection, args) + super(connection) + + @username = args[:username] || "guest" + @password = args[:password] || "guest" + @mechanism= args[:mechanism] || "PLAIN" + end + + def start + @connection.write_header(@spec.major, @spec.minor) + @connection.read_header + end + + def connection_start(ch, start) + r = "\0%s\0%s" % [@username, @password] + ch.connection_start_ok(:client_properties => PROPERTIES, + :mechanism => @mechanism, + :response => r) + end + + def connection_tune(ch, tune) + ch.connection_tune_ok() + ch.connection_open() + end + + def connection_open_ok(ch, open_ok) + @connection.opened = true + @connection.signal + end + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/fields.rb b/RC9/qpid/ruby/lib/qpid/fields.rb new file mode 100644 index 0000000000..cc87d07529 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/fields.rb @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Class + def fields(*fields) + module_eval { + def initialize(*args, &block) + args = init_fields(*args) + + if respond_to? :init + init(*args) {|*a| yield(*a)} + elsif args.any? + raise ArgumentError, "extra arguments: #{args.inspect}" + end + end + } + + vars = fields.map {|f| :"@#{f.to_s().chomp("?")}"} + + define_method(:init_fields) {|*args| + vars.each {|v| + instance_variable_set(v, args.shift()) + } + args + } + + vars.each_index {|i| + define_method(fields[i]) { + instance_variable_get(vars[i]) + } + } + end +end diff --git a/RC9/qpid/ruby/lib/qpid/framer.rb b/RC9/qpid/ruby/lib/qpid/framer.rb new file mode 100644 index 0000000000..2a565a69a8 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/framer.rb @@ -0,0 +1,195 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' +require 'logger' + +module Qpid + + FIRST_SEG = 0x08 + LAST_SEG = 0x04 + FIRST_FRM = 0x02 + LAST_FRM = 0x01 + + class << self + attr_accessor :raw_logger, :frm_logger + end + + def self.packed_size(format) + # FIXME: This is a total copout to simulate Python's + # struct.calcsize + ([0]*256).pack(format).size + end + + class Frame + attr_reader :payload, :track, :flags, :type, :channel + + # HEADER = "!2BHxBH4x" + # Python Meaning Ruby + # ! big endian (implied by format char) + # 2B 2 uchar C2 + # H unsigned short n + # x pad byte x + # B uchar C + # H unsigned short n + # 4x pad byte x4 + HEADER = "C2nxCnx4" + HEADER_SIZE = Qpid::packed_size(HEADER) + MAX_PAYLOAD = 65535 - HEADER_SIZE + + def initialize(flags, type, track, channel, payload) + if payload.size > MAX_PAYLOAD + raise ArgumentError, "max payload size exceeded: #{payload.size}" + end + + @flags = flags + @type = type + @track = track + @channel = channel + @payload = payload + end + + def first_segment? ; FIRST_SEG & @flags > 0 ; end + + def last_segment? ; LAST_SEG & @flags > 0 ; end + + def first_frame? ; FIRST_FRM & @flags > 0 ; end + + def last_frame? ; LAST_FRM & @flags > 0 ; end + + def to_s + fs = first_segment? ? 'S' : '.' + ls = last_segment? ? 's' : '.' + ff = first_frame? ? 'F' : '.' + lf = last_frame? ? 'f' : '.' + + return "%s%s%s%s %s %s %s %s" % [fs, ls, ff, lf, + @type, + @track, + @channel, + @payload.inspect] + end + end + + class FramingError < Exception ; end + + class Closed < Exception ; end + + class Framer + include Packer + + # Python: "!4s4B" + HEADER = "a4C4" + HEADER_SIZE = 8 + + def raw + Qpid::raw_logger + end + + def frm + Qpid::frm_logger + end + + def initialize(sock) + @sock = sock + @sock.extend(MonitorMixin) + @buf = "" + end + + attr_reader :sock + + def aborted? ; false ; end + + def write(buf) + @buf += buf + end + + def flush + @sock.synchronize do + _write(@buf) + @buf = "" + frm.debug("FLUSHED") if frm + end + end + + def _write(buf) + while buf && buf.size > 0 + # FIXME: Catch errors + n = @sock.write(buf) + raw.debug("SENT #{buf[0, n].inspect}") if raw + buf[0,n] = "" + @sock.flush + end + end + + def read(n) + data = "" + while data.size < n + begin + s = @sock.read(n - data.size) + rescue IOError => e + raise e if data != "" + @sock.close unless @sock.closed? + raise Closed + end + # FIXME: Catch errors + if s.nil? or s.size == 0 + @sock.close unless @sock.closed? + raise Closed + end + data += s + raw.debug("RECV #{n}/#{data.size} #{s.inspect}") if raw + end + return data + end + + def read_header + unpack(Framer::HEADER, Framer::HEADER_SIZE) + end + + def write_header(major, minor) + @sock.synchronize do + pack(Framer::HEADER, "AMQP", 1, 1, major, minor) + flush() + end + end + + def write_frame(frame) + @sock.synchronize do + size = frame.payload.size + Frame::HEADER_SIZE + track = frame.track & 0x0F + pack(Frame::HEADER, frame.flags, frame.type, size, track, frame.channel) + write(frame.payload) + if frame.last_segment? and frame.last_frame? + flush() + frm.debug("SENT #{frame}") if frm + end + end + end + + def read_frame + flags, type, size, track, channel = unpack(Frame::HEADER, Frame::HEADER_SIZE) + raise FramingError if (flags & 0xF0 > 0) + payload = read(size - Frame::HEADER_SIZE) + frame = Frame.new(flags, type, track, channel, payload) + frm.debug("RECV #{frame}") if frm + return frame + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/invoker.rb b/RC9/qpid/ruby/lib/qpid/invoker.rb new file mode 100644 index 0000000000..39716ac6c2 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/invoker.rb @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Qpid::Invoker + + # Requires that client defines a invoke method and overrides + # resolve_method + + # FIXME: Is it really worth defining methods in method_missing ? We + # could just dispatch there directly + + def invc_method(name, resolved) + define_singleton_method(name) { |*args| invoke(resolved, args) } + # FIXME: the Python code also attaches docs from resolved.pydoc + end + + def invc_value(name, resolved) + define_singleton_method(name) { | | resolved } + end + + def invc_error(name, resolved) + msg = "%s instance has no attribute '%s'" % [self.class.name, name] + if resolved + msg += "\n%s" % resolved + end + raise NameError, msg + end + + def resolve_method(name) + invocation(:error, nil) + end + + def method_missing(name, *args) + disp, resolved = resolve_method(name) + disp.call(name, resolved) + send(name, *args) + end + + def invocation(kind, name = nil) + [ method("invc_#{kind}"), name ] + end + + private + def define_singleton_method(name, &body) + singleton_class = class << self; self; end + singleton_class.send(:define_method, name, &body) + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/packer.rb b/RC9/qpid/ruby/lib/qpid/packer.rb new file mode 100644 index 0000000000..ae1be37faf --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/packer.rb @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + module Packer + def unpack(fmt, len) + raw = read(len) + values = raw.unpack(fmt) + values = values[0] if values.size == 1 + return values + end + + def pack(fmt, *args) + write(args.pack(fmt)) + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/peer.rb b/RC9/qpid/ruby/lib/qpid/peer.rb new file mode 100644 index 0000000000..cdb962169b --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/peer.rb @@ -0,0 +1,289 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "thread" +require "qpid/queue" +require "qpid/connection08" +require "qpid/fields" + +module Qpid08 + + Queue = Qpid::Queue + + class Peer + + def initialize(conn, delegate) + @conn = conn + @delegate = delegate + @outgoing = Queue.new() + @work = Queue.new() + @channels = {} + @mutex = Mutex.new() + end + + def channel(id) + @mutex.synchronize do + ch = @channels[id] + if ch.nil? + ch = Channel.new(id, self, @outgoing, @conn.spec) + @channels[id] = ch + end + return ch + end + end + + def channel_delete(id) + @channels.delete(id) + end + + def start() + spawn(:writer) + spawn(:reader) + spawn(:worker) + end + + def close() + @mutex.synchronize do + @channels.each_value do |ch| + ch.close() + end + @outgoing.close() + @work.close() + end + end + + private + + def spawn(method, *args) + Thread.new do + begin + send(method, *args) + # is this the standard way to catch any exception? + rescue Closed => e + puts "#{method} #{e}" + rescue Object => e + print e + e.backtrace.each do |line| + print "\n ", line + end + print "\n" + end + end + end + + def reader() + while true + frame = @conn.read() + ch = channel(frame.channel) + ch.dispatch(frame, @work) + end + end + + def writer() + while true + @conn.write(@outgoing.get()) + end + end + + def worker() + while true + dispatch(@work.get()) + end + end + + def dispatch(queue) + frame = queue.get() + ch = channel(frame.channel) + payload = frame.payload + if payload.method.content? + content = Qpid08::read_content(queue) + else + content = nil + end + + message = Message.new(payload.method, payload.args, content) + @delegate.dispatch(ch, message) + end + + end + + class Channel + def initialize(id, peer, outgoing, spec) + @id = id + @peer = peer + @outgoing = outgoing + @spec = spec + @incoming = Queue.new() + @responses = Queue.new() + @queue = nil + @closed = false + end + + attr_reader :id + + def closed?; @closed end + + def close() + return if closed? + @peer.channel_delete(@id) + @closed = true + @incoming.close() + @responses.close() + end + + def dispatch(frame, work) + payload = frame.payload + case payload + when Method + if payload.method.response? + @queue = @responses + else + @queue = @incoming + work << @incoming + end + end + @queue << frame + end + + def method_missing(name, *args) + method = @spec.find_method(name) + if method.nil? + raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}") + end + + if args.size == 1 and args[0].instance_of? Hash + kwargs = args[0] + invoke_args = method.fields.map do |f| + kwargs[f.name] + end + content = kwargs[:content] + else + invoke_args = [] + method.fields.each do |f| + if args.any? + invoke_args << args.shift() + else + invoke_args << f.default + end + end + if method.content? and args.any? + content = args.shift() + else + content = nil + end + if args.any? then raise ArgumentError.new("#{args.size} extr arguments") end + end + return invoke(method, invoke_args, content) + end + + def invoke(method, args, content = nil) + raise Closed() if closed? + frame = Frame.new(@id, Method.new(method, args)) + @outgoing << frame + + if method.content? + content = Content.new() if content.nil? + write_content(method.parent, content, @outgoing) + end + + nowait = false + f = method.fields[:"nowait"] + nowait = args[method.fields.index(f)] unless f.nil? + + unless nowait or method.responses.empty? + resp = @responses.get().payload + if resp.method.content? + content = read_content(@responses) + else + content = nil + end + if method.responses.include? resp.method + return Message.new(resp.method, resp.args, content) + else + # XXX: ValueError doesn't actually exist + raise ValueError.new(resp) + end + end + end + + def write_content(klass, content, queue) + size = content.size + header = Frame.new(@id, Header.new(klass, content.weight, size, content.headers)) + queue << header + content.children.each {|child| write_content(klass, child, queue)} + queue << Frame.new(@id, Body.new(content.body)) if size > 0 + end + + end + + def Qpid08.read_content(queue) + frame = queue.get() + header = frame.payload + children = [] + 1.upto(header.weight) { children << read_content(queue) } + size = header.size + read = 0 + buf = "" + while read < size + body = queue.get() + content = body.payload.content + buf << content + read += content.size + end + buf.freeze() + return Content.new(header.properties.clone(), buf, children) + end + + class Content + def initialize(headers = {}, body = "", children = []) + @headers = headers + @body = body + @children = children + end + + attr_reader :headers, :body, :children + + def size; body.size end + def weight; children.size end + + def [](key); @headers[key] end + def []=(key, value); @headers[key] = value end + end + + class Message + fields(:method, :args, :content) + + alias fields args + + def method_missing(name) + return args[@method.fields[name].id] + end + + def inspect() + "#{method.qname}(#{args.join(", ")})" + end + end + + module Delegate + def dispatch(ch, msg) + send(msg.method.qname, ch, msg) + end + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/qmf.rb b/RC9/qpid/ruby/lib/qpid/qmf.rb new file mode 100644 index 0000000000..d2e2651653 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/qmf.rb @@ -0,0 +1,1603 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Console API for Qpid Management Framework + +require 'socket' +require 'monitor' +require 'thread' +require 'uri' +require 'time' + +module Qpid::Qmf + + # To access the asynchronous operations, a class must be derived from + # Console with overrides of any combination of the available methods. + class Console + + # Invoked when a connection is established to a broker + def broker_connected(broker); end + + # Invoked when the connection to a broker is lost + def broker_disconnected(broker); end + + # Invoked when a QMF package is discovered + def new_package(name); end + + # Invoked when a new class is discovered. Session.getSchema can be + # used to obtain details about the class + def new_class(kind, klass_key); end + + # Invoked when a QMF agent is discovered + def new_agent(agent); end + + # Invoked when a QMF agent disconects + def del_agent(agent); end + + # Invoked when an object is updated + def object_props(broker, record); end + + # Invoked when an object is updated + def object_stats(broker, record); end + + # Invoked when an event is raised + def event(broker, event); end + + def heartbeat(agent, timestamp); end + + def broker_info(broker); end + end + + class BrokerURL + + attr_reader :host, :port, :auth_name, :auth_pass, :auth_mech + + def initialize(text) + uri = URI.parse(text) + + @host = uri.host + @port = uri.port ? uri.port : 5672 + @auth_name = uri.user ? uri.user : "guest" + @auth_pass = uri.password ? uri.password: "guest" + @auth_mech = "PLAIN" + + return uri + end + + def name + "#{@host}:#{@port}" + end + + def match(host, port) + # FIXME: Unlcear what the Python code is actually checking for + # here, especially since HOST can resolve to multiple IP's + @port == port && + (host == @host || ipaddr(host, port) == ipaddr(@host, @port)) + end + + private + def ipaddr(host, port) + s = Socket::getaddrinfo(host, port, + Socket::AF_INET, Socket::SOCK_STREAM) + s[0][2] + end + end + + # An instance of the Session class represents a console session running + # against one or more QMF brokers. A single instance of Session is + # needed to interact with the management framework as a console. + class Session + CONTEXT_SYNC = 1 + CONTEXT_STARTUP = 2 + CONTEXT_MULTIGET = 3 + + GET_WAIT_TIME = 60 + + include MonitorMixin + + attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages + + # Initialize a session. If the console argument is provided, the + # more advanced asynchronous features are available. If console is + # defaulted, the session will operate in a simpler, synchronous + # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments + # are meaningful only if 'console' is provided. They control + # whether object updates, events, and agent-heartbeats are + # subscribed to. If the console is not interested in receiving one + # or more of the above, setting the argument to False will reduce + # tha bandwidth used by the API. If manageConnections is set to + # True, the Session object will manage connections to the brokers. + # This means that if a broker is unreachable, it will retry until a + # connection can be established. If a connection is lost, the + # Session will attempt to reconnect. + # + # If manageConnections is set to False, the user is responsible for + # handing failures. In this case, an unreachable broker will cause + # addBroker to raise an exception. If userBindings is set to False + # (the default) and rcvObjects is True, the console will receive + # data for all object classes. If userBindings is set to True, the + # user must select which classes the console shall receive by + # invoking the bindPackage or bindClass methods. This allows the + # console to be configured to receive only information that is + # relavant to a particular application. If rcvObjects id False, + # userBindings has no meaning. + # + # Accept a hash of parameters, where keys can be :console, + # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections, + # and :user_bindings + def initialize(kwargs = {}) + super() + @console = kwargs[:console] || nil + @brokers = [] + @packages = {} + @seq_mgr = SequenceManager.new + @cv = new_cond + @sync_sequence_list = [] + @result = [] + @select = [] + @error = nil + @rcv_objects = kwargs[:rcv_objects] == nil ? true : kwargs[:rcv_objects] + @rcv_events = kwargs[:rcv_events] == nil ? true : kwargs[:rcv_events] + @rcv_heartbeats = kwargs[:rcv_heartbeats] == nil ? true : kwargs[:rcv_heartbeats] + @user_bindings = kwargs[:user_bindings] == nil ? false : kwargs[:user_bindings] + unless @console + @rcv_objects = false + @rcv_events = false + @rcv_heartbeats = false + end + @binding_key_list = binding_keys + @manage_connections = kwargs[:manage_connections] || false + + if @user_bindings && ! @rcv_objects + raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided" + end + + end + + def to_s + "QMF Console Session Manager (brokers: #{@brokers.size})" + end + + def managedConnections? + return @manage_connections + end + + # Connect to a Qpid broker. Returns an object of type Broker + def add_broker(target="amqp://localhost") + url = BrokerURL.new(target) + broker = Broker.new(self, url.host, url.port, url.auth_mech, url.auth_name, url.auth_pass) + unless broker.connected? || @manage_connections + raise broker.error + end + + @brokers << broker + objects(:broker => broker, :class => "agent") unless @manage_connections + return broker + end + + # Disconnect from a broker. The 'broker' argument is the object + # returned from the addBroker call + def del_broker(broker) + broker.shutdown + @brokers.delete(broker) + end + + # Get the list of known classes within a QMF package + def classes(package_name) + @brokers.each { |broker| broker.wait_for_stable } + if @packages.include?(package_name) + # FIXME What's the actual structure of @packages[package_name] + @packages[package_name].inject([]) do |list, cname, hash| + list << [ package_name, cname, hash] + end + end + end + + # Get the schema for a QMF class + def schema(klass_key) + @brokers.each { |broker| broker.wait_for_stable } + pname, cname, hash = klass_key + if @packages.include?(pname) + @packages[pname][ [cname, hash] ] + end + end + + def bind_package(package_name) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topic_name, + :binding_key => "console.obj.*.*.#{package_name}.#" } + broker.amqpSession.exchange_bind(args) + end + end + + def bind_class(package_name, class_name) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topic_name, + :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" } + broker.amqpSession.exchange_bind(args) + end + end + + def bind_class_key(klass_key) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + pname, cname, hash = klass_key + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topic_name, + :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" } + broker.amqpSession.exchange_bind(args) + end + end + + # Get a list of currently known agents + def agents(broker=nil) + broker_list = [] + if broker.nil? + broker_list = @brokers.dup + else + broker_list << broker + end + broker_list.each { |b| b.wait_for_stable } + agent_list = [] + broker_list.each { |b| agent_list += b.agents } + return agent_list + end + + # Get a list of objects from QMF agents. + # All arguments are passed by name(keyword). + # + # The class for queried objects may be specified in one of the + # following ways: + # :schema => <schema> - supply a schema object returned from getSchema. + # :key => <key> - supply a klass_key from the list returned by getClasses. + # :class => <name> - supply a class name as a string. If the class name exists + # in multiple packages, a _package argument may also be supplied. + # :object_id = <id> - get the object referenced by the object-id + # + # If objects should be obtained from only one agent, use the following argument. + # Otherwise, the query will go to all agents. + # + # :agent = <agent> - supply an agent from the list returned by getAgents. + # If the get query is to be restricted to one broker (as opposed to + # all connected brokers), add the following argument: + # + # :broker = <broker> - supply a broker as returned by addBroker. + # + # If additional arguments are supplied, they are used as property + # selectors, as long as their keys are strings. For example, if + # the argument "name" => "test" is supplied, only objects whose + # "name" property is "test" will be returned in the result. + def objects(kwargs) + if kwargs.include?(:broker) + broker_list = [] + broker_list << kwargs[:broker] + else + broker_list = @brokers + end + broker_list.each { |broker| broker.wait_for_stable } + + agent_list = [] + if kwargs.include?(:agent) + agent = kwargs[:agent] + unless broker_list.include?(agent.broker) + raise ArgumentError, "Supplied agent is not accessible through the supplied broker" + end + agent_list << agent + else + if kwargs.include?(:object_id) + oid = kwargs[:object_id] + broker_list.each { |broker| + broker.agents.each { |agent| + if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank + agent_list << agent + end + } + } + else + broker_list.each { |broker| agent_list += broker.agents } + end + end + + cname = nil + if kwargs.include?(:schema) + # FIXME: What kind of object is kwargs[:schema] + pname, cname, hash = kwargs[:schema].getKey() + elsif kwargs.include?(:key) + pname, cname, hash = kwargs[:key] + elsif kwargs.include?(:class) + pname, cname, hash = [kwargs[:package], kwargs[:class], nil] + end + if cname.nil? && ! kwargs.include?(:object_id) + raise ArgumentError, + "No class supplied, use :schema, :key, :class, or :object_id' argument" + end + + map = {} + @select = [] + if kwargs.include?(:object_id) + map["_objectid"] = kwargs[:object_id].to_s + else + map["_class"] = cname + map["_package"] = pname if pname + map["_hash"] = hash if hash + kwargs.each do |k,v| + @select << [k, v] if k.is_a?(String) + end + end + + @result = [] + agent_list.each do |agent| + broker = agent.broker + send_codec = Qpid::StringCodec.new(broker.conn.spec) + seq = nil + synchronize do + seq = @seq_mgr.reserve(CONTEXT_MULTIGET) + @sync_sequence_list << seq + end + broker.set_header(send_codec, ?G, seq) + send_codec.write_map(map) + bank_key = "%d.%d" % [broker.broker_bank, agent.agent_bank] + smsg = broker.message(send_codec.encoded, "agent.#{bank_key}") + broker.emit(smsg) + end + + timeout = false + synchronize do + unless @cv.wait_for(GET_WAIT_TIME) { + @sync_sequence_list.empty? || @error } + @sync_sequence_list.each do |pending_seq| + @seq_mgr.release(pending_seq) + end + @sync_sequence_list = [] + timeout = true + end + end + + if @error + errorText = @error + @error = nil + raise errorText + end + + if @result.empty? && timeout + raise "No agent responded within timeout period" + end + @result + end + + def set_event_filter(kwargs); end + + def handle_broker_connect(broker); end + + def handle_broker_resp(broker, codec, seq) + broker.broker_id = codec.read_uuid + @console.broker_info(broker) if @console + + # Send a package request + # (effectively inc and dec outstanding by not doing anything) + send_codec = Qpid::StringCodec.new(broker.conn.spec) + seq = @seq_mgr.reserve(CONTEXT_STARTUP) + broker.set_header(send_codec, ?P, seq) + smsg = broker.message(send_codec.encoded) + broker.emit(smsg) + end + + def handle_package_ind(broker, codec, seq) + pname = codec.read_str8 + new_package = false + synchronize do + new_package = ! @packages.include?(pname) + @packages[pname] = {} if new_package + end + @console.new_package(pname) if @console + + # Send a class request + broker.inc_outstanding + send_codec = Qpid::StringCodec.new(broker.conn.spec) + seq = @seq_mgr.reserve(CONTEXT_STARTUP) + broker.set_header(send_codec, ?Q, seq) + send_codec.write_str8(pname) + smsg = broker.message(send_codec.encoded) + broker.emit(smsg) + end + + def handle_command_complete(broker, codec, seq) + code = codec.read_uint32 + text = codec.read_str8 + context = @seq_mgr.release(seq) + if context == CONTEXT_STARTUP + broker.dec_outstanding + elsif context == CONTEXT_SYNC && seq == broker.sync_sequence + broker.sync_done + elsif context == CONTEXT_MULTIGET && @sync_sequence_list.include?(seq) + synchronize do + @sync_sequence_list.delete(seq) + @cv.signal if @sync_sequence_list.empty? + end + end + end + + def handle_class_ind(broker, codec, seq) + kind = codec.read_uint8 + pname = codec.read_str8 + cname = codec.read_str8 + hash = codec.read_bin128 + unknown = false + + synchronize do + return unless @packages.include?(pname) + unknown = true unless @packages[pname].include?([cname, hash]) + end + + if unknown + # Send a schema request for the unknown class + broker.inc_outstanding + send_codec = Qpid::StringCodec.new(broker.conn.spec) + seq = @seq_mgr.reserve(CONTEXT_STARTUP) + broker.set_header(send_codec, ?S, seq) + send_codec.write_str8(pname) + send_codec.write_str8(cname) + send_codec.write_bin128(hash) + smsg = broker.message(send_codec.encoded) + broker.emit(smsg) + end + end + + def handle_method_resp(broker, codec, seq) + code = codec.read_uint32 + + text = codec.read_str16 + out_args = {} + method, synchronous = @seq_mgr.release(seq) + if code == 0 + method.arguments.each do |arg| + if arg.dir.index(?O) + out_args[arg.name] = decode_value(codec, arg.type) + end + end + end + result = MethodResult.new(code, text, out_args) + if synchronous: + broker.synchronize do + broker.sync_result = MethodResult.new(code, text, out_args) + broker.sync_done + end + else + @console.method_response(broker, seq, result) if @console + end + end + + def handle_heartbeat_ind(broker, codec, seq, msg) + if @console + broker_bank = 1 + agent_bank = 0 + dp = msg.get("delivery_properties") + if dp + key = dp["routing_key"] + key_elements = key.split(".") + if key_elements.length == 4 + broker_bank = key_elements[2].to_i + agent_bank = key_elements[3].to_i + end + end + agent = broker.agent(broker_bank, agent_bank) + timestamp = codec.read_uint64 + @console.heartbeat(agent, timestamp) if agent + end + end + + def handle_event_ind(broker, codec, seq) + if @console + event = Event.new(self, broker, codec) + @console.event(broker, event) + end + end + + def handle_schema_resp(broker, codec, seq) + kind = codec.read_uint8 + pname = codec.read_str8 + cname = codec.read_str8 + hash = codec.read_bin128 + klass_key = [pname, cname, hash] + klass = SchemaClass.new(kind, klass_key, codec) + synchronize { @packages[pname][ [cname, hash] ] = klass } + + @seq_mgr.release(seq) + broker.dec_outstanding + @console.new_class(kind, klass_key) if @console + end + + def handle_content_ind(broker, codec, seq, prop=false, stat=false) + pname = codec.read_str8 + cname = codec.read_str8 + hash = codec.read_bin128 + klass_key = [pname, cname, hash] + + schema = nil + synchronize do + return unless @packages.include?(pname) + return unless @packages[pname].include?([cname, hash]) + schema = @packages[pname][ [cname, hash] ] + end + + object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat) + if pname == "org.apache.qpid.broker" && cname == "agent" && prop + broker.update_agent(object) + end + + synchronize do + if @sync_sequence_list.include?(seq) + if object.timestamps()[2] == 0 && select_match(object) + @result << object + end + return + end + end + + @console.object_props(broker, object) if @console && @rcv_objects && prop + @console.object_stats(broker, object) if @console && @rcv_objects && stat + end + + def handle_broker_disconnect(broker); end + + def handle_error(error) + @error = error + synchronize do + @sync_sequence_list = [] + @cv.signal + end + end + + # Decode, from the codec, a value based on its typecode + def decode_value(codec, typecode) + case typecode + when 1: data = codec.read_uint8 # U8 + when 2: data = codec.read_uint16 # U16 + when 3: data = codec.read_uint32 # U32 + when 4: data = codec.read_uint64 # U64 + when 6: data = codec.read_str8 # SSTR + when 7: data = codec.read_str16 # LSTR + when 8: data = codec.read_int64 # ABSTIME + when 9: data = codec.read_uint64 # DELTATIME + when 10: data = ObjectId.new(codec) # REF + when 11: data = codec.read_uint8 != 0 # BOOL + when 12: data = codec.read_float # FLOAT + when 13: data = codec.read_double # DOUBLE + when 14: data = codec.read_uuid # UUID + when 15: data = codec.read_map # FTABLE + when 16: data = codec.read_int8 # S8 + when 17: data = codec.read_int16 # S16 + when 18: data = codec.read_int32 # S32 + when 19: data = codec.read_int64 # S64 + else + raise ArgumentError, "Invalid type code: #{typecode} - #{typecode.inspect}" + end + return data + end + + # Encode, into the codec, a value based on its typecode + def encode_value(codec, value, typecode) + # FIXME: Python does a lot of magic type conversions + # We just assume that value has the right type; this is safer + # than coercing explicitly, since Array::pack will complain + # loudly about various type errors + case typecode + when 1: codec.write_uint8(value) # U8 + when 2: codec.write_uint16(value) # U16 + when 3: codec.write_uint32(value) # U32 + when 4: codec.write_uint64(value) # U64 + when 6: codec.write_str8(value) # SSTR + when 7: codec.write_str16(value) # LSTR + when 8: codec.write_int64(value) # ABSTIME + when 9: codec.write_uint64(value) # DELTATIME + when 10: value.encode(codec) # REF + when 11: codec.write_uint8(value ? 1 : 0) # BOOL + when 12: codec.write_float(value) # FLOAT + when 13: codec.write_double(value) # DOUBLE + when 14: codec.write_uuid(value) # UUID + when 15: codec.write_map(value) # FTABLE + when 16: codec.write_int8(value) # S8 + when 17: codec.write_int16(value) # S16 + when 18: codec.write_int32(value) # S32 + when 19: codec.write_int64(value) # S64 + else + raise ValueError, "Invalid type code: %d" % typecode + end + end + + def display_value(value, typecode) + case typecode + when 1: return value.to_s + when 2: return value.to_s + when 3: return value.to_s + when 4: return value.to_s + when 6: return value.to_s + when 7: return value.to_s + when 8: return strftime("%c", gmtime(value / 1000000000)) + when 9: return value.to_s + when 10: return value.to_s + when 11: return value ? 'T' : 'F' + when 12: return value.to_s + when 13: return value.to_s + when 14: return Qpid::UUID::format(value) + when 15: return value.to_s + when 16: return value.to_s + when 17: return value.to_s + when 18: return value.to_s + when 19: return value.to_s + else + raise ValueError, "Invalid type code: %d" % typecode + end + end + + private + + def binding_keys + key_list = [] + key_list << "schema.#" + if @rcv_objects && @rcv_events && @rcv_heartbeats && + ! @user_bindings + key_list << "console.#" + else + if @rcv_objects && ! @user_bindings + key_list << "console.obj.#" + else + key_list << "console.obj.*.*.org.apache.qpid.broker.agent" + end + key_list << "console.event.#" if @rcv_events + key_list << "console.heartbeat.#" if @rcv_heartbeats + end + return key_list + end + + # Check the object against select to check for a match + def select_match(object) + select.each do |key, value| + object.properties.each do |prop, propval| + return false if key == prop.name && value != propval + end + end + return true + end + + end + + class Package + attr_reader :name + + def initialize(name) + @name = name + end + end + + # A ClassKey uniquely identifies a class from the schema. + class ClassKey + attr_reader :package, :klass_name, :hash + + def initialize(package, klass_name, hash) + @package = package + @klass_name = klass_name + @hash = hash + end + end + + class SchemaClass + + CLASS_KIND_TABLE = 1 + CLASS_KIND_EVENT = 2 + + attr_reader :klass_key, :properties, :statistics, :methods, :arguments + + def initialize(kind, key, codec) + @kind = kind + @klass_key = key + @properties = [] + @statistics = [] + @methods = [] + @arguments = [] + + if @kind == CLASS_KIND_TABLE + prop_count = codec.read_uint16 + stat_count = codec.read_uint16 + method_count = codec.read_uint16 + prop_count.times { |idx| + @properties << SchemaProperty.new(codec) } + stat_count.times { |idx| + @statistics << SchemaStatistic.new(codec) } + method_count.times { |idx| + @methods<< SchemaMethod.new(codec) } + elsif @kind == CLASS_KIND_EVENT + arg_count = codec.read_uint16 + arg_count.times { |idx| + sa = SchemaArgument.new(codec, false) + @arguments << sa + } + end + end + + def to_s + pname, cname, hash = @klass_key + if @kind == CLASS_KIND_TABLE + kind_str = "Table" + elsif @kind == CLASS_KIND_EVENT + kind_str = "Event" + else + kind_str = "Unsupported" + end + result = "%s Class: %s:%s " % [kind_str, pname, cname] + result += Qpid::UUID::format(hash) + return result + end + end + + class SchemaProperty + + attr_reader :name, :type, :access, :index, :optional, + :unit, :min, :max, :maxlan, :desc + + def initialize(codec) + map = codec.read_map + @name = map["name"] + @type = map["type"] + @access = map["access"] + @index = map["index"] != 0 + @optional = map["optional"] != 0 + @unit = map["unit"] + @min = map["min"] + @max = map["max"] + @maxlan = map["maxlen"] + @desc = map["desc"] + end + + def to_s + @name + end + end + + class SchemaStatistic + + attr_reader :name, :type, :unit, :desc + + def initialize(codec) + map = codec.read_map + @name = map["name"] + @type = map["type"] + @unit = map["unit"] + @desc = map["desc"] + end + + def to_s + @name + end + end + + class SchemaMethod + + attr_reader :name, :desc, :arguments + + def initialize(codec) + map = codec.read_map + @name = map["name"] + arg_count = map["argCount"] + @desc = map["desc"] + @arguments = [] + arg_count.times { |idx| + @arguments << SchemaArgument.new(codec, true) + } + end + + def to_s + result = @name + "(" + first = true + result += @arguments.select { |arg| arg.dir.index(?I) }.join(", ") + result += ")" + return result + end + end + + class SchemaArgument + + attr_reader :name, :type, :dir, :unit, :min, :max, :maxlen + attr_reader :desc, :default + + def initialize(codec, method_arg) + map = codec.read_map + @name = map["name"] + @type = map["type"] + @dir = map["dir"].upcase if method_arg + @unit = map["unit"] + @min = map["min"] + @max = map["max"] + @maxlen = map["maxlen"] + @desc = map["desc"] + @default = map["default"] + end + end + + # Object that represents QMF object identifiers + class ObjectId + + include Comparable + + attr_reader :first, :second + + def initialize(codec, first=0, second=0) + if codec + @first = codec.read_uint64 + @second = codec.read_uint64 + else + @first = first + @second = second + end + end + + def <=>(other) + return 1 unless other.is_a?(ObjectId) + return -1 if first < other.first + return 1 if first > other.first + return second <=> other.second + end + + def to_s + "%d-%d-%d-%d-%d" % [flags, sequence, broker_bank, agent_bank, object] + end + + def index + [first, second] + end + + def flags + (first & 0xF000000000000000) >> 60 + end + + def sequence + (first & 0x0FFF000000000000) >> 48 + end + + def broker_bank + (first & 0x0000FFFFF0000000) >> 28 + end + + def agent_bank + first & 0x000000000FFFFFFF + end + + def object + second + end + + def durable? + sequence == 0 + end + + def encode(codec) + codec.write_uint64(first) + codec.write_uint64(second) + end + end + + class Object + + attr_reader :object_id, :schema, :properties, :statistics, + :current_time, :create_time, :delete_time, :broker + + def initialize(session, broker, schema, codec, prop, stat) + @session = session + @broker = broker + @schema = schema + @current_time = codec.read_uint64 + @create_time = codec.read_uint64 + @delete_time = codec.read_uint64 + @object_id = ObjectId.new(codec) + @properties = [] + @statistics = [] + if prop + missing = parse_presence_masks(codec, schema) + schema.properties.each do |property| + v = nil + unless missing.include?(property.name) + v = @session.decode_value(codec, property.type) + end + @properties << [property, v] + end + end + + if stat + schema.statistics.each do |statistic| + s = @session.decode_value(codec, statistic.type) + @statistics << [statistic, s] + end + end + end + + def klass_key + @schema.klass_key + end + + + def methods + @schema.methods + end + + # Return the current, creation, and deletion times for this object + def timestamps + return [@current_time, @create_time, @delete_time] + end + + # Return a string describing this object's primary key + def index + @properties.select { |property, value| + property.index + }.collect { |property,value| + @session.display_value(value, property.type) }.join(":") + end + + # Replace properties and/or statistics with a newly received update + def merge_update(newer) + unless object_id == newer.object_id + raise "Objects with different object-ids" + end + @properties = newer.getProperties unless newer.properties.empty? + @statistics = newer.getStatistics unless newer.statistics.empty? + end + + def to_s + key = klass_key + key[0] + ":" + key[1] + "[" + @object_id.to_s() + "] " + index + end + + # This must be defined because ruby has this (deprecated) method built in. + def id + method_missing(:id) + end + + # Same here.. + def type + method_missing(:type) + end + + def name + method_missing(:name) + end + + def method_missing(name, *args) + name = name.to_s + + if method = @schema.methods.find { |method| name == method.name } + return invoke(method, name, args) + end + + @properties.each do |property, value| + return value if name == property.name + if name == "_#{property.name}_" && property.type == 10 + # Dereference references + deref = @session.objects(:object_id => value, :broker => @broker) + return nil unless deref.size == 1 + return deref[0] + end + end + @statistics.each do |statistic, value| + if name == statistic.name + return value + end + end + raise "Type Object has no attribute '#{name}'" + end + + private + + def send_method_request(method, name, args, synchronous = false) + @schema.methods.each do |schema_method| + if name == schema_method.name + send_codec = Qpid::StringCodec.new(@broker.conn.spec) + seq = @session.seq_mgr.reserve([schema_method, synchronous]) + @broker.set_header(send_codec, ?M, seq) + @object_id.encode(send_codec) + pname, cname, hash = @schema.klass_key + send_codec.write_str8(pname) + send_codec.write_str8(cname) + send_codec.write_bin128(hash) + send_codec.write_str8(name) + + formals = method.arguments.select { |arg| arg.dir.index(?I) } + count = method.arguments.select { |arg| arg.dir.index(?I) }.size + unless formals.size == args.size + raise "Incorrect number of arguments: expected #{formals.size}, got #{args.size}" + end + + formals.zip(args).each do |formal, actual| + @session.encode_value(send_codec, actual, formal.type) + end + + smsg = @broker.message(send_codec.encoded, + "agent.#{object_id.broker_bank}.#{object_id.agent_bank}") + + @broker.sync_start if synchronous + @broker.emit(smsg) + + return seq + end + end + end + + def invoke(method, name, args) + if send_method_request(method, name, args, synchronous = true) + unless @broker.wait_for_sync_done + @session.seq_mgr.release(seq) + raise "Timed out waiting for method to respond" + end + + if @broker.error + error_text = @broker.error + @broker.error = nil + raise error_text + end + + return @broker.sync_result + end + raise "Invalid Method (software defect) [#{name}]" + end + + def parse_presence_masks(codec, schema) + exclude_list = [] + bit = 0 + schema.properties.each do |property| + if property.optional + if bit == 0 + mask = codec.read_uint8 + bit = 1 + end + if (mask & bit) == 0 + exclude_list << property.name + end + bit *= 2 + bit = 0 if bit == 256 + end + end + return exclude_list + end + end + + class MethodResult + + attr_reader :status, :text + + def initialize(status, text, out_args) + @status = status + @text = text + @out_args = out_args + end + + def method_missing(name) + name = name.to_s() + if @out_args.include?(name) + return @out_args[name] + else + raise "Unknown method result arg #{name}" + end + end + + def to_s + "#{text} (#{status}) - #{out_args.inspect}" + end + end + + class ManagedConnection + + DELAY_MIN = 1 + DELAY_MAX = 128 + DELAY_FACTOR = 2 + include MonitorMixin + + def initialize(broker) + super() + @broker = broker + @cv = new_cond + @is_cancelled = false + end + + # Main body of the running thread. + def start + @thread = Thread.new { + delay = DELAY_MIN + while true + begin + @broker.try_to_connect + synchronize do + while !@is_cancelled and @broker.connected? + @cv.wait + Thread.exit if @is_cancelled + delay = DELAY_MIN + end + end + + rescue Qpid::Session::Closed, Qpid::Session::Detached, SystemCallError + delay *= DELAY_FACTOR if delay < DELAY_MAX + end + + synchronize do + @cv.wait(delay) + Thread.exit if @is_cancelled + end + end + } + end + + # Tell this thread to stop running and return. + def stop + synchronize do + @is_cancelled = true + @cv.signal + end + end + + # Notify the thread that the connection was lost. + def disconnected + synchronize do + @cv.signal + end + end + + def join + @thread.join + end + end + + class Broker + + SYNC_TIME = 60 + + include MonitorMixin + + attr_accessor :error + + attr_reader :amqp_session_id, :amqp_session, :conn, :broker_bank, :topic_name + + attr_accessor :broker_id, :sync_result + + def initialize(session, host, port, auth_mech, auth_name, auth_pass) + super() + + # For debugging.. + Thread.abort_on_exception = true + + @session = session + @host = host + @port = port + @auth_name = auth_name + @auth_pass = auth_pass + @broker_bank = 1 + @agents = {} + @agents["1.0"] = Agent.new(self, 0, "BrokerAgent") + @topic_bound = false + @cv = new_cond + @sync_in_flight = false + @sync_request = 0 + @sync_result = nil + @reqs_outstanding = 1 + @error = nil + @broker_id = nil + @is_connected = false + @conn = nil + if @session.managedConnections? + @thread = ManagedConnection.new(self) + @thread.start + else + @thread = nil + try_to_connect + end + end + + def connected? + @is_connected + end + + def agent(broker_bank, agent_bank) + bank_key = "%d.%d" % [broker_bank, agent_bank] + return @agents[bank_key] + end + + # Get the list of agents reachable via this broker + def agents + @agents.values + end + + def url + "#{@host}:#{@port}" + end + + def to_s + if connected? + "Broker connected at: #{url}" + else + "Disconnected Broker" + end + end + + def wait_for_sync_done + synchronize do + return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error } + end + end + + def wait_for_stable + synchronize do + return if @reqs_outstanding == 0 + @sync_in_flight = true + unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 } + raise "Timed out waiting for broker to synchronize" + end + end + end + + # Compose the header of a management message + def set_header(codec, opcode, seq=0) + codec.write_uint8(?A) + codec.write_uint8(?M) + codec.write_uint8(?2) + codec.write_uint8(opcode) + codec.write_uint32(seq) + end + + def message(body, routing_key="broker") + dp = @amqp_session.delivery_properties + dp.routing_key = routing_key + mp = @amqp_session.message_properties + mp.content_type = "x-application/qmf" + mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name) + return Qpid::Message.new(dp, mp, body) + end + + def emit(msg, dest="qpid.management") + @amqp_session.message_transfer(:destination => dest, + :message => msg) + end + + def inc_outstanding + synchronize { @reqs_outstanding += 1 } + end + + def dec_outstanding + synchronize do + @reqs_outstanding -= 1 + if @reqs_outstanding == 0 && ! @topic_bound + @topic_bound = true + @session.binding_key_list.each do |key| + args = { + :exchange => "qpid.management", + :queue => @topic_name, + :binding_key => key } + @amqp_session.exchange_bind(args) + end + end + if @reqs_outstanding == 0 && @sync_in_flight + sync_done + end + end + end + + def sync_start + synchronize { @sync_in_flight = true } + end + + def sync_done + synchronize do + @sync_in_flight = false + @cv.signal + end + end + + def update_agent(obj) + bank_key = "%d.%d" % [obj.brokerBank, obj.agentBank] + if obj.delete_time == 0 + unless @agents.include?(bank_key) + agent = Agent.new(self, obj.agentBank, obj.label) + @agents[bank_key] = agent + @session.console.new_agent(agent) if @session.console + end + else + agent = @agents.delete(bank_key) + @session.console.del_agent(agent) if agent && @session.console + end + end + + def shutdown + if @thread + @thread.stop + @thread.join + end + if connected? + @amqp_session.incoming("rdest").stop + if @session.console + @amqp_session.incoming("tdest").stop + end + @amqp_session.close + @is_connected = false + end + end + + def try_to_connect + #begin + @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid] + # FIXME: Need sth for Qpid::Util::connect + + @conn = Qpid::Connection.new(TCPSocket.new(@host, @port), + :username => @auth_name, + :password => @auth_pass) + @conn.start + @reply_name = "reply-%s" % amqp_session_id + @amqp_session = @conn.session(@amqp_session_id) + @amqp_session.auto_sync = true + + @amqp_session.queue_declare(:queue => @reply_name, + :exclusive => true, + :auto_delete => true) + + @amqp_session.exchange_bind(:exchange => "amq.direct", + :queue => @reply_name, + :binding_key => @reply_name) + @amqp_session.message_subscribe(:queue => @reply_name, + :destination => "rdest", + :accept_mode => @amqp_session.message_accept_mode.none, + :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired) + q = @amqp_session.incoming("rdest") + q.exc_listen(& method(:exception_cb)) + q.listen(& method(:reply_cb)) + @amqp_session.message_set_flow_mode(:destination => "rdest", + :flow_mode => 1) + @amqp_session.message_flow(:destination => "rdest", + :unit => 0, + :value => 0xFFFFFFFF) + @amqp_session.message_flow(:destination => "rdest", + :unit => 1, + :value => 0xFFFFFFFF) + + @topic_name = "topic-#{@amqp_session_id}" + @amqp_session.queue_declare(:queue => @topic_name, + :exclusive => true, + :auto_delete => true) + @amqp_session.message_subscribe(:queue => @topic_name, + :destination => "tdest", + :accept_mode => @amqp_session.message_accept_mode.none, + :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired) + @amqp_session.incoming("tdest").listen(& method(:reply_cb)) + @amqp_session.message_set_flow_mode(:destination => "tdest", + :flow_mode => 1) + @amqp_session.message_flow(:destination => "tdest", + :unit => 0, + :value => 0xFFFFFFFF) + @amqp_session.message_flow(:destination => "tdest", + :unit => 1, + :value => 0xFFFFFFFF) + + @is_connected = true + @session.handle_broker_connect(self) + + codec = Qpid::StringCodec.new(@conn.spec) + set_header(codec, ?B) + msg = message(codec.encoded) + emit(msg) + end + + private + + # Check the header of a management message and extract the opcode and + # class + def check_header(codec) + begin + return [nil, nil] unless codec.read_uint8 == ?A + return [nil, nil] unless codec.read_uint8 == ?M + return [nil, nil] unless codec.read_uint8 == ?2 + opcode = codec.read_uint8 + seq = codec.read_uint32 + return [opcode, seq] + rescue + return [nil, nil] + end + end + + def reply_cb(msg) + codec = Qpid::StringCodec.new(@conn.spec, msg.body) + loop do + opcode, seq = check_header(codec) + return unless opcode + case opcode + when ?b: @session.handle_broker_resp(self, codec, seq) + when ?p: @session.handle_package_ind(self, codec, seq) + when ?z: @session.handle_command_complete(self, codec, seq) + when ?q: @session.handle_class_ind(self, codec, seq) + when ?m: @session.handle_method_resp(self, codec, seq) + when ?h: @session.handle_heartbeat_ind(self, codec, seq, msg) + when ?e: @session.handle_event_ind(self, codec, seq) + when ?s: @session.handle_schema_resp(self, codec, seq) + when ?c: @session.handle_content_ind(self, codec, seq, true, false) + when ?i: @session.handle_content_ind(self, codec, seq, false, true) + when ?g: @session.handle_content_ind(self, codec, seq, true, true) + else + raise "Unexpected opcode #{opcode.inspect}" + end + end + end + + def exception_cb(data) + @is_connected = false + @error = data + synchronize { @cv.signal if @sync_in_flight } + @session.handle_error(@error) + @session.handle_broker_disconnect(self) + @thread.disconnected if @thread + end + end + + class Agent + attr_reader :broker, :agent_bank, :label + + def initialize(broker, agent_bank, label) + @broker = broker + @agent_bank = agent_bank + @label = label + end + + def broker_bank + @broker.broker_bank + end + + def to_s + "Agent at bank %d.%d (%s)" % [@broker.broker_bank, @agent_bank, @label] + end + end + + class Event + + attr_reader :klass_key, :arguments, :timestamp, :name, :schema + + def initialize(session, broker, codec) + @session = session + @broker = broker + pname = codec.read_str8 + cname = codec.read_str8 + hash = codec.read_bin128 + @klass_key = [pname, cname, hash] + @timestamp = codec.read_int64 + @severity = codec.read_uint8 + @schema = nil + session.packages.keys.each do |pname| + k = [cname, hash] + if session.packages[pname].include?(k) + @schema = session.packages[pname][k] + @arguments = {} + @schema.arguments.each do |arg| + v = session.decode_value(codec, arg.type) + @arguments[arg.name] = v + end + end + end + end + + def to_s + return "<uninterpretable>" unless @schema + t = Time.at(self.timestamp / 1000000000) + out = t.strftime("%c") + out += " " + sev_name + " " + @klass_key[0] + ":" + klass_key[1] + out += " broker=" + @broker.url + @schema.arguments.each do |arg| + out += " " + arg.name + "=" + @session.display_value(@arguments[arg.name], arg.type) + end + return out + end + + def sev_name + case @severity + when 0 : return "EMER " + when 1 : return "ALERT" + when 2 : return "CRIT " + when 3 : return "ERROR" + when 4 : return "WARN " + when 5 : return "NOTIC" + when 6 : return "INFO " + when 7 : return "DEBUG" + else + return "INV-%d" % @severity + end + end + + end + + # Manage sequence numbers for asynchronous method calls + class SequenceManager + include MonitorMixin + + def initialize + super() + @sequence = 0 + @pending = {} + end + + # Reserve a unique sequence number + def reserve (data) + synchronize do + result = @sequence + @sequence += 1 + @pending[result] = data + return result + end + end + + # Release a reserved sequence number + def release (seq) + synchronize { @pending.delete(seq) } + end + end + + class DebugConsole < Console + + def broker_connected(broker) + puts "brokerConnected #{broker}" + end + + def broker_disconnected(broker) + puts "brokerDisconnected #{broker}" + end + + def new_package(name) + puts "newPackage #{name}" + end + + def new_class(kind, klass_key) + puts "newClass #{kind} #{klass_key}" + end + + def new_agent(agent) + puts "new_agent #{agent}" + end + + def del_agent(agent) + puts "delAgent #{agent}" + end + + def object_props(broker, record) + puts "objectProps #{record}" + end + + def object_stats(broker, record) + puts "objectStats #{record}" + end + + def event(broker, event) + puts "event #{event}" + end + + def heartbeat(agent, timestamp) + puts "heartbeat #{agent}" + end + + def broker_info(broker) + puts "brokerInfo #{broker}" + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/queue.rb b/RC9/qpid/ruby/lib/qpid/queue.rb new file mode 100644 index 0000000000..4150173b53 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/queue.rb @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Augment the standard python multithreaded Queue implementation to add a +# close() method so that threads blocking on the content of a queue can be +# notified if the queue is no longer in use. + +require 'thread' + +# Python nominally uses a bounded queue, but the code never establishes +# a maximum size; we therefore use Ruby's unbounded queue +class Qpid::Queue < ::Queue + + DONE = Object.new + STOP = Object.new + + def initialize + super + @error = nil + @listener = nil + @exc_listener = nil + @exc_listener_lock = Monitor.new + @thread = nil + end + + def close(error = nil) + @error = error + put(DONE) + unless @thread.nil? + @thread.join() + @thread = nil + end + end + + def get(block = true, timeout = nil) + unless timeout.nil? + raise NotImplementedError + end + result = pop(! block) + if result == DONE + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Qpid::Closed exception + put(DONE) + raise Qpid::Closed.new(@error) + else + return result + end + end + + alias :put :push + + def exc_listen(&block) + @exc_listener_lock.synchronize do + @exc_listener = block + end + end + + def listen(&block) + if ! block_given? && @thread + put(STOP) + @thread.join() + @thread = nil + end + + # FIXME: There is a potential race since we could be changing one + # non-nil listener to another + @listener = block + + if block_given? && @thread.nil? + @thread = Thread.new do + loop do + begin + o = get() + break if o == STOP + @listener.call(o) + rescue Qpid::Closed => e + @exc_listener.call(e) if @exc_listener + break + end + end + end + end + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/session.rb b/RC9/qpid/ruby/lib/qpid/session.rb new file mode 100644 index 0000000000..43a664d285 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/session.rb @@ -0,0 +1,458 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' + +module Qpid + + class Session < Invoker + + def log; Qpid::logger["qpid.io.cmd"]; end + def msg; Qpid::logger["qpid.io.msg"]; end + + + class Exception < RuntimeError; end + class Closed < Qpid::Session::Exception; end + class Detached < Qpid::Session::Exception; end + + + INCOMPLETE = Object.new + + def self.client(*args) + return Qpid::Client(*args) + end + + def self.server(*args) + return Server(*args) + end + + attr_reader :name, :spec, :auto_sync, :timeout, :channel + attr_reader :results, :exceptions + attr_accessor :channel, :auto_sync, :send_id, :receiver, :sender + + # FIXME: Pass delegate through a block ? + def initialize(name, spec, kwargs = {}) + auto_sync = true + auto_sync = kwargs[:auto_sync] if kwargs.key?(:auto_sync) + timeout = kwargs[:timeout] || 10 + delegate = kwargs[:delegate] + + @name = name + @spec = spec + @auto_sync = auto_sync + @timeout = timeout + @invoke_lock = Monitor.new + @closing = false + @closed = false + + @cond_lock = Monitor.new + @condition = @cond_lock.new_cond + + @send_id = true + @receiver = Receiver.new(self) + @sender = Sender.new(self) + + @lock = Monitor.new + @incoming = {} + @results = {} + @exceptions = [] + + @assembly = nil + + @delegate = delegate.call(self) if delegate + + @ctl_seg = spec[:segment_type].enum[:control].value + @cmd_seg = spec[:segment_type].enum[:command].value + @hdr_seg = spec[:segment_type].enum[:header].value + @body_seg = spec[:segment_type].enum[:body].value + end + + def incoming(destination) + @lock.synchronize do + queue = @incoming[destination] + unless queue + queue = Incoming.new(self, destination) + @incoming[destination] = queue + end + return queue + end + end + + def error? + @exceptions.size > 0 + end + + def sync(timeout=nil) + if channel && Thread.current == channel.connection.thread + raise Qpid::Session::Exception, "deadlock detected" + end + unless @auto_sync + execution_sync(:sync => true) + end + last = @sender.next_id - 1 + @cond_lock.synchronize do + unless @condition.wait_for(timeout) { + @sender.completed.include?(last) || error? + } + raise Qpid::Timeout + end + end + if error? + raise Qpid::Session::Exception, exceptions + end + end + + def close(timeout=nil) + @invoke_lock.synchronize do + @closing = true + channel.session_detach(name) + end + @cond_lock.synchronize do + unless @condition.wait_for(timeout) { @closed } + raise Qpid::Timeout + end + end + end + + def closed + @lock.synchronize do + return if @closed + + @results.each { |id, f| f.error(exceptions) } + @results.clear + + @incoming.values.each { |q| q.close(exceptions) } + @closed = true + @cond_lock.synchronize { @condition.signal } + end + end + + def resolve_method(name) + o = @spec.children[name] + case o + when Qpid::Spec010::Command + return invocation(:method, o) + when Qpid::Spec010::Struct + return invocation(:method, o) + when Qpid::Spec010::Domain + return invocation(:value, o.enum) unless o.enum.nil? + end + + matches = @spec.children.select { |x| + x.name.to_s.include?(name.to_s) + }.collect { |x| x.name.to_s }.sort + if matches.size == 0 + msg = nil + elsif matches.size == 1 + msg = "Did you mean #{matches[0]} ? " + else + msg = "Did you mean one of #{matches.join(",")} ? " + end + return invocation(:error, msg) + end + + def invoke(type, args) + # XXX + unless type.respond_to?(:track) + return type.create(*args) + end + @invoke_lock.synchronize do + return do_invoke(type, args) + end + end + + def do_invoke(type, args) + raise Qpid::Session::Closed if @closing + raise Qpid::Session::Detached unless channel + + # Clumsy simulation of Python's keyword args + kwargs = {} + if args.size > 0 && args[-1].is_a?(Hash) + if args.size > type.fields.size + kwargs = args.pop + elsif type.fields[args.size - 1].type != @spec[:map] + kwargs = args.pop + end + end + + if type.payload + if args.size == type.fields.size + 1 + message = args.pop + else + message = kwargs.delete(:message) # XXX Really ? + end + else + message = nil + end + + hdr = Qpid::struct(@spec[:header]) + hdr.sync = @auto_sync || kwargs.delete(:sync) + + cmd = type.create(*args.push(kwargs)) + sc = Qpid::StringCodec.new(@spec) + sc.write_command(hdr, cmd) + + seg = Segment.new(true, (message.nil? || + (message.headers.nil? && message.body.nil?)), + type.segment_type, type.track, @channel.id, sc.encoded) + + unless type.result.nil? + result = Future.new(exception=Exception) + @results[@sender.next_id] = result + end + emit(seg) + + log.debug("SENT %s %s %s" % [seg.id, hdr, cmd]) if log + + unless message.nil? + unless message.headers.nil? + sc = Qpid::StringCodec.new(@spec) + message.headers.each { |st| sc.write_struct32(st) } + + seg = Segment.new(false, message.body.nil?, @hdr_seg, + type.track, @channel.id, sc.encoded) + emit(seg) + end + unless message.body.nil? + seg = Segment.new(false, true, @body_seg, type.track, + @channel.id, message.body) + emit(seg) + end + msg.debug("SENT %s" % message) if msg + end + + if !type.result.nil? + return @auto_sync ? result.get(@timeout) : result + elsif @auto_sync + sync(@timeout) + end + end + + def received(seg) + @receiver.received(seg) + if seg.first_segment? + raise Qpid::Session::Exception unless @assembly.nil? + @assembly = [] + end + @assembly << seg + if seg.last_segment? + dispatch(@assembly) + @assembly = nil + end + end + + def dispatch(assembly) + hdr = nil + cmd = nil + header = nil + body = nil + assembly.each do |seg| + d = seg.decode(@spec) + case seg.type + when @cmd_seg + hdr, cmd = d + when @hdr_seg + header = d + when @body_seg + body = d + else + raise Qpid::Session::Exception + end + end + log.debug("RECV %s %s %s" % [cmd.id, hdr, cmd]) if log + + if cmd.type.payload + result = @delegate.send(cmd.type.name, cmd, header, body) + else + result = @delegate.send(cmd.type.name, cmd) + end + + unless cmd.type.result.nil? + execution_result(cmd.id, result) + end + + if result != INCOMPLETE + assembly.each do |seg| + @receiver.has_completed(seg) + # XXX: don't forget to obey sync for manual completion as well + if hdr.sync + @channel.session_completed(@receiver.completed) + end + end + end + end + + # Python calls this 'send', but that has a special meaning + # in Ruby, so we call it 'emit' + def emit(seg) + @sender.emit(seg) + end + + def signal + @cond_lock.synchronize { @condition.signal } + end + + def wait_for(timeout = nil, &block) + @cond_lock.synchronize { @condition.wait_for(timeout, &block) } + end + + def to_s + "<Session: #{name}, #{channel}>" + end + + class Receiver + + attr_reader :completed + attr_accessor :next_id, :next_offset + + def initialize(session) + @session = session + @next_id = nil + @next_offset = nil + @completed = Qpid::RangedSet.new() + end + + def received(seg) + if @next_id.nil? || @next_offset.nil? + raise Exception, "todo" + end + seg.id = @next_id + seg.offset = @next_offset + if seg.last_segment? + @next_id += 1 + @next_offset = 0 + else + @next_offset += seg.payload.size + end + end + + def has_completed(seg) + if seg.id.nil? + raise ArgumentError, "cannot complete unidentified segment" + end + if seg.last_segment? + @completed.add(seg.id) + end + end + + def known_completed(commands) + completed = Qpid::RangedSet.new() + @completed.ranges.each do |c| + unless commands.ranges.find { |kc| + kc.contains(c.lower) && kc.contains(c.upper) + } + completed.add_range(c) + end + end + @completed = completed + end + end + + class Sender + + def initialize(session) + @session = session + @next_id = 0.to_serial + @next_offset = 0 + @segments = [] + @completed = RangedSet.new() + end + + attr_reader :next_id, :completed + + def emit(seg) + seg.id = @next_id + seg.offset = @next_offset + if seg.last_segment? + @next_id += 1 + @next_offset = 0 + else + @next_offset += seg.payload.size + end + @segments << seg + if @session.send_id + @session.send_id = false + @session.channel.session_command_point(seg.id, seg.offset) + end + @session.channel.connection.write_segment(seg) + end + + def has_completed(commands) + @segments = @segments.reject { |seg| commands.include?(seg.id) } + commands.ranges.each do |range| + @completed.add(range.lower, range.upper) + end + end + end + + class Incoming < Qpid::Queue + + def initialize(session, destination) + super() + @session = session + @destination = destination + end + + def start + @session.message_credit_unit.choices.each do |unit| + @session.message_flow(@destination, unit.value, 0xFFFFFFFF) + end + end + + def stop + @session.message_cancel(@destination) + listen # Kill the listener + end + end + + class Delegate + + def initialize(session) + @session = session + end + + #XXX: do something with incoming accepts + def message_accept(ma) nil; end + + def execution_result(er) + future = @session.results.delete(er.command_id) + future.set(er.value) + end + + def execution_exception(ex) + @session.exceptions.append(ex) + end + end + + class Client < Delegate + + def log ; Qpid::logger["qpid.io.msg"]; end + + def message_transfer(cmd, headers, body) + m = Qpid::Message.new(body) + m.headers = headers + m.id = cmd.id + messages = @session.incoming(cmd.destination) + messages.put(m) + log.debug("RECV %s" % m) if log + return INCOMPLETE + end + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/spec.rb b/RC9/qpid/ruby/lib/qpid/spec.rb new file mode 100644 index 0000000000..b3d70d019d --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/spec.rb @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "set" +require "rexml/document" +require "qpid/fields" +require "qpid/traverse" + +module Qpid + module Spec + + include REXML + + class Container < Array + + def initialize() + @cache = {} + end + + def [](key) + return @cache[key] if @cache.include?(key) + value = do_lookup(key) + @cache[key] = value + return value + end + + def do_lookup(key) + case key + when String + return find {|x| x.name == key.intern()} + when Symbol + return find {|x| x.name == key} + else + return slice(key) + end + end + + def +(other) + copy = clone() + copy.concat(other) + return copy + end + + end + + class Reference + + fields(:name) + + def init(&block) + @resolver = block + end + + def resolve(*args) + @resolver.call(*args) + end + + end + + class Loader + + def initialize() + @stack = [] + end + + def container() + return Container.new() + end + + def load(obj) + case obj + when String + elem = @stack[-1] + result = container() + elem.elements.each(obj) {|e| + @index = result.size + result << load(e) + } + @index = nil + return result + else + elem = obj + @stack << elem + begin + result = send(:"load_#{elem.name}") + ensure + @stack.pop() + end + return result + end + end + + def element + @stack[-1] + end + + def text + element.text + end + + def attr(name, type = :string, default = nil, path = nil) + if path.nil? + elem = element + else + elem = nil + element.elements.each(path) {|elem|} + if elem.nil? + return default + end + end + + value = elem.attributes[name] + value = value.strip() unless value.nil? + if value.nil? + default + else + send(:"parse_#{type}", value) + end + end + + def parse_int(value) + if value.nil? + return nil + else + value.to_i(0) + end + end + + TRUE = ["yes", "true", "1"].to_set + FALSE = ["no", "false", "0", nil].to_set + + def parse_bool(value) + if TRUE.include?(value) + true + elsif FALSE.include?(value) + false + else + raise Exception.new("parse error, expecting boolean: #{value}") + end + end + + def parse_string(value) + value.to_s + end + + def parse_symbol(value) + value.intern() unless value.nil? + end + + REPLACE = {" " => "_", "-" => "_"} + KEYWORDS = {"global" => "global_", "return" => "return_"} + + def parse_name(value) + return if value.nil? + + REPLACE.each do |k, v| + value = value.gsub(k, v) + end + + value = KEYWORDS[value] if KEYWORDS.has_key? value + return value.intern() + end + + end + + end +end diff --git a/RC9/qpid/ruby/lib/qpid/spec010.rb b/RC9/qpid/ruby/lib/qpid/spec010.rb new file mode 100644 index 0000000000..4c1e46b910 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/spec010.rb @@ -0,0 +1,485 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "qpid/spec" +require 'pathname' +require 'fileutils' + +module Qpid::Spec010 + + include Qpid::Spec + + # XXX: workaround for ruby bug/missfeature + Reference = Reference + Loader = Loader + + class Spec + + ENCODINGS = { + String => "vbin16", + Fixnum => "int64", + Bignum => "int64", + Float => "float", + NilClass => "void", + Array => "list", + Hash => "map" + } + + fields(:major, :minor, :port, :children) + + def init() + @controls = {} + @commands = {} + @structs = {} + @types = {} + children.each {|c| + case c + when Control + @controls[c.code] = c + when Command + @commands[c.code] = c + when Struct + @structs[c.code] = c + when Type + @types[c.code] = c unless c.code.nil? + end + } + end + + attr_reader :controls, :commands, :structs, :types + + def [](key) + return @children[key] + end + + def encoding(klass) + if ENCODINGS.has_key?(klass) + return self[ENCODINGS[klass]] + end + for base in klass.__bases__ + result = encoding(base) + return result unless result.nil? + end + end + + def inspect; "spec"; end + end + + class Constant + + fields(:name, :value) + + attr :parent, true + + end + + class Type + + fields(:name, :code, :fixed, :variable) + + attr :parent, true + + def present?(value) + if @fixed == 0 + return value + else + return !value.nil? + end + end + + def encode(codec, value) + codec.send("write_#{name}", value) + end + + def decode(codec) + return codec.send("read_#{name}") + end + + def inspect; name; end + + end + + class Domain < Type + + fields(:name, :type, :enum) + + attr :parent, true + + def encode(codec, value) + @type.encode(codec, value) + end + + def decode(codec) + return @type.decode(codec) + end + + end + + class Enum + fields(:choices) + + def [](choice) + case choice + when String + choice = choice.to_sym + return choices.find { |c| c.name == choice } + when Symbol + return choices.find { |c| c.name == choice } + else + return choices.find { |c| c.value == choice } + end + end + + def method_missing(name, *args) + raise ArgumentError.new("wrong number of arguments") unless args.empty? + return self[name].value + end + + end + + class Choice + fields(:name, :value) + end + + class Composite + + fields(:name, :code, :size, :pack, :fields) + + attr :parent, true + + # Python calls this 'new', but that has special meaning in Ruby + def create(*args) + return Qpid::struct(self, *args) + end + + def decode(codec) + codec.read_size(@size) + codec.read_uint16() unless @code.nil? + return Qpid::struct(self, self.decode_fields(codec)) + end + + def decode_fields(codec) + flags = 0 + pack.times {|i| flags |= (codec.read_uint8() << 8*i)} + + result = {} + + fields.each_index {|i| + f = @fields[i] + if flags & (0x1 << i) != 0 + result[f.name] = f.type.decode(codec) + else + result[f.name] = nil + end + } + + return result + end + + def encode(codec, value) + sc = Qpid::StringCodec.new(@spec) + sc.write_uint16(@code) unless @code.nil? + encode_fields(sc, value) + codec.write_size(@size, sc.encoded.size) + codec.write(sc.encoded) + end + + def encode_fields(codec, values) + # FIXME: This could be written cleaner using select + # instead of flags + flags = 0 + fields.each_index do |i| + f = fields[i] + flags |= (0x1 << i) if f.type.present?(values[f.name]) + end + + pack.times { |i| codec.write_uint8((flags >> 8*i) & 0xFF) } + + fields.each_index do |i| + f = fields[i] + f.type.encode(codec, values[f.name]) if flags & (0x1 << i) != 0 + end + end + + def inspect; name; end + + end + + class Field + + fields(:name, :type, :exceptions) + + def default() + return nil + end + + end + + class Struct < Composite + + def present?(value) + return !value.nil? + end + + end + + class Action < Composite; end + + class Control < Action + + def segment_type + @parent[:segment_type].enum[:control].value + end + + def track + @parent[:track].enum[:control].value + end + + end + + class Command < Action + + attr_accessor :payload, :result + + def segment_type + @parent["segment_type"].enum["command"].value + end + + def track + @parent["track"].enum["command"].value + end + + end + + class Doc + fields(:type, :title, :text) + end + + class Loader010 < Loader + + def initialize() + super() + end + + def klass + cls = element + until cls.nil? + break if cls.name == "class" + cls = cls.parent + end + return cls + end + + def scope + if element.name == "struct" + return nil + else + return class_name + end + end + + def class_name + cls = klass + if cls.nil? + return nil + else + return parse_name(cls.attributes["name"].strip) + end + end + + def class_code + cls = klass + if cls.nil? + return 0 + else + return parse_int(cls.attributes["code"].strip) + end + end + + def parse_decl(value) + name = parse_name(value) + + s = scope + if s.nil? + return name + else + return :"#{s}_#{name}" + end + end + + def parse_code(value) + c = parse_int(value) + if c.nil? + return nil + else + return c | (class_code << 8) + end + end + + def parse_type(value) + name = parse_name(value.sub(".", "_")) + cls = class_name + return Reference.new {|spec| + candidates = [name] + candidates << :"#{cls}_#{name}" unless cls.nil? + for c in candidates + child = spec[c] + break unless child.nil? + end + if child.nil? + raise Exception.new("unresolved type: #{name}") + else + child + end +} + end + + def load_amqp() + children = nil + + for s in ["constant", "type", "domain", "struct", "control", + "command"] + ch = load(s) + if children.nil? + children = ch + else + children += ch + end + children += load("class/#{s}") + end + children += load("class/command/result/struct") + Spec.new(attr("major", :int), attr("minor", :int), attr("port", :int), + children) + end + + def load_constant() + Constant.new(attr("name", :decl), attr("value", :int)) + end + + def load_type() + Type.new(attr("name", :decl), attr("code", :code), + attr("fixed-width", :int), attr("variable-width", :int)) + end + + def load_domain() + Domain.new(attr("name", :decl), attr("type", :type), load("enum").first) + end + + def load_enum() + Enum.new(load("choice")) + end + + def load_choice() + Choice.new(attr("name", :name), attr("value", :int)) + end + + def load_field() + Field.new(attr("name", :name), attr("type", :type)) + end + + def load_struct() + Struct.new(attr("name", :decl), attr("code", :code), attr("size", :int), + attr("pack", :int), load("field")) + end + + def load_action(cls) + cls.new(attr("name", :decl), attr("code", :code), 0, 2, load("field")) + end + + def load_control() + load_action(Control) + end + + def load_command() + result = attr("type", :type, nil, "result") + result = attr("name", :type, nil, "result/struct") if result.nil? + segs = load("segments") + cmd = load_action(Command) + cmd.result = result + cmd.payload = !segs.empty? + return cmd + end + + def load_result() + true + end + + def load_segments() + true + end + + end + + def self.spec_cache(specfile) + File::join(File::dirname(__FILE__), "spec_cache", + File::basename(specfile, ".xml") + ".rb_marshal") + end + + # XXX: could be shared + def self.load(spec = nil) + return spec if spec.is_a?(Qpid::Spec010::Spec) + if spec.nil? + # FIXME: Need to add a packaging setup in here so we know where + # the installed spec is going to be. + specfile = nil + if ENV['AMQP_SPEC'] + specfile = ENV['AMQP_SPEC'] + else + require "qpid/config" + specfile = Qpid::Config.amqp_spec + end + else + specfile = spec + end + + specfile_cache = spec_cache(specfile) + # FIXME: Check that cache is newer than specfile + if File::exist?(specfile_cache) + begin + spec = File::open(specfile_cache, "r") do |f| + Marshal::load(f) + end + return spec + rescue + # Ignore, will load from XML + end + end + + doc = File::open(specfile, "r") { |f| Document.new(f) } + spec = Loader010.new().load(doc.root) + spec.traverse! do |o| + if o.is_a?(Reference) + o.resolve(spec) + else + o + end + end + + spec.children.each { |c| c.parent = spec } + + begin + FileUtils::mkdir_p(File::dirname(specfile_cache)) + File::open(specfile_cache, "w") { |f| Marshal::dump(spec, f) } + rescue + # Ignore, we are fine without the cached spec + end + return spec + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/spec08.rb b/RC9/qpid/ruby/lib/qpid/spec08.rb new file mode 100644 index 0000000000..902c05c297 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/spec08.rb @@ -0,0 +1,190 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "qpid/spec" + +module Qpid08 + + module Spec + + include Qpid::Spec + + # XXX: workaround for ruby bug/missfeature + Reference = Reference + + class Root + fields(:major, :minor, :classes, :constants, :domains) + + def find_method(name) + classes.each do |c| + c.methods.each do |m| + if name == m.qname + return m + end + end + end + + return nil + end + end + + class Constant + fields(:name, :id, :type, :docs) + end + + class Domain + fields(:name, :type) + end + + class Class + fields(:name, :id, :handler, :fields, :methods, :docs) + end + + class Method + fields(:name, :id, :content?, :responses, :synchronous?, :fields, + :docs) + + def init() + @response = false + end + + attr :parent, true + + def response?; @response end + def response=(b); @response = b end + + def qname + :"#{parent.name}_#{name}" + end + end + + class Field + fields(:name, :id, :type, :docs) + + def default + case type + when :bit then false + when :octet, :short, :long, :longlong then 0 + when :shortstr, :longstr then "" + when :table then {} + end + end + end + + class Doc + fields(:type, :text) + end + + class Container08 < Container + def do_lookup(key) + case key + when Integer + return find {|x| x.id == key} + else + return super(key) + end + end + end + + class Loader08 < Loader + + def container() + return Container08.new() + end + + def load_amqp() + Root.new(attr("major", :int), attr("minor", :int), load("class"), + load("constant"), load("domain")) + end + + def load_class() + Class.new(attr("name", :name), attr("index", :int), attr("handler", :name), + load("field"), load("method"), load("doc")) + end + + def load_method() + Method.new(attr("name", :name), attr("index", :int), + attr("content", :bool), load("response"), + attr("synchronous", :bool), load("field"), load("docs")) + end + + def load_response() + name = attr("name", :name) + Reference.new {|spec, klass| + response = klass.methods[name] + if response.nil? + raise Exception.new("no such method: #{name}") + end + response + } + end + + def load_field() + type = attr("type", :name) + if type.nil? + domain = attr("domain", :name) + type = Reference.new {|spec, klass| + spec.domains[domain].type + } + end + Field.new(attr("name", :name), @index, type, load("docs")) + end + + def load_constant() + Constant.new(attr("name", :name), attr("value", :int), attr("class", :name), + load("doc")) + end + + def load_domain() + Domain.new(attr("name", :name), attr("type", :name)) + end + + def load_doc() + Doc.new(attr("type", :symbol), text) + end + + end + + def self.load(spec) + case spec + when String + spec = File.new(spec) + end + doc = Document.new(spec) + spec = Loader08.new().load(doc.root) + spec.classes.each do |klass| + klass.traverse! do |o| + case o + when Reference + o.resolve(spec, klass) + else + o + end + end + klass.methods.each do |m| + m.parent = klass + m.responses.each do |r| + r.response = true + end + end + end + return spec + end + end +end diff --git a/RC9/qpid/ruby/lib/qpid/test.rb b/RC9/qpid/ruby/lib/qpid/test.rb new file mode 100644 index 0000000000..2e643f4348 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/test.rb @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "qpid/spec08" +require "qpid/client" + +module Qpid08 + + module Test + + def connect() + spec = Spec.load("../specs/amqp.0-8.xml") + c = Client.new("0.0.0.0", 5672, spec) + c.start({"LOGIN" => "guest", "PASSWORD" => "guest"}) + return c + end + + end + +end diff --git a/RC9/qpid/ruby/lib/qpid/traverse.rb b/RC9/qpid/ruby/lib/qpid/traverse.rb new file mode 100644 index 0000000000..67358a7eb1 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/traverse.rb @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Object + + public + + def traverse() + traverse! {|o| yield(o); o} + end + + def traverse_children!() + instance_variables.each {|v| + value = instance_variable_get(v) + replacement = yield(value) + instance_variable_set(v, replacement) unless replacement.equal? value + } + end + + def traverse!(replacements = {}) + return replacements[__id__] if replacements.has_key? __id__ + replacement = yield(self) + replacements[__id__] = replacement + traverse_children! {|o| o.traverse!(replacements) {|c| yield(c)}} + return replacement + end + +end + +class Array + def traverse_children!() + map! {|o| yield(o)} + end +end + +class Hash + def traverse_children!() + mods = {} + each_pair {|k, v| + key = yield(k) + value = yield(v) + mods[key] = value unless key.equal? k and value.equal? v + delete(k) unless key.equal? k + } + + merge!(mods) + end +end diff --git a/RC9/qpid/ruby/lib/qpid/util.rb b/RC9/qpid/ruby/lib/qpid/util.rb new file mode 100644 index 0000000000..2dbc37da09 --- /dev/null +++ b/RC9/qpid/ruby/lib/qpid/util.rb @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'thread' +require 'monitor' + +# Monkeypatch +class MonitorMixin::ConditionVariable + + # Wait until BLOCK returns TRUE or TIMEOUT seconds have passed + # Return TRUE if BLOCK returned TRUE within the TIMEOUT, FALSE + # otherswise + def wait_for(timeout=nil, &block) + start = Time.now + passed = 0 + until yield + if timeout.nil? + wait + elsif passed < timeout + wait(timeout) + else + return false + end + passed = Time.now - start + end + return true + end +end + +module Qpid::Util + + # Similar to Python's threading.Event + class Event + def initialize + @monitor = Monitor.new + @cond = @monitor.new_cond + @set = false + end + + def set + @monitor.synchronize do + @set = true + @cond.signal + end + end + + def clear + @monitor.synchronize { @set = false } + end + + def wait(timeout = nil) + @monitor.synchronize do + unless @set + @cond.wait_for(timeout) { @set } + end + end + end + end +end |