diff options
Diffstat (limited to 'ruby/lib/qpid/session.rb')
-rw-r--r-- | ruby/lib/qpid/session.rb | 458 |
1 files changed, 0 insertions, 458 deletions
diff --git a/ruby/lib/qpid/session.rb b/ruby/lib/qpid/session.rb deleted file mode 100644 index d693b722c2..0000000000 --- a/ruby/lib/qpid/session.rb +++ /dev/null @@ -1,458 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -require 'monitor' - -module Qpid - - class Session < Invoker - - def log; Qpid::logger["qpid.io.cmd"]; end - def msg; Qpid::logger["qpid.io.msg"]; end - - - class Exception < RuntimeError; end - class Closed < Qpid::Session::Exception; end - class Detached < Qpid::Session::Exception; end - - - INCOMPLETE = Object.new - - def self.client(*args) - return Qpid::Client(*args) - end - - def self.server(*args) - return Server(*args) - end - - attr_reader :name, :spec, :auto_sync, :timeout, :channel - attr_reader :results, :exceptions - attr_accessor :channel, :auto_sync, :send_id, :receiver, :sender - - # FIXME: Pass delegate through a block ? - def initialize(name, spec, kwargs = {}) - auto_sync = true - auto_sync = kwargs[:auto_sync] if kwargs.key?(:auto_sync) - timeout = kwargs[:timeout] || 10 - delegate = kwargs[:delegate] - - @name = name - @spec = spec - @auto_sync = auto_sync - @timeout = timeout - @invoke_lock = Monitor.new - @closing = false - @closed = false - - @cond_lock = Monitor.new - @condition = @cond_lock.new_cond - - @send_id = true - @receiver = Receiver.new(self) - @sender = Sender.new(self) - - @lock = Monitor.new - @incoming = {} - @results = {} - @exceptions = [] - - @assembly = nil - - @delegate = delegate.call(self) if delegate - - @ctl_seg = spec[:segment_type].enum[:control].value - @cmd_seg = spec[:segment_type].enum[:command].value - @hdr_seg = spec[:segment_type].enum[:header].value - @body_seg = spec[:segment_type].enum[:body].value - end - - def incoming(destination) - @lock.synchronize do - queue = @incoming[destination] - unless queue - queue = Incoming.new(self, destination) - @incoming[destination] = queue - end - return queue - end - end - - def error? - @exceptions.size > 0 - end - - def sync(timeout=nil) - if channel && Thread.current == channel.connection.thread - raise Qpid::Session::Exception, "deadlock detected" - end - unless @auto_sync - execution_sync(:sync => true) - end - last = @sender.next_id - 1 - @cond_lock.synchronize do - unless @condition.wait_for(timeout) { - @sender.completed.include?(last) || error? - } - raise Qpid::Timeout - end - end - if error? - raise Qpid::Session::Exception, @exceptions - end - end - - def close(timeout=nil) - @invoke_lock.synchronize do - @closing = true - channel.session_detach(name) - end - @cond_lock.synchronize do - unless @condition.wait_for(timeout) { @closed } - raise Qpid::Timeout - end - end - end - - def closed - @lock.synchronize do - return if @closed - - @results.each { |id, f| f.error(exceptions) } - @results.clear - - @incoming.values.each { |q| q.close(exceptions) } - @closed = true - @cond_lock.synchronize { @condition.signal } - end - end - - def resolve_method(name) - o = @spec.children[name] - case o - when Qpid::Spec010::Command - return invocation(:method, o) - when Qpid::Spec010::Struct - return invocation(:method, o) - when Qpid::Spec010::Domain - return invocation(:value, o.enum) unless o.enum.nil? - end - - matches = @spec.children.select { |x| - x.name.to_s.include?(name.to_s) - }.collect { |x| x.name.to_s }.sort - if matches.size == 0 - msg = nil - elsif matches.size == 1 - msg = "Did you mean #{matches[0]} ? " - else - msg = "Did you mean one of #{matches.join(",")} ? " - end - return invocation(:error, msg) - end - - def invoke(type, args) - # XXX - unless type.respond_to?(:track) - return type.create(*args) - end - @invoke_lock.synchronize do - return do_invoke(type, args) - end - end - - def do_invoke(type, args) - raise Qpid::Session::Closed if @closing - raise Qpid::Session::Detached unless channel - - # Clumsy simulation of Python's keyword args - kwargs = {} - if args.size > 0 && args[-1].is_a?(Hash) - if args.size > type.fields.size - kwargs = args.pop - elsif type.fields[args.size - 1].type != @spec[:map] - kwargs = args.pop - end - end - - if type.payload - if args.size == type.fields.size + 1 - message = args.pop - else - message = kwargs.delete(:message) # XXX Really ? - end - else - message = nil - end - - hdr = Qpid::struct(@spec[:header]) - hdr.sync = @auto_sync || kwargs.delete(:sync) - - cmd = type.create(*args.push(kwargs)) - sc = Qpid::StringCodec.new(@spec) - sc.write_command(hdr, cmd) - - seg = Segment.new(true, (message.nil? || - (message.headers.nil? && message.body.nil?)), - type.segment_type, type.track, @channel.id, sc.encoded) - - unless type.result.nil? - result = Future.new(exception=Exception) - @results[@sender.next_id] = result - end - emit(seg) - - log.debug("SENT %s %s %s" % [seg.id, hdr, cmd]) if log - - unless message.nil? - unless message.headers.nil? - sc = Qpid::StringCodec.new(@spec) - message.headers.each { |st| sc.write_struct32(st) } - - seg = Segment.new(false, message.body.nil?, @hdr_seg, - type.track, @channel.id, sc.encoded) - emit(seg) - end - unless message.body.nil? - seg = Segment.new(false, true, @body_seg, type.track, - @channel.id, message.body) - emit(seg) - end - msg.debug("SENT %s" % message) if msg - end - - if !type.result.nil? - return @auto_sync ? result.get(@timeout) : result - elsif @auto_sync - sync(@timeout) - end - end - - def received(seg) - @receiver.received(seg) - if seg.first_segment? - raise Qpid::Session::Exception unless @assembly.nil? - @assembly = [] - end - @assembly << seg - if seg.last_segment? - dispatch(@assembly) - @assembly = nil - end - end - - def dispatch(assembly) - hdr = nil - cmd = nil - header = nil - body = nil - assembly.each do |seg| - d = seg.decode(@spec) - case seg.type - when @cmd_seg - hdr, cmd = d - when @hdr_seg - header = d - when @body_seg - body = d - else - raise Qpid::Session::Exception - end - end - log.debug("RECV %s %s %s" % [cmd.id, hdr, cmd]) if log - - if cmd.st_type.payload - result = @delegate.send(cmd.st_type.name, cmd, header, body) - else - result = @delegate.send(cmd.st_type.name, cmd) - end - - unless cmd.st_type.result.nil? - execution_result(cmd.id, result) - end - - if result != INCOMPLETE - assembly.each do |seg| - @receiver.has_completed(seg) - # XXX: don't forget to obey sync for manual completion as well - if hdr.sync - @channel.session_completed(@receiver.completed) - end - end - end - end - - # Python calls this 'send', but that has a special meaning - # in Ruby, so we call it 'emit' - def emit(seg) - @sender.emit(seg) - end - - def signal - @cond_lock.synchronize { @condition.signal } - end - - def wait_for(timeout = nil, &block) - @cond_lock.synchronize { @condition.wait_for(timeout, &block) } - end - - def to_s - "<Session: #{name}, #{channel}>" - end - - class Receiver - - attr_reader :completed - attr_accessor :next_id, :next_offset - - def initialize(session) - @session = session - @next_id = nil - @next_offset = nil - @completed = Qpid::RangedSet.new() - end - - def received(seg) - if @next_id.nil? || @next_offset.nil? - raise Exception, "todo" - end - seg.id = @next_id - seg.offset = @next_offset - if seg.last_segment? - @next_id += 1 - @next_offset = 0 - else - @next_offset += seg.payload.size - end - end - - def has_completed(seg) - if seg.id.nil? - raise ArgumentError, "cannot complete unidentified segment" - end - if seg.last_segment? - @completed.add(seg.id) - end - end - - def known_completed(commands) - completed = Qpid::RangedSet.new() - @completed.ranges.each do |c| - unless commands.ranges.find { |kc| - kc.contains(c.lower) && kc.contains(c.upper) - } - completed.add_range(c) - end - end - @completed = completed - end - end - - class Sender - - def initialize(session) - @session = session - @next_id = 0.to_serial - @next_offset = 0 - @segments = [] - @completed = RangedSet.new() - end - - attr_reader :next_id, :completed - - def emit(seg) - seg.id = @next_id - seg.offset = @next_offset - if seg.last_segment? - @next_id += 1 - @next_offset = 0 - else - @next_offset += seg.payload.size - end - @segments << seg - if @session.send_id - @session.send_id = false - @session.channel.session_command_point(seg.id, seg.offset) - end - @session.channel.connection.write_segment(seg) - end - - def has_completed(commands) - @segments = @segments.reject { |seg| commands.include?(seg.id) } - commands.ranges.each do |range| - @completed.add(range.lower, range.upper) - end - end - end - - class Incoming < Qpid::Queue - - def initialize(session, destination) - super() - @session = session - @destination = destination - end - - def start - @session.message_credit_unit.choices.each do |unit| - @session.message_flow(@destination, unit.value, 0xFFFFFFFF) - end - end - - def stop - @session.message_cancel(@destination) - listen # Kill the listener - end - end - - class Delegate - - def initialize(session) - @session = session - end - - #XXX: do something with incoming accepts - def message_accept(ma) nil; end - - def execution_result(er) - future = @session.results.delete(er.command_id) - future.set(er.value) - end - - def execution_exception(ex) - @session.exceptions << ex - end - end - - class Client < Delegate - - def log ; Qpid::logger["qpid.io.msg"]; end - - def message_transfer(cmd, headers, body) - m = Qpid::Message.new(body) - m.headers = headers - m.id = cmd.id - messages = @session.incoming(cmd.destination) - messages.put(m) - log.debug("RECV %s" % m) if log - return INCOMPLETE - end - end - end -end |