summaryrefslogtreecommitdiff
path: root/cpp/bindings/qmf/ruby
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-08-28 22:03:26 +0000
committerTed Ross <tross@apache.org>2009-08-28 22:03:26 +0000
commitdc6068ce7a6bd0d9467886b44cd6252ba491e615 (patch)
tree7bf0b42a58d7f020fc61e81ae53ba7415e8fe91a /cpp/bindings/qmf/ruby
parenta435a75437cd389c6fa08ae171f7d25b1d3a7e77 (diff)
downloadqpid-python-dc6068ce7a6bd0d9467886b44cd6252ba491e615.tar.gz
Major work in the QMF engine.
- The console framework now establishes connectivity with the broker. - The Ruby binding for console is tracking the engine development. - Overall improvements (thread safety in Ruby, etc.) have been added. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@809042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/bindings/qmf/ruby')
-rw-r--r--cpp/bindings/qmf/ruby/qmf.rb298
1 files changed, 258 insertions, 40 deletions
diff --git a/cpp/bindings/qmf/ruby/qmf.rb b/cpp/bindings/qmf/ruby/qmf.rb
index badd53942d..38c42b5aa7 100644
--- a/cpp/bindings/qmf/ruby/qmf.rb
+++ b/cpp/bindings/qmf/ruby/qmf.rb
@@ -20,6 +20,7 @@
require 'qmfengine'
require 'thread'
require 'socket'
+require 'monitor'
module Qmf
@@ -70,39 +71,18 @@ module Qmf
def sess_event_recv(context, message); end
end
- class Query
- attr_reader :impl
- def initialize(i)
- @impl = i
- end
-
- def package_name
- @impl.getPackage
- end
-
- def class_name
- @impl.getClass
- end
-
- def object_id
- objid = @impl.getObjectId
- if objid.class == NilClass
- return nil
- end
- return ObjectId.new(objid)
- end
- end
-
class Connection
+ include MonitorMixin
+
attr_reader :impl
def initialize(settings)
+ super()
@impl = Qmfengine::ResilientConnection.new(settings.impl)
@sockEngine, @sock = Socket::socketpair(Socket::PF_UNIX, Socket::SOCK_STREAM, 0)
@impl.setNotifyFd(@sockEngine.fileno)
@new_conn_handlers = Array.new
@conn_handlers = Array.new
- @sess_handlers = Array.new
@thread = Thread.new do
run
@@ -110,47 +90,56 @@ module Qmf
end
def add_conn_handler(handler)
- @new_conn_handlers.push(handler)
+ synchronize do
+ @new_conn_handlers.push(handler)
+ end
@sockEngine.write("x")
end
- def add_sess_handler(handler)
- @sess_handlers.push(handler)
- end
-
def run()
- event = Qmfengine::ResilientConnectionEvent.new
+ eventImpl = Qmfengine::ResilientConnectionEvent.new
connected = nil
+ new_handlers = nil
+ bt_count = 0
+
while :true
@sock.read(1)
- @new_conn_handlers.each do |nh|
+ synchronize do
+ new_handlers = @new_conn_handlers
+ @new_conn_handlers = Array.new
+ end
+
+ new_handlers.each do |nh|
@conn_handlers.push(nh)
nh.conn_event_connected() if connected
end
- @new_conn_handlers = Array.new
+ new_handlers = nil
- valid = @impl.getEvent(event)
+ valid = @impl.getEvent(eventImpl)
while valid
begin
- case event.kind
+ case eventImpl.kind
when Qmfengine::ResilientConnectionEvent::CONNECTED
connected = :true
@conn_handlers.each { |h| h.conn_event_connected() }
when Qmfengine::ResilientConnectionEvent::DISCONNECTED
connected = nil
- @conn_handlers.each { |h| h.conn_event_disconnected(event.errorText) }
+ @conn_handlers.each { |h| h.conn_event_disconnected(eventImpl.errorText) }
when Qmfengine::ResilientConnectionEvent::SESSION_CLOSED
- event.sessionContext.handler.sess_event_session_closed(event.sessionContext, event.errorText)
+ eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText)
when Qmfengine::ResilientConnectionEvent::RECV
- event.sessionContext.handler.sess_event_recv(event.sessionContext, event.message)
+ eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message)
end
rescue Exception => ex
puts "Event Exception: #{ex}"
- puts ex.backtrace
+ if bt_count < 2
+ puts ex.backtrace
+ bt_count += 1
+ end
end
@impl.popEvent
- valid = @impl.getEvent(event)
+ valid = @impl.getEvent(eventImpl)
end
end
end
@@ -164,9 +153,12 @@ module Qmf
@label = label
@handler = handler
@handle = Qmfengine::SessionHandle.new
- @conn.add_sess_handler(@handler)
result = @conn.impl.createSession(label, self, @handle)
end
+
+ def destroy()
+ @conn.impl.destroySession(@handle)
+ end
end
##==============================================================================
@@ -262,6 +254,30 @@ module Qmf
end
end
+ class ConsoleObject < QmfObject
+ attr_reader :current_time, :create_time, :delete_time
+
+ def initialize(cls)
+ super(cls)
+ end
+
+ def update()
+ end
+
+ def mergeUpdate(newObject)
+ end
+
+ def deleted?()
+ @delete_time > 0
+ end
+
+ def index()
+ end
+
+ def method_missing(name, *args)
+ end
+ end
+
class ObjectId
attr_reader :impl
def initialize(impl=nil)
@@ -357,6 +373,29 @@ module Qmf
end
end
+ class Query
+ attr_reader :impl
+ def initialize(i)
+ @impl = i
+ end
+
+ def package_name
+ @impl.getPackage
+ end
+
+ def class_name
+ @impl.getClass
+ end
+
+ def object_id
+ objid = @impl.getObjectId
+ if objid.class == NilClass
+ return nil
+ end
+ return ObjectId.new(objid)
+ end
+ end
+
##==============================================================================
## SCHEMA
##==============================================================================
@@ -410,6 +449,21 @@ module Qmf
end
end
+ class SchemaClassKey
+ attr_reader :impl
+ def initialize(i)
+ @impl = i
+ end
+
+ def get_package()
+ @impl.getPackageName()
+ end
+
+ def get_class()
+ @impl.getClassName()
+ end
+ end
+
class SchemaObjectClass
attr_reader :impl
def initialize(package, name, kwargs={})
@@ -467,6 +521,170 @@ module Qmf
## CONSOLE
##==============================================================================
+ class ConsoleHandler
+ def agent_added(agent); end
+ def agent_deleted(agent); end
+ def new_package(package); end
+ def new_class(class_key); end
+ def object_update(object, hasProps, hasStats); end
+ def event_received(event); end
+ def agent_heartbeat(agent, timestamp); end
+ def method_response(resp); end
+ def broker_info(broker); end
+ end
+
+ class Console
+ attr_reader :impl
+
+ def initialize(handler, kwargs={})
+ @handler = handler
+ @impl = Qmfengine::ConsoleEngine.new
+ @event = Qmfengine::ConsoleEvent.new
+ @broker_list = Array.new
+ end
+
+ def add_connection(conn)
+ broker = Broker.new(self, conn)
+ @broker_list.push(broker)
+ return broker
+ end
+
+ def del_connection(broker)
+ end
+
+ def get_packages()
+ end
+
+ def get_classes(package)
+ end
+
+ def get_schema(class_key)
+ end
+
+ def bind_package(package)
+ end
+
+ def bind_class(kwargs = {})
+ end
+
+ def get_agents(broker = nil)
+ end
+
+ def get_objects(query, kwargs = {})
+ end
+
+ def start_sync(query)
+ end
+
+ def touch_sync(sync)
+ end
+
+ def end_sync(sync)
+ end
+
+ def do_console_events()
+ count = 0
+ valid = @impl.getEvent(@event)
+ while valid
+ count += 1
+ case @event.kind
+ when Qmfengine::ConsoleEvent::AGENT_ADDED
+ when Qmfengine::ConsoleEvent::AGENT_DELETED
+ when Qmfengine::ConsoleEvent::NEW_PACKAGE
+ when Qmfengine::ConsoleEvent::NEW_CLASS
+ when Qmfengine::ConsoleEvent::OBJECT_UPDATE
+ when Qmfengine::ConsoleEvent::EVENT_RECEIVED
+ when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT
+ when Qmfengine::ConsoleEvent::METHOD_RESPONSE
+ end
+ @impl.popEvent
+ valid = @impl.getEvent(@event)
+ end
+ return count
+ end
+ end
+
+ class Broker < ConnectionHandler
+ attr_reader :impl
+
+ def initialize(console, conn)
+ @console = console
+ @conn = conn
+ @session = nil
+ @event = Qmfengine::BrokerEvent.new
+ @xmtMessage = Qmfengine::Message.new
+ @impl = Qmfengine::BrokerProxy.new(@console.impl)
+ @console.impl.addConnection(@impl, self)
+ @conn.add_conn_handler(self)
+ end
+
+ def do_broker_events()
+ count = 0
+ valid = @impl.getEvent(@event)
+ while valid
+ count += 1
+ puts "Broker Event: #{@event.kind}"
+ case @event.kind
+ when Qmfengine::BrokerEvent::BROKER_INFO
+ when Qmfengine::BrokerEvent::DECLARE_QUEUE
+ @conn.impl.declareQueue(@session.handle, @event.name)
+ when Qmfengine::BrokerEvent::DELETE_QUEUE
+ @conn.impl.deleteQueue(@session.handle, @event.name)
+ when Qmfengine::BrokerEvent::BIND
+ @conn.impl.bind(@session.handle, @event.exchange, @event.name, @event.bindingKey)
+ when Qmfengine::BrokerEvent::UNBIND
+ @conn.impl.unbind(@session.handle, @event.exchange, @event.name, @event.bindingKey)
+ when Qmfengine::BrokerEvent::SETUP_COMPLETE
+ @impl.startProtocol
+ end
+ @impl.popEvent
+ valid = @impl.getEvent(@event)
+ end
+ return count
+ end
+
+ def do_broker_messages()
+ count = 0
+ valid = @impl.getXmtMessage(@xmtMessage)
+ while valid
+ count += 1
+ @conn.impl.sendMessage(@session.handle, @xmtMessage)
+ @impl.popXmt
+ valid = @impl.getXmtMessage(@xmtMessage)
+ end
+ return count
+ end
+
+ def do_events()
+ begin
+ ccnt = @console.do_console_events
+ bcnt = do_broker_events
+ mcnt = do_broker_messages
+ end until ccnt == 0 and bcnt == 0 and mcnt == 0
+ end
+
+ def conn_event_connected()
+ puts "Console Connection Established..."
+ @session = Session.new(@conn, "qmfc-%s.%d" % [Socket.gethostname, Process::pid], self)
+ @impl.sessionOpened(@session.handle)
+ do_events
+ end
+
+ def conn_event_disconnected(error)
+ puts "Console Connection Lost"
+ end
+
+ def sess_event_session_closed(context, error)
+ puts "Console Session Lost"
+ @impl.sessionClosed()
+ end
+
+ def sess_event_recv(context, message)
+ @impl.handleRcvMessage(message)
+ do_events
+ end
+ end
+
##==============================================================================
## AGENT
##==============================================================================