diff options
| author | Ted Ross <tross@apache.org> | 2009-08-28 22:03:26 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2009-08-28 22:03:26 +0000 |
| commit | dc6068ce7a6bd0d9467886b44cd6252ba491e615 (patch) | |
| tree | 7bf0b42a58d7f020fc61e81ae53ba7415e8fe91a /cpp/bindings/qmf/ruby | |
| parent | a435a75437cd389c6fa08ae171f7d25b1d3a7e77 (diff) | |
| download | qpid-python-dc6068ce7a6bd0d9467886b44cd6252ba491e615.tar.gz | |
Major work in the QMF engine.
- The console framework now establishes connectivity with the broker.
- The Ruby binding for console is tracking the engine development.
- Overall improvements (thread safety in Ruby, etc.) have been added.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@809042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/bindings/qmf/ruby')
| -rw-r--r-- | cpp/bindings/qmf/ruby/qmf.rb | 298 |
1 files changed, 258 insertions, 40 deletions
diff --git a/cpp/bindings/qmf/ruby/qmf.rb b/cpp/bindings/qmf/ruby/qmf.rb index badd53942d..38c42b5aa7 100644 --- a/cpp/bindings/qmf/ruby/qmf.rb +++ b/cpp/bindings/qmf/ruby/qmf.rb @@ -20,6 +20,7 @@ require 'qmfengine' require 'thread' require 'socket' +require 'monitor' module Qmf @@ -70,39 +71,18 @@ module Qmf def sess_event_recv(context, message); end end - class Query - attr_reader :impl - def initialize(i) - @impl = i - end - - def package_name - @impl.getPackage - end - - def class_name - @impl.getClass - end - - def object_id - objid = @impl.getObjectId - if objid.class == NilClass - return nil - end - return ObjectId.new(objid) - end - end - class Connection + include MonitorMixin + attr_reader :impl def initialize(settings) + super() @impl = Qmfengine::ResilientConnection.new(settings.impl) @sockEngine, @sock = Socket::socketpair(Socket::PF_UNIX, Socket::SOCK_STREAM, 0) @impl.setNotifyFd(@sockEngine.fileno) @new_conn_handlers = Array.new @conn_handlers = Array.new - @sess_handlers = Array.new @thread = Thread.new do run @@ -110,47 +90,56 @@ module Qmf end def add_conn_handler(handler) - @new_conn_handlers.push(handler) + synchronize do + @new_conn_handlers.push(handler) + end @sockEngine.write("x") end - def add_sess_handler(handler) - @sess_handlers.push(handler) - end - def run() - event = Qmfengine::ResilientConnectionEvent.new + eventImpl = Qmfengine::ResilientConnectionEvent.new connected = nil + new_handlers = nil + bt_count = 0 + while :true @sock.read(1) - @new_conn_handlers.each do |nh| + synchronize do + new_handlers = @new_conn_handlers + @new_conn_handlers = Array.new + end + + new_handlers.each do |nh| @conn_handlers.push(nh) nh.conn_event_connected() if connected end - @new_conn_handlers = Array.new + new_handlers = nil - valid = @impl.getEvent(event) + valid = @impl.getEvent(eventImpl) while valid begin - case event.kind + case eventImpl.kind when Qmfengine::ResilientConnectionEvent::CONNECTED connected = :true @conn_handlers.each { |h| h.conn_event_connected() } when Qmfengine::ResilientConnectionEvent::DISCONNECTED connected = nil - @conn_handlers.each { |h| h.conn_event_disconnected(event.errorText) } + @conn_handlers.each { |h| h.conn_event_disconnected(eventImpl.errorText) } when Qmfengine::ResilientConnectionEvent::SESSION_CLOSED - event.sessionContext.handler.sess_event_session_closed(event.sessionContext, event.errorText) + eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText) when Qmfengine::ResilientConnectionEvent::RECV - event.sessionContext.handler.sess_event_recv(event.sessionContext, event.message) + eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message) end rescue Exception => ex puts "Event Exception: #{ex}" - puts ex.backtrace + if bt_count < 2 + puts ex.backtrace + bt_count += 1 + end end @impl.popEvent - valid = @impl.getEvent(event) + valid = @impl.getEvent(eventImpl) end end end @@ -164,9 +153,12 @@ module Qmf @label = label @handler = handler @handle = Qmfengine::SessionHandle.new - @conn.add_sess_handler(@handler) result = @conn.impl.createSession(label, self, @handle) end + + def destroy() + @conn.impl.destroySession(@handle) + end end ##============================================================================== @@ -262,6 +254,30 @@ module Qmf end end + class ConsoleObject < QmfObject + attr_reader :current_time, :create_time, :delete_time + + def initialize(cls) + super(cls) + end + + def update() + end + + def mergeUpdate(newObject) + end + + def deleted?() + @delete_time > 0 + end + + def index() + end + + def method_missing(name, *args) + end + end + class ObjectId attr_reader :impl def initialize(impl=nil) @@ -357,6 +373,29 @@ module Qmf end end + class Query + attr_reader :impl + def initialize(i) + @impl = i + end + + def package_name + @impl.getPackage + end + + def class_name + @impl.getClass + end + + def object_id + objid = @impl.getObjectId + if objid.class == NilClass + return nil + end + return ObjectId.new(objid) + end + end + ##============================================================================== ## SCHEMA ##============================================================================== @@ -410,6 +449,21 @@ module Qmf end end + class SchemaClassKey + attr_reader :impl + def initialize(i) + @impl = i + end + + def get_package() + @impl.getPackageName() + end + + def get_class() + @impl.getClassName() + end + end + class SchemaObjectClass attr_reader :impl def initialize(package, name, kwargs={}) @@ -467,6 +521,170 @@ module Qmf ## CONSOLE ##============================================================================== + class ConsoleHandler + def agent_added(agent); end + def agent_deleted(agent); end + def new_package(package); end + def new_class(class_key); end + def object_update(object, hasProps, hasStats); end + def event_received(event); end + def agent_heartbeat(agent, timestamp); end + def method_response(resp); end + def broker_info(broker); end + end + + class Console + attr_reader :impl + + def initialize(handler, kwargs={}) + @handler = handler + @impl = Qmfengine::ConsoleEngine.new + @event = Qmfengine::ConsoleEvent.new + @broker_list = Array.new + end + + def add_connection(conn) + broker = Broker.new(self, conn) + @broker_list.push(broker) + return broker + end + + def del_connection(broker) + end + + def get_packages() + end + + def get_classes(package) + end + + def get_schema(class_key) + end + + def bind_package(package) + end + + def bind_class(kwargs = {}) + end + + def get_agents(broker = nil) + end + + def get_objects(query, kwargs = {}) + end + + def start_sync(query) + end + + def touch_sync(sync) + end + + def end_sync(sync) + end + + def do_console_events() + count = 0 + valid = @impl.getEvent(@event) + while valid + count += 1 + case @event.kind + when Qmfengine::ConsoleEvent::AGENT_ADDED + when Qmfengine::ConsoleEvent::AGENT_DELETED + when Qmfengine::ConsoleEvent::NEW_PACKAGE + when Qmfengine::ConsoleEvent::NEW_CLASS + when Qmfengine::ConsoleEvent::OBJECT_UPDATE + when Qmfengine::ConsoleEvent::EVENT_RECEIVED + when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT + when Qmfengine::ConsoleEvent::METHOD_RESPONSE + end + @impl.popEvent + valid = @impl.getEvent(@event) + end + return count + end + end + + class Broker < ConnectionHandler + attr_reader :impl + + def initialize(console, conn) + @console = console + @conn = conn + @session = nil + @event = Qmfengine::BrokerEvent.new + @xmtMessage = Qmfengine::Message.new + @impl = Qmfengine::BrokerProxy.new(@console.impl) + @console.impl.addConnection(@impl, self) + @conn.add_conn_handler(self) + end + + def do_broker_events() + count = 0 + valid = @impl.getEvent(@event) + while valid + count += 1 + puts "Broker Event: #{@event.kind}" + case @event.kind + when Qmfengine::BrokerEvent::BROKER_INFO + when Qmfengine::BrokerEvent::DECLARE_QUEUE + @conn.impl.declareQueue(@session.handle, @event.name) + when Qmfengine::BrokerEvent::DELETE_QUEUE + @conn.impl.deleteQueue(@session.handle, @event.name) + when Qmfengine::BrokerEvent::BIND + @conn.impl.bind(@session.handle, @event.exchange, @event.name, @event.bindingKey) + when Qmfengine::BrokerEvent::UNBIND + @conn.impl.unbind(@session.handle, @event.exchange, @event.name, @event.bindingKey) + when Qmfengine::BrokerEvent::SETUP_COMPLETE + @impl.startProtocol + end + @impl.popEvent + valid = @impl.getEvent(@event) + end + return count + end + + def do_broker_messages() + count = 0 + valid = @impl.getXmtMessage(@xmtMessage) + while valid + count += 1 + @conn.impl.sendMessage(@session.handle, @xmtMessage) + @impl.popXmt + valid = @impl.getXmtMessage(@xmtMessage) + end + return count + end + + def do_events() + begin + ccnt = @console.do_console_events + bcnt = do_broker_events + mcnt = do_broker_messages + end until ccnt == 0 and bcnt == 0 and mcnt == 0 + end + + def conn_event_connected() + puts "Console Connection Established..." + @session = Session.new(@conn, "qmfc-%s.%d" % [Socket.gethostname, Process::pid], self) + @impl.sessionOpened(@session.handle) + do_events + end + + def conn_event_disconnected(error) + puts "Console Connection Lost" + end + + def sess_event_session_closed(context, error) + puts "Console Session Lost" + @impl.sessionClosed() + end + + def sess_event_recv(context, message) + @impl.handleRcvMessage(message) + do_events + end + end + ##============================================================================== ## AGENT ##============================================================================== |
