# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys import socket import os import logging from threading import Thread from threading import RLock from threading import Condition import qmfengine from qmfengine import (ACCESS_READ_CREATE, ACCESS_READ_ONLY, ACCESS_READ_WRITE) from qmfengine import (CLASS_EVENT, CLASS_OBJECT) from qmfengine import (DIR_IN, DIR_IN_OUT, DIR_OUT) from qmfengine import (TYPE_ABSTIME, TYPE_ARRAY, TYPE_BOOL, TYPE_DELTATIME, TYPE_DOUBLE, TYPE_FLOAT, TYPE_INT16, TYPE_INT32, TYPE_INT64, TYPE_INT8, TYPE_LIST, TYPE_LSTR, TYPE_MAP, TYPE_OBJECT, TYPE_REF, TYPE_SSTR, TYPE_UINT16, TYPE_UINT32, TYPE_UINT64, TYPE_UINT8, TYPE_UUID) from qmfengine import (O_EQ, O_NE, O_LT, O_LE, O_GT, O_GE, O_RE_MATCH, O_RE_NOMATCH, E_NOT, E_AND, E_OR, E_XOR) from qmfengine import (SEV_EMERG, SEV_ALERT, SEV_CRIT, SEV_ERROR, SEV_WARN, SEV_NOTICE, SEV_INFORM, SEV_DEBUG) def qmf_to_native(val): typecode = val.getType() if typecode == TYPE_UINT8: return val.asUint() elif typecode == TYPE_UINT16: return val.asUint() elif typecode == TYPE_UINT32: return val.asUint() elif typecode == TYPE_UINT64: return val.asUint64() elif typecode == TYPE_SSTR: return val.asString() elif typecode == TYPE_LSTR: return val.asString() elif typecode == TYPE_ABSTIME: return val.asInt64() elif typecode == TYPE_DELTATIME: return val.asUint64() elif typecode == TYPE_REF: return ObjectId(val.asObjectId()) elif typecode == TYPE_BOOL: return val.asBool() elif typecode == TYPE_FLOAT: return val.asFloat() elif typecode == TYPE_DOUBLE: return val.asDouble() elif typecode == TYPE_UUID: return val.asUuid() elif typecode == TYPE_INT8: return val.asInt() elif typecode == TYPE_INT16: return val.asInt() elif typecode == TYPE_INT32: return val.asInt() elif typecode == TYPE_INT64: return val.asInt64() elif typecode == TYPE_MAP: return value_to_dict(val) elif typecode == TYPE_LIST: return value_to_list(val) else: # when TYPE_OBJECT logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) ) return None def native_to_qmf(target, value): val = None typecode = None if target.__class__ == qmfengine.Value: val = target typecode = val.getType() else: typecode = target val = qmfengine.Value(typecode) if typecode == TYPE_UINT8: val.setUint(value) elif typecode == TYPE_UINT16: val.setUint(value) elif typecode == TYPE_UINT32: val.setUint(value) elif typecode == TYPE_UINT64: val.setUint64(value) elif typecode == TYPE_SSTR: if value: val.setString(value) else: val.setString('') elif typecode == TYPE_LSTR: if value: val.setString(value) else: val.setString('') elif typecode == TYPE_ABSTIME: val.setInt64(value) elif typecode == TYPE_DELTATIME: val.setUint64(value) elif typecode == TYPE_REF: val.setObjectId(value.impl) elif typecode == TYPE_BOOL: val.setBool(value) elif typecode == TYPE_FLOAT: val.setFloat(value) elif typecode == TYPE_DOUBLE: val.setDouble(value) elif typecode == TYPE_UUID: val.setUuid(value) elif typecode == TYPE_INT8: val.setInt(value) elif typecode == TYPE_INT16: val.setInt(value) elif typecode == TYPE_INT32: val.setInt(value) elif typecode == TYPE_INT64: val.setInt64(value) elif typecode == TYPE_MAP: dict_to_value(val, value) elif typecode == TYPE_LIST: list_to_value(val, value) else: # when TYPE_OBJECT logging.error("Unsupported type for get_attr? '%s'" % str(val.getType())) return None return val def pick_qmf_type(value): if value.__class__ == int: if value >= 0: if value < 0x100000000: return TYPE_UINT32 return TYPE_UINT64 else: if value > -0xffffffff: return TYPE_INT32 return TYPE_INT64 if value.__class__ == long: if value >= 0: return TYPE_UINT64 return TYPE_INT64 if value.__class__ == str: if len(value) < 256: return TYPE_SSTR return TYPE_LSTR if value.__class__ == float: return TYPE_DOUBLE if value.__class__ == bool: return TYPE_BOOL if value == None: return TYPE_BOOL if value.__class__ == dict: return TYPE_MAP if value.__class__ == list: return TYPE_LIST raise "QMF type not known for native type %s" % value.__class__ def value_to_dict(val): if not val.isMap(): raise "value_to_dict must be given a map value" mymap = {} for i in range(val.keyCount()): key = val.key(i) mymap[key] = qmf_to_native(val.byKey(key)) return mymap def dict_to_value(val, mymap): for key, value in mymap.items(): if key.__class__ != str: raise "QMF map key must be a string" typecode = pick_qmf_type(value) val.insert(key, native_to_qmf(typecode, value)) def value_to_list(val): mylist = [] if val.isList(): for i in range(val.listItemCount()): mylist.append(qmf_to_native(val.listItem(i))) return mylist #if val.isArray(): # for i in range(val.arrayItemCount()): # mylist.append(qmf_to_native(val.arrayItem(i))) # return mylist raise "value_to_list must be given a list value" def list_to_value(val, mylist): for item in mylist: typecode = pick_qmf_type(item) val.appendToList(native_to_qmf(typecode, item)) ##============================================================================== ## CONNECTION ##============================================================================== class ConnectionSettings(object): #attr_reader :impl def __init__(self, url=None): if url: self.impl = qmfengine.ConnectionSettings(url) else: self.impl = qmfengine.ConnectionSettings() def set_attr(self, key, val): if type(val) == str: _v = qmfengine.Value(TYPE_LSTR) _v.setString(val) elif type(val) == int: _v = qmfengine.Value(TYPE_UINT32) _v.setUint(val) elif type(val) == bool: _v = qmfengine.Value(TYPE_BOOL) _v.setBool(val) else: raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, type(val))) good = self.impl.setAttr(key, _v) if not good: raise Exception("Argument error: unsupported attribute '%s'" % key ) def get_attr(self, key): _v = self.impl.getAttr(key) if _v.isString(): return _v.asString() elif _v.isUint(): return _v.asUint() elif _v.isBool(): return _v.asBool() else: raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, str(_v.getType()))) def __getattr__(self, name): return self.get_attr(name) def __setattr__(self, name, value): if name == "impl": return super.__setattr__(self, name, value) return self.set_attr(name, value) class ConnectionHandler: def conn_event_connected(self): None def conn_event_disconnected(self, error): None def conn_event_visit(self): None def sess_event_session_closed(self, context, error): None def sess_event_recv(self, context, message): None class Connection(Thread): def __init__(self, settings): Thread.__init__(self) self._lock = RLock() self.impl = qmfengine.ResilientConnection(settings.impl) self._sockEngine, self._sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) self.impl.setNotifyFd(self._sockEngine.fileno()) self._new_conn_handlers = [] self._conn_handlers_to_delete = [] self._conn_handlers = [] self._connected = False self._operational = True self.start() def destroy(self, timeout=None): logging.debug("Destroying Connection...") self._operational = False self.kick() self.join(timeout) logging.debug("... Conn Destroyed!" ) if self.isAlive(): logging.error("Error: Connection thread '%s' is hung..." % self.getName()) def connected(self): return self._connected def kick(self): self.impl.notify() def add_conn_handler(self, handler): self._lock.acquire() try: self._new_conn_handlers.append(handler) finally: self._lock.release() self.kick() def del_conn_handler(self, handler): self._lock.acquire() try: self._conn_handlers_to_delete.append(handler) finally: self._lock.release() self.kick() def run(self): eventImpl = qmfengine.ResilientConnectionEvent() new_handlers = [] del_handlers = [] bt_count = 0 while self._operational: logging.debug("Connection thread waiting for socket data...") self._sock.recv(1) self._lock.acquire() try: new_handlers = self._new_conn_handlers del_handlers = self._conn_handlers_to_delete self._new_conn_handlers = [] self._conn_handlers_to_delete = [] finally: self._lock.release() for nh in new_handlers: self._conn_handlers.append(nh) if self._connected: nh.conn_event_connected() new_handlers = [] for dh in del_handlers: if dh in self._conn_handlers: self._conn_handlers.remove(dh) del_handlers = [] valid = self.impl.getEvent(eventImpl) while valid: try: if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED: logging.debug("Connection thread: CONNECTED event received.") self._connected = True for h in self._conn_handlers: h.conn_event_connected() elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED: logging.debug("Connection thread: DISCONNECTED event received.") self._connected = False for h in self._conn_handlers: h.conn_event_disconnected(eventImpl.errorText) elif eventImpl.kind == qmfengine.ResilientConnectionEvent.SESSION_CLOSED: logging.debug("Connection thread: SESSION_CLOSED event received.") eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText) elif eventImpl.kind == qmfengine.ResilientConnectionEvent.RECV: logging.debug("Connection thread: RECV event received.") eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message) else: logging.debug("Connection thread received unknown event: '%s'" % str(eventImpl.kind)) except: import traceback logging.error( "Exception occurred during Connection event processing:" ) logging.error( str(sys.exc_info()) ) if bt_count < 2: traceback.print_exc() traceback.print_stack() bt_count += 1 self.impl.popEvent() valid = self.impl.getEvent(eventImpl) for h in self._conn_handlers: h.conn_event_visit() logging.debug("Shutting down Connection thread") class Session: def __init__(self, conn, label, handler): self._conn = conn self._label = label self.handler = handler self.handle = qmfengine.SessionHandle() result = self._conn.impl.createSession(label, self, self.handle) def destroy(self): self._conn.impl.destroySession(self.handle) ##============================================================================== ## OBJECTS and EVENTS ##============================================================================== class QmfEvent(object): # attr_reader :impl, :event_class def __init__(self, cls, kwargs={}): self._allow_sets = True if kwargs.has_key("broker"): self._broker = kwargs["broker"] else: self._broker = None if cls: self.event_class = cls self.impl = qmfengine.Event(self.event_class.impl) elif kwargs.has_key("impl"): self.impl = qmfengine.Event(kwargs["impl"]) self.event_class = SchemaEventClass(None, None, 0, {"impl":self.impl.getClass()}) else: raise Exception("Argument error: required parameter ('impl') not supplied") def arguments(self): list = [] for arg in self.event_class.arguments: list.append([arg, self.get_attr(arg.name())]) return list def get_attr(self, name): val = self._value(name) return qmf_to_native(val) def set_attr(self, name, v): val = self._value(name) native_to_qmf(val, v) def __getitem__(self, name): return self.get_attr(name) def __setitem__(self, name, value): self.set_attr(name, value) def __setattr__(self, name, value): # # Ignore the internal attributes, set them normally... # if (name[0] == '_' or name == 'impl' or name == 'event_class'): return super.__setattr__(self, name, value) if not self._allow_sets: raise Exception("'Set' operations not permitted on this object") # # If the name matches an argument name, set the value of the argument. # # print "set name=%s" % str(name) for arg in self.event_class.arguments: if arg.name() == name: return self.set_attr(name, value) # unrecognized name? should I raise an exception? super.__setattr__(self, name, value) def __getattr__(self, name, *args): # # If the name matches an argument name, return the value of the argument. # for arg in self.event_class.arguments: if arg.name() == name: return self.get_attr(name) # # This name means nothing to us, pass it up the line to the parent # class's handler. # # print "__getattr__=%s" % str(name) super.__getattr__(self, name) def _value(self, name): val = self.impl.getValue(name) if not val: raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % (name, self.event_class.impl.getClassKey().getPackageName(), self.event_class.impl.getClassKey().getClassName())) return val class QmfObject(object): # attr_reader :impl, :object_class def __init__(self, cls, kwargs={}): self._cv = Condition() self._sync_count = 0 self._sync_result = None self._allow_sets = False if kwargs.has_key("broker"): self._broker = kwargs["broker"] else: self._broker = None if cls: self.object_class = cls self.impl = qmfengine.Object(self.object_class.impl) elif kwargs.has_key("impl"): self.impl = qmfengine.Object(kwargs["impl"]) self.object_class = SchemaObjectClass(None, None, {"impl":self.impl.getClass()}) else: raise Exception("Argument error: required parameter ('impl') not supplied") def destroy(self): self.impl.destroy() def object_id(self): return ObjectId(self.impl.getObjectId()) def set_object_id(self, oid): self.impl.setObjectId(oid.impl) def properties(self): list = [] for prop in self.object_class.properties: list.append([prop, self.get_attr(prop.name())]) return list def statistics(self): list = [] for stat in self.object_class.statistics: list.append([stat, self.get_attr(stat.name())]) return list def get_attr(self, name): val = self._value(name) return qmf_to_native(val) def set_attr(self, name, v): val = self._value(name) native_to_qmf(val, v) def __getitem__(self, name): return self.get_attr(name) def __setitem__(self, name, value): self.set_attr(name, value) def inc_attr(self, name, by=1): self.set_attr(name, self.get_attr(name) + by) def dec_attr(self, name, by=1): self.set_attr(name, self.get_attr(name) - by) def __setattr__(self, name, value): # # Ignore the internal attributes, set them normally... # if (name[0] == '_' or name == 'impl' or name == 'object_class'): return super.__setattr__(self, name, value) if not self._allow_sets: raise Exception("'Set' operations not permitted on this object") # # If the name matches a property name, set the value of the property. # # print "set name=%s" % str(name) for prop in self.object_class.properties: if prop.name() == name: return self.set_attr(name, value) # # otherwise, check for a statistic set... # for stat in self.object_class.statistics: if stat.name() == name: return self.set_attr(name, value) # unrecognized name? should I raise an exception? super.__setattr__(self, name, value) def __getattr__(self, name, *args): # # If the name matches a property name, return the value of the property. # for prop in self.object_class.properties: if prop.name() == name: return self.get_attr(name) # # Do the same for statistics # for stat in self.object_class.statistics: if stat.name() == name: return self.get_attr(name) # # If we still haven't found a match for the name, check to see if # it matches a method name. If so, marshall up the arguments into # a map, and invoke the method. # for method in self.object_class.methods: if method.name() == name: argMap = self._marshall(method, args) return lambda name, argMap : self._invokeMethod(name, argMap) # # This name means nothing to us, pass it up the line to the parent # class's handler. # # print "__getattr__=%s" % str(name) super.__getattr__(self, name) def _invokeMethod(self, name, argMap): """ Private: Helper function that invokes an object's method, and waits for the result. """ self._cv.acquire() try: timeout = 30 self._sync_count = 1 self.impl.invokeMethod(name, argMap, self) if self._broker: self._broker.conn.kick() self._cv.wait(timeout) if self._sync_count == 1: raise Exception("Timed out: waiting for response to method call.") finally: self._cv.release() return self._sync_result def _method_result(self, result): """ Called to return the result of a method call on an object """ self._cv.acquire(); try: self._sync_result = result self._sync_count -= 1 self._cv.notify() finally: self._cv.release() def _marshall(schema, args): ''' Private: Convert a list of arguments (positional) into a Value object of type "map". Used to create the argument parameter for an object's method invokation. ''' # Build a map of the method's arguments map = qmfengine.Value(TYPE_MAP) for arg in schema.arguments: if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: map.insert(arg.name, qmfengine.Value(arg.typecode)) # install each argument's value into the map marshalled = Arguments(map) idx = 0 for arg in schema.arguments: if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: if args[idx]: marshalled[arg.name] = args[idx] idx += 1 return marshalled.map def _value(self, name): val = self.impl.getValue(name) if not val: raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % (name, self.object_class.impl.getClassKey().getPackageName(), self.object_class.impl.getClassKey().getClassName())) return val class AgentObject(QmfObject): def __init__(self, cls, kwargs={}): QmfObject.__init__(self, cls, kwargs) self._allow_sets = True def destroy(self): self.impl.destroy() def set_object_id(self, oid): self.impl.setObjectId(oid.impl) class ConsoleObject(QmfObject): # attr_reader :current_time, :create_time, :delete_time def __init__(self, cls, kwargs={}): QmfObject.__init__(self, cls, kwargs) def update(self): if not self._broker: raise Exception("No linkage to broker") newer = self._broker.console.objects(Query({"object_id":object_id})) if newer.size != 1: raise Exception("Expected exactly one update for this object, %d present" % int(newer.size)) self.merge_update(newer[0]) def merge_update(self, newObject): self.impl.merge(new_object.impl) def is_deleted(self): return self.impl.isDeleted() def key(self): pass class ObjectId: def __init__(self, impl=None): if impl: self.impl = impl else: self.impl = qmfengine.ObjectId() self.agent_key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank()) def object_num_high(self): return self.impl.getObjectNumHi() def object_num_low(self): return self.impl.getObjectNumLo() def agent_key(self): self.agent_key def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.impl == other.impl def __ne__(self, other): return not self.__eq__(other) def __repr__(self): return self.impl.str() class Arguments(object): def __init__(self, map): self.map = map self._by_hash = {} key_count = self.map.keyCount() a = 0 while a < key_count: key = self.map.key(a) self._by_hash[key] = qmf_to_native(self.map.byKey(key)) a += 1 def __getitem__(self, key): return self._by_hash[key] def __setitem__(self, key, value): self._by_hash[key] = value self.set(key, value) def __iter__(self): return self._by_hash.__iter__ def __getattr__(self, name): if name in self._by_hash: return self._by_hash[name] return super.__getattr__(self, name) def __setattr__(self, name, value): # # ignore local data members # if (name[0] == '_' or name == 'map'): return super.__setattr__(self, name, value) if name in self._by_hash: self._by_hash[name] = value return self.set(name, value) return super.__setattr__(self, name, value) def set(self, key, value): val = self.map.byKey(key) native_to_qmf(val, value) class MethodResponse(object): def __init__(self, impl): self.impl = qmfengine.MethodResponse(impl) def status(self): return self.impl.getStatus() def exception(self): return self.impl.getException() def text(self): return exception().asString() def args(self): return Arguments(self.impl.getArgs()) def __getattr__(self, name): myArgs = self.args() return myArgs.__getattr__(name) def __setattr__(self, name, value): if name == 'impl': return super.__setattr__(self, name, value) myArgs = self.args() return myArgs.__setattr__(name, value) ##============================================================================== ## QUERY ##============================================================================== class Query: def __init__(self, kwargs={}): if "impl" in kwargs: self.impl = kwargs["impl"] else: package = '' if "key" in kwargs: # construct using SchemaClassKey: self.impl = qmfengine.Query(kwargs["key"]) elif "object_id" in kwargs: self.impl = qmfengine.Query(kwargs["object_id"].impl) else: if "package" in kwargs: package = kwargs["package"] if "class" in kwargs: self.impl = qmfengine.Query(kwargs["class"], package) else: raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']") def package_name(self): return self.impl.getPackage() def class_name(self): return self.impl.getClass() def object_id(self): _objid = self.impl.getObjectId() if _objid: return ObjectId(_objid) else: return None ##============================================================================== ## SCHEMA ##============================================================================== class SchemaArgument: #attr_reader :impl def __init__(self, name, typecode, kwargs={}): if "impl" in kwargs: self.impl = kwargs["impl"] else: self.impl = qmfengine.SchemaArgument(name, typecode) if kwargs.has_key("dir"): self.impl.setDirection(kwargs["dir"]) if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) def name(self): return self.impl.getName() def direction(self): return self.impl.getDirection() def typecode(self): return self.impl.getType() def __repr__(self): return self.name() class SchemaMethod: # attr_reader :impl, arguments def __init__(self, name, kwargs={}): self.arguments = [] if "impl" in kwargs: self.impl = kwargs["impl"] for i in range(self.impl.getArgumentCount()): self.arguments.append(SchemaArgument(None,None,{"impl":self.impl.getArgument(i)})) else: self.impl = qmfengine.SchemaMethod(name) if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) def add_argument(self, arg): self.arguments.append(arg) self.impl.addArgument(arg.impl) def name(self): return self.impl.getName() def __repr__(self): return self.name() class SchemaProperty: #attr_reader :impl def __init__(self, name, typecode, kwargs={}): if "impl" in kwargs: self.impl = kwargs["impl"] else: self.impl = qmfengine.SchemaProperty(name, typecode) if kwargs.has_key("access"): self.impl.setAccess(kwargs["access"]) if kwargs.has_key("index"): self.impl.setIndex(kwargs["index"]) if kwargs.has_key("optional"): self.impl.setOptional(kwargs["optional"]) if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) def name(self): return self.impl.getName() def __repr__(self): return self.name() class SchemaStatistic: # attr_reader :impl def __init__(self, name, typecode, kwargs={}): if "impl" in kwargs: self.impl = kwargs["impl"] else: self.impl = qmfengine.SchemaStatistic(name, typecode) if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) def name(self): return self.impl.getName() def __repr__(self): return self.name() class SchemaClassKey: #attr_reader :impl def __init__(self, i): self.impl = i def package_name(self): return self.impl.getPackageName() def class_name(self): return self.impl.getClassName() def __repr__(self): return self.impl.asString() class SchemaObjectClass: # attr_reader :impl, :properties, :statistics, :methods def __init__(self, package, name, kwargs={}): self.properties = [] self.statistics = [] self.methods = [] if "impl" in kwargs: self.impl = kwargs["impl"] for i in range(self.impl.getPropertyCount()): self.properties.append(SchemaProperty(None, None, {"impl":self.impl.getProperty(i)})) for i in range(self.impl.getStatisticCount()): self.statistics.append(SchemaStatistic(None, None, {"impl":self.impl.getStatistic(i)})) for i in range(self.impl.getMethodCount()): self.methods.append(SchemaMethod(None, {"impl":self.impl.getMethod(i)})) else: self.impl = qmfengine.SchemaObjectClass(package, name) def add_property(self, prop): self.properties.append(prop) self.impl.addProperty(prop.impl) def add_statistic(self, stat): self.statistics.append(stat) self.impl.addStatistic(stat.impl) def add_method(self, meth): self.methods.append(meth) self.impl.addMethod(meth.impl) def class_key(self): return SchemaClassKey(self.impl.getClassKey()) def package_name(self): return self.impl.getClassKey().getPackageName() def class_name(self): return self.impl.getClassKey().getClassName() class SchemaEventClass: # attr_reader :impl :arguments def __init__(self, package, name, sev, kwargs={}): self.arguments = [] if "impl" in kwargs: self.impl = kwargs["impl"] for i in range(self.impl.getArgumentCount()): self.arguments.append(SchemaArgument(nil, nil, {"impl":self.impl.getArgument(i)})) else: self.impl = qmfengine.SchemaEventClass(package, name, sev) if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) def add_argument(self, arg): self.arguments.append(arg) self.impl.addArgument(arg.impl) def name(self): return self.impl.getClassKey().getClassName() def class_key(self): return SchemaClassKey(self.impl.getClassKey()) def package_name(self): return self.impl.getClassKey().getPackageName() def class_name(self): return self.impl.getClassKey().getClassName() ##============================================================================== ## CONSOLE ##============================================================================== class ConsoleHandler: def agent_added(self, agent): pass def agent_deleted(self, agent): pass def new_package(self, package): pass def new_class(self, class_key): pass def object_update(self, obj, hasProps, hasStats): pass def event_received(self, event): pass def agent_heartbeat(self, agent, timestamp): pass def method_response(self, resp): pass def broker_info(self, broker): pass class Console(Thread): # attr_reader :impl def __init__(self, handler=None, kwargs={}): Thread.__init__(self) self._handler = handler self.impl = qmfengine.Console() self._event = qmfengine.ConsoleEvent() self._broker_list = [] self._cv = Condition() self._sync_count = 0 self._sync_result = None self._select = {} self._cb_cond = Condition() self._operational = True self.start() def destroy(self, timeout=None): logging.debug("Destroying Console...") self._operational = False self.start_console_events() # wake thread up self.join(timeout) logging.debug("... Console Destroyed!") if self.isAlive(): logging.error( "Console thread '%s' is hung..." % self.getName() ) def add_connection(self, conn): broker = Broker(self, conn) self._cv.acquire() try: self._broker_list.append(broker) finally: self._cv.release() return broker def del_connection(self, broker): logging.debug("shutting down broker...") broker.shutdown() logging.debug("...broker down.") self._cv.acquire() try: self._broker_list.remove(broker) finally: self._cv.release() logging.debug("del_connection() finished") def packages(self): plist = [] for i in range(self.impl.packageCount()): plist.append(self.impl.getPackageName(i)) return plist def classes(self, package, kind=CLASS_OBJECT): clist = [] for i in range(self.impl.classCount(package)): key = self.impl.getClass(package, i) class_kind = self.impl.getClassKind(key) if class_kind == kind: if kind == CLASS_OBJECT: clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) elif kind == CLASS_EVENT: clist.append(SchemaEventClass(None, None, 0, {"impl":self.impl.getEventClass(key)})) return clist def bind_package(self, package): return self.impl.bindPackage(package) def bind_class(self, kwargs = {}): if "key" in kwargs: self.impl.bindClass(kwargs["key"]) elif "package" in kwargs: package = kwargs["package"] if "class" in kwargs: self.impl.bindClass(package, kwargs["class"]) else: self.impl.bindClass(package, "*") else: raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") def bind_event(self, kwargs = {}): if "key" in kwargs: self.impl.bindEvent(kwargs["key"]) elif "package" in kwargs: package = kwargs["package"] if "event" in kwargs: self.impl.bindEvent(package, kwargs["event"]) else: self.impl.bindEvent(package, "*") else: raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'event']") def agents(self, broker=None): blist = [] if broker: blist.append(broker) else: self._cv.acquire() try: # copy while holding lock blist = self._broker_list[:] finally: self._cv.release() agents = [] for b in blist: for idx in range(b.impl.agentCount()): agents.append(AgentProxy(b.impl.getAgent(idx), b)) return agents def objects(self, query, kwargs = {}): timeout = 30 agent = None temp_args = kwargs.copy() if type(query) == type({}): temp_args.update(query) if "_timeout" in temp_args: timeout = temp_args["_timeout"] temp_args.pop("_timeout") if "_agent" in temp_args: agent = temp_args["_agent"] temp_args.pop("_agent") if type(query) == type({}): query = Query(temp_args) self._select = {} for k in temp_args.iterkeys(): if type(k) == str: self._select[k] = temp_args[k] self._cv.acquire() try: self._sync_count = 1 self._sync_result = [] broker = self._broker_list[0] broker.send_query(query.impl, None, agent) self._cv.wait(timeout) if self._sync_count == 1: raise Exception("Timed out: waiting for query response") finally: self._cv.release() return self._sync_result def object(self, query, kwargs = {}): ''' Return one and only one object or None. ''' objs = objects(query, kwargs) if len(objs) == 1: return objs[0] else: return None def first_object(self, query, kwargs = {}): ''' Return the first of potentially many objects. ''' objs = objects(query, kwargs) if objs: return objs[0] else: return None # Check the object against select to check for a match def _select_match(self, object): schema_props = object.properties() for key in self._select.iterkeys(): for prop in schema_props: if key == p[0].name() and self._select[key] != p[1]: return False return True def _get_result(self, list, context): ''' Called by Broker proxy to return the result of a query. ''' self._cv.acquire() try: for item in list: if self._select_match(item): self._sync_result.append(item) self._sync_count -= 1 self._cv.notify() finally: self._cv.release() def start_sync(self, query): pass def touch_sync(self, sync): pass def end_sync(self, sync): pass def run(self): while self._operational: self._cb_cond.acquire() try: self._cb_cond.wait(1) while self._do_console_events(): pass finally: self._cb_cond.release() logging.debug("Shutting down Console thread") def start_console_events(self): self._cb_cond.acquire() try: self._cb_cond.notify() finally: self._cb_cond.release() def _do_console_events(self): ''' Called by the Console thread to poll for events. Passes the events onto the ConsoleHandler associated with this Console. Is called periodically, but can also be kicked by Console.start_console_events(). ''' count = 0 valid = self.impl.getEvent(self._event) while valid: count += 1 try: if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: logging.debug("Console Event AGENT_ADDED received") if self._handler: self._handler.agent_added(AgentProxy(self._event.agent, None)) elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: logging.debug("Console Event AGENT_DELETED received") if self._handler: self._handler.agent_deleted(AgentProxy(self._event.agent, None)) elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: logging.debug("Console Event NEW_PACKAGE received") if self._handler: self._handler.new_package(self._event.name) elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: logging.debug("Console Event NEW_CLASS received") if self._handler: self._handler.new_class(SchemaClassKey(self._event.classKey)) elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: logging.debug("Console Event OBJECT_UPDATE received") if self._handler: self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), self._event.hasProps, self._event.hasStats) elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: logging.debug("Console Event EVENT_RECEIVED received") elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: logging.debug("Console Event AGENT_HEARTBEAT received") if self._handler: self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: logging.debug("Console Event METHOD_RESPONSE received") else: logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) except e: print "Exception caught in callback thread:", e self.impl.popEvent() valid = self.impl.getEvent(self._event) return count class AgentProxy: # attr_reader :broker def __init__(self, impl, broker): self.impl = impl self.broker = broker self.key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank()) def label(self): return self.impl.getLabel() def key(self): return self.key class Broker(ConnectionHandler): # attr_reader :impl :conn, :console, :broker_bank def __init__(self, console, conn): self.broker_bank = 1 self.console = console self.conn = conn self._session = None self._cv = Condition() self._stable = None self._event = qmfengine.BrokerEvent() self._xmtMessage = qmfengine.Message() self.impl = qmfengine.BrokerProxy(self.console.impl) self.console.impl.addConnection(self.impl, self) self.conn.add_conn_handler(self) self._operational = True def shutdown(self): logging.debug("broker.shutdown() called.") self.console.impl.delConnection(self.impl) self.conn.del_conn_handler(self) if self._session: self.impl.sessionClosed() logging.debug("broker.shutdown() sessionClosed done.") self._session.destroy() logging.debug("broker.shutdown() session destroy done.") self._session = None self._operational = False logging.debug("broker.shutdown() done.") def wait_for_stable(self, timeout = None): self._cv.acquire() try: if self._stable: return if timeout: self._cv.wait(timeout) if not self._stable: raise Exception("Timed out: waiting for broker connection to become stable") else: while not self._stable: self._cv.wait() finally: self._cv.release() def send_query(self, query, ctx, agent): agent_impl = None if agent: agent_impl = agent.impl self.impl.sendQuery(query, ctx, agent_impl) self.conn.kick() def _do_broker_events(self): count = 0 valid = self.impl.getEvent(self._event) while valid: count += 1 if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: logging.debug("Broker Event BROKER_INFO received"); elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: logging.debug("Broker Event DECLARE_QUEUE received"); self.conn.impl.declareQueue(self._session.handle, self._event.name) elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: logging.debug("Broker Event DELETE_QUEUE received"); self.conn.impl.deleteQueue(self._session.handle, self._event.name) elif self._event.kind == qmfengine.BrokerEvent.BIND: logging.debug("Broker Event BIND received"); self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) elif self._event.kind == qmfengine.BrokerEvent.UNBIND: logging.debug("Broker Event UNBIND received"); self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: logging.debug("Broker Event SETUP_COMPLETE received"); self.impl.startProtocol() elif self._event.kind == qmfengine.BrokerEvent.STABLE: logging.debug("Broker Event STABLE received"); self._cv.acquire() try: self._stable = True self._cv.notify() finally: self._cv.release() elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: result = [] for idx in range(self._event.queryResponse.getObjectCount()): result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) self.console._get_result(result, self._event.context) elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: obj = self._event.context obj._method_result(MethodResponse(self._event.methodResponse())) self.impl.popEvent() valid = self.impl.getEvent(self._event) return count def _do_broker_messages(self): count = 0 valid = self.impl.getXmtMessage(self._xmtMessage) while valid: count += 1 logging.debug("Broker: sending msg on connection") self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) self.impl.popXmt() valid = self.impl.getXmtMessage(self._xmtMessage) return count def _do_events(self): while True: self.console.start_console_events() bcnt = self._do_broker_events() mcnt = self._do_broker_messages() if bcnt == 0 and mcnt == 0: break; def conn_event_connected(self): logging.debug("Broker: Connection event CONNECTED") self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) self.impl.sessionOpened(self._session.handle) self._do_events() def conn_event_disconnected(self, error): logging.debug("Broker: Connection event DISCONNECTED") pass def conn_event_visit(self): self._do_events() def sess_event_session_closed(self, context, error): logging.debug("Broker: Session event CLOSED") self.impl.sessionClosed() def sess_event_recv(self, context, message): logging.debug("Broker: Session event MSG_RECV") if not self._operational: logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) self.impl.handleRcvMessage(message) self._do_events() ##============================================================================== ## AGENT ##============================================================================== class AgentHandler: def get_query(self, context, query, userId): None def method_call(self, context, name, object_id, args, userId): None class Agent(ConnectionHandler): def __init__(self, handler, label=""): if label == "": self._agentLabel = "rb-%s.%d" % (socket.gethostname(), os.getpid()) else: self._agentLabel = label self._conn = None self._handler = handler self.impl = qmfengine.Agent(self._agentLabel) self._event = qmfengine.AgentEvent() self._xmtMessage = qmfengine.Message() def set_connection(self, conn): self._conn = conn self._conn.add_conn_handler(self) def register_class(self, cls): self.impl.registerClass(cls.impl) def alloc_object_id(self, low = 0, high = 0): return ObjectId(self.impl.allocObjectId(low, high)) def raise_event(self, event): self.impl.raiseEvent(event.impl) def query_response(self, context, obj): self.impl.queryResponse(context, obj.impl) def query_complete(self, context): self.impl.queryComplete(context) def method_response(self, context, status, text, arguments): self.impl.methodResponse(context, status, text, arguments.map) def do_agent_events(self): count = 0 valid = self.impl.getEvent(self._event) while valid: count += 1 if self._event.kind == qmfengine.AgentEvent.GET_QUERY: self._handler.get_query(self._event.sequence, Query({"impl":self._event.query}), self._event.authUserId) elif self._event.kind == qmfengine.AgentEvent.START_SYNC: pass elif self._event.kind == qmfengine.AgentEvent.END_SYNC: pass elif self._event.kind == qmfengine.AgentEvent.METHOD_CALL: args = Arguments(self._event.arguments) self._handler.method_call(self._event.sequence, self._event.name, ObjectId(self._event.objectId), args, self._event.authUserId) elif self._event.kind == qmfengine.AgentEvent.DECLARE_QUEUE: self._conn.impl.declareQueue(self._session.handle, self._event.name) elif self._event.kind == qmfengine.AgentEvent.DELETE_QUEUE: self._conn.impl.deleteQueue(self._session.handle, self._event.name) elif self._event.kind == qmfengine.AgentEvent.BIND: self._conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) elif self._event.kind == qmfengine.AgentEvent.UNBIND: self._conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) elif self._event.kind == qmfengine.AgentEvent.SETUP_COMPLETE: self.impl.startProtocol() self.impl.popEvent() valid = self.impl.getEvent(self._event) return count def do_agent_messages(self): count = 0 valid = self.impl.getXmtMessage(self._xmtMessage) while valid: count += 1 self._conn.impl.sendMessage(self._session.handle, self._xmtMessage) self.impl.popXmt() valid = self.impl.getXmtMessage(self._xmtMessage) return count def do_events(self): while True: ecnt = self.do_agent_events() mcnt = self.do_agent_messages() if ecnt == 0 and mcnt == 0: break def conn_event_connected(self): logging.debug("Agent Connection Established...") self._session = Session(self._conn, "qmfa-%s.%d" % (socket.gethostname(), os.getpid()), self) self.impl.newSession() self.do_events() def conn_event_disconnected(self, error): logging.debug("Agent Connection Lost") pass def conn_event_visit(self): self.do_events() def sess_event_session_closed(self, context, error): logging.debug("Agent Session Lost") pass def sess_event_recv(self, context, message): self.impl.handleRcvMessage(message) self.do_events()