summaryrefslogtreecommitdiff
path: root/cpp/bindings/qmf/python/qmf.py
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/bindings/qmf/python/qmf.py
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/bindings/qmf/python/qmf.py')
-rw-r--r--cpp/bindings/qmf/python/qmf.py1680
1 files changed, 0 insertions, 1680 deletions
diff --git a/cpp/bindings/qmf/python/qmf.py b/cpp/bindings/qmf/python/qmf.py
deleted file mode 100644
index 06d3070841..0000000000
--- a/cpp/bindings/qmf/python/qmf.py
+++ /dev/null
@@ -1,1680 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import sys
-import socket
-import os
-import logging
-from threading import Thread
-from threading import RLock
-from threading import Condition
-import qmfengine
-from qmfengine import (ACCESS_READ_CREATE, ACCESS_READ_ONLY, ACCESS_READ_WRITE)
-from qmfengine import (CLASS_EVENT, CLASS_OBJECT)
-from qmfengine import (DIR_IN, DIR_IN_OUT, DIR_OUT)
-from qmfengine import (TYPE_ABSTIME, TYPE_ARRAY, TYPE_BOOL, TYPE_DELTATIME,
- TYPE_DOUBLE, TYPE_FLOAT, TYPE_INT16, TYPE_INT32, TYPE_INT64,
- TYPE_INT8, TYPE_LIST, TYPE_LSTR, TYPE_MAP, TYPE_OBJECT,
- TYPE_REF, TYPE_SSTR, TYPE_UINT16, TYPE_UINT32, TYPE_UINT64,
- TYPE_UINT8, TYPE_UUID)
-from qmfengine import (O_EQ, O_NE, O_LT, O_LE, O_GT, O_GE, O_RE_MATCH, O_RE_NOMATCH,
- E_NOT, E_AND, E_OR, E_XOR)
-from qmfengine import (SEV_EMERG, SEV_ALERT, SEV_CRIT, SEV_ERROR, SEV_WARN, SEV_NOTICE,
- SEV_INFORM, SEV_DEBUG)
-
-
-def qmf_to_native(val):
- typecode = val.getType()
- if typecode == TYPE_UINT8: return val.asUint()
- elif typecode == TYPE_UINT16: return val.asUint()
- elif typecode == TYPE_UINT32: return val.asUint()
- elif typecode == TYPE_UINT64: return val.asUint64()
- elif typecode == TYPE_SSTR: return val.asString()
- elif typecode == TYPE_LSTR: return val.asString()
- elif typecode == TYPE_ABSTIME: return val.asInt64()
- elif typecode == TYPE_DELTATIME: return val.asUint64()
- elif typecode == TYPE_REF: return ObjectId(val.asObjectId())
- elif typecode == TYPE_BOOL: return val.asBool()
- elif typecode == TYPE_FLOAT: return val.asFloat()
- elif typecode == TYPE_DOUBLE: return val.asDouble()
- elif typecode == TYPE_UUID: return val.asUuid()
- elif typecode == TYPE_INT8: return val.asInt()
- elif typecode == TYPE_INT16: return val.asInt()
- elif typecode == TYPE_INT32: return val.asInt()
- elif typecode == TYPE_INT64: return val.asInt64()
- elif typecode == TYPE_MAP: return value_to_dict(val)
- elif typecode == TYPE_LIST: return value_to_list(val)
- else:
- # when TYPE_OBJECT
- logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) )
- return None
-
-
-def native_to_qmf(target, value):
- val = None
- typecode = None
- if target.__class__ == qmfengine.Value:
- val = target
- typecode = val.getType()
- else:
- typecode = target
- val = qmfengine.Value(typecode)
-
- if typecode == TYPE_UINT8: val.setUint(value)
- elif typecode == TYPE_UINT16: val.setUint(value)
- elif typecode == TYPE_UINT32: val.setUint(value)
- elif typecode == TYPE_UINT64: val.setUint64(value)
- elif typecode == TYPE_SSTR:
- if value: val.setString(value)
- else: val.setString('')
- elif typecode == TYPE_LSTR:
- if value: val.setString(value)
- else: val.setString('')
- elif typecode == TYPE_ABSTIME: val.setInt64(value)
- elif typecode == TYPE_DELTATIME: val.setUint64(value)
- elif typecode == TYPE_REF: val.setObjectId(value.impl)
- elif typecode == TYPE_BOOL: val.setBool(value)
- elif typecode == TYPE_FLOAT: val.setFloat(value)
- elif typecode == TYPE_DOUBLE: val.setDouble(value)
- elif typecode == TYPE_UUID: val.setUuid(value)
- elif typecode == TYPE_INT8: val.setInt(value)
- elif typecode == TYPE_INT16: val.setInt(value)
- elif typecode == TYPE_INT32: val.setInt(value)
- elif typecode == TYPE_INT64: val.setInt64(value)
- elif typecode == TYPE_MAP: dict_to_value(val, value)
- elif typecode == TYPE_LIST: list_to_value(val, value)
- else:
- # when TYPE_OBJECT
- logging.error("Unsupported type for get_attr? '%s'" % str(val.getType()))
- return None
- return val
-
-
-def pick_qmf_type(value):
- if value.__class__ == int:
- if value >= 0:
- if value < 0x100000000: return TYPE_UINT32
- return TYPE_UINT64
- else:
- if value > -0xffffffff: return TYPE_INT32
- return TYPE_INT64
-
- if value.__class__ == long:
- if value >= 0: return TYPE_UINT64
- return TYPE_INT64
-
- if value.__class__ == str:
- if len(value) < 256: return TYPE_SSTR
- return TYPE_LSTR
-
- if value.__class__ == float: return TYPE_DOUBLE
- if value.__class__ == bool: return TYPE_BOOL
- if value == None: return TYPE_BOOL
- if value.__class__ == dict: return TYPE_MAP
- if value.__class__ == list: return TYPE_LIST
-
- raise "QMF type not known for native type %s" % value.__class__
-
-
-def value_to_dict(val):
- if not val.isMap(): raise "value_to_dict must be given a map value"
- mymap = {}
- for i in range(val.keyCount()):
- key = val.key(i)
- mymap[key] = qmf_to_native(val.byKey(key))
- return mymap
-
-
-def dict_to_value(val, mymap):
- for key, value in mymap.items():
- if key.__class__ != str: raise "QMF map key must be a string"
- typecode = pick_qmf_type(value)
- val.insert(key, native_to_qmf(typecode, value))
-
-
-def value_to_list(val):
- mylist = []
- if val.isList():
- for i in range(val.listItemCount()):
- mylist.append(qmf_to_native(val.listItem(i)))
- return mylist
- #if val.isArray():
- # for i in range(val.arrayItemCount()):
- # mylist.append(qmf_to_native(val.arrayItem(i)))
- # return mylist
-
- raise "value_to_list must be given a list value"
-
-
-def list_to_value(val, mylist):
- for item in mylist:
- typecode = pick_qmf_type(item)
- val.appendToList(native_to_qmf(typecode, item))
-
-
- ##==============================================================================
- ## CONNECTION
- ##==============================================================================
-
-class ConnectionSettings(object):
- #attr_reader :impl
- def __init__(self, url=None):
- if url:
- self.impl = qmfengine.ConnectionSettings(url)
- else:
- self.impl = qmfengine.ConnectionSettings()
-
-
- def set_attr(self, key, val):
- if type(val) == str:
- _v = qmfengine.Value(TYPE_LSTR)
- _v.setString(val)
- elif type(val) == int:
- _v = qmfengine.Value(TYPE_UINT32)
- _v.setUint(val)
- elif type(val) == bool:
- _v = qmfengine.Value(TYPE_BOOL)
- _v.setBool(val)
- else:
- raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, type(val)))
-
- good = self.impl.setAttr(key, _v)
- if not good:
- raise Exception("Argument error: unsupported attribute '%s'" % key )
-
-
- def get_attr(self, key):
- _v = self.impl.getAttr(key)
- if _v.isString():
- return _v.asString()
- elif _v.isUint():
- return _v.asUint()
- elif _v.isBool():
- return _v.asBool()
- else:
- raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, str(_v.getType())))
-
-
- def __getattr__(self, name):
- return self.get_attr(name)
-
-
- def __setattr__(self, name, value):
- if name == "impl":
- return super.__setattr__(self, name, value)
- return self.set_attr(name, value)
-
-
-
-class ConnectionHandler:
- def conn_event_connected(self): None
- def conn_event_disconnected(self, error): None
- def conn_event_visit(self): None
- def sess_event_session_closed(self, context, error): None
- def sess_event_recv(self, context, message): None
-
-
-
-class Connection(Thread):
- def __init__(self, settings):
- Thread.__init__(self)
- self._lock = RLock()
- self.impl = qmfengine.ResilientConnection(settings.impl)
- self._sockEngine, self._sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
- self.impl.setNotifyFd(self._sockEngine.fileno())
- self._new_conn_handlers = []
- self._conn_handlers_to_delete = []
- self._conn_handlers = []
- self._connected = False
- self._operational = True
- self.start()
-
-
- def destroy(self, timeout=None):
- logging.debug("Destroying Connection...")
- self._operational = False
- self.kick()
- self.join(timeout)
- logging.debug("... Conn Destroyed!" )
- if self.isAlive():
- logging.error("Error: Connection thread '%s' is hung..." % self.getName())
-
-
- def connected(self):
- return self._connected
-
-
- def kick(self):
- self.impl.notify()
-
-
- def add_conn_handler(self, handler):
- self._lock.acquire()
- try:
- self._new_conn_handlers.append(handler)
- finally:
- self._lock.release()
- self.kick()
-
-
- def del_conn_handler(self, handler):
- self._lock.acquire()
- try:
- self._conn_handlers_to_delete.append(handler)
- finally:
- self._lock.release()
- self.kick()
-
-
- def run(self):
- eventImpl = qmfengine.ResilientConnectionEvent()
- new_handlers = []
- del_handlers = []
- bt_count = 0
-
- while self._operational:
- logging.debug("Connection thread waiting for socket data...")
- self._sock.recv(1)
-
- self._lock.acquire()
- try:
- new_handlers = self._new_conn_handlers
- del_handlers = self._conn_handlers_to_delete
- self._new_conn_handlers = []
- self._conn_handlers_to_delete = []
- finally:
- self._lock.release()
-
- for nh in new_handlers:
- self._conn_handlers.append(nh)
- if self._connected:
- nh.conn_event_connected()
- new_handlers = []
-
- for dh in del_handlers:
- if dh in self._conn_handlers:
- self._conn_handlers.remove(dh)
- del_handlers = []
-
- valid = self.impl.getEvent(eventImpl)
- while valid:
- try:
- if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED:
- logging.debug("Connection thread: CONNECTED event received.")
- self._connected = True
- for h in self._conn_handlers:
- h.conn_event_connected()
-
- elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED:
- logging.debug("Connection thread: DISCONNECTED event received.")
- self._connected = False
- for h in self._conn_handlers:
- h.conn_event_disconnected(eventImpl.errorText)
-
- elif eventImpl.kind == qmfengine.ResilientConnectionEvent.SESSION_CLOSED:
- logging.debug("Connection thread: SESSION_CLOSED event received.")
- eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText)
-
- elif eventImpl.kind == qmfengine.ResilientConnectionEvent.RECV:
- logging.debug("Connection thread: RECV event received.")
- eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message)
- else:
- logging.debug("Connection thread received unknown event: '%s'" % str(eventImpl.kind))
-
- except:
- import traceback
- logging.error( "Exception occurred during Connection event processing:" )
- logging.error( str(sys.exc_info()) )
- if bt_count < 2:
- traceback.print_exc()
- traceback.print_stack()
- bt_count += 1
-
- self.impl.popEvent()
- valid = self.impl.getEvent(eventImpl)
-
- for h in self._conn_handlers:
- h.conn_event_visit()
-
- logging.debug("Shutting down Connection thread")
-
-
-
-class Session:
- def __init__(self, conn, label, handler):
- self._conn = conn
- self._label = label
- self.handler = handler
- self.handle = qmfengine.SessionHandle()
- result = self._conn.impl.createSession(label, self, self.handle)
-
-
- def destroy(self):
- self._conn.impl.destroySession(self.handle)
-
-
-
- ##==============================================================================
- ## OBJECTS and EVENTS
- ##==============================================================================
-
-class QmfEvent(object):
- # attr_reader :impl, :event_class
- def __init__(self, cls, kwargs={}):
- self._allow_sets = True
- if kwargs.has_key("broker"):
- self._broker = kwargs["broker"]
- else:
- self._broker = None
- if cls:
- self.event_class = cls
- self.impl = qmfengine.Event(self.event_class.impl)
- elif kwargs.has_key("impl"):
- self.impl = qmfengine.Event(kwargs["impl"])
- self.event_class = SchemaEventClass(None, None, 0,
- {"impl":self.impl.getClass()})
- else:
- raise Exception("Argument error: required parameter ('impl') not supplied")
-
-
- def arguments(self):
- list = []
- for arg in self.event_class.arguments:
- list.append([arg, self.get_attr(arg.name())])
- return list
-
-
- def get_attr(self, name):
- val = self._value(name)
- return qmf_to_native(val)
-
-
- def set_attr(self, name, v):
- val = self._value(name)
- native_to_qmf(val, v)
-
-
- def __getitem__(self, name):
- return self.get_attr(name)
-
-
- def __setitem__(self, name, value):
- self.set_attr(name, value)
-
-
- def __setattr__(self, name, value):
- #
- # Ignore the internal attributes, set them normally...
- #
- if (name[0] == '_' or
- name == 'impl' or
- name == 'event_class'):
- return super.__setattr__(self, name, value)
-
- if not self._allow_sets:
- raise Exception("'Set' operations not permitted on this object")
-
- #
- # If the name matches an argument name, set the value of the argument.
- #
- # print "set name=%s" % str(name)
- for arg in self.event_class.arguments:
- if arg.name() == name:
- return self.set_attr(name, value)
-
- # unrecognized name? should I raise an exception?
- super.__setattr__(self, name, value)
-
-
- def __getattr__(self, name, *args):
- #
- # If the name matches an argument name, return the value of the argument.
- #
- for arg in self.event_class.arguments:
- if arg.name() == name:
- return self.get_attr(name)
-
- #
- # This name means nothing to us, pass it up the line to the parent
- # class's handler.
- #
- # print "__getattr__=%s" % str(name)
- super.__getattr__(self, name)
-
-
- def _value(self, name):
- val = self.impl.getValue(name)
- if not val:
- raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" %
- (name,
- self.event_class.impl.getClassKey().getPackageName(),
- self.event_class.impl.getClassKey().getClassName()))
- return val
-
-
-class QmfObject(object):
- # attr_reader :impl, :object_class
- def __init__(self, cls, kwargs={}):
- self._cv = Condition()
- self._sync_count = 0
- self._sync_result = None
- self._allow_sets = False
- if kwargs.has_key("broker"):
- self._broker = kwargs["broker"]
- else:
- self._broker = None
- if cls:
- self.object_class = cls
- self.impl = qmfengine.Object(self.object_class.impl)
- elif kwargs.has_key("impl"):
- self.impl = qmfengine.Object(kwargs["impl"])
- self.object_class = SchemaObjectClass(None,
- None,
- {"impl":self.impl.getClass()})
- else:
- raise Exception("Argument error: required parameter ('impl') not supplied")
-
-
- def destroy(self):
- self.impl.destroy()
-
-
- def object_id(self):
- return ObjectId(self.impl.getObjectId())
-
-
- def set_object_id(self, oid):
- self.impl.setObjectId(oid.impl)
-
-
- def properties(self):
- list = []
- for prop in self.object_class.properties:
- list.append([prop, self.get_attr(prop.name())])
- return list
-
-
- def statistics(self):
- list = []
- for stat in self.object_class.statistics:
- list.append([stat, self.get_attr(stat.name())])
- return list
-
-
- def get_attr(self, name):
- val = self._value(name)
- return qmf_to_native(val)
-
-
- def set_attr(self, name, v):
- val = self._value(name)
- native_to_qmf(val, v)
-
-
- def __getitem__(self, name):
- return self.get_attr(name)
-
-
- def __setitem__(self, name, value):
- self.set_attr(name, value)
-
-
- def inc_attr(self, name, by=1):
- self.set_attr(name, self.get_attr(name) + by)
-
-
- def dec_attr(self, name, by=1):
- self.set_attr(name, self.get_attr(name) - by)
-
-
- def __setattr__(self, name, value):
- #
- # Ignore the internal attributes, set them normally...
- #
- if (name[0] == '_' or
- name == 'impl' or
- name == 'object_class'):
- return super.__setattr__(self, name, value)
-
- if not self._allow_sets:
- raise Exception("'Set' operations not permitted on this object")
- #
- # If the name matches a property name, set the value of the property.
- #
- # print "set name=%s" % str(name)
- for prop in self.object_class.properties:
- if prop.name() == name:
- return self.set_attr(name, value)
- #
- # otherwise, check for a statistic set...
- #
- for stat in self.object_class.statistics:
- if stat.name() == name:
- return self.set_attr(name, value)
-
- # unrecognized name? should I raise an exception?
- super.__setattr__(self, name, value)
-
-
- def __getattr__(self, name, *args):
- #
- # If the name matches a property name, return the value of the property.
- #
- for prop in self.object_class.properties:
- if prop.name() == name:
- return self.get_attr(name)
- #
- # Do the same for statistics
- #
- for stat in self.object_class.statistics:
- if stat.name() == name:
- return self.get_attr(name)
- #
- # If we still haven't found a match for the name, check to see if
- # it matches a method name. If so, marshall up the arguments into
- # a map, and invoke the method.
- #
- for method in self.object_class.methods:
- if method.name() == name:
- argMap = self._marshall(method, args)
- return lambda name, argMap : self._invokeMethod(name, argMap)
-
- #
- # This name means nothing to us, pass it up the line to the parent
- # class's handler.
- #
- # print "__getattr__=%s" % str(name)
- super.__getattr__(self, name)
-
-
- def _invokeMethod(self, name, argMap):
- """
- Private: Helper function that invokes an object's method, and waits for the result.
- """
- self._cv.acquire()
- try:
- timeout = 30
- self._sync_count = 1
- self.impl.invokeMethod(name, argMap, self)
- if self._broker:
- self._broker.conn.kick()
- self._cv.wait(timeout)
- if self._sync_count == 1:
- raise Exception("Timed out: waiting for response to method call.")
- finally:
- self._cv.release()
-
- return self._sync_result
-
-
- def _method_result(self, result):
- """
- Called to return the result of a method call on an object
- """
- self._cv.acquire();
- try:
- self._sync_result = result
- self._sync_count -= 1
- self._cv.notify()
- finally:
- self._cv.release()
-
-
- def _marshall(schema, args):
- '''
- Private: Convert a list of arguments (positional) into a Value object of type "map".
- Used to create the argument parameter for an object's method invokation.
- '''
- # Build a map of the method's arguments
- map = qmfengine.Value(TYPE_MAP)
- for arg in schema.arguments:
- if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT:
- map.insert(arg.name, qmfengine.Value(arg.typecode))
-
- # install each argument's value into the map
- marshalled = Arguments(map)
- idx = 0
- for arg in schema.arguments:
- if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT:
- if args[idx]:
- marshalled[arg.name] = args[idx]
- idx += 1
-
- return marshalled.map
-
-
- def _value(self, name):
- val = self.impl.getValue(name)
- if not val:
- raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" %
- (name,
- self.object_class.impl.getClassKey().getPackageName(),
- self.object_class.impl.getClassKey().getClassName()))
- return val
-
-
-
-class AgentObject(QmfObject):
- def __init__(self, cls, kwargs={}):
- QmfObject.__init__(self, cls, kwargs)
- self._allow_sets = True
-
-
- def destroy(self):
- self.impl.destroy()
-
-
- def set_object_id(self, oid):
- self.impl.setObjectId(oid.impl)
-
-
-
-class ConsoleObject(QmfObject):
- # attr_reader :current_time, :create_time, :delete_time
- def __init__(self, cls, kwargs={}):
- QmfObject.__init__(self, cls, kwargs)
-
-
- def update(self):
- if not self._broker:
- raise Exception("No linkage to broker")
- newer = self._broker.console.objects(Query({"object_id":object_id}))
- if newer.size != 1:
- raise Exception("Expected exactly one update for this object, %d present" % int(newer.size))
- self.merge_update(newer[0])
-
-
- def merge_update(self, newObject):
- self.impl.merge(new_object.impl)
-
-
- def is_deleted(self):
- return self.impl.isDeleted()
-
-
- def key(self): pass
-
-
-
-class ObjectId:
- def __init__(self, impl=None):
- if impl:
- self.impl = impl
- else:
- self.impl = qmfengine.ObjectId()
- self.agent_key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank())
-
-
- def object_num_high(self):
- return self.impl.getObjectNumHi()
-
-
- def object_num_low(self):
- return self.impl.getObjectNumLo()
-
-
- def agent_key(self):
- self.agent_key
-
- def __eq__(self, other):
- if not isinstance(other, self.__class__): return False
- return self.impl == other.impl
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def __repr__(self):
- return self.impl.str()
-
-
-
-class Arguments(object):
- def __init__(self, map):
- self.map = map
- self._by_hash = {}
- key_count = self.map.keyCount()
- a = 0
- while a < key_count:
- key = self.map.key(a)
- self._by_hash[key] = qmf_to_native(self.map.byKey(key))
- a += 1
-
-
- def __getitem__(self, key):
- return self._by_hash[key]
-
-
- def __setitem__(self, key, value):
- self._by_hash[key] = value
- self.set(key, value)
-
-
- def __iter__(self):
- return self._by_hash.__iter__
-
-
- def __getattr__(self, name):
- if name in self._by_hash:
- return self._by_hash[name]
- return super.__getattr__(self, name)
-
-
- def __setattr__(self, name, value):
- #
- # ignore local data members
- #
- if (name[0] == '_' or
- name == 'map'):
- return super.__setattr__(self, name, value)
-
- if name in self._by_hash:
- self._by_hash[name] = value
- return self.set(name, value)
-
- return super.__setattr__(self, name, value)
-
-
- def set(self, key, value):
- val = self.map.byKey(key)
- native_to_qmf(val, value)
-
-
-
-class MethodResponse(object):
- def __init__(self, impl):
- self.impl = qmfengine.MethodResponse(impl)
-
-
- def status(self):
- return self.impl.getStatus()
-
-
- def exception(self):
- return self.impl.getException()
-
-
- def text(self):
- return exception().asString()
-
-
- def args(self):
- return Arguments(self.impl.getArgs())
-
-
- def __getattr__(self, name):
- myArgs = self.args()
- return myArgs.__getattr__(name)
-
-
- def __setattr__(self, name, value):
- if name == 'impl':
- return super.__setattr__(self, name, value)
-
- myArgs = self.args()
- return myArgs.__setattr__(name, value)
-
-
-
- ##==============================================================================
- ## QUERY
- ##==============================================================================
-
-
-class Query:
- def __init__(self, kwargs={}):
- if "impl" in kwargs:
- self.impl = kwargs["impl"]
- else:
- package = ''
- if "key" in kwargs:
- # construct using SchemaClassKey:
- self.impl = qmfengine.Query(kwargs["key"])
- elif "object_id" in kwargs:
- self.impl = qmfengine.Query(kwargs["object_id"].impl)
- else:
- if "package" in kwargs:
- package = kwargs["package"]
- if "class" in kwargs:
- self.impl = qmfengine.Query(kwargs["class"], package)
- else:
- raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']")
-
-
- def package_name(self): return self.impl.getPackage()
- def class_name(self): return self.impl.getClass()
- def object_id(self):
- _objid = self.impl.getObjectId()
- if _objid:
- return ObjectId(_objid)
- else:
- return None
-
-
- ##==============================================================================
- ## SCHEMA
- ##==============================================================================
-
-
-
-class SchemaArgument:
- #attr_reader :impl
- def __init__(self, name, typecode, kwargs={}):
- if "impl" in kwargs:
- self.impl = kwargs["impl"]
- else:
- self.impl = qmfengine.SchemaArgument(name, typecode)
- if kwargs.has_key("dir"): self.impl.setDirection(kwargs["dir"])
- if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
-
-
- def name(self):
- return self.impl.getName()
-
-
- def direction(self):
- return self.impl.getDirection()
-
-
- def typecode(self):
- return self.impl.getType()
-
-
- def __repr__(self):
- return self.name()
-
-
-
-class SchemaMethod:
- # attr_reader :impl, arguments
- def __init__(self, name, kwargs={}):
- self.arguments = []
- if "impl" in kwargs:
- self.impl = kwargs["impl"]
- for i in range(self.impl.getArgumentCount()):
- self.arguments.append(SchemaArgument(None,None,{"impl":self.impl.getArgument(i)}))
- else:
- self.impl = qmfengine.SchemaMethod(name)
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
-
-
- def add_argument(self, arg):
- self.arguments.append(arg)
- self.impl.addArgument(arg.impl)
-
- def name(self):
- return self.impl.getName()
-
- def __repr__(self):
- return self.name()
-
-
-
-class SchemaProperty:
- #attr_reader :impl
- def __init__(self, name, typecode, kwargs={}):
- if "impl" in kwargs:
- self.impl = kwargs["impl"]
- else:
- self.impl = qmfengine.SchemaProperty(name, typecode)
- if kwargs.has_key("access"): self.impl.setAccess(kwargs["access"])
- if kwargs.has_key("index"): self.impl.setIndex(kwargs["index"])
- if kwargs.has_key("optional"): self.impl.setOptional(kwargs["optional"])
- if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
-
-
- def name(self):
- return self.impl.getName()
-
- def __repr__(self):
- return self.name()
-
-
-
-class SchemaStatistic:
- # attr_reader :impl
- def __init__(self, name, typecode, kwargs={}):
- if "impl" in kwargs:
- self.impl = kwargs["impl"]
- else:
- self.impl = qmfengine.SchemaStatistic(name, typecode)
- if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
-
-
- def name(self):
- return self.impl.getName()
-
- def __repr__(self):
- return self.name()
-
-
-
-class SchemaClassKey:
- #attr_reader :impl
- def __init__(self, i):
- self.impl = i
-
-
- def package_name(self):
- return self.impl.getPackageName()
-
-
- def class_name(self):
- return self.impl.getClassName()
-
- def __repr__(self):
- return self.impl.asString()
-
-
-
-class SchemaObjectClass:
- # attr_reader :impl, :properties, :statistics, :methods
- def __init__(self, package, name, kwargs={}):
- self.properties = []
- self.statistics = []
- self.methods = []
- if "impl" in kwargs:
- self.impl = kwargs["impl"]
-
- for i in range(self.impl.getPropertyCount()):
- self.properties.append(SchemaProperty(None, None, {"impl":self.impl.getProperty(i)}))
-
- for i in range(self.impl.getStatisticCount()):
- self.statistics.append(SchemaStatistic(None, None, {"impl":self.impl.getStatistic(i)}))
-
- for i in range(self.impl.getMethodCount()):
- self.methods.append(SchemaMethod(None, {"impl":self.impl.getMethod(i)}))
- else:
- self.impl = qmfengine.SchemaObjectClass(package, name)
-
-
- def add_property(self, prop):
- self.properties.append(prop)
- self.impl.addProperty(prop.impl)
-
-
- def add_statistic(self, stat):
- self.statistics.append(stat)
- self.impl.addStatistic(stat.impl)
-
-
- def add_method(self, meth):
- self.methods.append(meth)
- self.impl.addMethod(meth.impl)
-
-
- def class_key(self):
- return SchemaClassKey(self.impl.getClassKey())
-
-
- def package_name(self):
- return self.impl.getClassKey().getPackageName()
-
-
- def class_name(self):
- return self.impl.getClassKey().getClassName()
-
-
-
-class SchemaEventClass:
- # attr_reader :impl :arguments
- def __init__(self, package, name, sev, kwargs={}):
- self.arguments = []
- if "impl" in kwargs:
- self.impl = kwargs["impl"]
- for i in range(self.impl.getArgumentCount()):
- self.arguments.append(SchemaArgument(nil, nil, {"impl":self.impl.getArgument(i)}))
- else:
- self.impl = qmfengine.SchemaEventClass(package, name, sev)
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
-
-
- def add_argument(self, arg):
- self.arguments.append(arg)
- self.impl.addArgument(arg.impl)
-
-
- def name(self):
- return self.impl.getClassKey().getClassName()
-
- def class_key(self):
- return SchemaClassKey(self.impl.getClassKey())
-
-
- def package_name(self):
- return self.impl.getClassKey().getPackageName()
-
-
- def class_name(self):
- return self.impl.getClassKey().getClassName()
-
-
- ##==============================================================================
- ## CONSOLE
- ##==============================================================================
-
-
-
-class ConsoleHandler:
- def agent_added(self, agent): pass
- def agent_deleted(self, agent): pass
- def new_package(self, package): pass
- def new_class(self, class_key): pass
- def object_update(self, obj, hasProps, hasStats): pass
- def event_received(self, event): pass
- def agent_heartbeat(self, agent, timestamp): pass
- def method_response(self, resp): pass
- def broker_info(self, broker): pass
-
-
-
-class Console(Thread):
- # attr_reader :impl
- def __init__(self, handler=None, kwargs={}):
- Thread.__init__(self)
- self._handler = handler
- self.impl = qmfengine.Console()
- self._event = qmfengine.ConsoleEvent()
- self._broker_list = []
- self._cv = Condition()
- self._sync_count = 0
- self._sync_result = None
- self._select = {}
- self._cb_cond = Condition()
- self._operational = True
- self.start()
-
-
- def destroy(self, timeout=None):
- logging.debug("Destroying Console...")
- self._operational = False
- self.start_console_events() # wake thread up
- self.join(timeout)
- logging.debug("... Console Destroyed!")
- if self.isAlive():
- logging.error( "Console thread '%s' is hung..." % self.getName() )
-
-
- def add_connection(self, conn):
- broker = Broker(self, conn)
- self._cv.acquire()
- try:
- self._broker_list.append(broker)
- finally:
- self._cv.release()
- return broker
-
-
- def del_connection(self, broker):
- logging.debug("shutting down broker...")
- broker.shutdown()
- logging.debug("...broker down.")
- self._cv.acquire()
- try:
- self._broker_list.remove(broker)
- finally:
- self._cv.release()
- logging.debug("del_connection() finished")
-
-
- def packages(self):
- plist = []
- for i in range(self.impl.packageCount()):
- plist.append(self.impl.getPackageName(i))
- return plist
-
-
- def classes(self, package, kind=CLASS_OBJECT):
- clist = []
- for i in range(self.impl.classCount(package)):
- key = self.impl.getClass(package, i)
- class_kind = self.impl.getClassKind(key)
- if class_kind == kind:
- if kind == CLASS_OBJECT:
- clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)}))
- elif kind == CLASS_EVENT:
- clist.append(SchemaEventClass(None, None, 0, {"impl":self.impl.getEventClass(key)}))
- return clist
-
-
- def bind_package(self, package):
- return self.impl.bindPackage(package)
-
-
- def bind_class(self, kwargs = {}):
- if "key" in kwargs:
- self.impl.bindClass(kwargs["key"])
- elif "package" in kwargs:
- package = kwargs["package"]
- if "class" in kwargs:
- self.impl.bindClass(package, kwargs["class"])
- else:
- self.impl.bindClass(package, "*")
- else:
- raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']")
-
-
- def bind_event(self, kwargs = {}):
- if "key" in kwargs:
- self.impl.bindEvent(kwargs["key"])
- elif "package" in kwargs:
- package = kwargs["package"]
- if "event" in kwargs:
- self.impl.bindEvent(package, kwargs["event"])
- else:
- self.impl.bindEvent(package, "*")
- else:
- raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'event']")
-
-
- def agents(self, broker=None):
- blist = []
- if broker:
- blist.append(broker)
- else:
- self._cv.acquire()
- try:
- # copy while holding lock
- blist = self._broker_list[:]
- finally:
- self._cv.release()
-
- agents = []
- for b in blist:
- for idx in range(b.impl.agentCount()):
- agents.append(AgentProxy(b.impl.getAgent(idx), b))
-
- return agents
-
-
- def objects(self, query, kwargs = {}):
- timeout = 30
- agent = None
- temp_args = kwargs.copy()
- if type(query) == type({}):
- temp_args.update(query)
-
- if "_timeout" in temp_args:
- timeout = temp_args["_timeout"]
- temp_args.pop("_timeout")
-
- if "_agent" in temp_args:
- agent = temp_args["_agent"]
- temp_args.pop("_agent")
-
- if type(query) == type({}):
- query = Query(temp_args)
-
- self._select = {}
- for k in temp_args.iterkeys():
- if type(k) == str:
- self._select[k] = temp_args[k]
-
- self._cv.acquire()
- try:
- self._sync_count = 1
- self._sync_result = []
- broker = self._broker_list[0]
- broker.send_query(query.impl, None, agent)
- self._cv.wait(timeout)
- if self._sync_count == 1:
- raise Exception("Timed out: waiting for query response")
- finally:
- self._cv.release()
-
- return self._sync_result
-
-
- def object(self, query, kwargs = {}):
- '''
- Return one and only one object or None.
- '''
- objs = objects(query, kwargs)
- if len(objs) == 1:
- return objs[0]
- else:
- return None
-
-
- def first_object(self, query, kwargs = {}):
- '''
- Return the first of potentially many objects.
- '''
- objs = objects(query, kwargs)
- if objs:
- return objs[0]
- else:
- return None
-
-
- # Check the object against select to check for a match
- def _select_match(self, object):
- schema_props = object.properties()
- for key in self._select.iterkeys():
- for prop in schema_props:
- if key == p[0].name() and self._select[key] != p[1]:
- return False
- return True
-
-
- def _get_result(self, list, context):
- '''
- Called by Broker proxy to return the result of a query.
- '''
- self._cv.acquire()
- try:
- for item in list:
- if self._select_match(item):
- self._sync_result.append(item)
- self._sync_count -= 1
- self._cv.notify()
- finally:
- self._cv.release()
-
-
- def start_sync(self, query): pass
-
-
- def touch_sync(self, sync): pass
-
-
- def end_sync(self, sync): pass
-
-
- def run(self):
- while self._operational:
- self._cb_cond.acquire()
- try:
- self._cb_cond.wait(1)
- while self._do_console_events():
- pass
- finally:
- self._cb_cond.release()
- logging.debug("Shutting down Console thread")
-
-
- def start_console_events(self):
- self._cb_cond.acquire()
- try:
- self._cb_cond.notify()
- finally:
- self._cb_cond.release()
-
-
- def _do_console_events(self):
- '''
- Called by the Console thread to poll for events. Passes the events
- onto the ConsoleHandler associated with this Console. Is called
- periodically, but can also be kicked by Console.start_console_events().
- '''
- count = 0
- valid = self.impl.getEvent(self._event)
- while valid:
- count += 1
- try:
- if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
- logging.debug("Console Event AGENT_ADDED received")
- if self._handler:
- self._handler.agent_added(AgentProxy(self._event.agent, None))
- elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
- logging.debug("Console Event AGENT_DELETED received")
- if self._handler:
- self._handler.agent_deleted(AgentProxy(self._event.agent, None))
- elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
- logging.debug("Console Event NEW_PACKAGE received")
- if self._handler:
- self._handler.new_package(self._event.name)
- elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
- logging.debug("Console Event NEW_CLASS received")
- if self._handler:
- self._handler.new_class(SchemaClassKey(self._event.classKey))
- elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
- logging.debug("Console Event OBJECT_UPDATE received")
- if self._handler:
- self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
- self._event.hasProps, self._event.hasStats)
- elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
- logging.debug("Console Event EVENT_RECEIVED received")
- elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
- logging.debug("Console Event AGENT_HEARTBEAT received")
- if self._handler:
- self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
- elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
- logging.debug("Console Event METHOD_RESPONSE received")
- else:
- logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
- except e:
- print "Exception caught in callback thread:", e
- self.impl.popEvent()
- valid = self.impl.getEvent(self._event)
- return count
-
-
-
-class AgentProxy:
- # attr_reader :broker
- def __init__(self, impl, broker):
- self.impl = impl
- self.broker = broker
- self.key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank())
-
-
- def label(self):
- return self.impl.getLabel()
-
-
- def key(self):
- return self.key
-
-
-class Broker(ConnectionHandler):
- # attr_reader :impl :conn, :console, :broker_bank
- def __init__(self, console, conn):
- self.broker_bank = 1
- self.console = console
- self.conn = conn
- self._session = None
- self._cv = Condition()
- self._stable = None
- self._event = qmfengine.BrokerEvent()
- self._xmtMessage = qmfengine.Message()
- self.impl = qmfengine.BrokerProxy(self.console.impl)
- self.console.impl.addConnection(self.impl, self)
- self.conn.add_conn_handler(self)
- self._operational = True
-
-
- def shutdown(self):
- logging.debug("broker.shutdown() called.")
- self.console.impl.delConnection(self.impl)
- self.conn.del_conn_handler(self)
- if self._session:
- self.impl.sessionClosed()
- logging.debug("broker.shutdown() sessionClosed done.")
- self._session.destroy()
- logging.debug("broker.shutdown() session destroy done.")
- self._session = None
- self._operational = False
- logging.debug("broker.shutdown() done.")
-
-
- def wait_for_stable(self, timeout = None):
- self._cv.acquire()
- try:
- if self._stable:
- return
- if timeout:
- self._cv.wait(timeout)
- if not self._stable:
- raise Exception("Timed out: waiting for broker connection to become stable")
- else:
- while not self._stable:
- self._cv.wait()
- finally:
- self._cv.release()
-
-
- def send_query(self, query, ctx, agent):
- agent_impl = None
- if agent:
- agent_impl = agent.impl
- self.impl.sendQuery(query, ctx, agent_impl)
- self.conn.kick()
-
-
- def _do_broker_events(self):
- count = 0
- valid = self.impl.getEvent(self._event)
- while valid:
- count += 1
- if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
- logging.debug("Broker Event BROKER_INFO received");
- elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
- logging.debug("Broker Event DECLARE_QUEUE received");
- self.conn.impl.declareQueue(self._session.handle, self._event.name)
- elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
- logging.debug("Broker Event DELETE_QUEUE received");
- self.conn.impl.deleteQueue(self._session.handle, self._event.name)
- elif self._event.kind == qmfengine.BrokerEvent.BIND:
- logging.debug("Broker Event BIND received");
- self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
- elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
- logging.debug("Broker Event UNBIND received");
- self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
- elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
- logging.debug("Broker Event SETUP_COMPLETE received");
- self.impl.startProtocol()
- elif self._event.kind == qmfengine.BrokerEvent.STABLE:
- logging.debug("Broker Event STABLE received");
- self._cv.acquire()
- try:
- self._stable = True
- self._cv.notify()
- finally:
- self._cv.release()
- elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE:
- result = []
- for idx in range(self._event.queryResponse.getObjectCount()):
- result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self}))
- self.console._get_result(result, self._event.context)
- elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE:
- obj = self._event.context
- obj._method_result(MethodResponse(self._event.methodResponse()))
-
- self.impl.popEvent()
- valid = self.impl.getEvent(self._event)
-
- return count
-
-
- def _do_broker_messages(self):
- count = 0
- valid = self.impl.getXmtMessage(self._xmtMessage)
- while valid:
- count += 1
- logging.debug("Broker: sending msg on connection")
- self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
- self.impl.popXmt()
- valid = self.impl.getXmtMessage(self._xmtMessage)
-
- return count
-
-
- def _do_events(self):
- while True:
- self.console.start_console_events()
- bcnt = self._do_broker_events()
- mcnt = self._do_broker_messages()
- if bcnt == 0 and mcnt == 0:
- break;
-
-
- def conn_event_connected(self):
- logging.debug("Broker: Connection event CONNECTED")
- self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
- self.impl.sessionOpened(self._session.handle)
- self._do_events()
-
-
- def conn_event_disconnected(self, error):
- logging.debug("Broker: Connection event DISCONNECTED")
- pass
-
-
- def conn_event_visit(self):
- self._do_events()
-
-
- def sess_event_session_closed(self, context, error):
- logging.debug("Broker: Session event CLOSED")
- self.impl.sessionClosed()
-
-
- def sess_event_recv(self, context, message):
- logging.debug("Broker: Session event MSG_RECV")
- if not self._operational:
- logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
- self.impl.handleRcvMessage(message)
- self._do_events()
-
-
-
- ##==============================================================================
- ## AGENT
- ##==============================================================================
-
-
-
-class AgentHandler:
- def get_query(self, context, query, userId): None
- def method_call(self, context, name, object_id, args, userId): None
-
-
-
-class Agent(ConnectionHandler):
- def __init__(self, handler, label=""):
- if label == "":
- self._agentLabel = "rb-%s.%d" % (socket.gethostname(), os.getpid())
- else:
- self._agentLabel = label
- self._conn = None
- self._handler = handler
- self.impl = qmfengine.Agent(self._agentLabel)
- self._event = qmfengine.AgentEvent()
- self._xmtMessage = qmfengine.Message()
-
-
- def set_connection(self, conn):
- self._conn = conn
- self._conn.add_conn_handler(self)
-
-
- def register_class(self, cls):
- self.impl.registerClass(cls.impl)
-
-
- def alloc_object_id(self, low = 0, high = 0):
- return ObjectId(self.impl.allocObjectId(low, high))
-
-
- def raise_event(self, event):
- self.impl.raiseEvent(event.impl)
-
- def query_response(self, context, obj):
- self.impl.queryResponse(context, obj.impl)
-
-
- def query_complete(self, context):
- self.impl.queryComplete(context)
-
-
- def method_response(self, context, status, text, arguments):
- self.impl.methodResponse(context, status, text, arguments.map)
-
-
- def do_agent_events(self):
- count = 0
- valid = self.impl.getEvent(self._event)
- while valid:
- count += 1
- if self._event.kind == qmfengine.AgentEvent.GET_QUERY:
- self._handler.get_query(self._event.sequence,
- Query({"impl":self._event.query}),
- self._event.authUserId)
-
- elif self._event.kind == qmfengine.AgentEvent.START_SYNC:
- pass
- elif self._event.kind == qmfengine.AgentEvent.END_SYNC:
- pass
- elif self._event.kind == qmfengine.AgentEvent.METHOD_CALL:
- args = Arguments(self._event.arguments)
- self._handler.method_call(self._event.sequence, self._event.name,
- ObjectId(self._event.objectId),
- args, self._event.authUserId)
-
- elif self._event.kind == qmfengine.AgentEvent.DECLARE_QUEUE:
- self._conn.impl.declareQueue(self._session.handle, self._event.name)
-
- elif self._event.kind == qmfengine.AgentEvent.DELETE_QUEUE:
- self._conn.impl.deleteQueue(self._session.handle, self._event.name)
-
- elif self._event.kind == qmfengine.AgentEvent.BIND:
- self._conn.impl.bind(self._session.handle, self._event.exchange,
- self._event.name, self._event.bindingKey)
-
- elif self._event.kind == qmfengine.AgentEvent.UNBIND:
- self._conn.impl.unbind(self._session.handle, self._event.exchange,
- self._event.name, self._event.bindingKey)
-
- elif self._event.kind == qmfengine.AgentEvent.SETUP_COMPLETE:
- self.impl.startProtocol()
-
- self.impl.popEvent()
- valid = self.impl.getEvent(self._event)
- return count
-
-
- def do_agent_messages(self):
- count = 0
- valid = self.impl.getXmtMessage(self._xmtMessage)
- while valid:
- count += 1
- self._conn.impl.sendMessage(self._session.handle, self._xmtMessage)
- self.impl.popXmt()
- valid = self.impl.getXmtMessage(self._xmtMessage)
- return count
-
-
- def do_events(self):
- while True:
- ecnt = self.do_agent_events()
- mcnt = self.do_agent_messages()
- if ecnt == 0 and mcnt == 0:
- break
-
-
- def conn_event_connected(self):
- logging.debug("Agent Connection Established...")
- self._session = Session(self._conn,
- "qmfa-%s.%d" % (socket.gethostname(), os.getpid()),
- self)
- self.impl.newSession()
- self.do_events()
-
-
- def conn_event_disconnected(self, error):
- logging.debug("Agent Connection Lost")
- pass
-
-
- def conn_event_visit(self):
- self.do_events()
-
-
- def sess_event_session_closed(self, context, error):
- logging.debug("Agent Session Lost")
- pass
-
-
- def sess_event_recv(self, context, message):
- self.impl.handleRcvMessage(message)
- self.do_events()
-
-