diff options
| author | Ted Ross <tross@apache.org> | 2008-10-15 15:51:15 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-10-15 15:51:15 +0000 |
| commit | 978185bafb2b08d89c0b675263dd2b1f715c9e69 (patch) | |
| tree | adbffd2832e5ed2cd947c9a7acad91497b12aced | |
| parent | 658c2efbccea313fc9d8ec4ee658d6e8b30ac61a (diff) | |
| download | qpid-python-978185bafb2b08d89c0b675263dd2b1f715c9e69.tar.gz | |
QPID-1360 - Scaling improvements for QMF
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@704944 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/managementgen/qmf/templates/Class.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 81 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementBroker.cpp | 32 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.h | 10 | ||||
| -rwxr-xr-x | qpid/python/commands/qpid-queue-stats | 195 | ||||
| -rw-r--r-- | qpid/python/qpid/management.py | 69 | ||||
| -rw-r--r-- | qpid/python/qpid/qmfconsole.py | 145 |
7 files changed, 297 insertions, 241 deletions
diff --git a/qpid/cpp/managementgen/qmf/templates/Class.h b/qpid/cpp/managementgen/qmf/templates/Class.h index 2a995c95a5..7796914d51 100644 --- a/qpid/cpp/managementgen/qmf/templates/Class.h +++ b/qpid/cpp/managementgen/qmf/templates/Class.h @@ -55,11 +55,11 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject struct PerThreadStats** perThreadStatsArray; inline struct PerThreadStats* getThreadStats() { - int index = getThreadIndex(); - struct PerThreadStats* threadStats = perThreadStatsArray[index]; + int idx = getThreadIndex(); + struct PerThreadStats* threadStats = perThreadStatsArray[idx]; if (threadStats == 0) { threadStats = new(PerThreadStats); - perThreadStatsArray[index] = threadStats; + perThreadStatsArray[idx] = threadStats; /*MGEN:Class.InitializePerThreadElements*/ } return threadStats; diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 2f7a524a65..36e56e48ae 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -606,13 +606,11 @@ void ManagementAgentImpl::periodicProcessing() moveNewObjectsLH(); - if (clientWasAdded) - { + if (clientWasAdded) { clientWasAdded = false; for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); - iter++) - { + iter++) { ManagementObject* object = iter->second; object->setAllChanged(); } @@ -620,39 +618,64 @@ void ManagementAgentImpl::periodicProcessing() if (managementObjects.empty()) return; - + + // + // Clear the been-here flag on all objects in the map. + // for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) - { - ManagementObject* object = iter->second; + iter->second->setFlags(0); + + // + // Process the entire object map. + // + for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second; + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->isDeleted())) + continue; - if (object->getConfigChanged() || object->isDeleted()) - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); + Buffer msgBuffer(msgChars, BUFSIZE); + for (ManagementObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); - } + if (object->getConfigChanged() || object->isDeleted()) { + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + } - if (object->getInstChanged()) - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); + if (object->getInstChanged()) { + encodeHeader(msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + } + + if (object->isDeleted()) + deleteList.push_back(iter->first); - contentSize = BUFSIZE - msgBuffer.available(); + if (msgBuffer.available() < (BUFSIZE / 2)) + break; + } + } + + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { msgBuffer.reset(); - routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName(); + routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName(); connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - - if (object->isDeleted()) - deleteList.push_back(iter->first); } // Delete flagged objects @@ -737,7 +760,9 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); msg.setData(data); - session.messageTransfer(arg::content=msg, arg::destination=exchange); + try { + session.messageTransfer(arg::content=msg, arg::destination=exchange); + } catch(std::exception&) {} } void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp index 4a9882d827..7ced42f69b 100644 --- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp +++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp @@ -291,26 +291,26 @@ bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementBroker::sendBuffer (Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - string routingKey) +void ManagementBroker::sendBuffer(Buffer& buf, + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey) { if (exchange.get() == 0) return; - intrusive_ptr<Message> msg (new Message ()); - AMQFrame method (in_place<MessageTransferBody>( + intrusive_ptr<Message> msg(new Message()); + AMQFrame method(in_place<MessageTransferBody>( ProtocolVersion(), exchange->getName (), 0, 0)); - AMQFrame header (in_place<AMQHeaderBody>()); + AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>()); content.castBody<AMQContentBody>()->decode(buf, length); - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); msg->getFrames().append(method); msg->getFrames().append(header); @@ -321,7 +321,9 @@ void ManagementBroker::sendBuffer (Buffer& buf, msg->getFrames().append(content); DeliverableMessage deliverable (msg); - exchange->route (deliverable, routingKey, 0); + try { + exchange->route(deliverable, routingKey, 0); + } catch(std::exception&) {} } void ManagementBroker::moveNewObjectsLH() @@ -385,7 +387,7 @@ void ManagementBroker::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName (); + routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName (); sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } @@ -397,7 +399,7 @@ void ManagementBroker::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName (); + routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName (); sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } @@ -1018,7 +1020,7 @@ void ManagementBroker::addClassLH(uint8_t kind, return; // No such class found, create a new class with local information. - QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << + QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" << key.name); cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index fa2025112f..a34f50ab8f 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -122,6 +122,7 @@ class ManagementObject : public ManagementItem sys::Mutex accessLock; ManagementAgent* agent; int maxThreads; + uint32_t flags; static int nextThreadIndex; @@ -164,6 +165,15 @@ class ManagementObject : public ManagementItem deleted = true; } inline bool isDeleted (void) { return deleted; } + inline void setFlags(uint32_t f) { flags = f; } + inline uint32_t getFlags() { return flags; } + bool isSameClass(ManagementObject& other) { + for (int idx = 0; idx < 16; idx++) + if (other.getMd5Sum()[idx] != getMd5Sum()[idx]) + return false; + return other.getClassName() == getClassName() && + other.getPackageName() == getPackageName(); + } }; typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap; diff --git a/qpid/python/commands/qpid-queue-stats b/qpid/python/commands/qpid-queue-stats index 98dfa7580a..c29cab3568 100755 --- a/qpid/python/commands/qpid-queue-stats +++ b/qpid/python/commands/qpid-queue-stats @@ -26,120 +26,96 @@ import re import socket import qpid from threading import Condition -from qpid.management import managementClient -from qpid.managementdata import Broker +from qpid.qmfconsole import Session, Console from qpid.peer import Closed from qpid.connection import Connection, ConnectionFailed from qpid.util import connect from time import sleep -class mgmtObject (object): - """ Generic object that holds the contents of a management object with its - attributes set as object attributes. """ - - def __init__ (self, classKey, timestamps, row): - self.classKey = classKey - self.timestamps = timestamps - for cell in row: - setattr (self, cell[0], cell[1]) - - - -class BrokerManager: - def __init__ (self): - self.dest = None - self.src = None - self.broker = None - self.objects = {} - self.filter = None - - def SetBroker (self, broker): - self.broker = broker - - def ConnectToBroker (self): - try: - self.sessionId = "%s.%d" % (os.uname()[1], os.getpid()) - self.conn = Connection (connect (self.broker.host, self.broker.port), - username=self.broker.username, password=self.broker.password) - self.conn.start () - self.session = self.conn.session(self.sessionId) - self.mclient = managementClient (self.conn.spec, None, self.configCb, self.instCb) - self.mchannel = self.mclient.addChannel (self.session) - except socket.error, e: - print "Socket Error %s - %s" % (e[0], e[1]) - sys.exit (1) - except Closed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit (1) - except ConnectionFailed, e: - print "Connect Failed %d - %s" % (e[0], e[1]) - sys.exit(1) - - def setFilter(self,filter): - self.filter = filter - - def Disconnect (self): - self.mclient.removeChannel (self.mchannel) - self.session.close(timeout=10) - self.conn.close(timeout=10) - - def configCb (self, context, classKey, row, timestamps): - className = classKey[1] - if className != "queue": - return - - obj = mgmtObject (classKey, timestamps, row) - if obj.id not in self.objects: - self.objects[obj.id] = (obj.name, None, None) - - def instCb (self, context, classKey, row, timestamps): - className = classKey[1] - if className != "queue": - return - - obj = mgmtObject (classKey, timestamps, row) - if obj.id not in self.objects: - return - - (name, first, last) = self.objects[obj.id] - if first == None: - self.objects[obj.id] = (name, obj, None) - return - - if len(self.filter) > 0 : - match = False +class BrokerManager(Console): + def __init__(self, host): + self.url = host + self.objects = {} + self.filter = None + self.session = Session(self, rcvEvents=False, rcvHeartbeats=False, userBindings=True) + try: + self.broker = self.session.addBroker(self.url) + except socket.error, e: + print "Socket Error %s - %s" % (e[0], e[1]) + sys.exit (1) + except Closed, e: + print "Connect Failed %d - %s" % (e[0], e[1]) + sys.exit (1) + except ConnectionFailed, e: + print "Connect Failed %d - %s" % (e[0], e[1]) + sys.exit(1) + + def setFilter(self,filter): + self.filter = filter + + def objectProps(self, broker, record): + className = record.getClassKey()[1] + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + self.objects[id] = (record.name, None, None) + + def objectStats(self, broker, record): + className = record.getClassKey()[1] + if className != "queue": + return + + id = record.getObjectId().__repr__() + if id not in self.objects: + return + + (name, first, last) = self.objects[id] + if first == None: + self.objects[id] = (name, record, None) + return + + if len(self.filter) > 0 : + match = False - for x in self.filter: - if x.match(name): - match = True - break - if match == False: - return - - if last == None: - lastSample = first - else: - lastSample = last - - self.objects[obj.id] = (name, first, obj) - - deltaTime = float (obj.timestamps[0] - lastSample.timestamps[0]) - enqueueRate = float (obj.msgTotalEnqueues - lastSample.msgTotalEnqueues) / (deltaTime / 1000000000.0) - dequeueRate = float (obj.msgTotalDequeues - lastSample.msgTotalDequeues) / (deltaTime / 1000000000.0) - print "%-41s%10.2f%11d%13.2f%13.2f" % \ - (name, deltaTime / 1000000000, obj.msgDepth, enqueueRate, dequeueRate) - - - def Display (self): - self.ConnectToBroker () - print "Queue Name Sec Depth Enq Rate Deq Rate" - print "========================================================================================" - try: - while True: - sleep (1) - except KeyboardInterrupt: - pass - self.Disconnect () + for x in self.filter: + if x.match(name): + match = True + break + if match == False: + return + + if last == None: + lastSample = first + else: + lastSample = last + + self.objects[id] = (name, first, record) + + deltaTime = float (record.getTimestamps()[0] - lastSample.getTimestamps()[0]) + enqueueRate = float (record.msgTotalEnqueues - lastSample.msgTotalEnqueues) / \ + (deltaTime / 1000000000.0) + dequeueRate = float (record.msgTotalDequeues - lastSample.msgTotalDequeues) / \ + (deltaTime / 1000000000.0) + print "%-41s%10.2f%11d%13.2f%13.2f" % \ + (name, deltaTime / 1000000000, record.msgDepth, enqueueRate, dequeueRate) + + + def Display (self): + classes = self.session.getClasses("org.apache.qpid.broker") + for cls in classes: + if cls[1] == "queue": + queueClassKey = cls + self.session.bindClass(queueClassKey) + print "Queue Name Sec Depth Enq Rate Deq Rate" + print "========================================================================================" + try: + while True: + sleep (1) + except KeyboardInterrupt: + print + self.session.delBroker(self.broker) ## ## Main Program @@ -157,8 +133,7 @@ def main(): for s in options.filter.split(","): filter.append(re.compile(s)) - bm = BrokerManager () - bm.SetBroker (Broker (host)) + bm = BrokerManager(host) bm.setFilter(filter) bm.Display() diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index e6ff5852f2..485b64b99f 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -162,8 +162,12 @@ class managementChannel: ssn.exchange_bind (exchange="amq.direct", queue=self.replyName, binding_key=self.replyName) - ssn.message_subscribe (queue=self.topicName, destination="tdest") - ssn.message_subscribe (queue=self.replyName, destination="rdest") + ssn.message_subscribe (queue=self.topicName, destination="tdest", + accept_mode=ssn.accept_mode.none, + acquire_mode=ssn.acquire_mode.pre_acquired) + ssn.message_subscribe (queue=self.replyName, destination="rdest", + accept_mode=ssn.accept_mode.none, + acquire_mode=ssn.acquire_mode.pre_acquired) ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb) ssn.incoming ("rdest").listen (self.replyCb) @@ -202,9 +206,6 @@ class managementChannel: if self.enabled: self.qpidChannel.message_transfer (destination=exchange, message=msg) - def accept (self, msg): - self.qpidChannel.message_accept(RangedSet(msg.id)) - def message (self, body, routing_key="broker"): dp = self.qpidChannel.delivery_properties() dp.routing_key = routing_key @@ -349,28 +350,27 @@ class managementClient: def topicCb (self, ch, msg): """ Receive messages via the topic queue of a particular channel. """ codec = Codec (self.spec, msg.body) - hdr = self.checkHeader (codec) - if hdr == None: - raise ValueError ("outer header invalid"); + while True: + hdr = self.checkHeader (codec) + if hdr == None: + return - if hdr[0] == 'p': - self.handlePackageInd (ch, codec) - elif hdr[0] == 'q': - self.handleClassInd (ch, codec) - elif hdr[0] == 'h': - self.handleHeartbeat (ch, codec) - elif hdr[0] == 'e': - self.handleEvent (ch, codec) - else: - self.parse (ch, codec, hdr[0], hdr[1]) - ch.accept(msg) + if hdr[0] == 'p': + self.handlePackageInd (ch, codec) + elif hdr[0] == 'q': + self.handleClassInd (ch, codec) + elif hdr[0] == 'h': + self.handleHeartbeat (ch, codec) + elif hdr[0] == 'e': + self.handleEvent (ch, codec) + else: + self.parse (ch, codec, hdr[0], hdr[1]) def replyCb (self, ch, msg): """ Receive messages via the reply queue of a particular channel. """ codec = Codec (self.spec, msg.body) hdr = self.checkHeader (codec) if hdr == None: - ch.accept(msg) return if hdr[0] == 'm': @@ -385,7 +385,6 @@ class managementClient: self.handleClassInd (ch, codec) else: self.parse (ch, codec, hdr[0], hdr[1]) - ch.accept(msg) def exceptCb (self, ch, data): if self.closeCb != None: @@ -403,20 +402,22 @@ class managementClient: codec.write_uint32 (seq) def checkHeader (self, codec): - """ Check the header of a management message and extract the opcode and - class. """ - octet = chr (codec.read_uint8 ()) - if octet != 'A': - return None - octet = chr (codec.read_uint8 ()) - if octet != 'M': - return None - octet = chr (codec.read_uint8 ()) - if octet != '2': + """ Check the header of a management message and extract the opcode and class. """ + try: + octet = chr (codec.read_uint8 ()) + if octet != 'A': + return None + octet = chr (codec.read_uint8 ()) + if octet != 'M': + return None + octet = chr (codec.read_uint8 ()) + if octet != '2': + return None + opcode = chr (codec.read_uint8 ()) + seq = codec.read_uint32 () + return (opcode, seq) + except: return None - opcode = chr (codec.read_uint8 ()) - seq = codec.read_uint32 () - return (opcode, seq) def encodeValue (self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ diff --git a/qpid/python/qpid/qmfconsole.py b/qpid/python/qpid/qmfconsole.py index 5e1df10e5b..ed4565dac4 100644 --- a/qpid/python/qpid/qmfconsole.py +++ b/qpid/python/qpid/qmfconsole.py @@ -62,11 +62,11 @@ class Console: """ Invoked when a QMF agent disconects. """ pass - def objectProps(self, broker, id, record): + def objectProps(self, broker, record): """ Invoked when an object is updated. """ pass - def objectStats(self, broker, id, record): + def objectStats(self, broker, record): """ Invoked when an object is updated. """ pass @@ -111,10 +111,10 @@ class Session: _CONTEXT_STARTUP = 2 _CONTEXT_MULTIGET = 3 - GET_WAIT_TIME = 10 + GET_WAIT_TIME = 60 def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, - manageConnections=False): + manageConnections=False, userBindings=False): """ Initialize a session. If the console argument is provided, the more advanced asynchronous features are available. If console is @@ -131,6 +131,12 @@ class Session: If manageConnections is set to False, the user is responsible for handing failures. In this case, an unreachable broker will cause addBroker to raise an exception. + + If userBindings is set to False (the default) and rcvObjects is True, the console will + receive data for all object classes. If userBindings is set to True, the user must select + which classes the console shall receive by invoking the bindPackage or bindClass methods. + This allows the console to be configured to receive only information that is relavant to + a particular application. If rcvObjects id False, userBindings has no meaning. """ self.console = console self.brokers = [] @@ -141,14 +147,21 @@ class Session: self.getResult = [] self.getSelect = [] self.error = None + self.rcvObjects = rcvObjects + self.rcvEvents = rcvEvents + self.rcvHeartbeats = rcvHeartbeats + self.userBindings = userBindings if self.console == None: - rcvObjects = False - rcvEvents = False - rcvHeartbeats = False - self.bindingKeyList = self._bindingKeys(rcvObjects, rcvEvents, rcvHeartbeats) + self.rcvObjects = False + self.rcvEvents = False + self.rcvHeartbeats = False + self.bindingKeyList = self._bindingKeys() self.manageConnections = manageConnections - if (manageConnections): + if self.userBindings and not self.rcvObjects: + raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + + if manageConnections: raise Exception("manageConnections - not yet implemented") def __repr__(self): @@ -200,6 +213,23 @@ class Session: if (cname, hash) in self.packages[pname]: return self.packages[pname][(cname, hash)] + def bindPackage(self, packageName): + """ """ + if not self.userBindings or not self.rcvObjects: + raise Exception("userBindings option not set for Session") + for broker in self.brokers: + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, + binding_key="console.obj.%s" % packageName) + + def bindClass(self, classKey): + """ """ + if not self.userBindings or not self.rcvObjects: + raise Exception("userBindings option not set for Session") + pname, cname, hash = classKey + for broker in self.brokers: + broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, + binding_key="console.obj.%s.%s" % (pname, cname)) + def getAgents(self, broker=None): """ Get a list of currently known agents """ brokerList = [] @@ -322,20 +352,19 @@ class Session: """ """ pass - def _bindingKeys(self, rcvObjects, rcvEvents, rcvHeartbeats): + def _bindingKeys(self): keyList = [] keyList.append("schema.#") - if rcvObjects and rcvEvents and rcvHeartbeats: + if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings: keyList.append("console.#") else: - if rcvObjects: - keyList.append("console.prop.#") - keyList.append("console.stat.#") + if self.rcvObjects and not self.userBindings: + keyList.append("console.obj.#") else: - keyList.append("console.prop.org.apache.qpid.broker.agent") - if rcvEvents: + keyList.append("console.obj.org.apache.qpid.broker.agent") + if self.rcvEvents: keyList.append("console.event.#") - if rcvHeartbeats: + if self.rcvHeartbeats: keyList.append("console.heartbeat") return keyList @@ -488,9 +517,9 @@ class Session: if self.console != None: if prop: - self.console.objectProps(broker, object.getObjectId(), object) + self.console.objectProps(broker, object) if stat: - self.console.objectStats(broker, object.getObjectId(), object) + self.console.objectStats(broker, object) def _handleError(self, error): self.error = error @@ -858,6 +887,15 @@ class Object(object): def getStatistics(self): return self._statistics + def mergeUpdate(self, newer): + """ Replace properties and/or statistics with a newly received update """ + if self._objectId != newer._objectId: + raise Exception("Objects with different object-ids") + if len(newer.getProperties()) > 0: + self.properties = newer.getProperties() + if len(newer.getStatistics()) > 0: + self.statistics = newer.getStatistics() + def __repr__(self): return self.getIndex() @@ -960,7 +998,7 @@ class MethodResult(object): class Broker: """ """ - SYNC_TIME = 10 + SYNC_TIME = 60 def __init__(self, session, host, port, authMech, authUser, authPass): self.session = session @@ -1024,7 +1062,9 @@ class Broker: self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True) self.amqpSession.exchange_bind(exchange="amq.direct", queue=self.replyName, binding_key=self.replyName) - self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest") + self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFF) @@ -1032,7 +1072,9 @@ class Broker: self.topicName = "topic-%s" % self.amqpSessionId self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True) - self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest") + self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) self.amqpSession.incoming("tdest").listen(self._replyCb) self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF) @@ -1076,18 +1118,21 @@ class Broker: def _checkHeader(self, codec): """ Check the header of a management message and extract the opcode and class. """ - octet = chr(codec.read_uint8()) - if octet != 'A': - return None, None - octet = chr(codec.read_uint8()) - if octet != 'M': - return None, None - octet = chr(codec.read_uint8()) - if octet != '2': + try: + octet = chr(codec.read_uint8()) + if octet != 'A': + return None, None + octet = chr(codec.read_uint8()) + if octet != 'M': + return None, None + octet = chr(codec.read_uint8()) + if octet != '2': + return None, None + opcode = chr(codec.read_uint8()) + seq = codec.read_uint32() + return opcode, seq + except: return None, None - opcode = chr(codec.read_uint8()) - seq = codec.read_uint32() - return opcode, seq def _message (self, body, routing_key="broker"): dp = self.amqpSession.delivery_properties() @@ -1143,23 +1188,21 @@ class Broker: self.cv.release() def _replyCb(self, msg): - self.amqpSession.message_accept(RangedSet(msg.id)) codec = Codec(self.conn.spec, msg.body) - opcode, seq = self._checkHeader(codec) - if opcode == None: - return - - if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) - elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) - elif opcode == 'q': self.session._handleClassInd (self, codec, seq) - elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) - elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq) - elif opcode == 'e': self.session._handleEventInd (self, codec, seq) - elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) - elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) - elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) - elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) + while True: + opcode, seq = self._checkHeader(codec) + if opcode == None: return + if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) + elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) + elif opcode == 'q': self.session._handleClassInd (self, codec, seq) + elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) + elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq) + elif opcode == 'e': self.session._handleEventInd (self, codec, seq) + elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) + elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) + elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) + elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) def _exceptionCb(self, data): self.isConnected = False @@ -1286,10 +1329,10 @@ class DebugConsole(Console): def delAgent(self, agent): print "delAgent:", agent - def objectProps(self, broker, id, record): + def objectProps(self, broker, record): print "objectProps:", record.getClassKey() - def objectStats(self, broker, id, record): + def objectStats(self, broker, record): print "objectStats:", record.getClassKey() def event(self, broker, event): |
