# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # require 'rbconfig' module Qpid class Delegate def initialize(connection, args={}) @connection = connection @spec = connection.spec @delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) @control = @spec[:track].enum[:control].value end def log ; Qpid::logger["qpid.io.ctl"]; end def received(seg) ssn = @connection.attached[seg.channel] unless ssn ch = Qpid::Connection::Channel.new(@connection, seg.channel) else ch = ssn.channel end if seg.track == @control ctl = seg.decode(@spec) log.debug("RECV %s", ctl) if log attr = ctl.type.name method(attr).call(ch, ctl) elsif ssn.nil? ch.session_detached else ssn.received(seg) end end def connection_close(ch, close) @connection.close_code = [close.reply_code, close.reply_text] ch.connection_close_ok @connection.sock.close_write() unless @connection.opened @connection.failed = true @connection.signal end end def connection_close_ok(ch, close_ok) @connection.opened = false @connection.signal end def session_attach(ch, a) begin @connection.attach(a.name, ch, @delegate, a.force) ch.session_attached(a.name) rescue Qpid::ChannelBusy ch.session_detached(a.name) rescue Qpid::SessionBusy ch.session_detached(a.name) end end def session_attached(ch, a) ch.session.signal end def session_detach(ch, d) #send back the confirmation of detachment before removing the #channel from the attached set; this avoids needing to hold the #connection lock during the sending of this control and ensures #that if the channel is immediately reused for a new session the #attach request will follow the detached notification. ch.session_detached(d.name) ssn = @connection.detach(d.name, ch) end def session_detached(ch, d) @connection.detach(d.name, ch) end def session_request_timeout(ch, rt) ch.session_timeout(rt.timeout) end def session_command_point(ch, cp) ssn = ch.session ssn.receiver.next_id = cp.command_id ssn.receiver.next_offset = cp.command_offset end def session_completed(ch, cmp) ch.session.sender.has_completed(cmp.commands) if cmp.timely_reply ch.session_known_completed(cmp.commands) end ch.session.signal end def session_known_completed(ch, kn_cmp) ch.session.receiver.known_completed(kn_cmp.commands) end def session_flush(ch, f) rcv = ch.session.receiver if f.expected if rcv.next_id exp = Qpid::RangedSet.new(rcv.next_id) else exp = nil end ch.session_expected(exp) end if f.confirmed ch.session_confirmed(rcv.completed) end if f.completed ch.session_completed(rcv.completed) end end class Server < Delegate def start @connection.read_header() @connection.write_header(@spec.major, @spec.minor) ch = Qpid::Connection::Channel.new(@connection, 0) ch.connection_start(:mechanisms => ["ANONYMOUS"]) ch end def connection_start_ok(ch, start_ok) ch.connection_tune(:channel_max => 65535) end def connection_tune_ok(ch, tune_ok) nil end def connection_open(ch, open) @connection.opened = true ch.connection_open_ok() @connection.signal end end class Client < Delegate # FIXME: Python uses os.name for platform - we don't have an exact # analog in Ruby PROPERTIES = {"product" => "qpid python client", "version" => "development", "platform" => Config::CONFIG["build_os"]} def initialize(connection, args) super(connection) @username = args[:username] || "guest" @password = args[:password] || "guest" @mechanism= args[:mechanism] || "PLAIN" end def start @connection.write_header(@spec.major, @spec.minor) @connection.read_header end def connection_start(ch, start) r = "\0%s\0%s" % [@username, @password] ch.connection_start_ok(:client_properties => PROPERTIES, :mechanism => @mechanism, :response => r) end def connection_tune(ch, tune) ch.connection_tune_ok() ch.connection_open() end def connection_open_ok(ch, open_ok) @connection.opened = true @connection.signal end end end end