From ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Fri, 21 Oct 2011 01:19:00 +0000 Subject: Undo bad merge from trunk - merged at wrong level. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68 --- ruby/LICENSE.txt | 203 + ruby/Makefile | 47 + ruby/NOTICE.txt | 19 + ruby/README.txt | 26 + ruby/RELEASE_NOTES | 10 + ruby/Rakefile | 116 + ruby/examples/hello-world.rb | 61 + ruby/examples/qmf-libvirt.rb | 80 + ruby/ext/sasl/extconf.rb | 28 + ruby/ext/sasl/sasl.c | 472 ++ ruby/lib/qpid.rb | 41 + ruby/lib/qpid/assembler.rb | 148 + ruby/lib/qpid/client.rb | 136 + ruby/lib/qpid/codec.rb | 457 ++ ruby/lib/qpid/codec08.rb | 265 + ruby/lib/qpid/config.rb | 32 + ruby/lib/qpid/connection.rb | 222 + ruby/lib/qpid/connection08.rb | 252 + ruby/lib/qpid/datatypes.rb | 353 ++ ruby/lib/qpid/delegates.rb | 237 + ruby/lib/qpid/fields.rb | 49 + ruby/lib/qpid/framer.rb | 212 + ruby/lib/qpid/invoker.rb | 65 + ruby/lib/qpid/packer.rb | 33 + ruby/lib/qpid/peer.rb | 289 ++ ruby/lib/qpid/qmf.rb | 1957 ++++++++ ruby/lib/qpid/queue.rb | 101 + ruby/lib/qpid/session.rb | 458 ++ ruby/lib/qpid/spec.rb | 183 + ruby/lib/qpid/spec010.rb | 485 ++ ruby/lib/qpid/spec08.rb | 190 + ruby/lib/qpid/specs/amqp.0-10-qpid-errata.xml | 6654 +++++++++++++++++++++++++ ruby/lib/qpid/specs/amqp.0-10.dtd | 246 + ruby/lib/qpid/test.rb | 35 + ruby/lib/qpid/traverse.rb | 64 + ruby/lib/qpid/util.rb | 75 + ruby/tests/assembler.rb | 78 + ruby/tests/codec010.rb | 122 + ruby/tests/connection.rb | 246 + ruby/tests/datatypes.rb | 224 + ruby/tests/framer.rb | 99 + ruby/tests/qmf.rb | 248 + ruby/tests/queue.rb | 80 + ruby/tests/spec010.rb | 80 + ruby/tests/util.rb | 72 + ruby/tests_0-8/basic.rb | 69 + ruby/tests_0-8/channel.rb | 48 + 47 files changed, 15667 insertions(+) create mode 100755 ruby/LICENSE.txt create mode 100644 ruby/Makefile create mode 100644 ruby/NOTICE.txt create mode 100644 ruby/README.txt create mode 100644 ruby/RELEASE_NOTES create mode 100644 ruby/Rakefile create mode 100755 ruby/examples/hello-world.rb create mode 100644 ruby/examples/qmf-libvirt.rb create mode 100644 ruby/ext/sasl/extconf.rb create mode 100644 ruby/ext/sasl/sasl.c create mode 100644 ruby/lib/qpid.rb create mode 100644 ruby/lib/qpid/assembler.rb create mode 100644 ruby/lib/qpid/client.rb create mode 100644 ruby/lib/qpid/codec.rb create mode 100644 ruby/lib/qpid/codec08.rb create mode 100644 ruby/lib/qpid/config.rb create mode 100644 ruby/lib/qpid/connection.rb create mode 100644 ruby/lib/qpid/connection08.rb create mode 100644 ruby/lib/qpid/datatypes.rb create mode 100644 ruby/lib/qpid/delegates.rb create mode 100644 ruby/lib/qpid/fields.rb create mode 100644 ruby/lib/qpid/framer.rb create mode 100644 ruby/lib/qpid/invoker.rb create mode 100644 ruby/lib/qpid/packer.rb create mode 100644 ruby/lib/qpid/peer.rb create mode 100644 ruby/lib/qpid/qmf.rb create mode 100644 ruby/lib/qpid/queue.rb create mode 100644 ruby/lib/qpid/session.rb create mode 100644 ruby/lib/qpid/spec.rb create mode 100644 ruby/lib/qpid/spec010.rb create mode 100644 ruby/lib/qpid/spec08.rb create mode 100644 ruby/lib/qpid/specs/amqp.0-10-qpid-errata.xml create mode 100644 ruby/lib/qpid/specs/amqp.0-10.dtd create mode 100644 ruby/lib/qpid/test.rb create mode 100644 ruby/lib/qpid/traverse.rb create mode 100644 ruby/lib/qpid/util.rb create mode 100644 ruby/tests/assembler.rb create mode 100644 ruby/tests/codec010.rb create mode 100644 ruby/tests/connection.rb create mode 100644 ruby/tests/datatypes.rb create mode 100644 ruby/tests/framer.rb create mode 100644 ruby/tests/qmf.rb create mode 100644 ruby/tests/queue.rb create mode 100644 ruby/tests/spec010.rb create mode 100644 ruby/tests/util.rb create mode 100644 ruby/tests_0-8/basic.rb create mode 100644 ruby/tests_0-8/channel.rb (limited to 'ruby') diff --git a/ruby/LICENSE.txt b/ruby/LICENSE.txt new file mode 100755 index 0000000000..6b0b1270ff --- /dev/null +++ b/ruby/LICENSE.txt @@ -0,0 +1,203 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/ruby/Makefile b/ruby/Makefile new file mode 100644 index 0000000000..9cac3207c0 --- /dev/null +++ b/ruby/Makefile @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +SASL_DIR = ext/sasl +SASL_MODULE = $(SASL_DIR)/sasl.so +RUBY_LIB = lib +SPEC_CACHE_SCRIPT = sc.rb + +.PHONY: spec_cache all clean distclean + +all : build + +$(SASL_MODULE) : $(SASL_DIR)/sasl.c + cd $(SASL_DIR); ruby extconf.rb + $(MAKE) -C $(SASL_DIR) + +spec_cache : + echo "require 'qpid'" > $(SPEC_CACHE_SCRIPT) + echo "Qpid::Spec010::load()" >> $(SPEC_CACHE_SCRIPT) + ruby -I $(RUBY_LIB) -I $(SASL_DIR) $(SPEC_CACHE_SCRIPT) + rm $(SPEC_CACHE_SCRIPT) + +build: $(SASL_MODULE) spec_cache + +clean: + cd $(SASL_DIR); make clean + +distclean: + cd $(SASL_DIR); make distclean + rm -rf $(RUBY_LIB)/qpid/spec_cache + diff --git a/ruby/NOTICE.txt b/ruby/NOTICE.txt new file mode 100644 index 0000000000..fff2bca45c --- /dev/null +++ b/ruby/NOTICE.txt @@ -0,0 +1,19 @@ +========================================================================= +== NOTICE file corresponding to the section 4 d of == +== the Apache License, Version 2.0, == +== in this case for the Apache Qpid distribution. == +========================================================================= + +This product includes software developed by the Apache Software Foundation +(http://www.apache.org/). + +Please read the LICENSE.txt file present in the root directory of this +distribution. + + +Aside from contributions to the Apache Qpid project, this software also +includes (binary only): + + - None at this time. + + diff --git a/ruby/README.txt b/ruby/README.txt new file mode 100644 index 0000000000..330f9c6f61 --- /dev/null +++ b/ruby/README.txt @@ -0,0 +1,26 @@ += Running hello-world.rb = + +The ruby client includes a simple hello-world example that publishes +and consumes a message. You can find this in the examples +directory. This example requires a running broker. + +You can set RUBYLIB to the directories containing the Qpid ruby +library and the SASL extension, then run the example from the command +line. These are found in the ./lib and ./ext/sasl subdirectories. + +$ export RUBYLIB=/home/me/qpid/ruby/lib:/home/me/qpid/ruby/ext/sasl +$ ./hello-world.rb +#, #], @body="Hello World!", @id=#> + +Alternatively, you can specify the library paths using $ ruby -I: + +$ ruby -I /home/me/qpid/ruby/lib:/home/me/qpid/ruby/ext/sasl hello-world.rb +#, #], @body="Hello World!", @id=#> + += Running the Tests = + +The "tests" directory contains a collection of unit tests for the ruby +client. These can be run from the 'ruby' directory with the Rakefile +provided: + +$ rake test diff --git a/ruby/RELEASE_NOTES b/ruby/RELEASE_NOTES new file mode 100644 index 0000000000..90e7297e47 --- /dev/null +++ b/ruby/RELEASE_NOTES @@ -0,0 +1,10 @@ +Apache Qpid Ruby 0.8 Release Notes +--------------------------------- + +The Qpid 0.8 release of the ruby client contains support the for AMQP +0-10 & 0-8 specifications. See: + +http://www.amqp.org/confluence/display/AMQP/AMQP+Specification + +The README file provided contains some details on installing and using +the ruby client that is included with this distribution. diff --git a/ruby/Rakefile b/ruby/Rakefile new file mode 100644 index 0000000000..9b0878813d --- /dev/null +++ b/ruby/Rakefile @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Rakefile for ruby-rpm -*- ruby -*- +require 'rake/clean' +require 'rake/testtask' +require 'rake/gempackagetask' +require 'pathname' + +PKG_NAME='ruby-qpid' +PKG_VERSION='0.10.2' +GEM_NAME='qpid' + +EXT_CONF="ext/sasl/extconf.rb" +MAKEFILE="ext/sasl/Makefile" +SASL_MODULE="ext/sasl/sasl.so" +SASL_SRC=SASL_MODULE.gsub(/.so$/, ".c") + +# +# Additional files for clean/clobber +# + +CLEAN.include [ "**/*~", "lib/*/spec_cache", SASL_MODULE, "ext/**/*.o" ] + +CLOBBER.include [ "config.save", "ext/**/mkmf.log", + MAKEFILE ] + +file MAKEFILE => EXT_CONF do |t| + Dir::chdir(File::dirname(EXT_CONF)) do + unless sh "ruby #{File::basename(EXT_CONF)}" + $stderr.puts "Failed to run extconf" + break + end + end +end + +file SASL_MODULE => [ MAKEFILE, SASL_SRC ] do |t| + Dir::chdir(File::dirname(EXT_CONF)) do + unless sh "make" + $stderr.puts "make failed" + break + end + end +end +desc "Build the native library and AMQP spec cache" +task :build => :spec_cache + +Rake::TestTask.new(:test) do |t| + t.test_files = FileList['tests/*.rb'].exclude("tests/util.rb") + t.libs = [ 'lib', 'ext/sasl' ] +end + +Rake::TestTask.new(:"test_0-8") do |t| + t.test_files = FileList["tests_0-8/*.rb"] + t.libs = [ 'lib', 'ext/sasl' ] +end + +desc "Create cached versions of the AMQP specs" +task :spec_cache => SASL_MODULE do |t| + pid = fork do + $:.insert(0, "lib", "ext/sasl") + require 'qpid' + Qpid::Spec010::load() + end + Process.wait(pid) +end + +# +# Packaging +# + +PKG_FILES = FileList[ + "LICENSE.txt", "NOTICE.txt", + "Rakefile", "RELEASE_NOTES", + "lib/**/*.rb", "lib/**/*.xml", "lib/**/*.dtd", "lib/*/spec_cache/*.rb*", + "tests/**/*", "examples/**", "ext/**/*.[ch]", "ext/**/MANIFEST", + "ext/**/extconf.rb" +] + +DIST_FILES = FileList[ + "pkg/*.tgz", "pkg/*.gem" +] + +SPEC = Gem::Specification.new do |s| + s.name = GEM_NAME + s.version = PKG_VERSION + s.email = "dev@qpid.apache.org" + s.homepage = "http://cwiki.apache.org/qpid/" + s.summary = "Ruby client for Qpid" + s.files = PKG_FILES + s.required_ruby_version = '>= 1.8.1' + s.description = "Ruby client for Qpid" + s.extensions << 'ext/sasl/extconf.rb' +end + +Rake::GemPackageTask.new(SPEC) do |pkg| + task pkg.package_dir => [ :spec_cache ] + pkg.need_tar = true + pkg.need_zip = true +end diff --git a/ruby/examples/hello-world.rb b/ruby/examples/hello-world.rb new file mode 100755 index 0000000000..e8ef673316 --- /dev/null +++ b/ruby/examples/hello-world.rb @@ -0,0 +1,61 @@ +#!/usr/bin/ruby +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "qpid" +require "socket" + +broker = if ARGV.length > 0 then ARGV[0] else "localhost" end +port = if ARGV.length > 1 then ARGV[1].to_i else 5672 end +if ARGV.length > 2 then + puts "usage: hello-world.rb [ [ ] ]" + exit 1 +end + +conn = Qpid::Connection.new(TCPSocket.new(broker, port)) +conn.start(10) + +ssn = conn.session("test") + +# create a queue +ssn.queue_declare("test-queue") + +ssn.exchange_declare("test-exchange", :type => "direct") + +# Publish a message +dp = ssn.delivery_properties(:routing_key => "test-queue") +mp = ssn.message_properties(:content_type => "text/plain") +msg = Qpid::Message.new(dp, mp, "Hello World!") +ssn.message_transfer(:message => msg) + +# subscribe to a queue +ssn.message_subscribe(:destination => "messages", :queue => "test-queue", + :accept_mode => ssn.message_accept_mode.none) +incoming = ssn.incoming("messages") + +# start incoming message flow +incoming.start() + +# grab a message from the queue +p incoming.get(10) + +# cancel the subscription and close the session and connection +ssn.message_cancel(:destination => "messages") +ssn.close() +conn.close() diff --git a/ruby/examples/qmf-libvirt.rb b/ruby/examples/qmf-libvirt.rb new file mode 100644 index 0000000000..492f4fe8d6 --- /dev/null +++ b/ruby/examples/qmf-libvirt.rb @@ -0,0 +1,80 @@ +#!/usr/bin/ruby +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "qpid" + +s = Qpid::Qmf::Session.new() +b = s.add_broker("amqp://localhost:5672") + +while true: + nodes = s.objects(:class => "node") + nodes.each do |node| + puts "node: #{node.hostname}" + for (key, val) in node.properties + puts " property: #{key}, #{val}" + end + + # Find any domains that on the current node. + domains = s.objects(:class => "domain", 'node' => node.object_id) + domains.each do |domain| + r = domain.getXMLDesc() + puts "status: #{r.status}" + if r.status == 0 + puts "xml description: #{r.description}" + puts "length: #{r.description.length}" + end + + puts " domain: #{domain.name}, state: #{domain.state}, id: #{domain.id}" + for (key, val) in domain.properties + puts " property: #{key}, #{val}" + end + end + + pools = s.objects(:class => "pool", 'node' => node.object_id) + pools.each do |pool| + puts " pool: #{pool.name}" + for (key, val) in pool.properties + puts " property: #{key}, #{val}" + end + + r = pool.getXMLDesc() + puts "status: #{r.status}" + puts "text: #{r.text}" + if r.status == 0 + puts "xml description: #{r.description}" + puts "length: #{r.description.length}" + end + + # Find volumes that are part of the pool. + volumes = s.objects(:class => "volume", 'pool' => pool.object_id) + volumes.each do |volume| + puts " volume: #{volume.name}" + for (key, val) in volume.properties + puts " property: #{key}, #{val}" + end + end + end + + end + + puts '----------------------------' + sleep(5) + +end diff --git a/ruby/ext/sasl/extconf.rb b/ruby/ext/sasl/extconf.rb new file mode 100644 index 0000000000..56841f34e3 --- /dev/null +++ b/ruby/ext/sasl/extconf.rb @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +require 'mkmf' + +extension_name = 'sasl' +have_library("c", "main") + +unless have_library("sasl2") + raise "Package cyrus-sasl-devel not found" +end + +create_makefile(extension_name) diff --git a/ruby/ext/sasl/sasl.c b/ruby/ext/sasl/sasl.c new file mode 100644 index 0000000000..2d4e40d30e --- /dev/null +++ b/ruby/ext/sasl/sasl.c @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +#include +#include +#include +#include +#include +#include + +static VALUE mSasl; + +#define INPUT_SIZE 512 +#define MECH_SIZE 32 + +typedef void* sasl_context_t; + +#define QSASL_OK 0 +#define QSASL_CONTINUE 1 +#define QSASL_FAILED 2 + +typedef struct { + char magic[8]; + sasl_conn_t* conn; + sasl_callback_t callbacks[8]; + char* userName; + char* password; + char* operUserName; + unsigned int minSsf; + unsigned int maxSsf; + char mechanism[MECH_SIZE]; + char input[INPUT_SIZE]; +} context_t; + +// +// Resolve forward references +// +static VALUE qsasl_free(int, VALUE*, VALUE); + +// +// Validate an input string to ensure that it is either NULL or of reasonable size. +// +static int qsasl_valid(char* str) +{ + int idx; + + if (str == 0) + return 1; + + for (idx = 0; idx < INPUT_SIZE; idx++) { + if (str[idx] == '\0') + return 1; + } + + return 0; +} + +// +// SASL callback for identity and authentication identity. +// +static int qsasl_cb_user(void* _context, int id, const char **result, unsigned *len) +{ + context_t* context = (context_t*) _context; + + if (context->userName) + *result = context->userName; + + return SASL_OK; +} + +// +// SASL callback for passwords. +// +static int qsasl_cb_password(sasl_conn_t* conn, void* _context, int id, sasl_secret_t **psecret) +{ + context_t* context = (context_t*) _context; + sasl_secret_t* secret; + size_t length; + + if (context->password) + length = strlen(context->password); + else + length = 0; + + secret = (sasl_secret_t*) malloc(sizeof(sasl_secret_t) + length); + secret->len = length; + if (length) + memcpy(secret->data, context->password, length); + *psecret = secret; + + return SASL_OK; +} + +// +// Interactively prompt the user for authentication data. +// +static void qsasl_prompt(sasl_context_t _context, sasl_interact_t* interact) +{ + context_t* context = (context_t*) _context; + char *pass; + char *input; + char passwdPrompt[100]; + + if (interact->id == SASL_CB_PASS) { + strncpy(passwdPrompt, interact->prompt, 95); + strcat(passwdPrompt, ": "); + pass = getpass(passwdPrompt); + strncpy(context->input, pass, INPUT_SIZE - 1); + context->input[INPUT_SIZE - 1] = '\0'; + } else { + printf(interact->prompt); + if (interact->defresult) { + printf(" (%s)", interact->defresult); + } + printf(": "); + input = fgets(context->input, INPUT_SIZE, stdin); + if (input != context->input) { + rb_raise(rb_eRuntimeError, "Unexpected EOF on interactive prompt"); + } + } + + interact->result = context->input; + interact->len = strlen(context->input); +} + +// +// Initialize the SASL client library. +// +static VALUE qsasl_client_init() +{ + int result; + + result = sasl_client_init(0); + if (result != SASL_OK) + rb_raise(rb_eRuntimeError, + "sasl_client_init failed: %d - %s", + result, sasl_errstring(result, -0, 0)); + return Qnil; +} + +// +// Allocate a new SASL client context. +// +static VALUE qsasl_client_new(int argc, VALUE *argv, VALUE obj) +{ + char* mechanism = 0; + char* serviceName = 0; + char* hostName = 0; + char* userName = 0; + char* password = 0; + unsigned int minSsf = 0; + unsigned int maxSsf = 65535; + + int result; + int i = 0; + context_t *context; + sasl_security_properties_t secprops; + + if (argc != 7) + rb_raise(rb_eRuntimeError, "Wrong number of arguments"); + + if (!NIL_P(argv[0])) + mechanism = StringValuePtr(argv[0]); + if (!NIL_P(argv[1])) + serviceName = StringValuePtr(argv[1]); + if (!NIL_P(argv[2])) + hostName = StringValuePtr(argv[2]); + if (!NIL_P(argv[3])) + userName = StringValuePtr(argv[3]); + if (!NIL_P(argv[4])) + password = StringValuePtr(argv[4]); + minSsf = FIX2INT(argv[5]); + maxSsf = FIX2INT(argv[6]); + + if (!qsasl_valid(mechanism) || !qsasl_valid(serviceName) || + !qsasl_valid(hostName) || !qsasl_valid(userName) || + !qsasl_valid(password)) { + rb_raise(rb_eRuntimeError, "Invalid string argument"); + } + + context = (context_t*) malloc(sizeof(context_t)); + memset(context, 0, sizeof(context_t)); + strcpy(context->magic, "QSASL01"); + + context->minSsf = minSsf; + context->maxSsf = maxSsf; + if (mechanism != 0) { + strncpy(context->mechanism, mechanism, MECH_SIZE - 1); + context->mechanism[MECH_SIZE - 1] = '\0'; + } + + context->callbacks[i].id = SASL_CB_GETREALM; + context->callbacks[i].proc = 0; + context->callbacks[i++].context = 0; + + if (userName != 0 && userName[0] != '\0') { + context->userName = (char*) malloc(strlen(userName) + 1); + strcpy(context->userName, userName); + + context->callbacks[i].id = SASL_CB_USER; + context->callbacks[i].proc = qsasl_cb_user; + context->callbacks[i++].context = context; + + context->callbacks[i].id = SASL_CB_AUTHNAME; + context->callbacks[i].proc = qsasl_cb_user; + context->callbacks[i++].context = context; + } + + context->callbacks[i].id = SASL_CB_PASS; + if (password != 0 && password[0] != '\0') { + context->password = (char*) malloc(strlen(password) + 1); + strcpy(context->password, password); + + context->callbacks[i].proc = qsasl_cb_password; + } else + context->callbacks[i].proc = 0; + context->callbacks[i++].context = context; + + context->callbacks[i].id = SASL_CB_LIST_END; + context->callbacks[i].proc = 0; + context->callbacks[i++].context = 0; + + result = sasl_client_new(serviceName, hostName, 0, 0, + context->callbacks, 0, &context->conn); + + if (result != SASL_OK) { + context->conn = 0; + qsasl_free(1, (VALUE*) &context, Qnil); + rb_raise(rb_eRuntimeError, "sasl_client_new failed: %d - %s", + result, sasl_errstring(result, 0, 0)); + } + + secprops.min_ssf = minSsf; + secprops.max_ssf = maxSsf; + secprops.maxbufsize = 65535; + secprops.property_names = 0; + secprops.property_values = 0; + secprops.security_flags = 0;//TODO: provide means for application to configure these + + result = sasl_setprop(context->conn, SASL_SEC_PROPS, &secprops); + if (result != SASL_OK) { + qsasl_free(1, (VALUE*) &context, Qnil); + rb_raise(rb_eRuntimeError, "sasl_setprop failed: %d - %s", + result, sasl_errdetail(context->conn)); + } + + return (VALUE) context; +} + +// +// Free a SASL client context. +// +static VALUE qsasl_free(int argc, VALUE *argv, VALUE obj) +{ + context_t* context; + + if (argc == 1) + context = (context_t*) argv[0]; + else + rb_raise(rb_eRuntimeError, "Wrong Number of Arguments"); + + if (context->conn) + sasl_dispose(&context->conn); + if (context->userName) + free(context->userName); + if (context->password) + free(context->password); + if (context->operUserName) + free(context->operUserName); + free(context); + + return Qnil; +} + +// +// Start the SASL exchange from the client's point of view. +// +static VALUE qsasl_client_start(int argc, VALUE *argv, VALUE obj) +{ + context_t* context; + char* mechList; + char* mechToUse; + int result; + int propResult; + const char* response; + unsigned int len; + sasl_interact_t* interact = 0; + const char* chosen; + const char* operName; + + if (argc == 2) { + context = (context_t*) argv[0]; + mechList = StringValuePtr(argv[1]); + } else + rb_raise(rb_eRuntimeError, "Wrong Number of Arguments"); + + if (strlen(context->mechanism) == 0) + mechToUse = mechList; + else + mechToUse = context->mechanism; + + do { + result = sasl_client_start(context->conn, mechToUse, &interact, + &response, &len, &chosen); + if (result == SASL_INTERACT) { + qsasl_prompt(context, interact); + } + } while (result == SASL_INTERACT); + + if (result != SASL_OK && result != SASL_CONTINUE) + rb_raise(rb_eRuntimeError, "sasl_client_start failed: %d - %s", + result, sasl_errdetail(context->conn)); + + if (result == SASL_OK) { + propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName); + if (propResult == SASL_OK) { + context->operUserName = (char*) malloc(strlen(operName) + 1); + strcpy(context->operUserName, operName); + } + } + + return rb_ary_new3(3, INT2NUM(result), rb_str_new(response, len), rb_str_new2(chosen)); +} + +// +// Take a step in the SASL exchange (only needed for multi-challenge mechanisms). +// +static VALUE qsasl_client_step(int argc, VALUE *argv, VALUE obj) +{ + context_t* context; + VALUE challenge; + int result; + int propResult; + const char* response; + const char* operName; + unsigned int len; + sasl_interact_t* interact = 0; + + if (argc == 2) { + context = (context_t*) argv[0]; + challenge = argv[1]; + } + else + rb_raise(rb_eRuntimeError, "Wrong Number of Arguments"); + + do { + result = sasl_client_step(context->conn, + RSTRING(challenge)->ptr, RSTRING(challenge)->len, + &interact, &response, &len); + if (result == SASL_INTERACT) { + qsasl_prompt(context, interact); + } + } while (result == SASL_INTERACT); + + if (result != SASL_OK && result != SASL_CONTINUE) + return QSASL_FAILED; + + if (result == SASL_OK) { + propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName); + if (propResult == SASL_OK) { + context->operUserName = (char*) malloc(strlen(operName) + 1); + strcpy(context->operUserName, operName); + } + } + + return rb_ary_new3(2, INT2NUM(result), rb_str_new(response, len)); +} + +static VALUE qsasl_user_id(int argc, VALUE *argv, VALUE obj) +{ + context_t* context; + + if (argc == 1) { + context = (context_t*) argv[0]; + } else { + rb_raise(rb_eRuntimeError, "Wrong Number of Arguments"); + } + + if (context->operUserName) + return rb_str_new2(context->operUserName); + + return Qnil; +} + +// +// Encode transport data for the security layer. +// +static VALUE qsasl_encode(int argc, VALUE *argv, VALUE obj) +{ + context_t* context; + VALUE clearText; + const char* outBuffer; + unsigned int outSize; + int result; + + if (argc == 2) { + context = (context_t*) argv[0]; + clearText = argv[1]; + } + else + rb_raise(rb_eRuntimeError, "Wrong Number of Arguments"); + + result = sasl_encode(context->conn, + RSTRING(clearText)->ptr, RSTRING(clearText)->len, + &outBuffer, &outSize); + if (result != SASL_OK) + rb_raise(rb_eRuntimeError, "sasl_encode failed: %d - %s", + result, sasl_errdetail(context->conn)); + + return rb_str_new(outBuffer, outSize); +} + +// +// Decode transport data for the security layer. +// +static VALUE qsasl_decode(int argc, VALUE *argv, VALUE obj) +{ + context_t* context; + VALUE cipherText; + const char* outBuffer; + unsigned int outSize; + int result; + + if (argc == 2) { + context = (context_t*) argv[0]; + cipherText = argv[1]; + } + else + rb_raise(rb_eRuntimeError, "Wrong Number of Arguments"); + + result = sasl_decode(context->conn, + RSTRING(cipherText)->ptr, RSTRING(cipherText)->len, + &outBuffer, &outSize); + if (result != SASL_OK) + rb_raise(rb_eRuntimeError, "sasl_decode failed: %d - %s", + result, sasl_errdetail(context->conn)); + + return rb_str_new(outBuffer, outSize); +} + +// +// Initialize the Sasl module. +// +void Init_sasl() +{ + mSasl = rb_define_module("Sasl"); + + rb_define_module_function(mSasl, "client_init", qsasl_client_init, -1); + rb_define_module_function(mSasl, "client_new", qsasl_client_new, -1); + rb_define_module_function(mSasl, "free", qsasl_free, -1); + rb_define_module_function(mSasl, "client_start", qsasl_client_start, -1); + rb_define_module_function(mSasl, "client_step", qsasl_client_step, -1); + rb_define_module_function(mSasl, "user_id", qsasl_user_id, -1); + rb_define_module_function(mSasl, "encode", qsasl_encode, -1); + rb_define_module_function(mSasl, "decode", qsasl_decode, -1); +} diff --git a/ruby/lib/qpid.rb b/ruby/lib/qpid.rb new file mode 100644 index 0000000000..1c719e9b1d --- /dev/null +++ b/ruby/lib/qpid.rb @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + def self.logger + @logger ||= {} + @logger + end +end + +require "qpid/util" +require "qpid/queue" +require "qpid/packer" +require "qpid/framer" +require "qpid/codec" +require 'qpid/datatypes' +require 'qpid/spec010' +require 'qpid/delegates' +require 'qpid/invoker' +require "qpid/assembler" +require 'qpid/session' +require "qpid/connection" +require "qpid/spec" +require 'qpid/queue' +require 'qpid/qmf' diff --git a/ruby/lib/qpid/assembler.rb b/ruby/lib/qpid/assembler.rb new file mode 100644 index 0000000000..b768c3f195 --- /dev/null +++ b/ruby/lib/qpid/assembler.rb @@ -0,0 +1,148 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + + class << self + attr_accessor :asm_logger + end + + class Segment + + attr_reader :type, :payload, :track, :channel + attr_accessor :id, :offset + + def initialize(first, last, type, track, channel, payload) + @id = nil + @offset = nil + @first = first + @last = last + @type = type + @track = track + @channel = channel + @payload = payload + end + + def first_segment? ; @first ; end + + def last_segment? ; @last ; end + + def decode(spec) + segs = spec[:segment_type] + choice = segs.enum.choices[type] + return method("decode_#{choice.name}").call(spec) + end + + def decode_control(spec) + sc = StringCodec.new(spec, payload) + return sc.read_control() + end + + def decode_command(spec) + sc = StringCodec.new(spec, payload) + hdr, cmd = sc.read_command() + cmd.id = id + return hdr, cmd + end + + def decode_header(spec) + sc = StringCodec.new(spec, payload) + values = [] + until sc.encoded.empty? + values << sc.read_struct32() + end + return values + end + + def decode_body(spec) + payload + end + + def append(frame) + @payload += frame.payload + end + + def to_s + f = first_segment? ? 'F' : '.' + l = last_segment? ? 'L' : '.' + return "%s%s %s %s %s %s" % [f, l, @type, + @track, @channel, @payload.inspect] + end + + end + + class Assembler < Framer + + def logger; Qpid::asm_logger; end + + def initialize(sock, max_payload = Frame::MAX_PAYLOAD) + super(sock) + @max_payload = max_payload + @fragments = {} + end + + def read_segment + loop do + frame = read_frame + key = [frame.channel, frame.track] + seg = @fragments[key] + unless seg + seg = Segment.new(frame.first_segment?, + frame.last_segment?, + frame.type, frame.track, + frame.channel, "") + @fragments[key] = seg + end + + seg.append(frame) + + if frame.last_frame? + @fragments.delete(key) + logger.debug("RECV #{seg}") if logger + return seg + end + end + end + + def write_segment(segment) + remaining = segment.payload + + first = true + while first or remaining + payload = remaining[0, @max_payload] + remaining = remaining[@max_payload, remaining.size] + + flags = 0 + + flags |= FIRST_FRM if first + flags |= LAST_FRM unless remaining + flags |= FIRST_SEG if segment.first_segment? + flags |= LAST_SEG if segment.last_segment? + + frame = Frame.new(flags, segment.type, segment.track, + segment.channel, payload) + write_frame(frame) + + first = false + end + + logger.debug("SENT #{segment}") if logger + end + end +end diff --git a/ruby/lib/qpid/client.rb b/ruby/lib/qpid/client.rb new file mode 100644 index 0000000000..ec3d100a9c --- /dev/null +++ b/ruby/lib/qpid/client.rb @@ -0,0 +1,136 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "thread" +require "qpid/peer" +require "qpid/queue" + +module Qpid08 + + class Client + def initialize(host, port, spec, vhost = "/") + @host = host + @port = port + @spec = spec + @vhost = vhost + + @mechanism = nil + @response = nil + @locale = nil + + @queues = {} + @mutex = Mutex.new() + + @closed = false + @code = nil + @started = ConditionVariable.new() + + @conn = Connection.new(@host, @port, @spec) + @peer = Peer.new(@conn, ClientDelegate.new(self)) + end + + attr_reader :mechanism, :response, :locale + + def closed?; @closed end + def closed=(value); @closed = value end + def code; @code end + + def wait() + @mutex.synchronize do + @started.wait(@mutex) + end + raise EOFError.new() if closed? + end + + def signal_start() + @started.broadcast() + end + + def queue(key) + @mutex.synchronize do + q = @queues[key] + if q.nil? + q = Queue.new() + @queues[key] = q + end + return q + end + end + + def start(response, mechanism="AMQPLAIN", locale="en_US") + @response = response + @mechanism = mechanism + @locale = locale + + @conn.connect() + @conn.init() + @peer.start() + wait() + channel(0).connection_open(@vhost) + end + + def channel(id) + return @peer.channel(id) + end + + def close(msg = nil) + @closed = true + @code = msg + @peer.close() + end + end + + class ClientDelegate + + include Delegate + + def initialize(client) + @client = client + end + + def connection_start(ch, msg) + ch.connection_start_ok(:mechanism => @client.mechanism, + :response => @client.response, + :locale => @client.locale) + end + + def connection_tune(ch, msg) + ch.connection_tune_ok(*msg.fields) + @client.signal_start() + end + + def connection_close(ch, msg) + puts "CONNECTION CLOSED: #{msg.args.join(", ")}" + @client.close(msg) + end + + def channel_close(ch, msg) + puts "CHANNEL[#{ch.id}] CLOSED: #{msg.args.join(", ")}" + ch.channel_close_ok() + ch.close() + end + + def basic_deliver(ch, msg) + queue = @client.queue(msg.consumer_tag) + queue << msg + end + + end + +end diff --git a/ruby/lib/qpid/codec.rb b/ruby/lib/qpid/codec.rb new file mode 100644 index 0000000000..a3b5d101c4 --- /dev/null +++ b/ruby/lib/qpid/codec.rb @@ -0,0 +1,457 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'qpid/packer.rb' +require 'iconv' + +module Qpid + + class Codec + + include Qpid::Packer + + attr_reader :spec + + def initialize(spec = "") + @spec = spec + end + + def write_void(v) + unless v.nil? + raise Exception.new("void not nil: #{v}") + end + end + + def read_void + return nil + end + + def write_bit(b) + unless b + raise Exception.new("bit is nil: #{b}") + end + end + + def read_bit + return true + end + + def read_uint8 + return unpack("C", 1) + end + + def write_uint8(n) + return pack("C", n) + end + + def read_int8 + return unpack("c", 1) + end + + def write_int8(n) + pack("c", n) + end + + def read_char + return unpack("c", 1) + end + + def write_char(c) + pack("c") + end + + def read_boolean + return read_uint8 != 0 + end + + def write_boolean(b) + n = 0 + n = 1 if b != 0 + write_uint8(n) + end + + def read_uint16 + return unpack("n", 2) + end + + def write_uint16(n) + pack("n", n) + end + + def read_int16 + # XXX: holy moly.. pack/unpack doesn't have signed network byte order. Crazy hackery. + val = unpack("n", 2) + val -= 2 ** 16 if val >= 2 ** 15 + return val + end + + def write_int16(n) + # XXX: Magically this one works even though it's not signed. + pack("n", n) + end + + def read_uint32 + return unpack("N", 4) + end + + def write_uint32(n) + pack("N", n) + end + + def read_int32 + # Again no pack/unpack for signed int + return unpack("N", 4) + end + + def write_int32(n) + # FIXME + pack("N", n) + end + + def read_float + return unpack("g", 4) + end + + def write_float(n) + pack("g", n) + end + + def read_sequence_no + return read_uint32.to_serial + end + + def write_sequence_no(n) + write_uint32(n.value) + end + + def encode_64bit(num, signed = false) + b = [] + + if num < 0 && signed + num += 2 ** 64 + end + + (0..7).each do |c| + d = 7 - c + b[c] = (num & (0xff << d * 8)) >> d * 8 + end + pack('C8', *b) + end + + + def decode_64bit(signed = false) + # Silly ruby pack/unpack does not implement 64 bit network byte order + # encode/decode. + a = unpack('C8', 8) + num = 0 + (0..7).each do |c| + d = 7 - c + num |= a[c] << 8 * d + end + + if signed && num >= 2 ** 63 + num -= 2 ** 64 + end + return num + end + + def read_uint64 + return decode_64bit + end + + def write_uint64(n) + encode_64bit(n) + end + + def read_int64 + return decode_64bit(signed = true) + end + + def write_int64(n) + encode_64bit(n, signed = true) + end + + def read_datetime + return read_uint64 + end + + def write_datetime(n) + write_uint64(n) + end + + def read_double + return unpack("G", 8) + end + + def write_double(n) + pack("G", n) + end + + def read_vbin8 + # XXX + return read(read_uint8) + end + + def write_vbin8(b) + # XXX + write_uint8(b.length) + write(b) + end + + def read_str8 + # FIXME: Check iconv.. I think this will throw if there are odd characters. + return Iconv.conv("ASCII", "UTF-8", read_vbin8) + end + + def write_str8(s) + write_vbin8(Iconv.conv("UTF-8", "ASCII", s)) + end + + def read_str16 + return Iconv.conv("ASCII", "UTF-8", read_vbin16) + end + + def write_str16(s) + write_vbin16(Iconv.conv("UTF-8", "ASCII", s)) + end + + def read_vbin16 + # XXX: Using read method? + return read(read_uint16) + end + + def write_vbin16(b) + write_uint16(b.length) + write(b) + end + + def read_sequence_set + # FIXME: Need datatypes + result = RangedSet.new + size = read_uint16 + nranges = size / 8 + nranges.times do |i| + lower = read_sequence_no + upper = read_sequence_no + result.add(lower, upper) + end + return result + end + + def write_sequence_set(ss) + size = 8 * ss.ranges.length + write_uint16(size) + ss.ranges.each do |range| + write_sequence_no(range.lower) + write_sequence_no(range.upper) + end + end + + def read_vbin32 + return read(read_uint32) + end + + def write_vbin32(b) + write_uint32(b.length) + write(b) + end + + def write_map(m) + sc = StringCodec.new(@spec) + unless m.nil? + sc.write_uint32(m.size) + m.each do |k, v| + unless type = @spec.encoding(v.class) + raise Exception.new("no encoding for: #{v.class}") + end + sc.write_str8(k) + sc.write_uint8(type.code) + type.encode(sc, v) + end + end + write_vbin32(sc.encoded) + end + + def read_map + sc = StringCodec.new(@spec, read_vbin32) + return nil unless sc.encoded + count = sc.read_uint32 + result = nil + if count + result = {} + until sc.encoded.empty? + k = sc.read_str8 + code = sc.read_uint8 + type = @spec.types[code] + v = type.decode(sc) + result[k] = v + end + end + return result + end + + def write_array(a) + sc = StringCodec.new(@spec) + unless a.nil? + if a.length > 0 + type = @spec.encoding(a[0].class) + else + type = @spec.encoding(nil.class) + end + sc.write_uint8(type.code) + sc.write_uint32(a.size) + a.each { |o| type.encode(sc, o) } + end + write_vbin32(sc.encoded) + end + + def read_array + sc = StringCodec.new(@spec, read_vbin32) + return nil if not sc.encoded + type = @spec.types[sc.read_uint8] + count = sc.read_uint32 + result = nil + if count + result = [] + count.times { |i| result << (type.decode(sc)) } + end + return result + end + + def write_list(l) + sc = StringCodec.new(@spec) + unless l.nil? + sc.write_uint32(l.length) + l.each do |o| + type = @spec.encoding(o.class) + sc.write_uint8(type.code) + type.encode(sc, o) + end + end + write_vbin32(sc.encoded) + end + + def read_list + sc = StringCodec.new(@spec, read_vbin32) + return nil if not sc.encoded + count = sc.read_uint32 + result = nil + if count + result = [] + count.times do |i| + type = @spec.types[sc.read_uint8] + result << type.decode(sc) + end + end + return result + end + + def read_struct32 + size = read_uint32 + code = read_uint16 + type = @spec.structs[code] + # XXX: BLEH! + fields = type.decode_fields(self) + return Qpid::struct(type, fields) + end + + def write_struct32(value) + type = value.st_type + sc = StringCodec.new(@spec) + sc.write_uint16(type.code) + type.encode_fields(sc, value) + write_vbin32(sc.encoded) + end + + def read_control + cntrl = @spec.controls[read_uint16] + return Qpid::struct(cntrl, cntrl.decode_fields(self)) + end + + def write_control(ctrl) + type = ctrl.st_type + write_uint16(type.code) + type.encode_fields(self, ctrl) + end + + def read_command + type = @spec.commands[read_uint16] + hdr = @spec[:header].decode(self) + cmd = Qpid::struct(type, type.decode_fields(self)) + return hdr, cmd + end + + def write_command(hdr, cmd) + type = cmd.st_type + write_uint16(type.code) + hdr.st_type.encode(self, hdr) + type.encode_fields(self, cmd) + end + + def read_size(width) + if width > 0 + return send(:"read_uint#{width * 8}") + end + end + + def write_size(width, n) + if width > 0 + send(:"write_uint#{width * 8}", n) + end + end + + def read_uuid + return unpack("a16", 16) + end + + def write_uuid(s) + pack("a16", s) + end + + def read_bin128 + return unpack("a16", 16) + end + + def write_bin128(b) + pack("a16", b) + end + + end + + class StringCodec < Codec + + def initialize(spec, encoded = "") + @spec = spec + @encoded = encoded + end + + attr_reader :encoded + + def write(s) + @encoded += s + end + + def read(n) + return "" if n.nil? + result = @encoded[0...n] + @encoded = @encoded[n...@encoded.size] || "" + return result + end + end +end diff --git a/ruby/lib/qpid/codec08.rb b/ruby/lib/qpid/codec08.rb new file mode 100644 index 0000000000..148dee07bb --- /dev/null +++ b/ruby/lib/qpid/codec08.rb @@ -0,0 +1,265 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid08 + # is there a better way to do this? + class StringWriter + + def initialize(str = "") + @str = str + end + + def write(value) + @str << value + end + + def to_s() + return @str + end + + end + + class EOF < Exception; end + + class Encoder + + def initialize(out) + @out = out + @bits = [] + end + + attr_reader(:out) + + def encode(type, value) + send(type, value) + end + + def bit(b) + @bits << b + end + + def octet(o) + pack("C", o) + end + + def short(s) + pack("n", s) + end + + def long(l) + pack("N", l) + end + + def longlong(l) + lower = l & 0xffffffff + upper = (l & ~0xffffffff) >> 32 + long(upper) + long(lower) + end + + def timestamp(l) + longlong(l) + end + + def shortstr(s) + # shortstr is actually octetstr + octet(s.length) + write(s) + end + + def longstr(s) + case s + when Hash + table(s) + else + long(s.length) + write(s) + end + end + + def table(t) + t = {} if t.nil? + enc = Encoder.new(StringWriter.new()) + t.each {|key, value| + enc.shortstr(key) + # I offer this chicken to the gods of polymorphism. May they + # choke on it. + case value + when String + type = :longstr + desc = "S" + when Numeric + type = :long + desc = "I" + else + raise Exception.new("unknown table value: #{value.class}") + end + enc.write(desc) + enc.encode(type, value) + } + longstr(enc.out.to_s()) + end + + def write(str) + flushbits() + @out.write(str) + # puts "OUT #{str.inspect()}" + end + + def pack(fmt, *args) + write(args.pack(fmt)) + end + + def flush() + flushbits() + end + + private + + def flushbits() + if @bits.empty? then return end + + bytes = [] + index = 0 + @bits.each {|b| + bytes << 0 if index == 0 + if b then bytes[-1] |= 1 << index end + index = (index + 1) % 8 + } + @bits.clear() + bytes.each {|b| + octet(b) + } + end + + end + + class StringReader + + def initialize(str) + @str = str + @index = 0 + end + + def read(n) + result = @str[@index, n] + @index += result.length + return result + end + + end + + class Decoder + + def initialize(_in) + @in = _in + @bits = [] + end + + def decode(type) + return send(type) + end + + def bit() + if @bits.empty? + byte = octet() + 7.downto(0) {|i| + @bits << (byte[i] == 1) + } + end + return @bits.pop() + end + + def octet() + return unpack("C", 1) + end + + def short() + return unpack("n", 2) + end + + def long() + return unpack("N", 4) + end + + def longlong() + upper = long() + lower = long() + return upper << 32 | lower + end + + def timestamp() + return longlong() + end + + def shortstr() + # shortstr is actually octetstr + return read(octet()) + end + + def longstr() + return read(long()) + end + + def table() + dec = Decoder.new(StringReader.new(longstr())) + result = {} + while true + begin + key = dec.shortstr() + rescue EOF + break + end + desc = dec.read(1) + case desc + when "S" + value = dec.longstr() + when "I" + value = dec.long() + else + raise Exception.new("unrecognized descriminator: #{desc.inspect()}") + end + result[key] = value + end + return result + end + + def read(n) + return "" if n == 0 + result = @in.read(n) + if result.nil? or result.empty? + raise EOF.new() + else + # puts " IN #{result.inspect()}" + return result + end + end + + def unpack(fmt, size) + result = read(size).unpack(fmt) + if result.length == 1 + return result[0] + else + return result + end + end + + end + +end diff --git a/ruby/lib/qpid/config.rb b/ruby/lib/qpid/config.rb new file mode 100644 index 0000000000..b5b79cd309 --- /dev/null +++ b/ruby/lib/qpid/config.rb @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + module Config + + def self.amqp_spec + dirs = [File::expand_path(File::join(File::dirname(__FILE__), "specs"))] + dirs.each do |d| + spec = File::join(d, "amqp.0-10-qpid-errata.xml") + return spec if File::exists? spec + end + end + + end +end diff --git a/ruby/lib/qpid/connection.rb b/ruby/lib/qpid/connection.rb new file mode 100644 index 0000000000..d2efbfb263 --- /dev/null +++ b/ruby/lib/qpid/connection.rb @@ -0,0 +1,222 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' + +module Qpid + + class ChannelBusy< Exception ; end + + class ChannelsBusy < Exception ; end + + class SessionBusy < Exception ; end + + class ConnectionFailed < Exception ; end + + class Timeout < Exception ; end + + class Connection < Assembler + + include MonitorMixin + + attr_reader :spec, :attached, :sessions, :thread + attr_accessor :opened, :failed, :close_code, :user_id + + def initialize(sock, args={}) + super(sock) + + delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) + spec = args[:spec] || nil + + @spec = Qpid::Spec010::load(spec) + @track = @spec["track"] + + @attached = {} + @sessions = {} + + @condition = new_cond + @opened = false + @failed = false + @close_code = [nil, "connection aborted"] + + @thread = nil + + @channel_max = 65535 + @user_id = nil + + @delegate = delegate.call(self, args) + end + + def attach(name, ch, delegate, force=false) + synchronize do + ssn = @attached[ch.id] + if ssn + raise ChannelBusy.new(ch, ssn) unless ssn.name == name + else + ssn = @sessions[name] + if ssn.nil? + ssn = Session.new(name, @spec, :delegate => delegate) + @sessions[name] = ssn + elsif ssn.channel + if force + @attached.delete(ssn.channel.id) + ssn.channel = nil + else + raise SessionBusy.new(ssn) + end + end + @attached[ch.id] = ssn + ssn.channel = ch + end + ch.session = ssn + return ssn + end + end + + def detach(name, ch) + synchronize do + @attached.delete(ch.id) + ssn = @sessions.delete(name) + if ssn + ssn.channel = nil + ssn.closed + return ssn + end + end + end + + def session(name, kwargs = {}) + timeout = kwargs[:timeout] + delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new) + + # FIXME: Python has cryptic comment about 'ch 0 ?' + channel = (0..@channel_max).detect { |i| ! @attached.key?(i) } + raise ChannelsBusy unless channel + + synchronize do + ch = Channel.new(self, channel) + ssn = attach(name, ch, delegate) + ssn.channel.session_attach(name) + if ssn.wait_for(timeout) { ssn.channel } + return ssn + else + detach(name, ch) + raise Timeout + end + end + end + + def detach_all + synchronize do + attached.values.each do |ssn| + ssn.exceptions << @close_code unless @close_code[0] == 200 + detach(ssn.name, ssn.channel) + end + end + end + + def start(timeout=nil) + @delegate.start + @thread = Thread.new { run } + @thread[:name] = 'conn' + synchronize do + unless @condition.wait_for(timeout) { @opened || @failed } + raise Timeout + end + end + if @failed + raise ConnectionFailed.new(@close_code) + end + end + + def run + # XXX: we don't really have a good way to exit this loop without + # getting the other end to kill the socket + loop do + begin + seg = read_segment + rescue Qpid::Closed => e + detach_all + break + end + @delegate.received(seg) + end + end + + def close(timeout=nil) + return unless @opened + Channel.new(self, 0).connection_close(200) + synchronize do + unless @condition.wait_for(timeout) { ! @opened } + raise Timeout + end + end + @thread.join(timeout) + @thread = nil + end + + def signal + synchronize { @condition.signal } + end + + def to_s + # FIXME: We'd like to report something like HOST:PORT + return @sock.to_s + end + + class Channel < Invoker + + attr_reader :id, :connection + attr_accessor :session + + def initialize(connection, id) + @connection = connection + @id = id + @session = nil + end + + def resolve_method(name) + inst = @connection.spec[name] + if inst.is_a?(Qpid::Spec010::Control) + return invocation(:method, inst) + else + return invocation(:error, nil) + end + end + + def invoke(type, args) + ctl = type.create(*args) + sc = StringCodec.new(@connection.spec) + sc.write_control(ctl) + @connection.write_segment(Segment.new(true, true, type.segment_type, + type.track, self.id, sc.encoded)) + + log = Qpid::logger["qpid.io.ctl"] + log.debug("SENT %s", ctl) if log + end + + def to_s + return "#{@connection}[#{@id}]" + end + + end + + end + +end diff --git a/ruby/lib/qpid/connection08.rb b/ruby/lib/qpid/connection08.rb new file mode 100644 index 0000000000..09a4888cc4 --- /dev/null +++ b/ruby/lib/qpid/connection08.rb @@ -0,0 +1,252 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "socket" +require "qpid/codec08" + +module Qpid08 + + class Connection + + def initialize(host, port, spec) + @host = host + @port = port + @spec = spec + end + + attr_reader(:host, :port, :spec) + + def connect() + @sock = TCPSocket.open(@host, @port) + @out = Encoder.new(@sock) + @in = Decoder.new(@sock) + end + + def init() + @out.write("AMQP") + [1, 1, @spec.major, @spec.minor].each {|o| + @out.octet(o) + } + end + + def write(frame) + # puts "OUT #{frame.inspect()}" + @out.octet(@spec.constants[frame.payload.type].id) + @out.short(frame.channel) + frame.payload.encode(@out) + @out.octet(frame_end) + end + + def read() + type = @spec.constants[@in.octet()].name + channel = @in.short() + payload = Payload.decode(type, @spec, @in) + oct = @in.octet() + if oct != frame_end + raise Exception.new("framing error: expected #{frame_end}, got #{oct}") + end + frame = Frame.new(channel, payload) + # puts " IN #{frame.inspect}" + return frame + end + + private + + def frame_end + @spec.constants[:"frame_end"].id + end + + end + + class Frame + + def initialize(channel, payload) + @channel = channel + @payload = payload + end + + attr_reader(:channel, :payload) + + end + + class Payload + + TYPES = {} + + def Payload.singleton_method_added(name) + if name == :type + TYPES[type] = self + end + end + + def Payload.decode(type, spec, dec) + klass = TYPES[type] + klass.decode(spec, dec) + end + + end + + class Method < Payload + + def initialize(method, args) + if args.size != method.fields.size + raise ArgumentError.new("argument mismatch #{method} #{args}") + end + @method = method + @args = args + end + + attr_reader(:method, :args) + + def Method.type; :frame_method end + + def type; Method.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@method.parent.id) + enc.short(@method.id) + @method.fields.zip(self.args).each {|f, a| + if a.nil?; a = f.default end + enc.encode(f.type, a) + } + enc.flush() + encoder.longstr(buf.to_s) + end + + def Method.decode(spec, decoder) + buf = decoder.longstr() + dec = Decoder.new(StringReader.new(buf)) + klass = spec.classes[dec.short()] + meth = klass.methods[dec.short()] + args = meth.fields.map {|f| dec.decode(f.type)} + return Method.new(meth, args) + end + + def inspect(); "#{method.qname}(#{args.join(", ")})" end + + end + + class Header < Payload + + def Header.type; :frame_header end + + def initialize(klass, weight, size, properties) + @klass = klass + @weight = weight + @size = size + @properties = properties + end + + attr_reader :weight, :size, :properties + + def type; Header.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@klass.id) + enc.short(@weight) + enc.longlong(@size) + + # property flags + nprops = @klass.fields.size + flags = 0 + 0.upto(nprops - 1) do |i| + f = @klass.fields[i] + flags <<= 1 + flags |= 1 unless @properties[f.name].nil? + # the last bit indicates more flags + if i > 0 and (i % 15) == 0 + flags <<= 1 + if nprops > (i + 1) + flags |= 1 + enc.short(flags) + flags = 0 + end + end + end + flags <<= ((16 - (nprops % 15)) % 16) + enc.short(flags) + + # properties + @klass.fields.each do |f| + v = @properties[f.name] + enc.encode(f.type, v) unless v.nil? + end + enc.flush() + encoder.longstr(buf.to_s) + end + + def Header.decode(spec, decoder) + dec = Decoder.new(StringReader.new(decoder.longstr())) + klass = spec.classes[dec.short()] + weight = dec.short() + size = dec.longlong() + + # property flags + bits = [] + while true + flags = dec.short() + 15.downto(1) do |i| + if flags >> i & 0x1 != 0 + bits << true + else + bits << false + end + end + break if flags & 0x1 == 0 + end + + # properties + properties = {} + bits.zip(klass.fields).each do |b, f| + properties[f.name] = dec.decode(f.type) if b + end + return Header.new(klass, weight, size, properties) + end + + def inspect(); "#{@klass.name}(#{@properties.inspect()})" end + + end + + class Body < Payload + + def Body.type; :frame_body end + + def type; Body.type end + + def initialize(content) + @content = content + end + + attr_reader :content + + def encode(enc) + enc.longstr(@content) + end + + def Body.decode(spec, dec) + return Body.new(dec.longstr()) + end + + end + +end diff --git a/ruby/lib/qpid/datatypes.rb b/ruby/lib/qpid/datatypes.rb new file mode 100644 index 0000000000..418388c73a --- /dev/null +++ b/ruby/lib/qpid/datatypes.rb @@ -0,0 +1,353 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + + def self.struct(type, *args) + # FIXME: This is fragile; the last arg could be a hash, + # without being hte keywords + kwargs = {} + kwargs = args.pop if args.any? && args[-1].is_a?(Hash) + + if args.size > type.fields.size + raise TypeError, + "%s() takes at most %d arguments (%d given)" % + [type.name, type.fields.size, args.size] + end + + attrs = type.fields.inject({}) do |attrs, field| + if args.any? + attrs[field.name] = args.shift + if kwargs.key?(field.name) + raise TypeError, + "%s() got multiple values for keyword argument '%s'" % + [type.name, field.name] + end + elsif kwargs.key?(field.name) + attrs[field.name] = kwargs.delete(field.name) + else + attrs[field.name] = field.default + end + attrs + end + + unless kwargs.empty? + unexpected = kwargs.keys[0] + raise TypeError, + "%s() got an unexpected keyword argument '%s'" % + [type.name, unexpected] + end + + attrs[:st_type] = type + attrs[:id] = nil + + name = "Qpid_" + type.name.to_s.capitalize + unless ::Struct.const_defined?(name) + vars = type.fields.collect { |f| f.name } << :st_type << :id + ::Struct.new(name, *vars) + end + st = ::Struct.const_get(name) + + result = st.new + attrs.each { |k, v| result[k] = v } + return result + end + + class Message + + attr_accessor :headers, :body, :id + + def initialize(*args) + @body = nil + @headers = nil + + @body = args.pop unless args.empty? + @headers = args unless args.empty? + + @id = nil + end + + def has(name) + return ! get(name).nil? + end + + def get(name) + if @headers + name = name.to_sym + @headers.find { |h| h.st_type.name == name } + end + end + + def set(header) + @headers ||= [] + if h = @headers.find { |h| h.st_type == header.st_type } + ind = @headers.index(h) + @headers[ind] = header + else + @headers << header + end + end + + def clear(name) + if @headers + name = name.to_sym + @headers.delete_if { |h| h.st_type.name == name } + end + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # args = [] + # if self.headers: + # args.extend(map(repr, self.headers)) + # if self.body: + # args.append(repr(self.body)) + # if self.id is not None: + # args.append("id=%s" % self.id) + # return "Message(%s)" % ", ".join(args) + # end + end + + class ::Object + + def to_serial + Qpid::Serial.new(self) + end + end + + class Serial + + include Comparable + + attr_accessor :value + + def initialize(value) + @value = value & 0xFFFFFFFF + end + + def hash + @value.hash + end + + def to_serial + self + end + + def eql?(other) + other = other.to_serial + value.eql?(other.value) + end + + def <=>(other) + return 1 if other.nil? + + other = other.to_serial + + delta = (value - other.value) & 0xFFFFFFFF + neg = delta & 0x80000000 + mag = delta & 0x7FFFFFFF + + return (neg>0) ? -mag : mag + end + + def +(other) + result = other.to_serial + result.value += value + return result + end + + def -(other) + result = other.to_serial + result.value = value - result.value + return result + end + + def succ + Serial.new(value + 1) + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # return "serial(%s)" % self.value + # end + + def to_s + value.to_s + end + + end + + # The Python class datatypes.Range is emulated by the standard + # Range class with a few additions + class ::Range + + alias :lower :begin + alias :upper :end + + def touches(r) + # XXX: are we doing more checks than we need? + return (r.include?(lower - 1) || + r.include?(upper + 1) || + include?(r.lower - 1) || + include?(r.upper + 1) || + r.include?(lower) || + r.include?(upper) || + include?(r.lower) || + include?(r.upper)) + end + + def span(r) + Range.new([lower, r.lower].min, [upper, r.upper].max) + end + + def intersect(r) + l = [lower, r.lower].max + u = [upper, r.upper].min + return l > u ? nil : Range.new(l, u) + end + + end + + class RangedSet + + include Enumerable + + attr_accessor :ranges + + def initialize(*args) + @ranges = [] + args.each { |n| add(n) } + end + + def each(&block) + ranges.each { |r| yield(r) } + end + + def include?(n) + if (n.is_a?(Range)) + super(n) + else + ranges.find { |r| r.include?(n) } + end + end + + def add_range(range) + ranges.delete_if do |r| + if range.touches(r) + range = range.span(r) + true + else + false + end + end + ranges << range + end + + def add(lower, upper = nil) + upper = lower if upper.nil? + add_range(Range.new(lower, upper)) + end + + def to_s + repr = ranges.sort { |a,b| b.lower <=> a.lower }. + map { |r| r.to_s }.join(",") + "(other) + if other.respond_to?(:bytes) + return bytes <=> other.bytes + else + raise NotImplementedError + end + end + + def to_s + UUID::format(bytes) + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # return "UUID(%r)" % str(self) + # end + + def self.random_uuid + bytes = (1..16).collect { |i| rand(256) } + + # From RFC4122, the version bits are set to 0100 + bytes[7] &= 0x0F + bytes[7] |= 0x40 + + # From RFC4122, the top two bits of byte 8 get set to 01 + bytes[8] &= 0x3F + bytes[8] |= 0x80 + return bytes.pack("C16") + end + + def self.uuid4 + UUID.new(random_uuid) + end + + def self.format(s) + # Python format !LHHHHL + # big-endian, ulong, ushort x 4, ulong + "%08x-%04x-%04x-%04x-%04x%08x" % s.unpack("NnnnnN") + end + end +end diff --git a/ruby/lib/qpid/delegates.rb b/ruby/lib/qpid/delegates.rb new file mode 100644 index 0000000000..f779047e05 --- /dev/null +++ b/ruby/lib/qpid/delegates.rb @@ -0,0 +1,237 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'rbconfig' +require 'sasl' + +module Qpid + + class Delegate + + def initialize(connection, args={}) + @connection = connection + @spec = connection.spec + @delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) + @control = @spec[:track].enum[:control].value + end + + def log ; Qpid::logger["qpid.io.ctl"]; end + + def received(seg) + ssn = @connection.attached[seg.channel] + unless ssn + ch = Qpid::Connection::Channel.new(@connection, seg.channel) + else + ch = ssn.channel + end + + if seg.track == @control + ctl = seg.decode(@spec) + log.debug("RECV %s", ctl) if log + attr = ctl.st_type.name + method(attr).call(ch, ctl) + elsif ssn.nil? + ch.session_detached + else + ssn.received(seg) + end + end + + def connection_close(ch, close) + @connection.close_code = [close.reply_code, close.reply_text] + ch.connection_close_ok + @connection.sock.close_write() + unless @connection.opened + @connection.failed = true + @connection.signal + end + end + + def connection_close_ok(ch, close_ok) + @connection.opened = false + @connection.signal + end + + def session_attach(ch, a) + begin + @connection.attach(a.name, ch, @delegate, a.force) + ch.session_attached(a.name) + rescue Qpid::ChannelBusy + ch.session_detached(a.name) + rescue Qpid::SessionBusy + ch.session_detached(a.name) + end + end + + def session_attached(ch, a) + ch.session.signal + end + + def session_detach(ch, d) + #send back the confirmation of detachment before removing the + #channel from the attached set; this avoids needing to hold the + #connection lock during the sending of this control and ensures + #that if the channel is immediately reused for a new session the + #attach request will follow the detached notification. + ch.session_detached(d.name) + ssn = @connection.detach(d.name, ch) + end + + def session_detached(ch, d) + @connection.detach(d.name, ch) + end + + def session_request_timeout(ch, rt) + ch.session_timeout(rt.timeout) + end + + def session_command_point(ch, cp) + ssn = ch.session + ssn.receiver.next_id = cp.command_id + ssn.receiver.next_offset = cp.command_offset + end + + def session_completed(ch, cmp) + ch.session.sender.has_completed(cmp.commands) + if cmp.timely_reply + ch.session_known_completed(cmp.commands) + end + ch.session.signal + end + + def session_known_completed(ch, kn_cmp) + ch.session.receiver.known_completed(kn_cmp.commands) + end + + def session_flush(ch, f) + rcv = ch.session.receiver + if f.expected + if rcv.next_id + exp = Qpid::RangedSet.new(rcv.next_id) + else + exp = nil + end + ch.session_expected(exp) + end + if f.confirmed + ch.session_confirmed(rcv.completed) + end + if f.completed + ch.session_completed(rcv.completed) + end + end + + class Server < Delegate + + def start + @connection.read_header() + @connection.write_header(@spec.major, @spec.minor) + ch = Qpid::Connection::Channel.new(@connection, 0) + ch.connection_start(:mechanisms => ["ANONYMOUS"]) + ch + end + + def connection_start_ok(ch, start_ok) + ch.connection_tune(:channel_max => 65535) + end + + def connection_tune_ok(ch, tune_ok) + nil + end + + def connection_open(ch, open) + @connection.opened = true + ch.connection_open_ok() + @connection.signal + end + end + + class Client < Delegate + + # FIXME: Python uses os.name for platform - we don't have an exact + # analog in Ruby + PROPERTIES = {"product" => "qpid python client", + "version" => "development", + "platform" => Config::CONFIG["build_os"], + "qpid.client_process" => File.basename($0), + "qpid.client_pid" => Process.pid, + "qpid.client_ppid" => Process.ppid} + + + def initialize(connection, args) + super(connection) + + result = Sasl::client_init + + @mechanism= args[:mechanism] + @username = args[:username] + @password = args[:password] + @service = args[:service] || "qpidd" + @min_ssf = args[:min_ssf] || 0 + @max_ssf = args[:max_ssf] || 65535 + + @saslConn = Sasl.client_new(@mechanism, @service, args[:host], + @username, @password, @min_ssf, @max_ssf) + end + + def start + @connection.write_header(@spec.major, @spec.minor) + @connection.read_header + end + + def connection_start(ch, start) + mech_list = "" + start.mechanisms.each do |m| + mech_list += m + " " + end + begin + resp = Sasl.client_start(@saslConn, mech_list) + @connection.user_id = Sasl.user_id(@saslConn) + ch.connection_start_ok(:client_properties => PROPERTIES, + :mechanism => resp[2], + :response => resp[1]) + rescue exception + ch.connection_close(:message => $!.message) + @connection.failed = true + @connection.signal + end + end + + def connection_secure(ch, secure) + resp = Sasl.client_step(@saslConn, secure.challenge) + @connection.user_id = Sasl.user_id(@saslConn) + ch.connection_secure_ok(:response => resp[1]) + end + + def connection_tune(ch, tune) + ch.connection_tune_ok(:channel_max => tune.channel_max, + :max_frame_size => tune.max_frame_size, + :heartbeat => 0) + ch.connection_open() + @connection.security_layer_tx = @saslConn + end + + def connection_open_ok(ch, open_ok) + @connection.security_layer_rx = @saslConn + @connection.opened = true + @connection.signal + end + end + end +end diff --git a/ruby/lib/qpid/fields.rb b/ruby/lib/qpid/fields.rb new file mode 100644 index 0000000000..cc87d07529 --- /dev/null +++ b/ruby/lib/qpid/fields.rb @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Class + def fields(*fields) + module_eval { + def initialize(*args, &block) + args = init_fields(*args) + + if respond_to? :init + init(*args) {|*a| yield(*a)} + elsif args.any? + raise ArgumentError, "extra arguments: #{args.inspect}" + end + end + } + + vars = fields.map {|f| :"@#{f.to_s().chomp("?")}"} + + define_method(:init_fields) {|*args| + vars.each {|v| + instance_variable_set(v, args.shift()) + } + args + } + + vars.each_index {|i| + define_method(fields[i]) { + instance_variable_get(vars[i]) + } + } + end +end diff --git a/ruby/lib/qpid/framer.rb b/ruby/lib/qpid/framer.rb new file mode 100644 index 0000000000..d057605383 --- /dev/null +++ b/ruby/lib/qpid/framer.rb @@ -0,0 +1,212 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' +require 'logger' +require 'sasl' + +module Qpid + + FIRST_SEG = 0x08 + LAST_SEG = 0x04 + FIRST_FRM = 0x02 + LAST_FRM = 0x01 + + class << self + attr_accessor :raw_logger, :frm_logger + end + + def self.packed_size(format) + # FIXME: This is a total copout to simulate Python's + # struct.calcsize + ([0]*256).pack(format).size + end + + class Frame + attr_reader :payload, :track, :flags, :type, :channel + + # HEADER = "!2BHxBH4x" + # Python Meaning Ruby + # ! big endian (implied by format char) + # 2B 2 uchar C2 + # H unsigned short n + # x pad byte x + # B uchar C + # H unsigned short n + # 4x pad byte x4 + HEADER = "C2nxCnx4" + HEADER_SIZE = Qpid::packed_size(HEADER) + MAX_PAYLOAD = 65535 - HEADER_SIZE + + def initialize(flags, type, track, channel, payload) + if payload.size > MAX_PAYLOAD + raise ArgumentError, "max payload size exceeded: #{payload.size}" + end + + @flags = flags + @type = type + @track = track + @channel = channel + @payload = payload + end + + def first_segment? ; FIRST_SEG & @flags > 0 ; end + + def last_segment? ; LAST_SEG & @flags > 0 ; end + + def first_frame? ; FIRST_FRM & @flags > 0 ; end + + def last_frame? ; LAST_FRM & @flags > 0 ; end + + def to_s + fs = first_segment? ? 'S' : '.' + ls = last_segment? ? 's' : '.' + ff = first_frame? ? 'F' : '.' + lf = last_frame? ? 'f' : '.' + + return "%s%s%s%s %s %s %s %s" % [fs, ls, ff, lf, + @type, + @track, + @channel, + @payload.inspect] + end + end + + class FramingError < Exception ; end + + class Closed < Exception ; end + + class Framer + include Packer + + # Python: "!4s4B" + HEADER = "a4C4" + HEADER_SIZE = 8 + + def raw + Qpid::raw_logger + end + + def frm + Qpid::frm_logger + end + + def initialize(sock) + @sock = sock + @sock.extend(MonitorMixin) + @tx_buf = "" + @rx_buf = "" + @security_layer_tx = nil + @security_layer_rx = nil + @maxbufsize = 65535 + end + + attr_reader :sock + attr_accessor :security_layer_tx, :security_layer_rx + + def aborted? ; false ; end + + def write(buf) + @tx_buf += buf + end + + def flush + @sock.synchronize do + if @security_layer_tx + cipher_buf = Sasl.encode(@security_layer_tx, @tx_buf) + _write(cipher_buf) + else + _write(@tx_buf) + end + @tx_buf = "" + frm.debug("FLUSHED") if frm + end + rescue + @sock.close unless @sock.closed? + end + + def _write(buf) + while buf && buf.size > 0 + # FIXME: Catch errors + n = @sock.write(buf) + raw.debug("SENT #{buf[0, n].inspect}") if raw + buf[0,n] = "" + @sock.flush + end + end + + def read(n) + while @rx_buf.size < n + begin + s = @sock.recv(@maxbufsize) + if @security_layer_rx + s = Sasl.decode(@security_layer_rx, s) + end + rescue IOError => e + raise e if @rx_buf != "" + @sock.close unless @sock.closed? + raise Closed + end + # FIXME: Catch errors + if s.nil? or s.size == 0 + @sock.close unless @sock.closed? + raise Closed + end + @rx_buf += s + raw.debug("RECV #{n}/#{@rx_buf.size} #{s.inspect}") if raw + end + data = @rx_buf[0, n] + @rx_buf = @rx_buf[n, @rx_buf.size - n] + return data + end + + def read_header + unpack(Framer::HEADER, Framer::HEADER_SIZE) + end + + def write_header(major, minor) + @sock.synchronize do + pack(Framer::HEADER, "AMQP", 1, 1, major, minor) + flush() + end + end + + def write_frame(frame) + @sock.synchronize do + size = frame.payload.size + Frame::HEADER_SIZE + track = frame.track & 0x0F + pack(Frame::HEADER, frame.flags, frame.type, size, track, frame.channel) + write(frame.payload) + if frame.last_segment? and frame.last_frame? + flush() + frm.debug("SENT #{frame}") if frm + end + end + end + + def read_frame + flags, type, size, track, channel = unpack(Frame::HEADER, Frame::HEADER_SIZE) + raise FramingError if (flags & 0xF0 > 0) + payload = read(size - Frame::HEADER_SIZE) + frame = Frame.new(flags, type, track, channel, payload) + frm.debug("RECV #{frame}") if frm + return frame + end + end +end diff --git a/ruby/lib/qpid/invoker.rb b/ruby/lib/qpid/invoker.rb new file mode 100644 index 0000000000..39716ac6c2 --- /dev/null +++ b/ruby/lib/qpid/invoker.rb @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Qpid::Invoker + + # Requires that client defines a invoke method and overrides + # resolve_method + + # FIXME: Is it really worth defining methods in method_missing ? We + # could just dispatch there directly + + def invc_method(name, resolved) + define_singleton_method(name) { |*args| invoke(resolved, args) } + # FIXME: the Python code also attaches docs from resolved.pydoc + end + + def invc_value(name, resolved) + define_singleton_method(name) { | | resolved } + end + + def invc_error(name, resolved) + msg = "%s instance has no attribute '%s'" % [self.class.name, name] + if resolved + msg += "\n%s" % resolved + end + raise NameError, msg + end + + def resolve_method(name) + invocation(:error, nil) + end + + def method_missing(name, *args) + disp, resolved = resolve_method(name) + disp.call(name, resolved) + send(name, *args) + end + + def invocation(kind, name = nil) + [ method("invc_#{kind}"), name ] + end + + private + def define_singleton_method(name, &body) + singleton_class = class << self; self; end + singleton_class.send(:define_method, name, &body) + end + +end diff --git a/ruby/lib/qpid/packer.rb b/ruby/lib/qpid/packer.rb new file mode 100644 index 0000000000..ae1be37faf --- /dev/null +++ b/ruby/lib/qpid/packer.rb @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + module Packer + def unpack(fmt, len) + raw = read(len) + values = raw.unpack(fmt) + values = values[0] if values.size == 1 + return values + end + + def pack(fmt, *args) + write(args.pack(fmt)) + end + end +end diff --git a/ruby/lib/qpid/peer.rb b/ruby/lib/qpid/peer.rb new file mode 100644 index 0000000000..cdb962169b --- /dev/null +++ b/ruby/lib/qpid/peer.rb @@ -0,0 +1,289 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "thread" +require "qpid/queue" +require "qpid/connection08" +require "qpid/fields" + +module Qpid08 + + Queue = Qpid::Queue + + class Peer + + def initialize(conn, delegate) + @conn = conn + @delegate = delegate + @outgoing = Queue.new() + @work = Queue.new() + @channels = {} + @mutex = Mutex.new() + end + + def channel(id) + @mutex.synchronize do + ch = @channels[id] + if ch.nil? + ch = Channel.new(id, self, @outgoing, @conn.spec) + @channels[id] = ch + end + return ch + end + end + + def channel_delete(id) + @channels.delete(id) + end + + def start() + spawn(:writer) + spawn(:reader) + spawn(:worker) + end + + def close() + @mutex.synchronize do + @channels.each_value do |ch| + ch.close() + end + @outgoing.close() + @work.close() + end + end + + private + + def spawn(method, *args) + Thread.new do + begin + send(method, *args) + # is this the standard way to catch any exception? + rescue Closed => e + puts "#{method} #{e}" + rescue Object => e + print e + e.backtrace.each do |line| + print "\n ", line + end + print "\n" + end + end + end + + def reader() + while true + frame = @conn.read() + ch = channel(frame.channel) + ch.dispatch(frame, @work) + end + end + + def writer() + while true + @conn.write(@outgoing.get()) + end + end + + def worker() + while true + dispatch(@work.get()) + end + end + + def dispatch(queue) + frame = queue.get() + ch = channel(frame.channel) + payload = frame.payload + if payload.method.content? + content = Qpid08::read_content(queue) + else + content = nil + end + + message = Message.new(payload.method, payload.args, content) + @delegate.dispatch(ch, message) + end + + end + + class Channel + def initialize(id, peer, outgoing, spec) + @id = id + @peer = peer + @outgoing = outgoing + @spec = spec + @incoming = Queue.new() + @responses = Queue.new() + @queue = nil + @closed = false + end + + attr_reader :id + + def closed?; @closed end + + def close() + return if closed? + @peer.channel_delete(@id) + @closed = true + @incoming.close() + @responses.close() + end + + def dispatch(frame, work) + payload = frame.payload + case payload + when Method + if payload.method.response? + @queue = @responses + else + @queue = @incoming + work << @incoming + end + end + @queue << frame + end + + def method_missing(name, *args) + method = @spec.find_method(name) + if method.nil? + raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}") + end + + if args.size == 1 and args[0].instance_of? Hash + kwargs = args[0] + invoke_args = method.fields.map do |f| + kwargs[f.name] + end + content = kwargs[:content] + else + invoke_args = [] + method.fields.each do |f| + if args.any? + invoke_args << args.shift() + else + invoke_args << f.default + end + end + if method.content? and args.any? + content = args.shift() + else + content = nil + end + if args.any? then raise ArgumentError.new("#{args.size} extr arguments") end + end + return invoke(method, invoke_args, content) + end + + def invoke(method, args, content = nil) + raise Closed() if closed? + frame = Frame.new(@id, Method.new(method, args)) + @outgoing << frame + + if method.content? + content = Content.new() if content.nil? + write_content(method.parent, content, @outgoing) + end + + nowait = false + f = method.fields[:"nowait"] + nowait = args[method.fields.index(f)] unless f.nil? + + unless nowait or method.responses.empty? + resp = @responses.get().payload + if resp.method.content? + content = read_content(@responses) + else + content = nil + end + if method.responses.include? resp.method + return Message.new(resp.method, resp.args, content) + else + # XXX: ValueError doesn't actually exist + raise ValueError.new(resp) + end + end + end + + def write_content(klass, content, queue) + size = content.size + header = Frame.new(@id, Header.new(klass, content.weight, size, content.headers)) + queue << header + content.children.each {|child| write_content(klass, child, queue)} + queue << Frame.new(@id, Body.new(content.body)) if size > 0 + end + + end + + def Qpid08.read_content(queue) + frame = queue.get() + header = frame.payload + children = [] + 1.upto(header.weight) { children << read_content(queue) } + size = header.size + read = 0 + buf = "" + while read < size + body = queue.get() + content = body.payload.content + buf << content + read += content.size + end + buf.freeze() + return Content.new(header.properties.clone(), buf, children) + end + + class Content + def initialize(headers = {}, body = "", children = []) + @headers = headers + @body = body + @children = children + end + + attr_reader :headers, :body, :children + + def size; body.size end + def weight; children.size end + + def [](key); @headers[key] end + def []=(key, value); @headers[key] = value end + end + + class Message + fields(:method, :args, :content) + + alias fields args + + def method_missing(name) + return args[@method.fields[name].id] + end + + def inspect() + "#{method.qname}(#{args.join(", ")})" + end + end + + module Delegate + def dispatch(ch, msg) + send(msg.method.qname, ch, msg) + end + end + +end diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb new file mode 100644 index 0000000000..4711d355cd --- /dev/null +++ b/ruby/lib/qpid/qmf.rb @@ -0,0 +1,1957 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Console API for Qpid Management Framework + +require 'socket' +require 'monitor' +require 'thread' +require 'uri' +require 'time' + +module Qpid::Qmf + + # To access the asynchronous operations, a class must be derived from + # Console with overrides of any combination of the available methods. + class Console + + # Invoked when a connection is established to a broker + def broker_connected(broker); end + + # Invoked when the connection to a broker is lost + def broker_disconnected(broker); end + + # Invoked when a QMF package is discovered + def new_package(name); end + + # Invoked when a new class is discovered. Session.getSchema can be + # used to obtain details about the class + def new_class(kind, klass_key); end + + # Invoked when a QMF agent is discovered + def new_agent(agent); end + + # Invoked when a QMF agent disconects + def del_agent(agent); end + + # Invoked when an object is updated + def object_props(broker, record); end + + # Invoked when an object is updated + def object_stats(broker, record); end + + # Invoked when an event is raised + def event(broker, event); end + + # Invoked when an agent heartbeat is received. + def heartbeat(agent, timestamp); end + + # Invoked when the connection sequence reaches the point where broker information is available. + def broker_info(broker); end + + # Invoked when a method response from an asynchronous method call is received. + def method_response(broker, seq, response); end + end + + class BrokerURL + + attr_reader :host, :port, :auth_name, :auth_pass + + def initialize(text) + uri = URI.parse(text) + + @host = uri.host + @port = uri.port ? uri.port : 5672 + @auth_name = uri.user + @auth_pass = uri.password + + return uri + end + + def name + "#{@host}:#{@port}" + end + + def match(host, port) + # FIXME: Unlcear what the Python code is actually checking for + # here, especially since HOST can resolve to multiple IP's + @port == port && + (host == @host || ipaddr(host, port) == ipaddr(@host, @port)) + end + + private + def ipaddr(host, port) + s = Socket::getaddrinfo(host, port, + Socket::AF_INET, Socket::SOCK_STREAM) + s[0][2] + end + end + + # An instance of the Session class represents a console session running + # against one or more QMF brokers. A single instance of Session is + # needed to interact with the management framework as a console. + class Session + CONTEXT_SYNC = 1 + CONTEXT_STARTUP = 2 + CONTEXT_MULTIGET = 3 + + DEFAULT_GET_WAIT_TIME = 60 + + include MonitorMixin + + attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages + + # Initialize a session. If the console argument is provided, the + # more advanced asynchronous features are available. If console is + # defaulted, the session will operate in a simpler, synchronous + # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments + # are meaningful only if 'console' is provided. They control + # whether object updates, events, and agent-heartbeats are + # subscribed to. If the console is not interested in receiving one + # or more of the above, setting the argument to False will reduce + # tha bandwidth used by the API. If manageConnections is set to + # True, the Session object will manage connections to the brokers. + # This means that if a broker is unreachable, it will retry until a + # connection can be established. If a connection is lost, the + # Session will attempt to reconnect. + # + # If manageConnections is set to False, the user is responsible for + # handing failures. In this case, an unreachable broker will cause + # addBroker to raise an exception. If userBindings is set to False + # (the default) and rcvObjects is True, the console will receive + # data for all object classes. If userBindings is set to True, the + # user must select which classes the console shall receive by + # invoking the bindPackage or bindClass methods. This allows the + # console to be configured to receive only information that is + # relavant to a particular application. If rcvObjects id False, + # userBindings has no meaning. + # + # Accept a hash of parameters, where keys can be :console, + # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections, + # and :user_bindings + def initialize(kwargs = {}) + super() + @console = kwargs[:console] || nil + @brokers = [] + @packages = {} + @seq_mgr = SequenceManager.new + @cv = new_cond + @sync_sequence_list = [] + @result = [] + @select = [] + @error = nil + @rcv_objects = kwargs[:rcv_objects] == nil ? true : kwargs[:rcv_objects] + @rcv_events = kwargs[:rcv_events] == nil ? true : kwargs[:rcv_events] + @rcv_heartbeats = kwargs[:rcv_heartbeats] == nil ? true : kwargs[:rcv_heartbeats] + @user_bindings = kwargs[:user_bindings] == nil ? false : kwargs[:user_bindings] + unless @console + @rcv_objects = false + @rcv_events = false + @rcv_heartbeats = false + end + @binding_key_list = binding_keys + @manage_connections = kwargs[:manage_connections] || false + + if @user_bindings && ! @rcv_objects + raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided" + end + + end + + def to_s + "QMF Console Session Manager (brokers: #{@brokers.size})" + end + + def managedConnections? + return @manage_connections + end + + # Connect to a Qpid broker. Returns an object of type Broker + # + # To supply a username for authentication, use the URL syntax: + # + # amqp://username@hostname:port + # + # If the broker needs a password for the client, an interactive prompt will be + # provided to the user. + # + # To supply a username and a password, use + # + # amqp://username:password@hostname:port + # + # The following keyword arguments may be used to control authentication: + # + # :mechanism - SASL mechanism (i.e. "PLAIN", "GSSAPI", "ANONYMOUS", etc. + # - defaults to unspecified (the system chooses for you) + # :service - SASL service name (i.e. the kerberos principal of the broker) + # - defaults to "qpidd" + # :min_ssf - Minimum Security Strength Factor for SASL security layers + # - defaults to 0 + # :max_ssf - Maximum Security Strength Factor for SASL security layers + # - defaults to 65535 + # + def add_broker(target = "amqp://localhost", kwargs = {}) + url = BrokerURL.new(target) + broker = Broker.new(self, url.host, url.port, url.auth_name, url.auth_pass, kwargs) + unless broker.connected? || @manage_connections + raise broker.error + end + + @brokers << broker + objects(:broker => broker, :class => "agent") unless @manage_connections + return broker + end + + # Disconnect from a broker. The 'broker' argument is the object + # returned from the addBroker call + def del_broker(broker) + broker.shutdown + @brokers.delete(broker) + end + + # Get the list of known classes within a QMF package + def classes(package_name) + list = [] + @brokers.each { |broker| broker.wait_for_stable } + if @packages.include?(package_name) + # FIXME What's the actual structure of @packages[package_name] + @packages[package_name].each do |key, schema_class| + list << schema_class.klass_key + end + end + return list + end + + # Get the schema for a QMF class + def schema(klass_key) + @brokers.each { |broker| broker.wait_for_stable } + if @packages.include?(klass_key.package) + @packages[klass_key.package][ [klass_key.klass_name, klass_key.hash] ] + end + end + + def bind_package(package_name) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topic_name, + :binding_key => "console.obj.*.*.#{package_name}.#" } + broker.amqp_session.exchange_bind(args) + end + end + + def bind_class(package_name, class_name) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topic_name, + :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" } + broker.amqp_session.exchange_bind(args) + end + end + + def bind_class_key(klass_key) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + pname, cname, hash = klass_key.to_a() + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topic_name, + :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" } + broker.amqp_session.exchange_bind(args) + end + end + + # Get a list of currently known agents + def agents(broker=nil) + broker_list = [] + if broker.nil? + broker_list = @brokers.dup + else + broker_list << broker + end + broker_list.each { |b| b.wait_for_stable } + agent_list = [] + broker_list.each { |b| agent_list += b.agents } + return agent_list + end + + # Get a list of objects from QMF agents. + # All arguments are passed by name(keyword). + # + # The class for queried objects may be specified in one of the + # following ways: + # :schema => - supply a schema object returned from getSchema. + # :key => - supply a klass_key from the list returned by getClasses. + # :class => - supply a class name as a string. If the class name exists + # in multiple packages, a _package argument may also be supplied. + # :object_id = - get the object referenced by the object-id + # + # If objects should be obtained from only one agent, use the following argument. + # Otherwise, the query will go to all agents. + # + # :agent = - supply an agent from the list returned by getAgents. + # + # If the get query is to be restricted to one broker (as opposed to + # all connected brokers), add the following argument: + # + # :broker = - supply a broker as returned by addBroker. + # + # The default timeout for this synchronous operation is 60 seconds. To change the timeout, + # use the following argument: + # + # :timeout =