# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # """ Console API for Qpid Management Framework """ import os import qpid import struct import socket import re from qpid.peer import Closed from qpid.connection import Connection, ConnectionFailed from qpid.datatypes import uuid4 from qpid.util import connect from datatypes import Message, RangedSet from threading import Lock, Condition from codec010 import StringCodec as Codec from time import time from cStringIO import StringIO class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ def newPackage(self, name): """ Invoked when a QMF package is discovered. """ pass def newClass(self, classKey): """ Invoked when a new class is discovered. Session.getSchema can be used to obtain details about the class.""" pass def newAgent(self, agent): """ Invoked when a QMF agent is discovered. """ pass def delAgent(self, agent): """ Invoked when a QMF agent disconects. """ pass def objectProps(self, broker, id, record): """ Invoked when an object is updated. """ pass def objectStats(self, broker, id, record): """ Invoked when an object is updated. """ pass def event(self, broker, event): """ Invoked when an event is raised. """ pass def heartbeat(self, agent, timestamp): """ """ pass def brokerInfo(self, broker): """ """ pass class BrokerURL: def __init__(self, text): rex = re.compile(r""" # [ [ / ] @] [ : ] ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X) match = rex.match(text) if not match: raise ValueError("'%s' is not a valid broker url" % (text)) user, password, host, port = match.groups() self.host = socket.gethostbyname(host) if port: self.port = int(port) else: self.port = 5672 self.authName = user or "guest" self.authPass = password or "guest" self.authMech = "PLAIN" def name(self): return self.host + ":" + str(self.port) class Session: """ An instance of the Session class represents a console session running against one or more QMF brokers. A single instance of Session is needed to interact with the management framework as a console. """ _CONTEXT_SYNC = 1 _CONTEXT_STARTUP = 2 _CONTEXT_MULTIGET = 3 GET_WAIT_TIME = 10 def __init__(self, console=None): """ Initialize a session. If the console argument is provided, the more advanced asynchronous features are available. If console is defaulted, the session will operate in a simpler, synchronous manner. """ self.console = console self.brokers = [] self.packages = {} self.seqMgr = SequenceManager() self.cv = Condition() self.syncSequenceList = [] self.getResult = [] self.error = None def __repr__(self): return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers) def addBroker(self, target="localhost"): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass) self.brokers.append(broker) self.getObjects(broker=broker, name="agent") return broker def delBroker(self, broker): """ Disconnect from a broker. The 'broker' argument is the object returned from the addBroker call """ broker._shutdown() self.brokers.remove(broker) del broker def getPackages(self): """ Get the list of known QMF packages """ for broker in self.brokers: broker._waitForStable() list = [] for package in self.packages: list.append(package) return list def getClasses(self, packageName): """ Get the list of known classes within a QMF package """ for broker in self.brokers: broker._waitForStable() list = [] if packageName in self.packages: for cname, hash in self.packages[packageName]: list.append((packageName, cname, hash)) return list def getSchema(self, classKey): """ Get the schema for a QMF class """ for broker in self.brokers: broker._waitForStable() pname, cname, hash = classKey if pname in self.packages: if (cname, hash) in self.packages[pname]: return self.packages[pname][(cname, hash)] def getAgents(self, broker=None): """ Get a list of currently known agents """ brokerList = [] if broker == None: for b in self.brokers: brokerList.append(b) else: brokerList.append(broker) for b in brokerList: b._waitForStable() agentList = [] for b in brokerList: for a in b.getAgents(): agentList.append(a) return agentList def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. All arguments are passed by name(keyword). The class for queried objects may be specified in one of the following ways: schema = - supply a schema object returned from getSchema key = - supply a classKey from the list returned by getClasses name = - supply a class name as a string If objects should be obtained from only one agent, use the following argument. Otherwise, the query will go to all agents. agent = - supply an agent from the list returned by getAgents If the get query is to be restricted to one broker (as opposed to all connected brokers), add the following argument: broker = - supply a broker as returned by addBroker """ if "broker" in kwargs: brokerList = [] brokerList.append(kwargs["broker"]) else: brokerList = self.brokers for broker in brokerList: broker._waitForStable() agentList = [] if "agent" in kwargs: agent = kwargs["agent"] if agent.broker not in brokerList: raise Exception("Supplied agent is not accessible through the supplied broker") agentList.append(agent) else: for broker in brokerList: for agent in broker.getAgents(): agentList.append(agent) cname = None if "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey() elif "key" in kwargs: pname, cname, hash = kwargs["key"] elif "name" in kwargs: pname, cname, hash = None, kwargs["name"], None if cname == None: raise Exception("No class supplied, use 'schema', 'key', or 'name' argument") map = {} map["_class"] = cname if pname != None: map["_package"] = pname if hash != None: map["_hash"] = hash self.getResult = [] for agent in agentList: broker = agent.broker sendCodec = Codec(broker.conn.spec) self.cv.acquire() seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) self.syncSequenceList.append(seq) self.cv.release() broker._setHeader(sendCodec, 'G', seq) sendCodec.write_map(map) smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank) broker._send(smsg) starttime = time() timeout = False self.cv.acquire() while len(self.syncSequenceList) > 0 and self.error == None: self.cv.wait(self.GET_WAIT_TIME) if time() - starttime > self.GET_WAIT_TIME: for pendingSeq in self.syncSequenceList: self.seqMgr._release(pendingSeq) self.syncSequenceList = [] timeout = True self.cv.release() if self.error: errorText = self.error self.error = None raise Exception(errorText) if len(self.getResult) == 0 and timeout: raise RuntimeError("No agent responded within timeout period") return self.getResult def setEventFilter(self, **kwargs): """ """ pass def _handleBrokerResp(self, broker, codec, seq): broker.brokerId = codec.read_uuid() if self.console != None: self.console.brokerInfo(broker) # Send a package request # (effectively inc and dec outstanding by not doing anything) sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'P', seq) smsg = broker._message(sendCodec.encoded) broker._send(smsg) def _handlePackageInd(self, broker, codec, seq): pname = str(codec.read_str8()) self.cv.acquire() if pname not in self.packages: self.packages[pname] = {} self.cv.release() if self.console != None: self.console.newPackage(pname) else: self.cv.release() # Send a class request broker._incOutstanding() sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'Q', seq) sendCodec.write_str8(pname) smsg = broker._message(sendCodec.encoded) broker._send(smsg) def _handleCommandComplete(self, broker, codec, seq): code = codec.read_uint32() text = str(codec.read_str8()) context = self.seqMgr._release(seq) if context == self._CONTEXT_STARTUP: broker._decOutstanding() elif context == self._CONTEXT_SYNC and seq == broker.syncSequence: broker.cv.acquire() broker.syncInFlight = False broker.cv.notify() broker.cv.release() elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList: self.cv.acquire() self.syncSequenceList.remove(seq) if len(self.syncSequenceList) == 0: self.cv.notify() self.cv.release() def _handleClassInd(self, broker, codec, seq): pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() self.cv.acquire() if pname not in self.packages: self.cv.release() return if (cname, hash) not in self.packages[pname]: # Send a schema request for the unknown class self.cv.release() broker._incOutstanding() sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) broker._setHeader(sendCodec, 'S', seq) sendCodec.write_str8(pname) sendCodec.write_str8(cname) sendCodec.write_bin128(hash) smsg = broker._message(sendCodec.encoded) broker._send(smsg) else: self.cv.release() def _handleMethodResp(self, broker, codec, seq): code = codec.read_uint32() text = str(codec.read_str8()) outArgs = {} obj, method = self.seqMgr._release(seq) if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: outArgs[arg.name] = obj._decodeValue(codec, arg.type) broker.cv.acquire() broker.syncResult = MethodResult(code, text, outArgs) broker.syncInFlight = False broker.cv.notify() broker.cv.release() def _handleHeartbeatInd(self, broker, codec, seq): timestamp = codec.read_uint64() pass def _handleEventInd(self, broker, codec, seq): pass def _handleSchemaResp(self, broker, codec, seq): pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() classKey = (pname, cname, hash) _class = SchemaClass(classKey, codec) self.cv.acquire() self.packages[pname][(cname, hash)] = _class self.cv.release() broker._decOutstanding() if self.console != None: self.console.newClass(classKey) def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() classKey = (pname, cname, hash) self.cv.acquire() if pname not in self.packages: self.cv.release() return if (cname, hash) not in self.packages[pname]: self.cv.release() return self.cv.release() schema = self.packages[pname][(cname, hash)] object = Object(self, broker, schema, codec, prop, stat) if pname == "org.apache.qpid.broker" and cname == "agent": broker._updateAgent(object) self.cv.acquire() if seq in self.syncSequenceList: self.getResult.append(object) self.cv.release() return self.cv.release() if self.console != None: if prop: self.console.objectProps(broker, object.getObjectId(), object) if stat: self.console.objectStats(broker, object.getObjectId(), object) def _handleError(self, error): self.error = error self.cv.acquire() self.syncSequenceList = [] self.cv.notify() self.cv.release() class Package: """ """ def __init__(self, name): self.name = name class ClassKey: """ """ def __init__(self, package, className, hash): self.package = package self.className = className self.hash = hash class SchemaClass: """ """ def __init__(self, key, codec): self.classKey = key self.properties = [] self.statistics = [] self.methods = [] self.events = [] propCount = codec.read_uint16() statCount = codec.read_uint16() methodCount = codec.read_uint16() eventCount = codec.read_uint16() for idx in range(propCount): self.properties.append(SchemaProperty(codec)) for idx in range(statCount): self.statistics.append(SchemaStatistic(codec)) for idx in range(methodCount): self.methods.append(SchemaMethod(codec)) for idx in range(eventCount): self.events.append(SchemaEvent(codec)) def __repr__(self): pname, cname, hash = self.classKey result = "Class: %s:%s " % (pname, cname) result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash) return result def getKey(self): """ Return the class-key for this class. """ return self.classKey def getProperties(self): """ Return the list of properties for the class. """ return self.properties def getStatistics(self): """ Return the list of statistics for the class. """ return self.statistics def getMethods(self): """ Return the list of methods for the class. """ return self.methods def getEvents(self): """ Return the list of events for the class. """ return self.events class SchemaProperty: """ """ def __init__(self, codec): map = codec.read_map() self.name = str(map["name"]) self.type = map["type"] self.access = map["access"] self.index = map["index"] != 0 self.optional = map["optional"] != 0 self.unit = None self.min = None self.max = None self.maxlan = None self.desc = None for key, value in map.items(): if key == "unit" : self.unit = str(value) elif key == "min" : self.min = value elif key == "max" : self.max = value elif key == "maxlen" : self.maxlen = value elif key == "desc" : self.desc = str(value) def __repr__(self): return self.name class SchemaStatistic: """ """ def __init__(self, codec): map = codec.read_map() self.name = str(map["name"]) self.type = map["type"] self.unit = None self.desc = None for key, value in map.items(): if key == "unit" : self.unit = str(value) elif key == "desc" : self.desc = str(value) def __repr__(self): return self.name class SchemaMethod: """ """ def __init__(self, codec): map = codec.read_map() self.name = str(map["name"]) argCount = map["argCount"] if "desc" in map: self.desc = str(map["desc"]) else: self.desc = None self.arguments = [] for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=True)) def __repr__(self): result = self.name + "(" first = True for arg in self.arguments: if arg.dir.find("I") != -1: if first: first = False else: result += ", " result += arg.name result += ")" return result class SchemaEvent: """ """ def __init__(self, codec): map = codec.read_map() self.name = str(map["name"]) argCount = map["argCount"] if "desc" in map: self.desc = str(map["desc"]) else: self.desc = None self.arguments = [] for idx in range(argCount): self.arguments.append(SchemaArgument(codec, methodArg=False)) def __repr__(self): result = self.name + "(" first = True for arg in self.arguments: if first: first = False else: result += ", " result += arg.name result += ")" return result class SchemaArgument: """ """ def __init__(self, codec, methodArg): map = codec.read_map() self.name = str(map["name"]) self.type = map["type"] if methodArg: self.dir = str(map["dir"].upper()) self.unit = None self.min = None self.max = None self.maxlen = None self.desc = None self.default = None for key, value in map.items(): if key == "unit" : self.unit = str(value) elif key == "min" : self.min = value elif key == "max" : self.max = value elif key == "maxlen" : self.maxlen = value elif key == "desc" : self.desc = str(value) elif key == "default" : self.default = str(value) class ObjectId(object): """ Object that represents QMF object identifiers """ def __init__(self, codec, first=0, second=0): if codec: self.first = codec.read_uint64() self.second = codec.read_uint64() else: self.first = first self.second = second def __cmp__(self, other): if other == None: return 1 if self.first < other.first: return -1 if self.first > other.first: return 1 if self.second < other.second: return -1 if self.second > other.second: return 1 return 0 def __repr__(self): return "%d-%d-%d-%d-%x" % (self.getFlags(), self.getSequence(), self.getBroker(), self.getBank(), self.getObject()) def index(self): return (self.first, self.second) def getFlags(self): return (self.first & 0xF000000000000000) >> 60 def getSequence(self): return (self.first & 0x0FFF000000000000) >> 48 def getBroker(self): return (self.first & 0x0000FFFFF0000000) >> 28 def getBank(self): return self.first & 0x000000000FFFFFFF def getObject(self): return self.second def isDurable(self): return self.getSequence() == 0 def encode(self, codec): codec.write_uint64(self.first) codec.write_uint64(self.second) class Object(object): """ """ def __init__(self, session, broker, schema, codec, prop, stat): """ """ self.session = session self.broker = broker self.schema = schema self.currentTime = codec.read_uint64() self.createTime = codec.read_uint64() self.deleteTime = codec.read_uint64() self.objectId = ObjectId(codec) self.properties = [] self.statistics = [] if prop: notPresent = self._parsePresenceMasks(codec, schema) for property in schema.getProperties(): if property.name in notPresent: self.properties.append((property, None)) else: self.properties.append((property, self._decodeValue(codec, property.type))) if stat: for statistic in schema.getStatistics(): self.statistics.append((statistic, self._decodeValue(codec, statistic.type))) def getObjectId(self): """ Return the object identifier for this object """ return self.objectId def getClassKey(self): """ Return the class-key that references the schema describing this object. """ return self.schema.getKey() def getSchema(self): """ Return the schema that describes this object. """ return self.schema def getMethods(self): """ Return a list of methods available for this object. """ return self.schema.getMethods() def getTimestamps(self): """ Return the current, creation, and deletion times for this object. """ return self.currentTime, self.createTime, self.deleteTime def getIndex(self): """ Return a string describing this object's primary key. """ result = "" for property, value in self.properties: if property.index: if result != "": result += ":" result += str(value) return result def __repr__(self): return self.getIndex() def __getattr__(self, name): for method in self.schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) for property, value in self.properties: if name == property.name: return value for statistic, value in self.statistics: if name == statistic.name: return value raise Exception("Type Object has no attribute '%s'" % name) def _invoke(self, name, args, kwargs): for method in self.schema.getMethods(): if name == method.name: aIdx = 0 sendCodec = Codec(self.broker.conn.spec) seq = self.session.seqMgr._reserve((self, method)) self.broker._setHeader(sendCodec, 'M', seq) self.objectId.encode(sendCodec) pname, cname, hash = self.schema.getKey() sendCodec.write_str8(pname) sendCodec.write_str8(cname) sendCodec.write_bin128(hash) sendCodec.write_str8(name) for arg in method.arguments: if arg.dir.find("I") != -1: self._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank())) self.broker.cv.acquire() self.broker.syncInFlight = True self.broker.cv.release() self.broker._send(smsg) self.broker.cv.acquire() starttime = time() while self.broker.syncInFlight and self.broker.error == None: self.broker.cv.wait(self.broker.SYNC_TIME) if time() - starttime > self.broker.SYNC_TIME: self.broker.cv.release() self.session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") self.broker.cv.release() if self.broker.error != None: errorText = self.broker.error self.broker.error = None raise Exception(errorText) return self.broker.syncResult raise Exception("Invalid Method (software defect) [%s]" % name) def _parsePresenceMasks(self, codec, schema): excludeList = [] bit = 0 for property in schema.getProperties(): if property.optional: if bit == 0: mask = codec.read_uint8() bit = 1 if (mask & bit) == 0: excludeList.append(property.name) bit *= 2 if bit == 256: bit = 0 return excludeList def _decodeValue(self, codec, typecode): """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.read_uint8() # U8 elif typecode == 2: data = codec.read_uint16() # U16 elif typecode == 3: data = codec.read_uint32() # U32 elif typecode == 4: data = codec.read_uint64() # U64 elif typecode == 6: data = str(codec.read_str8()) # SSTR elif typecode == 7: data = codec.read_vbin32() # LSTR elif typecode == 8: data = codec.read_int64() # ABSTIME elif typecode == 9: data = codec.read_uint64() # DELTATIME elif typecode == 10: data = ObjectId(codec) # REF elif typecode == 11: data = codec.read_uint8() # BOOL elif typecode == 12: data = codec.read_float() # FLOAT elif typecode == 13: data = codec.read_double() # DOUBLE elif typecode == 14: data = codec.read_uuid() # UUID elif typecode == 15: data = codec.read_map() # FTABLE elif typecode == 16: data = codec.read_int8() # S8 elif typecode == 17: data = codec.read_int16() # S16 elif typecode == 18: data = codec.read_int32() # S32 elif typecode == 19: data = codec.read_int64() # S63 else: raise ValueError("Invalid type code: %d" % typecode) return data def _encodeValue(self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: codec.write_uint8 (int(value)) # U8 elif typecode == 2: codec.write_uint16 (int(value)) # U16 elif typecode == 3: codec.write_uint32 (long(value)) # U32 elif typecode == 4: codec.write_uint64 (long(value)) # U64 elif typecode == 6: codec.write_str8 (value) # SSTR elif typecode == 7: codec.write_vbin32 (value) # LSTR elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME elif typecode == 10: value.encode (codec) # REF elif typecode == 11: codec.write_uint8 (int(value)) # BOOL elif typecode == 12: codec.write_float (float(value)) # FLOAT elif typecode == 13: codec.write_double (double(value)) # DOUBLE elif typecode == 14: codec.write_uuid (value) # UUID elif typecode == 15: codec.write_map (value) # FTABLE elif typecode == 16: codec.write_int8 (int(value)) # S8 elif typecode == 17: codec.write_int16 (int(value)) # S16 elif typecode == 18: codec.write_int32 (int(value)) # S32 elif typecode == 19: codec.write_int64 (int(value)) # S64 else: raise ValueError ("Invalid type code: %d" % typecode) class MethodResult(object): """ """ def __init__(self, status, text, outArgs): """ """ self.status = status self.text = text self.outArgs = outArgs def __getattr__(self, name): if name in self.outArgs: return self.outArgs[name] def __repr__(self): return "%s (%d) - %s" % (self.text, self.status, self.outArgs) class Broker: """ """ SYNC_TIME = 10 def __init__(self, session, host, port, authMech, authUser, authPass): self.session = session self.host = host self.port = port self.agents = {} self.agents[0] = Agent(self, 0, "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False self.syncRequest = 0 self.syncResult = None self.reqsOutstanding = 1 self.error = None self.brokerId = None err = None try: self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid()) self.conn = Connection(connect(host, port), username=authUser, password=authPass) self.conn.start() self.replyName = "reply-%s" % self.amqpSessionId self.amqpSession = self.conn.session(self.amqpSessionId) self.amqpSession.auto_sync = True self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) self.amqpSession.exchange_bind(exchange="amq.direct", queue=self.replyName, binding_key=self.replyName) self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest") self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF) self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFF) if self.session.console != None: self.topicName = "topic-%s" % self.amqpSessionId self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True) self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest") self.amqpSession.incoming("tdest").listen(self._replyCb) self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF) codec = Codec(self.conn.spec) self._setHeader(codec, 'B') msg = self._message(codec.encoded) self._send(msg) except socket.error, e: err = "Socket Error %s - %s" % (e[0], e[1]) except Closed, e: err = "Connect Failed %d - %s" % (e[0], e[1]) except ConnectionFailed, e: err = "Connect Failed %d - %s" % (e[0], e[1]) self.active = True if err != None: raise Exception(err) def getBrokerId(self): """ Get broker's unique identifier (UUID) """ return self.brokerId def getSessionId(self): """ Get the identifier of the AMQP session to the broker """ return self.amqpSessionId def getAgents(self): """ Get the list of agents reachable via this broker """ return self.agents.values() def getAmqpSession(self): """ Get the AMQP session object for this connected broker. """ return self.amqpSession def isConnected(self): return self.active def __repr__(self): if self.active: if self.port == 5672: port = "" else: port = ":%d" % self.port return "Broker connected at: amqp://%s%s" % (self.host, port) else: return "Disconnected Broker" def _updateAgent(self, obj): if obj.deleteTime == 0: if obj.objectIdBank not in self.agents: agent = Agent(self, obj.objectIdBank, obj.label) self.agents[obj.objectIdBank] = agent if self.session.console != None: self.session.console.newAgent(agent) else: agent = self.agents.pop(obj.objectIdBank, None) if agent != None and self.session.console != None: self.session.console.delAgent(agent) def _setHeader(self, codec, opcode, seq=0): """ Compose the header of a management message. """ codec.write_uint8(ord('A')) codec.write_uint8(ord('M')) codec.write_uint8(ord('1')) codec.write_uint8(ord(opcode)) codec.write_uint32(seq) def _checkHeader(self, codec): """ Check the header of a management message and extract the opcode and class. """ octet = chr(codec.read_uint8()) if octet != 'A': return None, None octet = chr(codec.read_uint8()) if octet != 'M': return None, None octet = chr(codec.read_uint8()) if octet != '1': return None, None opcode = chr(codec.read_uint8()) seq = codec.read_uint32() return opcode, seq def _message (self, body, routing_key="broker"): dp = self.amqpSession.delivery_properties() dp.routing_key = routing_key mp = self.amqpSession.message_properties() mp.content_type = "x-application/qmf" mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) return Message(dp, mp, body) def _send(self, msg, dest="qpid.management"): self.amqpSession.message_transfer(destination=dest, message=msg) def _shutdown(self): if self.active: self.amqpSession.incoming("rdest").stop() if self.session.console != None: self.amqpSession.incoming("tdest").stop() self.amqpSession.close() self.active = False else: raise Exception("Broker already disconnected") def _waitForStable(self): self.cv.acquire() if self.reqsOutstanding == 0: self.cv.release() return self.syncInFlight = True starttime = time() while self.reqsOutstanding != 0: self.cv.wait(self.SYNC_TIME) if time() - starttime > self.SYNC_TIME: self.cv.release() raise RuntimeError("Timed out waiting for broker to synchronize") self.cv.release() def _incOutstanding(self): self.cv.acquire() self.reqsOutstanding += 1 self.cv.release() def _decOutstanding(self): self.cv.acquire() self.reqsOutstanding -= 1 if self.reqsOutstanding == 0 and not self.topicBound and self.session.console != None: self.topicBound = True self.amqpSession.exchange_bind(exchange="qpid.management", queue=self.topicName, binding_key="mgmt.#") if self.reqsOutstanding == 0 and self.syncInFlight: self.syncInFlight = False self.cv.notify() self.cv.release() def _replyCb(self, msg): codec = Codec(self.conn.spec, msg.body) opcode, seq = self._checkHeader(codec) if opcode == None: return if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq) elif opcode == 'e': self.session._handleEventInd (self, codec, seq) elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) def _exceptionCb(self, data): self.active = False self.error = data self.cv.acquire() if self.syncInFlight: self.cv.notify() self.cv.release() self.session._handleError(self.error) class Agent: """ """ def __init__(self, broker, bank, label): self.broker = broker self.bank = bank self.label = label def __repr__(self): return "Agent at bank %d (%s)" % (self.bank, self.label) class Event: """ """ def __init__(self): pass class SequenceManager: """ Manage sequence numbers for asynchronous method calls """ def __init__(self): self.lock = Lock() self.sequence = 0 self.pending = {} def _reserve(self, data): """ Reserve a unique sequence number """ self.lock.acquire() result = self.sequence self.sequence = self.sequence + 1 self.pending[result] = data self.lock.release() return result def _release(self, seq): """ Release a reserved sequence number """ data = None self.lock.acquire() if seq in self.pending: data = self.pending[seq] del self.pending[seq] self.lock.release() return data class DebugConsole(Console): """ """ def newPackage(self, name): print "newPackage:", name def newClass(self, classKey): print "newClass:", classKey def newAgent(self, agent): print "newAgent:", agent def delAgent(self, agent): print "delAgent:", agent def objectProps(self, broker, id, record): print "objectProps:", record.getClassKey() def objectStats(self, broker, id, record): print "objectStats:", record.getClassKey() def event(self, broker, event): print "event:", event def heartbeat(self, agent, timestamp): print "heartbeat:", agent def brokerInfo(self, broker): print "brokerInfo:", broker