diff options
Diffstat (limited to 'ruby/qpid/peer.rb')
| -rw-r--r-- | ruby/qpid/peer.rb | 74 |
1 files changed, 56 insertions, 18 deletions
diff --git a/ruby/qpid/peer.rb b/ruby/qpid/peer.rb index 9e77165d01..320808fdc6 100644 --- a/ruby/qpid/peer.rb +++ b/ruby/qpid/peer.rb @@ -39,19 +39,33 @@ module Qpid @mutex.synchronize do ch = @channels[id] if ch.nil? - ch = Channel.new(id, @outgoing, @conn.spec) + ch = Channel.new(id, self, @outgoing, @conn.spec) @channels[id] = ch end return ch end end + def channel_delete(id) + @channels.delete(id) + end + def start() spawn(:writer) spawn(:reader) spawn(:worker) end + def close() + @mutex.synchronize do + @channels.each_value do |ch| + ch.close() + @outgoing.close() + @work.close() + end + end + end + private def spawn(method, *args) @@ -59,6 +73,8 @@ module Qpid begin send(method, *args) # is this the standard way to catch any exception? + rescue Closed => e + puts "#{method} #{e}" rescue Object => e print e e.backtrace.each do |line| @@ -94,7 +110,7 @@ module Qpid ch = channel(frame.channel) payload = frame.payload if payload.method.content? - content = read_content(queue) + content = Qpid::read_content(queue) else content = nil end @@ -106,25 +122,27 @@ module Qpid end class Channel - def initialize(id, outgoing, spec) + def initialize(id, peer, outgoing, spec) @id = id + @peer = peer @outgoing = outgoing @spec = spec @incoming = Queue.new() @responses = Queue.new() @queue = nil @closed = false - @reason = nil end + attr_reader :id + def closed?; @closed end - def close(reason) + def close() return if closed? + @peer.channel_delete(@id) @closed = true - @reason = reason @incoming.close() - @responses .close() + @responses.close() end def dispatch(frame, work) @@ -142,7 +160,7 @@ module Qpid end def method_missing(name, *args) - method = @spec.ruby_method(name) + method = @spec.find_method(name) if method.nil? raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}") end @@ -150,7 +168,7 @@ module Qpid if args.size == 1 and args[0].instance_of? Hash kwargs = args[0] invoke_args = method.fields.map do |f| - kwargs[f.ruby_name] + kwargs[f.name] end content = kwargs[:content] else @@ -173,13 +191,13 @@ module Qpid end def invoke(method, args, content = nil) - raise Closed(@reason) if closed? + raise Closed() if closed? frame = Frame.new(@id, Method.new(method, args)) @outgoing << frame if method.content? content = Content.new() if content.nil? - write_content(method.klass, content, @outgoing) + write_content(method.parent, content, @outgoing) end nowait = false @@ -204,7 +222,7 @@ module Qpid def write_content(klass, content, queue) size = content.size - header = Frame.new(@id, Header.new(klass, content.weight, size)) + header = Frame.new(@id, Header.new(klass, content.weight, size, content.headers)) queue << header content.children.each {|child| write_content(klass, child, queue)} queue << Frame.new(@id, Body.new(content.body)) if size > 0 @@ -212,7 +230,7 @@ module Qpid end - def read_content(queue) + def Qpid.read_content(queue) frame = queue.pop() header = frame.payload children = [] @@ -220,14 +238,30 @@ module Qpid size = header.size read = 0 buf = "" - while read << size - body = queue.get() + while read < size + body = queue.pop() content = body.payload.content buf << content read += content.size end buf.freeze() - return Content.new(buf, children, header.properties.clone()) + return Content.new(header.properties.clone(), buf, children) + end + + class Content + def initialize(headers = {}, body = "", children = []) + @headers = headers + @body = body + @children = children + end + + attr_reader :headers, :body, :children + + def size; body.size end + def weight; children.size end + + def [](key); @headers[key] end + def []=(key, value); @headers[key] = value end end class Message @@ -235,14 +269,18 @@ module Qpid alias fields args + def method_missing(name) + return args[@method.fields[name].id] + end + def inspect() - "#{method.ruby_name}(#{args.join(", ")})" + "#{method.qname}(#{args.join(", ")})" end end module Delegate def dispatch(ch, msg) - send(msg.method.ruby_name, ch, msg) + send(msg.method.qname, ch, msg) end end |
