diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/qmfconsole.py | 264 |
1 files changed, 147 insertions, 117 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index 4c7b47e38a..7bfe5d1128 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -106,9 +106,6 @@ class BrokerURL: def name(self): return self.host + ":" + str(self.port) - def match(self, host, port): - return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port - class Session: """ An instance of the Session class represents a console session running @@ -326,10 +323,12 @@ class Session: for agent in agentList: broker = agent.broker sendCodec = Codec(broker.conn.spec) - self.cv.acquire() - seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) - self.syncSequenceList.append(seq) - self.cv.release() + try: + self.cv.acquire() + seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) + self.syncSequenceList.append(seq) + finally: + self.cv.release() broker._setHeader(sendCodec, 'G', seq) sendCodec.write_map(map) smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank) @@ -337,15 +336,17 @@ class Session: starttime = time() timeout = False - self.cv.acquire() - while len(self.syncSequenceList) > 0 and self.error == None: - self.cv.wait(self.GET_WAIT_TIME) - if time() - starttime > self.GET_WAIT_TIME: - for pendingSeq in self.syncSequenceList: - self.seqMgr._release(pendingSeq) - self.syncSequenceList = [] - timeout = True - self.cv.release() + try: + self.cv.acquire() + while len(self.syncSequenceList) > 0 and self.error == None: + self.cv.wait(self.GET_WAIT_TIME) + if time() - starttime > self.GET_WAIT_TIME: + for pendingSeq in self.syncSequenceList: + self.seqMgr._release(pendingSeq) + self.syncSequenceList = [] + timeout = True + finally: + self.cv.release() if self.error: errorText = self.error @@ -397,14 +398,16 @@ class Session: def _handlePackageInd(self, broker, codec, seq): pname = str(codec.read_str8()) - self.cv.acquire() - if pname not in self.packages: - self.packages[pname] = {} - self.cv.release() - if self.console != None: - self.console.newPackage(pname) - else: + notify = False + try: + self.cv.acquire() + if pname not in self.packages: + self.packages[pname] = {} + notify = True + finally: self.cv.release() + if notify and self.console != None: + self.console.newPackage(pname) # Send a class request broker._incOutstanding() @@ -422,30 +425,38 @@ class Session: if context == self._CONTEXT_STARTUP: broker._decOutstanding() elif context == self._CONTEXT_SYNC and seq == broker.syncSequence: - broker.cv.acquire() - broker.syncInFlight = False - broker.cv.notify() - broker.cv.release() + try: + broker.cv.acquire() + broker.syncInFlight = False + broker.cv.notify() + finally: + broker.cv.release() elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList: - self.cv.acquire() - self.syncSequenceList.remove(seq) - if len(self.syncSequenceList) == 0: - self.cv.notify() - self.cv.release() + try: + self.cv.acquire() + self.syncSequenceList.remove(seq) + if len(self.syncSequenceList) == 0: + self.cv.notify() + finally: + self.cv.release() def _handleClassInd(self, broker, codec, seq): kind = codec.read_uint8() pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() + unknown = False - self.cv.acquire() - if pname not in self.packages: + try: + self.cv.acquire() + if pname in self.packages: + if (cname, hash) not in self.packages[pname]: + unknown = True + finally: self.cv.release() - return - if (cname, hash) not in self.packages[pname]: + + if unknown: # Send a schema request for the unknown class - self.cv.release() broker._incOutstanding() sendCodec = Codec(broker.conn.spec) seq = self.seqMgr._reserve(self._CONTEXT_STARTUP) @@ -455,8 +466,6 @@ class Session: sendCodec.write_bin128(hash) smsg = broker._message(sendCodec.encoded) broker._send(smsg) - else: - self.cv.release() def _handleMethodResp(self, broker, codec, seq): code = codec.read_uint32() @@ -469,11 +478,13 @@ class Session: outArgs[arg.name] = self._decodeValue(codec, arg.type) result = MethodResult(code, text, outArgs) if synchronous: - broker.cv.acquire() - broker.syncResult = result - broker.syncInFlight = False - broker.cv.notify() - broker.cv.release() + try: + broker.cv.acquire() + broker.syncResult = result + broker.syncInFlight = False + broker.cv.notify() + finally: + broker.cv.release() else: if self.console: self.console.methodResponse(broker, seq, result) @@ -495,9 +506,11 @@ class Session: hash = codec.read_bin128() classKey = (pname, cname, hash) _class = SchemaClass(kind, classKey, codec) - self.cv.acquire() - self.packages[pname][(cname, hash)] = _class - self.cv.release() + try: + self.cv.acquire() + self.packages[pname][(cname, hash)] = _class + finally: + self.cv.release() broker._decOutstanding() if self.console != None: self.console.newClass(kind, classKey) @@ -507,26 +520,28 @@ class Session: cname = str(codec.read_str8()) hash = codec.read_bin128() classKey = (pname, cname, hash) - self.cv.acquire() - if pname not in self.packages: - self.cv.release() - return - if (cname, hash) not in self.packages[pname]: + try: + self.cv.acquire() + if pname not in self.packages: + return + if (cname, hash) not in self.packages[pname]: + return + schema = self.packages[pname][(cname, hash)] + finally: self.cv.release() - return - self.cv.release() - schema = self.packages[pname][(cname, hash)] + object = Object(self, broker, schema, codec, prop, stat) if pname == "org.apache.qpid.broker" and cname == "agent": broker._updateAgent(object) - self.cv.acquire() - if seq in self.syncSequenceList: - if object.getTimestamps()[2] == 0 and self._selectMatch(object): - self.getResult.append(object) + try: + self.cv.acquire() + if seq in self.syncSequenceList: + if object.getTimestamps()[2] == 0 and self._selectMatch(object): + self.getResult.append(object) + return + finally: self.cv.release() - return - self.cv.release() if self.console != None: if prop: @@ -536,10 +551,12 @@ class Session: def _handleError(self, error): self.error = error - self.cv.acquire() - self.syncSequenceList = [] - self.cv.notify() - self.cv.release() + try: + self.cv.acquire() + self.syncSequenceList = [] + self.cv.notify() + finally: + self.cv.release() def _selectMatch(self, object): """ Check the object against self.getSelect to check for a match """ @@ -993,24 +1010,27 @@ class Object(object): smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % (self._objectId.getBroker(), self._objectId.getBank())) if synchronous: - self._broker.cv.acquire() - self._broker.syncInFlight = True - self._broker.cv.release() + try: + self._broker.cv.acquire() + self._broker.syncInFlight = True + finally: + self._broker.cv.release() self._broker._send(smsg) return seq return None def _invoke(self, name, args, kwargs): if self._sendMethodRequest(name, args, kwargs, True): - self._broker.cv.acquire() - starttime = time() - while self._broker.syncInFlight and self._broker.error == None: - self._broker.cv.wait(self._broker.SYNC_TIME) - if time() - starttime > self._broker.SYNC_TIME: - self._broker.cv.release() - self._session.seqMgr._release(seq) - raise RuntimeError("Timed out waiting for method to respond") - self._broker.cv.release() + try: + self._broker.cv.acquire() + starttime = time() + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(self._broker.SYNC_TIME) + if time() - starttime > self._broker.SYNC_TIME: + self._session.seqMgr._release(seq) + raise RuntimeError("Timed out waiting for method to respond") + finally: + self._broker.cv.release() if self._broker.error != None: errorText = self._broker.error self._broker.error = None @@ -1208,36 +1228,40 @@ class Broker: raise Exception("Broker already disconnected") def _waitForStable(self): - self.cv.acquire() - if self.reqsOutstanding == 0: + try: + self.cv.acquire() + if self.reqsOutstanding == 0: + return + self.syncInFlight = True + starttime = time() + while self.reqsOutstanding != 0: + self.cv.wait(self.SYNC_TIME) + if time() - starttime > self.SYNC_TIME: + raise RuntimeError("Timed out waiting for broker to synchronize") + finally: self.cv.release() - return - self.syncInFlight = True - starttime = time() - while self.reqsOutstanding != 0: - self.cv.wait(self.SYNC_TIME) - if time() - starttime > self.SYNC_TIME: - self.cv.release() - raise RuntimeError("Timed out waiting for broker to synchronize") - self.cv.release() def _incOutstanding(self): - self.cv.acquire() - self.reqsOutstanding += 1 - self.cv.release() + try: + self.cv.acquire() + self.reqsOutstanding += 1 + finally: + self.cv.release() def _decOutstanding(self): - self.cv.acquire() - self.reqsOutstanding -= 1 - if self.reqsOutstanding == 0 and not self.topicBound: - self.topicBound = True - for key in self.session.bindingKeyList: - self.amqpSession.exchange_bind(exchange="qpid.management", - queue=self.topicName, binding_key=key) - if self.reqsOutstanding == 0 and self.syncInFlight: - self.syncInFlight = False - self.cv.notify() - self.cv.release() + try: + self.cv.acquire() + self.reqsOutstanding -= 1 + if self.reqsOutstanding == 0 and not self.topicBound: + self.topicBound = True + for key in self.session.bindingKeyList: + self.amqpSession.exchange_bind(exchange="qpid.management", + queue=self.topicName, binding_key=key) + if self.reqsOutstanding == 0 and self.syncInFlight: + self.syncInFlight = False + self.cv.notify() + finally: + self.cv.release() def _replyCb(self, msg): codec = Codec(self.conn.spec, msg.body) @@ -1259,10 +1283,12 @@ class Broker: def _exceptionCb(self, data): self.isConnected = False self.error = data - self.cv.acquire() - if self.syncInFlight: - self.cv.notify() - self.cv.release() + try: + self.cv.acquire() + if self.syncInFlight: + self.cv.notify() + finally: + self.cv.release() self.session._handleError(self.error) self.session._handleBrokerDisconnect(self) @@ -1326,7 +1352,7 @@ class Event: return self.arguments def getTimestamp(self): - return self.timerstamp + return self.timestamp def getName(self): return self.name @@ -1343,21 +1369,25 @@ class SequenceManager: def _reserve(self, data): """ Reserve a unique sequence number """ - self.lock.acquire() - result = self.sequence - self.sequence = self.sequence + 1 - self.pending[result] = data - self.lock.release() + try: + self.lock.acquire() + result = self.sequence + self.sequence = self.sequence + 1 + self.pending[result] = data + finally: + self.lock.release() return result def _release(self, seq): """ Release a reserved sequence number """ data = None - self.lock.acquire() - if seq in self.pending: - data = self.pending[seq] - del self.pending[seq] - self.lock.release() + try: + self.lock.acquire() + if seq in self.pending: + data = self.pending[seq] + del self.pending[seq] + finally: + self.lock.release() return data |
