summaryrefslogtreecommitdiff
path: root/ruby/qpid/peer.rb
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-03-07 17:13:02 +0000
committerRafael H. Schloming <rhs@apache.org>2007-03-07 17:13:02 +0000
commit6dbcf1d2ef68c8bf2b0032287bf71af8b002e35e (patch)
tree5c2411ae8c9af50f952eab48a870d4ddd9357448 /ruby/qpid/peer.rb
parentc4d2e2a327175ad1703639e42363e8b4479ce0c0 (diff)
downloadqpid-python-6dbcf1d2ef68c8bf2b0032287bf71af8b002e35e.tar.gz
added test harness, tests, and a few missing pieces of implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@515652 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'ruby/qpid/peer.rb')
-rw-r--r--ruby/qpid/peer.rb74
1 files changed, 56 insertions, 18 deletions
diff --git a/ruby/qpid/peer.rb b/ruby/qpid/peer.rb
index 9e77165d01..320808fdc6 100644
--- a/ruby/qpid/peer.rb
+++ b/ruby/qpid/peer.rb
@@ -39,19 +39,33 @@ module Qpid
@mutex.synchronize do
ch = @channels[id]
if ch.nil?
- ch = Channel.new(id, @outgoing, @conn.spec)
+ ch = Channel.new(id, self, @outgoing, @conn.spec)
@channels[id] = ch
end
return ch
end
end
+ def channel_delete(id)
+ @channels.delete(id)
+ end
+
def start()
spawn(:writer)
spawn(:reader)
spawn(:worker)
end
+ def close()
+ @mutex.synchronize do
+ @channels.each_value do |ch|
+ ch.close()
+ @outgoing.close()
+ @work.close()
+ end
+ end
+ end
+
private
def spawn(method, *args)
@@ -59,6 +73,8 @@ module Qpid
begin
send(method, *args)
# is this the standard way to catch any exception?
+ rescue Closed => e
+ puts "#{method} #{e}"
rescue Object => e
print e
e.backtrace.each do |line|
@@ -94,7 +110,7 @@ module Qpid
ch = channel(frame.channel)
payload = frame.payload
if payload.method.content?
- content = read_content(queue)
+ content = Qpid::read_content(queue)
else
content = nil
end
@@ -106,25 +122,27 @@ module Qpid
end
class Channel
- def initialize(id, outgoing, spec)
+ def initialize(id, peer, outgoing, spec)
@id = id
+ @peer = peer
@outgoing = outgoing
@spec = spec
@incoming = Queue.new()
@responses = Queue.new()
@queue = nil
@closed = false
- @reason = nil
end
+ attr_reader :id
+
def closed?; @closed end
- def close(reason)
+ def close()
return if closed?
+ @peer.channel_delete(@id)
@closed = true
- @reason = reason
@incoming.close()
- @responses .close()
+ @responses.close()
end
def dispatch(frame, work)
@@ -142,7 +160,7 @@ module Qpid
end
def method_missing(name, *args)
- method = @spec.ruby_method(name)
+ method = @spec.find_method(name)
if method.nil?
raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}")
end
@@ -150,7 +168,7 @@ module Qpid
if args.size == 1 and args[0].instance_of? Hash
kwargs = args[0]
invoke_args = method.fields.map do |f|
- kwargs[f.ruby_name]
+ kwargs[f.name]
end
content = kwargs[:content]
else
@@ -173,13 +191,13 @@ module Qpid
end
def invoke(method, args, content = nil)
- raise Closed(@reason) if closed?
+ raise Closed() if closed?
frame = Frame.new(@id, Method.new(method, args))
@outgoing << frame
if method.content?
content = Content.new() if content.nil?
- write_content(method.klass, content, @outgoing)
+ write_content(method.parent, content, @outgoing)
end
nowait = false
@@ -204,7 +222,7 @@ module Qpid
def write_content(klass, content, queue)
size = content.size
- header = Frame.new(@id, Header.new(klass, content.weight, size))
+ header = Frame.new(@id, Header.new(klass, content.weight, size, content.headers))
queue << header
content.children.each {|child| write_content(klass, child, queue)}
queue << Frame.new(@id, Body.new(content.body)) if size > 0
@@ -212,7 +230,7 @@ module Qpid
end
- def read_content(queue)
+ def Qpid.read_content(queue)
frame = queue.pop()
header = frame.payload
children = []
@@ -220,14 +238,30 @@ module Qpid
size = header.size
read = 0
buf = ""
- while read << size
- body = queue.get()
+ while read < size
+ body = queue.pop()
content = body.payload.content
buf << content
read += content.size
end
buf.freeze()
- return Content.new(buf, children, header.properties.clone())
+ return Content.new(header.properties.clone(), buf, children)
+ end
+
+ class Content
+ def initialize(headers = {}, body = "", children = [])
+ @headers = headers
+ @body = body
+ @children = children
+ end
+
+ attr_reader :headers, :body, :children
+
+ def size; body.size end
+ def weight; children.size end
+
+ def [](key); @headers[key] end
+ def []=(key, value); @headers[key] = value end
end
class Message
@@ -235,14 +269,18 @@ module Qpid
alias fields args
+ def method_missing(name)
+ return args[@method.fields[name].id]
+ end
+
def inspect()
- "#{method.ruby_name}(#{args.join(", ")})"
+ "#{method.qname}(#{args.join(", ")})"
end
end
module Delegate
def dispatch(ch, msg)
- send(msg.method.ruby_name, ch, msg)
+ send(msg.method.qname, ch, msg)
end
end