summaryrefslogtreecommitdiff
path: root/ruby/connection.rb
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /ruby/connection.rb
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'ruby/connection.rb')
-rw-r--r--ruby/connection.rb142
1 files changed, 142 insertions, 0 deletions
diff --git a/ruby/connection.rb b/ruby/connection.rb
new file mode 100644
index 0000000000..4c5e54cb32
--- /dev/null
+++ b/ruby/connection.rb
@@ -0,0 +1,142 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require "socket"
+require "codec"
+
+include Codec
+
+module Qpid
+
+ class Connection
+
+ def initialize(host, port, spec)
+ @host = host
+ @port = port
+ @spec = spec
+ end
+
+ attr_reader(:host, :port, :spec)
+
+ def connect()
+ @sock = TCPSocket.open(@host, @port)
+ @out = Encoder.new(@sock)
+ @in = Decoder.new(@sock)
+ end
+
+ def init()
+ @out.write("AMQP")
+ [1, 1, @spec.major, @spec.minor].each {|o|
+ @out.octet(o)
+ }
+ end
+
+ def write(frame)
+ @out.octet(@spec.constants[frame.payload.type].id)
+ @out.short(frame.channel)
+ frame.payload.encode(@out)
+ @out.octet(frame_end)
+ end
+
+ def read()
+ type = @spec.constants[@in.octet()].name
+ channel = @in.short()
+ payload = Payload.decode(type, @spec, @in)
+ oct = @in.octet()
+ if oct != frame_end
+ raise Exception.new("framing error: expected #{frame_end}, got #{oct}")
+ end
+ Frame.new(channel, payload)
+ end
+
+ private
+
+ def frame_end
+ @spec.constants[:"frame end"].id
+ end
+
+ end
+
+ class Frame
+
+ def initialize(channel, payload)
+ @channel = channel
+ @payload = payload
+ end
+
+ attr_reader(:channel, :payload)
+
+ end
+
+ class Payload
+
+ TYPES = {}
+
+ def Payload.singleton_method_added(name)
+ if name == :type
+ TYPES[type] = self
+ end
+ end
+
+ def Payload.decode(type, spec, dec)
+ klass = TYPES[type]
+ klass.decode(spec, dec)
+ end
+
+ end
+
+ class Method < Payload
+
+ def initialize(method, args)
+ if args.size != method.fields.size
+ raise ArgumentError.new("argument mismatch #{method} #{args}")
+ end
+ @method = method
+ @args = args
+ end
+
+ attr_reader(:method, :args)
+
+ def Method.type
+ :"frame method"
+ end
+
+ def type; Method.type end
+
+ def encode(encoder)
+ buf = StringWriter.new()
+ enc = Encoder.new(buf)
+ enc.short(@method.parent.id)
+ enc.short(@method.id)
+ @method.fields.zip(self.args).each {|f, a|
+ enc.encode(f.type, a)
+ }
+ enc.flush()
+ encoder.longstr(buf.to_s)
+ end
+
+ def Method.decode(spec, decoder)
+ buf = decoder.longstr()
+ dec = Decoder.new(StringReader.new(buf))
+ klass = spec.classes[dec.short()]
+ meth = klass.methods[dec.short()]
+ args = meth.fields.map {|f| dec.decode(f.type)}
+ return Method.new(meth, args)
+ end
+
+ end
+
+end