diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2006-09-29 14:54:37 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-29 14:54:37 +0000 |
| commit | ce57f0694a13529ab6feb8d10a7f4c8935368bb0 (patch) | |
| tree | 87dcebe88c2bfc5902804bebc1405d019ffe29f9 /ruby/qpid/connection.rb | |
| parent | 3fedfb514c3d2ce6b9cb02a30465b6bfd3a37cf2 (diff) | |
| download | qpid-python-ce57f0694a13529ab6feb8d10a7f4c8935368bb0.tar.gz | |
moved ruby code into a qpid package
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@451317 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'ruby/qpid/connection.rb')
| -rw-r--r-- | ruby/qpid/connection.rb | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/ruby/qpid/connection.rb b/ruby/qpid/connection.rb new file mode 100644 index 0000000000..da2b0e1de7 --- /dev/null +++ b/ruby/qpid/connection.rb @@ -0,0 +1,142 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "socket" +require "qpid/codec" + +include Codec + +module Qpid + + class Connection + + def initialize(host, port, spec) + @host = host + @port = port + @spec = spec + end + + attr_reader(:host, :port, :spec) + + def connect() + @sock = TCPSocket.open(@host, @port) + @out = Encoder.new(@sock) + @in = Decoder.new(@sock) + end + + def init() + @out.write("AMQP") + [1, 1, @spec.major, @spec.minor].each {|o| + @out.octet(o) + } + end + + def write(frame) + @out.octet(@spec.constants[frame.payload.type].id) + @out.short(frame.channel) + frame.payload.encode(@out) + @out.octet(frame_end) + end + + def read() + type = @spec.constants[@in.octet()].name + channel = @in.short() + payload = Payload.decode(type, @spec, @in) + oct = @in.octet() + if oct != frame_end + raise Exception.new("framing error: expected #{frame_end}, got #{oct}") + end + Frame.new(channel, payload) + end + + private + + def frame_end + @spec.constants[:"frame end"].id + end + + end + + class Frame + + def initialize(channel, payload) + @channel = channel + @payload = payload + end + + attr_reader(:channel, :payload) + + end + + class Payload + + TYPES = {} + + def Payload.singleton_method_added(name) + if name == :type + TYPES[type] = self + end + end + + def Payload.decode(type, spec, dec) + klass = TYPES[type] + klass.decode(spec, dec) + end + + end + + class Method < Payload + + def initialize(method, args) + if args.size != method.fields.size + raise ArgumentError.new("argument mismatch #{method} #{args}") + end + @method = method + @args = args + end + + attr_reader(:method, :args) + + def Method.type + :"frame method" + end + + def type; Method.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@method.parent.id) + enc.short(@method.id) + @method.fields.zip(self.args).each {|f, a| + enc.encode(f.type, a) + } + enc.flush() + encoder.longstr(buf.to_s) + end + + def Method.decode(spec, decoder) + buf = decoder.longstr() + dec = Decoder.new(StringReader.new(buf)) + klass = spec.classes[dec.short()] + meth = klass.methods[dec.short()] + args = meth.fields.map {|f| dec.decode(f.type)} + return Method.new(meth, args) + end + + end + +end |
