# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # """ Console API for Qpid Management Framework """ import os import platform import qpid import struct import socket import re from qpid.peer import Closed from qpid.session import SessionDetached from qpid.connection import Connection, ConnectionFailed from qpid.datatypes import Message, RangedSet from qpid.util import connect, ssl, URL from qpid.codec010 import StringCodec as Codec from threading import Lock, Condition, Thread from time import time, strftime, gmtime from cStringIO import StringIO class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ def brokerConnected(self, broker): """ Invoked when a connection is established to a broker """ pass def brokerDisconnected(self, broker): """ Invoked when the connection to a broker is lost """ pass def newPackage(self, name): """ Invoked when a QMF package is discovered. """ pass def newClass(self, kind, classKey): """ Invoked when a new class is discovered. Session.getSchema can be used to obtain details about the class.""" pass def newAgent(self, agent): """ Invoked when a QMF agent is discovered. """ pass def delAgent(self, agent): """ Invoked when a QMF agent disconects. """ pass def objectProps(self, broker, record): """ Invoked when an object is updated. """ pass def objectStats(self, broker, record): """ Invoked when an object is updated. """ pass def event(self, broker, event): """ Invoked when an event is raised. """ pass def heartbeat(self, agent, timestamp): """ """ pass def brokerInfo(self, broker): """ """ pass def methodResponse(self, broker, seq, response): """ """ pass class BrokerURL(URL): def __init__(self, text): URL.__init__(self, text) socket.gethostbyname(self.host) if self.port is None: if self.scheme == URL.AMQPS: self.port = 5671 else: self.port = 5672 self.authName = str(self.user or "guest") self.authPass = str(self.password or "guest") self.authMech = "PLAIN" def name(self): return self.host + ":" + str(self.port) def match(self, host, port): return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port class Session: """ An instance of the Session class represents a console session running against one or more QMF brokers. A single instance of Session is needed to interact with the management framework as a console. """ _CONTEXT_SYNC = 1 _CONTEXT_STARTUP = 2 _CONTEXT_MULTIGET = 3 GET_WAIT_TIME = 60 def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): """ Initialize a session. If the console argument is provided, the more advanced asynchronous features are available. If console is defaulted, the session will operate in a simpler, synchronous manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments are meaningful only if 'console' is provided. They control whether object updates, events, and agent-heartbeats are subscribed to. If the console is not interested in receiving one or more of the above, setting the argument to False will reduce tha bandwidth used by the API. If manageConnections is set to True, the Session object will manage connections to the brokers. This means that if a broker is unreachable, it will retry until a connection can be established. If a connection is lost, the Session will attempt to reconnect. If manageConnections is set to False, the user is responsible for handing failures. In this case, an unreachable broker will cause addBroker to raise an exception. If userBindings is set to False (the default) and rcvObjects is True, the console will receive data for all object classes. If userBindings is set to True, the user must select which classes the console shall receive by invoking the bindPackage or bindClass methods. This allows the console to be configured to receive only information that is relavant to a particular application. If rcvObjects id False, userBindings has no meaning. """ self.console = console self.brokers = [] self.packages = {} self.seqMgr = SequenceManager() self.cv = Condition() self.syncSequenceList = [] self.getResult = [] self.getSelect = [] self.error = None self.rcvObjects = rcvObjects self.rcvEvents = rcvEvents self.rcvHeartbeats = rcvHeartbeats self.userBindings = userBindings if self.console == None: self.rcvObjects = False self.rcvEvents = False self.rcvHeartbeats = False self.bindingKeyList = self._bindingKeys() self.manageConnections = manageConnections if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") def __repr__(self): return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) def addBroker(self, target="localhost"): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass, ssl = url.scheme == URL.AMQPS) self.brokers.append(broker) if not self.manageConnections: self.getObjects(broker=broker, _class="agent") return broker def delBroker(self, broker): """ Disconnect from a broker. The 'broker' argument is the object returned from the addBroker call """ broker._shutdown() self.brokers.remove(broker) del broker def getPackages(self): """ Get the list of known QMF packages """ for broker in self.brokers: broker._waitForStable() list = [] for package in self.packages: list.append(package) return list def getClasses(self, packageName): """ Get the list of known classes within a QMF package """ for broker in self.brokers: broker._waitForStable() list = [] if packageName in self.packages: for pkey in self.packages[packageName]: list.append(self.packages[packageName][pkey].getKey()) return list def getSchema(self, classKey): """ Get the schema for a QMF class """ for broker in self.brokers: broker._waitForStable() pname = classKey.getPackageName() pkey = classKey.getPackageKey() if pname in self.packages: if pkey in self.packages[pname]: return self.packages[pname][pkey] def bindPackage(self, packageName): """ Request object updates for all table classes within a package. """ if not self.userBindings or not self.rcvObjects: raise Exception("userBindings option not set for Session") key = "console.obj.*.*.%s.#" % packageName self.bindingKeyList.append(key) for broker in self.brokers: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) def bindClass(self, pname, cname): """ Request object updates for a particular table class by package and class name. """ if not self.userBindings or not self.rcvObjects: raise Exception("userBindings option not set for Session") key = "console.obj.*.*.%s.%s.#" % (pname, cname) self.bindingKeyList.append(key) for broker in self.brokers: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) def bindClassKey(self, classKey): """ Request object updates for a particular table class by class key. """ pname = classKey.getPackageName() cname = classKey.getClassName() self.bindClass(pname, cname) def getAgents(self, broker=None): """ Get a list of currently known agents """ brokerList = [] if broker == None: for b in self.brokers: brokerList.append(b) else: brokerList.append(broker) for b in brokerList: b._waitForStable() agentList = [] for b in brokerList: for a in b.getAgents(): agentList.append(a) return agentList def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. All arguments are passed by name(keyword). The class for queried objects may be specified in one of the following ways: _schema = - supply a schema object returned from getSchema. _key = - supply a classKey from the list returned by getClasses. _class = - supply a class name as a string. If the class name exists in multiple packages, a _package argument may also be supplied. _objectId = - get the object referenced by the object-id If objects should be obtained from only one agent, use the following argument. Otherwise, the query will go to all agents. _agent = - supply an agent from the list returned by getAgents. If the get query is to be restricted to one broker (as opposed to all connected brokers), add the following argument: _broker = - supply a broker as returned by addBroker. If additional arguments are supplied, they are used as property selectors. For example, if the argument name="test" is supplied, only objects whose "name" property is "test" will be returned in the result. """ if "_broker" in kwargs: brokerList = [] brokerList.append(kwargs["_broker"]) else: brokerList = self.brokers for broker in brokerList: broker._waitForStable() agentList = [] if "_agent" in kwargs: agent = kwargs["_agent"] if agent.broker not in brokerList: raise Exception("Supplied agent is not accessible through the supplied broker") if agent.broker.isConnected(): agentList.append(agent) else: if "_objectId" in kwargs: oid = kwargs["_objectId"] for broker in brokerList: for agent in broker.getAgents(): if agent.getBrokerBank() == oid.getBrokerBank() and agent.getAgentBank() == oid.getAgentBank(): agentList.append(agent) else: for broker in brokerList: for agent in broker.getAgents(): if agent.broker.isConnected(): agentList.append(agent) if len(agentList) == 0: return [] pname = None cname = None hash = None classKey = None if "_schema" in kwargs: classKey = kwargs["_schema"].getKey() elif "_key" in kwargs: classKey = kwargs["_key"] elif "_class" in kwargs: cname = kwargs["_class"] if "_package" in kwargs: pname = kwargs["_package"] if cname == None and classKey == None and "_objectId" not in kwargs: raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") map = {} self.getSelect = [] if "_objectId" in kwargs: map["_objectid"] = kwargs["_objectId"].__repr__() else: if cname == None: cname = classKey.getClassName() pname = classKey.getPackageName() hash = classKey.getHash() map["_class"] = cname if pname != None: map["_package"] = pname if hash != None: map["_hash"] = hash for item in kwargs: if item[0] != '_': self.getSelect.append((item, kwargs[item])) self.getResult = [] for agent in agentList: broker = agent.broker sendCodec = Codec(broker.conn.spec) try: self.cv.acquire() seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) self.syncSequenceList.append(seq) finally: self.cv.release() broker._setHeader(sendCodec, 'G', seq) sendCodec.write_map(map) smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank)) broker._send(smsg) starttime = time() timeout = False try: self.cv.acquire() while len(self.syncSequenceList) > 0 and self.error == None: self.cv.wait(self.GET_WAIT_TIME) if time() - starttime > self.GET_WAIT_TIME: for pendingSeq in self.syncSequenceList: self.seqMgr._release(pendingSeq) self.syncSequenceList = [] timeout = True finally: self.cv.release() if self.error: errorText = self.error self.error = None raise Exception(errorText) if len(self.getResult) == 0 and timeout: raise RuntimeError("No agent responded within timeout period") return self.getResult def setEventFilter(self, **kwargs): """ """ pass def _bindingKeys(self): keyList = [] keyList.append("schema.#") if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings: keyList.append("console.#") else: if self.rcvObjects and not self.userBindings: keyList.append("console.obj.#") else: keyList.append("console.obj.*.*.org.apache.qpid.broker.agent") if self.rcvEvents: keyList.append("console.event.#") if self.rcvHeartbeats: keyList.append("console.heartbeat.#") return keyList def _handleBrokerConnect(self, broker): if self.console: self.console.brokerConnected(broker) def _handleBrokerDisconnect(self, broker): if self.console: self.console.brokerDisconnected(broker) def _handleBrokerResp(self, broker, codec, seq): broker.brokerId = codec.read_uuid() if self.console != None: self.console.brokerInfo(broker) # Send a package request # (effectively inc and dec outstanding by not doing anything) sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'P', seq) smsg = broker._message(sendCodec.encoded) broker._send(smsg) def _handlePackageInd(self, broker, codec, seq): pname = str(codec.read_str8()) notify = False try: self.cv.acquire() if pname not in self.packages: self.packages[pname] = {} notify = True finally: self.cv.release() if notify and self.console != None: self.console.newPackage(pname) # Send a class request broker._incOutstanding() sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'Q', seq) sendCodec.write_str8(pname) smsg = broker._message(sendCodec.encoded) broker._send(smsg) def _handleCommandComplete(self, broker, codec, seq): code = codec.read_uint32() text = codec.read_str8() context = self.seqMgr._release(seq) if context == self._CONTEXT_STARTUP: broker._decOutstanding() elif context == self._CONTEXT_SYNC and seq == broker.syncSequence: try: broker.cv.acquire() broker.syncInFlight = False broker.cv.notify() finally: broker.cv.release() elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList: try: self.cv.acquire() self.syncSequenceList.remove(seq) if len(self.syncSequenceList) == 0: self.cv.notify() finally: self.cv.release() def _handleClassInd(self, broker, codec, seq): kind = codec.read_uint8() classKey = ClassKey(codec) unknown = False try: self.cv.acquire() if classKey.getPackageName() in self.packages: if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]: unknown = True finally: self.cv.release() if unknown: # Send a schema request for the unknown class broker._incOutstanding() sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'S', seq) classKey.encode(sendCodec) smsg = broker._message(sendCodec.encoded) broker._send(smsg) def _handleMethodResp(self, broker, codec, seq): code = codec.read_uint32() text = codec.read_str16() outArgs = {} method, synchronous = self.seqMgr._release(seq) if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: outArgs[arg.name] = self._decodeValue(codec, arg.type) result = MethodResult(code, text, outArgs) if synchronous: try: broker.cv.acquire() broker.syncResult = result broker.syncInFlight = False broker.cv.notify() finally: broker.cv.release() else: if self.console: self.console.methodResponse(broker, seq, result) def _handleHeartbeatInd(self, broker, codec, seq, msg): brokerBank = 1 agentBank = 0 dp = msg.get("delivery_properties") if dp: key = dp["routing_key"] keyElements = key.split(".") if len(keyElements) == 4: brokerBank = int(keyElements[2]) agentBank = int(keyElements[3]) agent = broker.getAgent(brokerBank, agentBank) timestamp = codec.read_uint64() if self.console != None and agent != None: self.console.heartbeat(agent, timestamp) def _handleEventInd(self, broker, codec, seq): if self.console != None: event = Event(self, broker, codec) self.console.event(broker, event) def _handleSchemaResp(self, broker, codec, seq): kind = codec.read_uint8() classKey = ClassKey(codec) _class = SchemaClass(kind, classKey, codec) try: self.cv.acquire() self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class finally: self.cv.release() self.seqMgr._release(seq) broker._decOutstanding() if self.console != None: self.console.newClass(kind, classKey) def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): classKey = ClassKey(codec) try: self.cv.acquire() pname = classKey.getPackageName() if pname not in self.packages: return pkey = classKey.getPackageKey() if pkey not in self.packages[pname]: return schema = self.packages[pname][pkey] finally: self.cv.release() object = Object(self, broker, schema, codec, prop, stat) if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: broker._updateAgent(object) try: self.cv.acquire() if seq in self.syncSequenceList: if object.getTimestamps()[2] == 0 and self._selectMatch(object): self.getResult.append(object) return finally: self.cv.release() if self.console and self.rcvObjects: if prop: self.console.objectProps(broker, object) if stat: self.console.objectStats(broker, object) def _handleError(self, error): self.error = error try: self.cv.acquire() self.syncSequenceList = [] self.cv.notify() finally: self.cv.release() def _selectMatch(self, object): """ Check the object against self.getSelect to check for a match """ for key, value in self.getSelect: for prop, propval in object.getProperties(): if key == prop.name and value != propval: return False return True def _decodeValue(self, codec, typecode): """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.read_uint8() # U8 elif typecode == 2: data = codec.read_uint16() # U16 elif typecode == 3: data = codec.read_uint32() # U32 elif typecode == 4: data = codec.read_uint64() # U64 elif typecode == 6: data = codec.read_str8() # SSTR elif typecode == 7: data = codec.read_str16() # LSTR elif typecode == 8: data = codec.read_int64() # ABSTIME elif typecode == 9: data = codec.read_uint64() # DELTATIME elif typecode == 10: data = ObjectId(codec) # REF elif typecode == 11: data = codec.read_uint8() != 0 # BOOL elif typecode == 12: data = codec.read_float() # FLOAT elif typecode == 13: data = codec.read_double() # DOUBLE elif typecode == 14: data = codec.read_uuid() # UUID elif typecode == 15: data = codec.read_map() # FTABLE elif typecode == 16: data = codec.read_int8() # S8 elif typecode == 17: data = codec.read_int16() # S16 elif typecode == 18: data = codec.read_int32() # S32 elif typecode == 19: data = codec.read_int64() # S63 else: raise ValueError("Invalid type code: %d" % typecode) return data def _encodeValue(self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: codec.write_uint8 (int(value)) # U8 elif typecode == 2: codec.write_uint16 (int(value)) # U16 elif typecode == 3: codec.write_uint32 (long(value)) # U32 elif typecode == 4: codec.write_uint64 (long(value)) # U64 elif typecode == 6: codec.write_str8 (value) # SSTR elif typecode == 7: codec.write_str16 (value) # LSTR elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME elif typecode == 10: value.encode (codec) # REF elif typecode == 11: codec.write_uint8 (int(value)) # BOOL elif typecode == 12: codec.write_float (float(value)) # FLOAT elif typecode == 13: codec.write_double (float(value)) # DOUBLE elif typecode == 14: codec.write_uuid (value.bytes) # UUID elif typecode == 15: codec.write_map (value) # FTABLE elif typecode == 16: codec.write_int8 (int(value)) # S8 elif typecode == 17: codec.write_int16 (int(value)) # S16 elif typecode == 18: codec.write_int32 (int(value)) # S32 elif typecode == 19: codec.write_int64 (int(value)) # S64 else: raise ValueError ("Invalid type code: %d" % typecode) def _displayValue(self, value, typecode): """ """ if typecode == 1: return unicode(value) elif typecode == 2: return unicode(value) elif typecode == 3: return unicode(value) elif typecode == 4: return unicode(value) elif typecode == 6: return value elif typecode == 7: return value elif typecode == 8: return unicode(strftime("%c", gmtime(value / 1000000000))) elif typecode == 9: return unicode(value) elif typecode == 10: return unicode(value.__repr__()) elif typecode == 11: if value: return u"T" else: return u"F" elif typecode == 12: return unicode(value) elif typecode == 13: return unicode(value) elif typecode == 14: return unicode(value.__repr__()) elif typecode == 15: return unicode(value.__repr__()) elif typecode == 16: return unicode(value) elif typecode == 17: return unicode(value) elif typecode == 18: return unicode(value) elif typecode == 19: return unicode(value) else: raise ValueError ("Invalid type code: %d" % typecode) def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): """ This function can be used to send a method request to an object given only the broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are normally invoked on the object itself. """ schema = self.getSchema(schemaKey) for method in schema.getMethods(): if name == method.name: aIdx = 0 sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve((method, False)) broker._setHeader(sendCodec, 'M', seq) objectId.encode(sendCodec) schemaKey.encode(sendCodec) sendCodec.write_str8(name) count = 0 for arg in method.arguments: if arg.dir.find("I") != -1: count += 1 if count != len(argList): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) for arg in method.arguments: if arg.dir.find("I") != -1: self._encodeValue(sendCodec, argList[aIdx], arg.type) aIdx += 1 smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (objectId.getBrokerBank(), objectId.getAgentBank())) broker._send(smsg) return seq return None class Package: """ """ def __init__(self, name): self.name = name class ClassKey: """ A ClassKey uniquely identifies a class from the schema. """ def __init__(self, constructor): if type(constructor) == str: # construct from __repr__ string try: self.pname, cls = constructor.split(":") self.cname, hsh = cls.split("(") hsh = hsh.strip(")") hexValues = hsh.split("-") h0 = int(hexValues[0], 16) h1 = int(hexValues[1], 16) h2 = int(hexValues[2], 16) h3 = int(hexValues[3], 16) self.hash = struct.pack("!LLLL", h0, h1, h2, h3) except: raise Exception("Invalid ClassKey format") else: # construct from codec codec = constructor self.pname = str(codec.read_str8()) self.cname = str(codec.read_str8()) self.hash = codec.read_bin128() def encode(self, codec): codec.write_str8(self.pname) codec.write_str8(self.cname) codec.write_bin128(self.hash) def getPackageName(self): return self.pname def getClassName(self): return self.cname def getHash(self): return self.hash def getHashString(self): return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash) def getPackageKey(self): return (self.cname, self.hash) def __repr__(self): return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" class SchemaClass: """ """ CLASS_KIND_TABLE = 1 CLASS_KIND_EVENT = 2 def __init__(self, kind, key, codec): self.kind = kind self.classKey = key self.properties = [] self.statistics = [] self.methods = [] self.arguments = [] if self.kind == self.CLASS_KIND_TABLE: propCount = codec.read_uint16() statCount = codec.read_uint16() methodCount = codec.read_uint16() for idx in range(propCount): self.properties.append(SchemaProperty(codec)) for idx in range(statCount): self.statistics.append(SchemaStatistic(codec)) for idx in range(methodCount): self.methods.append(SchemaMethod(codec)) elif self.kind == self.CLASS_KIND_EVENT: argCount = codec.read_uint16() for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=False)) def __repr__(self): if self.kind == self.CLASS_KIND_TABLE: kindStr = "Table" elif self.kind == self.CLASS_KIND_EVENT: kindStr = "Event" else: kindStr = "Unsupported" result = "%s Class: %s " % (kindStr, self.classKey.__repr__()) return result def getKey(self): """ Return the class-key for this class. """ return self.classKey def getProperties(self): """ Return the list of properties for the class. """ return self.properties def getStatistics(self): """ Return the list of statistics for the class. """ return self.statistics def getMethods(self): """ Return the list of methods for the class. """ return self.methods def getArguments(self): """ Return the list of events for the class. """ return self.arguments class SchemaProperty: """ """ def __init__(self, codec): map = codec.read_map() self.name = str(map["name"]) self.type = map["type"] self.access = str(map["access"]) self.index = map["index"] != 0 self.optional = map["optional"] != 0 self.unit = None self.min = None self.max = None self.maxlen = None self.desc = None for key, value in map.items(): if key == "unit" : self.unit = value elif key == "min" : self.min = value elif key == "max" : self.max = value elif key == "maxlen" : self.maxlen = value elif key == "desc" : self.desc = value def __repr__(self): return self.name class SchemaStatistic: """ """ def __init__(self, codec): map = codec.read_map() self.name = str(map["name"]) self.type = map["type"] self.unit = None self.desc = None for key, value in map.items(): if key == "unit" : self.unit = value elif key == "desc" : self.desc = value def __repr__(self): return self.name class SchemaMethod: """ """ def __init__(self, codec): map = codec.read_map() self.name = str(map["name"]) argCount = map["argCount"] if "desc" in map: self.desc = map["desc"] else: self.desc = None self.arguments = [] for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=True)) def __repr__(self): result = self.name + "(" first = True for arg in self.arguments: if arg.dir.find("I") != -1: if first: first = False else: result += ", " result += arg.name result += ")" return result class SchemaArgument: """ """ def __init__(self, codec, methodArg): map = codec.read_map() self.name = str(map["name"]) self.type = map["type"] if methodArg: self.dir = str(map["dir"]).upper() self.unit = None self.min = None self.max = None self.maxlen = None self.desc = None self.default = None for key, value in map.items(): if key == "unit" : self.unit = value elif key == "min" : self.min = value elif key == "max" : self.max = value elif key == "maxlen" : self.maxlen = value elif key == "desc" : self.desc = value elif key == "default" : self.default = value class ObjectId: """ Object that represents QMF object identifiers """ def __init__(self, codec, first=0, second=0): if codec: self.first = codec.read_uint64() self.second = codec.read_uint64() else: self.first = first self.second = second def __cmp__(self, other): if other == None or not isinstance(other, ObjectId) : return 1 if self.first < other.first: return -1 if self.first > other.first: return 1 if self.second < other.second: return -1 if self.second > other.second: return 1 return 0 def __repr__(self): return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(), self.getBrokerBank(), self.getAgentBank(), self.getObject()) def index(self): return (self.first, self.second) def getFlags(self): return (self.first & 0xF000000000000000) >> 60 def getSequence(self): return (self.first & 0x0FFF000000000000) >> 48 def getBrokerBank(self): return (self.first & 0x0000FFFFF0000000) >> 28 def getAgentBank(self): return self.first & 0x000000000FFFFFFF def getObject(self): return self.second def isDurable(self): return self.getSequence() == 0 def encode(self, codec): codec.write_uint64(self.first) codec.write_uint64(self.second) def __hash__(self): return (self.first, self.second).__hash__() def __eq__(self, other): return (self.first, self.second).__eq__(other) class Object(object): """ This class defines a 'proxy' object representing a real managed object on an agent. Actions taken on this proxy are remotely affected on the real managed object. """ def __init__(self, session, broker, schema, codec, prop, stat): self._session = session self._broker = broker self._schema = schema self._currentTime = codec.read_uint64() self._createTime = codec.read_uint64() self._deleteTime = codec.read_uint64() self._objectId = ObjectId(codec) self._properties = [] self._statistics = [] if prop: notPresent = self._parsePresenceMasks(codec, schema) for property in schema.getProperties(): if property.name in notPresent: self._properties.append((property, None)) else: self._properties.append((property, self._session._decodeValue(codec, property.type))) if stat: for statistic in schema.getStatistics(): self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) def getBroker(self): """ Return the broker from which this object was sent """ return self._broker def getObjectId(self): """ Return the object identifier for this object """ return self._objectId def getClassKey(self): """ Return the class-key that references the schema describing this object. """ return self._schema.getKey() def getSchema(self): """ Return the schema that describes this object. """ return self._schema def getMethods(self): """ Return a list of methods available for this object. """ return self._schema.getMethods() def getTimestamps(self): """ Return the current, creation, and deletion times for this object. """ return self._currentTime, self._createTime, self._deleteTime def isDeleted(self): """ Return True iff this object has been deleted. """ return self._deleteTime != 0 def getIndex(self): """ Return a string describing this object's primary key. """ result = u"" for property, value in self._properties: if property.index: if result != u"": result += u":" try: valstr = unicode(self._session._displayValue(value, property.type)) except: valstr = u"" result += valstr return result def getProperties(self): """ Return a list of object properties """ return self._properties def getStatistics(self): """ Return a list of object statistics """ return self._statistics def mergeUpdate(self, newer): """ Replace properties and/or statistics with a newly received update """ if self._objectId != newer._objectId: raise Exception("Objects with different object-ids") if len(newer.getProperties()) > 0: self._properties = newer.getProperties() if len(newer.getStatistics()) > 0: self._statistics = newer.getStatistics() def update(self): """ Contact the agent and retrieve the lastest property and statistic values for this object. """ obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) if obj: self.mergeUpdate(obj[0]) else: raise Exception("Underlying object no longer exists") def __repr__(self): key = self.getClassKey() return key.getPackageName() + ":" + key.getClassName() +\ "[" + self.getObjectId().__repr__() + "] " + self.getIndex().encode("utf8") def __getattr__(self, name): for method in self._schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) for property, value in self._properties: if name == property.name: return value if name == "_" + property.name + "_" and property.type == 10: # Dereference references deref = self._session.getObjects(_objectId=value, _broker=self._broker) if len(deref) != 1: return None else: return deref[0] for statistic, value in self._statistics: if name == statistic.name: return value raise Exception("Type Object has no attribute '%s'" % name) def _sendMethodRequest(self, name, args, kwargs, synchronous=False): for method in self._schema.getMethods(): if name == method.name: aIdx = 0 sendCodec = Codec(self._broker.conn.spec) seq = self._session.seqMgr._reserve((method, synchronous)) self._broker._setHeader(sendCodec, 'M', seq) self._objectId.encode(sendCodec) self._schema.getKey().encode(sendCodec) sendCodec.write_str8(name) count = 0 for arg in method.arguments: if arg.dir.find("I") != -1: count += 1 if count != len(args): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) for arg in method.arguments: if arg.dir.find("I") != -1: self._session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) if synchronous: try: self._broker.cv.acquire() self._broker.syncInFlight = True finally: self._broker.cv.release() self._broker._send(smsg) return seq return None def _invoke(self, name, args, kwargs): if self._sendMethodRequest(name, args, kwargs, True): try: self._broker.cv.acquire() starttime = time() while self._broker.syncInFlight and self._broker.error == None: self._broker.cv.wait(self._broker.SYNC_TIME) if time() - starttime > self._broker.SYNC_TIME: self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") finally: self._broker.cv.release() if self._broker.error != None: errorText = self._broker.error self._broker.error = None raise Exception(errorText) return self._broker.syncResult raise Exception("Invalid Method (software defect) [%s]" % name) def _parsePresenceMasks(self, codec, schema): excludeList = [] bit = 0 for property in schema.getProperties(): if property.optional: if bit == 0: mask = codec.read_uint8() bit = 1 if (mask & bit) == 0: excludeList.append(property.name) bit *= 2 if bit == 256: bit = 0 return excludeList class MethodResult(object): """ """ def __init__(self, status, text, outArgs): """ """ self.status = status self.text = text self.outArgs = outArgs def __getattr__(self, name): if name in self.outArgs: return self.outArgs[name] def __repr__(self): return "%s (%d) - %s" % (self.text, self.status, self.outArgs) class ManagedConnection(Thread): """ Thread class for managing a connection. """ DELAY_MIN = 1 DELAY_MAX = 128 DELAY_FACTOR = 2 def __init__(self, broker): Thread.__init__(self) self.broker = broker self.cv = Condition() self.canceled = False def stop(self): """ Tell this thread to stop running and return. """ try: self.cv.acquire() self.canceled = True self.cv.notify() finally: self.cv.release() def disconnected(self): """ Notify the thread that the connection was lost. """ try: self.cv.acquire() self.cv.notify() finally: self.cv.release() def run(self): """ Main body of the running thread. """ delay = self.DELAY_MIN while True: try: self.broker._tryToConnect() try: self.cv.acquire() while (not self.canceled) and self.broker.connected: self.cv.wait() if self.canceled: return delay = self.DELAY_MIN finally: self.cv.release() except socket.error: if delay < self.DELAY_MAX: delay *= self.DELAY_FACTOR except SessionDetached: if delay < self.DELAY_MAX: delay *= self.DELAY_FACTOR except Closed: if delay < self.DELAY_MAX: delay *= self.DELAY_FACTOR try: self.cv.acquire() self.cv.wait(delay) if self.canceled: return finally: self.cv.release() class Broker: """ This object represents a connection (or potential connection) to a QMF broker. """ SYNC_TIME = 60 nextSeq = 1 def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False): self.session = session self.host = host self.port = port self.ssl = ssl self.authUser = authUser self.authPass = authPass self.cv = Condition() self.error = None self.brokerId = None self.connected = False self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq) Broker.nextSeq += 1 if self.session.manageConnections: self.thread = ManagedConnection(self) self.thread.start() else: self.thread = None self._tryToConnect() def isConnected(self): """ Return True if there is an active connection to the broker. """ return self.connected def getError(self): """ Return the last error message seen while trying to connect to the broker. """ return self.error def getBrokerId(self): """ Get broker's unique identifier (UUID) """ return self.brokerId def getBrokerBank(self): """ Return the broker-bank value. This is the value that the broker assigns to objects within its control. This value appears as a field in the ObjectId of objects created by agents controlled by this broker. """ return 1 def getAgent(self, brokerBank, agentBank): """ Return the agent object associated with a particular broker and agent bank value.""" bankKey = (brokerBank, agentBank) if bankKey in self.agents: return self.agents[bankKey] return None def getSessionId(self): """ Get the identifier of the AMQP session to the broker """ return self.amqpSessionId def getAgents(self): """ Get the list of agents reachable via this broker """ return self.agents.values() def getAmqpSession(self): """ Get the AMQP session object for this connected broker. """ return self.amqpSession def getUrl(self): """ """ return "%s:%d" % (self.host, self.port) def getFullUrl(self, noAuthIfGuestDefault=True): """ """ ssl = "" if self.ssl: ssl = "s" auth = "%s/%s@" % (self.authUser, self.authPass) if self.authUser == "" or \ (noAuthIfGuestDefault and self.authUser == "guest" and self.authPass == "guest"): auth = "" return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672) def __repr__(self): if self.connected: return "Broker connected at: %s" % self.getUrl() else: return "Disconnected Broker" def _tryToConnect(self): try: self.agents = {} self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") self.topicBound = False self.syncInFlight = False self.syncRequest = 0 self.syncResult = None self.reqsOutstanding = 1 sock = connect(self.host, self.port) if self.ssl: sock = ssl(sock) self.conn = Connection(sock, username=self.authUser, password=self.authPass) self.conn.start() self.replyName = "reply-%s" % self.amqpSessionId self.amqpSession = self.conn.session(self.amqpSessionId) self.amqpSession.auto_sync = True self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) self.amqpSession.exchange_bind(exchange="amq.direct", queue=self.replyName, binding_key=self.replyName) self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL) self.topicName = "topic-%s" % self.amqpSessionId self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True) self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) self.amqpSession.incoming("tdest").listen(self._replyCb) self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL) self.connected = True self.session._handleBrokerConnect(self) codec = Codec(self.conn.spec) self._setHeader(codec, 'B') msg = self._message(codec.encoded) self._send(msg) except socket.error, e: self.error = "Socket Error %s - %s" % (e[0], e[1]) raise except Closed, e: self.error = "Connect Failed %d - %s" % (e[0], e[1]) raise except ConnectionFailed, e: self.error = "Connect Failed %d - %s" % (e[0], e[1]) raise def _updateAgent(self, obj): bankKey = (obj.brokerBank, obj.agentBank) if obj._deleteTime == 0: if bankKey not in self.agents: agent = Agent(self, obj.agentBank, obj.label) self.agents[bankKey] = agent if self.session.console != None: self.session.console.newAgent(agent) else: agent = self.agents.pop(bankKey, None) if agent != None and self.session.console != None: self.session.console.delAgent(agent) def _setHeader(self, codec, opcode, seq=0): """ Compose the header of a management message. """ codec.write_uint8(ord('A')) codec.write_uint8(ord('M')) codec.write_uint8(ord('2')) codec.write_uint8(ord(opcode)) codec.write_uint32(seq) def _checkHeader(self, codec): """ Check the header of a management message and extract the opcode and class. """ try: octet = chr(codec.read_uint8()) if octet != 'A': return None, None octet = chr(codec.read_uint8()) if octet != 'M': return None, None octet = chr(codec.read_uint8()) if octet != '2': return None, None opcode = chr(codec.read_uint8()) seq = codec.read_uint32() return opcode, seq except: return None, None def _message (self, body, routing_key="broker"): dp = self.amqpSession.delivery_properties() dp.routing_key = routing_key mp = self.amqpSession.message_properties() mp.content_type = "x-application/qmf" mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) return Message(dp, mp, body) def _send(self, msg, dest="qpid.management"): self.amqpSession.message_transfer(destination=dest, message=msg) def _shutdown(self): if self.thread: self.thread.stop() self.thread.join() if self.connected: self.amqpSession.incoming("rdest").stop() if self.session.console != None: self.amqpSession.incoming("tdest").stop() self.amqpSession.close() self.conn.close() self.connected = False def _waitForStable(self): try: self.cv.acquire() if not self.connected: return if self.reqsOutstanding == 0: return self.syncInFlight = True starttime = time() while self.reqsOutstanding != 0: self.cv.wait(self.SYNC_TIME) if time() - starttime > self.SYNC_TIME: raise RuntimeError("Timed out waiting for broker to synchronize") finally: self.cv.release() def _incOutstanding(self): try: self.cv.acquire() self.reqsOutstanding += 1 finally: self.cv.release() def _decOutstanding(self): try: self.cv.acquire() self.reqsOutstanding -= 1 if self.reqsOutstanding == 0 and not self.topicBound: self.topicBound = True for key in self.session.bindingKeyList: self.amqpSession.exchange_bind(exchange="qpid.management", queue=self.topicName, binding_key=key) if self.reqsOutstanding == 0 and self.syncInFlight: self.syncInFlight = False self.cv.notify() finally: self.cv.release() def _replyCb(self, msg): codec = Codec(self.conn.spec, msg.body) while True: opcode, seq = self._checkHeader(codec) if opcode == None: return if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) elif opcode == 'e': self.session._handleEventInd (self, codec, seq) elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) def _exceptionCb(self, data): self.connected = False self.error = data try: self.cv.acquire() if self.syncInFlight: self.cv.notify() finally: self.cv.release() self.session._handleError(self.error) self.session._handleBrokerDisconnect(self) if self.thread: self.thread.disconnected() class Agent: """ """ def __init__(self, broker, agentBank, label): self.broker = broker self.brokerBank = broker.getBrokerBank() self.agentBank = agentBank self.label = label def __repr__(self): return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label) def getBroker(self): return self.broker def getBrokerBank(self): return self.brokerBank def getAgentBank(self): return self.agentBank class Event: """ """ def __init__(self, session, broker, codec): self.session = session self.broker = broker self.classKey = ClassKey(codec) self.timestamp = codec.read_int64() self.severity = codec.read_uint8() self.schema = None pname = self.classKey.getPackageName() pkey = self.classKey.getPackageKey() if pname in session.packages: if pkey in session.packages[pname]: self.schema = session.packages[pname][pkey] self.arguments = {} for arg in self.schema.arguments: self.arguments[arg.name] = session._decodeValue(codec, arg.type) def __repr__(self): if self.schema == None: return "" out = strftime("%c", gmtime(self.timestamp / 1000000000)) out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName() out += " broker=" + self.broker.getUrl() for arg in self.schema.arguments: disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8") if " " in disp: disp = "\"" + disp + "\"" out += " " + arg.name + "=" + disp return out def _sevName(self): if self.severity == 0 : return "EMER " if self.severity == 1 : return "ALERT" if self.severity == 2 : return "CRIT " if self.severity == 3 : return "ERROR" if self.severity == 4 : return "WARN " if self.severity == 5 : return "NOTIC" if self.severity == 6 : return "INFO " if self.severity == 7 : return "DEBUG" return "INV-%d" % self.severity def getClassKey(self): return self.classKey def getArguments(self): return self.arguments def getTimestamp(self): return self.timestamp def getName(self): return self.name def getSchema(self): return self.schema class SequenceManager: """ Manage sequence numbers for asynchronous method calls """ def __init__(self): self.lock = Lock() self.sequence = 0 self.pending = {} def _reserve(self, data): """ Reserve a unique sequence number """ try: self.lock.acquire() result = self.sequence self.sequence = self.sequence + 1 self.pending[result] = data finally: self.lock.release() return result def _release(self, seq): """ Release a reserved sequence number """ data = None try: self.lock.acquire() if seq in self.pending: data = self.pending[seq] del self.pending[seq] finally: self.lock.release() return data class DebugConsole(Console): """ """ def brokerConnected(self, broker): print "brokerConnected:", broker def brokerDisconnected(self, broker): print "brokerDisconnected:", broker def newPackage(self, name): print "newPackage:", name def newClass(self, kind, classKey): print "newClass:", kind, classKey def newAgent(self, agent): print "newAgent:", agent def delAgent(self, agent): print "delAgent:", agent def objectProps(self, broker, record): print "objectProps:", record def objectStats(self, broker, record): print "objectStats:", record def event(self, broker, event): print "event:", event def heartbeat(self, agent, timestamp): print "heartbeat:", agent def brokerInfo(self, broker): print "brokerInfo:", broker