diff options
Diffstat (limited to 'ruby/lib/qpid/framer.rb')
-rw-r--r-- | ruby/lib/qpid/framer.rb | 212 |
1 files changed, 0 insertions, 212 deletions
diff --git a/ruby/lib/qpid/framer.rb b/ruby/lib/qpid/framer.rb deleted file mode 100644 index d057605383..0000000000 --- a/ruby/lib/qpid/framer.rb +++ /dev/null @@ -1,212 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -require 'monitor' -require 'logger' -require 'sasl' - -module Qpid - - FIRST_SEG = 0x08 - LAST_SEG = 0x04 - FIRST_FRM = 0x02 - LAST_FRM = 0x01 - - class << self - attr_accessor :raw_logger, :frm_logger - end - - def self.packed_size(format) - # FIXME: This is a total copout to simulate Python's - # struct.calcsize - ([0]*256).pack(format).size - end - - class Frame - attr_reader :payload, :track, :flags, :type, :channel - - # HEADER = "!2BHxBH4x" - # Python Meaning Ruby - # ! big endian (implied by format char) - # 2B 2 uchar C2 - # H unsigned short n - # x pad byte x - # B uchar C - # H unsigned short n - # 4x pad byte x4 - HEADER = "C2nxCnx4" - HEADER_SIZE = Qpid::packed_size(HEADER) - MAX_PAYLOAD = 65535 - HEADER_SIZE - - def initialize(flags, type, track, channel, payload) - if payload.size > MAX_PAYLOAD - raise ArgumentError, "max payload size exceeded: #{payload.size}" - end - - @flags = flags - @type = type - @track = track - @channel = channel - @payload = payload - end - - def first_segment? ; FIRST_SEG & @flags > 0 ; end - - def last_segment? ; LAST_SEG & @flags > 0 ; end - - def first_frame? ; FIRST_FRM & @flags > 0 ; end - - def last_frame? ; LAST_FRM & @flags > 0 ; end - - def to_s - fs = first_segment? ? 'S' : '.' - ls = last_segment? ? 's' : '.' - ff = first_frame? ? 'F' : '.' - lf = last_frame? ? 'f' : '.' - - return "%s%s%s%s %s %s %s %s" % [fs, ls, ff, lf, - @type, - @track, - @channel, - @payload.inspect] - end - end - - class FramingError < Exception ; end - - class Closed < Exception ; end - - class Framer - include Packer - - # Python: "!4s4B" - HEADER = "a4C4" - HEADER_SIZE = 8 - - def raw - Qpid::raw_logger - end - - def frm - Qpid::frm_logger - end - - def initialize(sock) - @sock = sock - @sock.extend(MonitorMixin) - @tx_buf = "" - @rx_buf = "" - @security_layer_tx = nil - @security_layer_rx = nil - @maxbufsize = 65535 - end - - attr_reader :sock - attr_accessor :security_layer_tx, :security_layer_rx - - def aborted? ; false ; end - - def write(buf) - @tx_buf += buf - end - - def flush - @sock.synchronize do - if @security_layer_tx - cipher_buf = Sasl.encode(@security_layer_tx, @tx_buf) - _write(cipher_buf) - else - _write(@tx_buf) - end - @tx_buf = "" - frm.debug("FLUSHED") if frm - end - rescue - @sock.close unless @sock.closed? - end - - def _write(buf) - while buf && buf.size > 0 - # FIXME: Catch errors - n = @sock.write(buf) - raw.debug("SENT #{buf[0, n].inspect}") if raw - buf[0,n] = "" - @sock.flush - end - end - - def read(n) - while @rx_buf.size < n - begin - s = @sock.recv(@maxbufsize) - if @security_layer_rx - s = Sasl.decode(@security_layer_rx, s) - end - rescue IOError => e - raise e if @rx_buf != "" - @sock.close unless @sock.closed? - raise Closed - end - # FIXME: Catch errors - if s.nil? or s.size == 0 - @sock.close unless @sock.closed? - raise Closed - end - @rx_buf += s - raw.debug("RECV #{n}/#{@rx_buf.size} #{s.inspect}") if raw - end - data = @rx_buf[0, n] - @rx_buf = @rx_buf[n, @rx_buf.size - n] - return data - end - - def read_header - unpack(Framer::HEADER, Framer::HEADER_SIZE) - end - - def write_header(major, minor) - @sock.synchronize do - pack(Framer::HEADER, "AMQP", 1, 1, major, minor) - flush() - end - end - - def write_frame(frame) - @sock.synchronize do - size = frame.payload.size + Frame::HEADER_SIZE - track = frame.track & 0x0F - pack(Frame::HEADER, frame.flags, frame.type, size, track, frame.channel) - write(frame.payload) - if frame.last_segment? and frame.last_frame? - flush() - frm.debug("SENT #{frame}") if frm - end - end - end - - def read_frame - flags, type, size, track, channel = unpack(Frame::HEADER, Frame::HEADER_SIZE) - raise FramingError if (flags & 0xF0 > 0) - payload = read(size - Frame::HEADER_SIZE) - frame = Frame.new(flags, type, track, channel, payload) - frm.debug("RECV #{frame}") if frm - return frame - end - end -end |