summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/qpid/qmfconsole.py264
1 files changed, 147 insertions, 117 deletions
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index 4c7b47e38a..7bfe5d1128 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -106,9 +106,6 @@ class BrokerURL:
def name(self):
return self.host + ":" + str(self.port)
- def match(self, host, port):
- return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port
-
class Session:
"""
An instance of the Session class represents a console session running
@@ -326,10 +323,12 @@ class Session:
for agent in agentList:
broker = agent.broker
sendCodec = Codec(broker.conn.spec)
- self.cv.acquire()
- seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
- self.syncSequenceList.append(seq)
- self.cv.release()
+ try:
+ self.cv.acquire()
+ seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
+ self.syncSequenceList.append(seq)
+ finally:
+ self.cv.release()
broker._setHeader(sendCodec, 'G', seq)
sendCodec.write_map(map)
smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
@@ -337,15 +336,17 @@ class Session:
starttime = time()
timeout = False
- self.cv.acquire()
- while len(self.syncSequenceList) > 0 and self.error == None:
- self.cv.wait(self.GET_WAIT_TIME)
- if time() - starttime > self.GET_WAIT_TIME:
- for pendingSeq in self.syncSequenceList:
- self.seqMgr._release(pendingSeq)
- self.syncSequenceList = []
- timeout = True
- self.cv.release()
+ try:
+ self.cv.acquire()
+ while len(self.syncSequenceList) > 0 and self.error == None:
+ self.cv.wait(self.GET_WAIT_TIME)
+ if time() - starttime > self.GET_WAIT_TIME:
+ for pendingSeq in self.syncSequenceList:
+ self.seqMgr._release(pendingSeq)
+ self.syncSequenceList = []
+ timeout = True
+ finally:
+ self.cv.release()
if self.error:
errorText = self.error
@@ -397,14 +398,16 @@ class Session:
def _handlePackageInd(self, broker, codec, seq):
pname = str(codec.read_str8())
- self.cv.acquire()
- if pname not in self.packages:
- self.packages[pname] = {}
- self.cv.release()
- if self.console != None:
- self.console.newPackage(pname)
- else:
+ notify = False
+ try:
+ self.cv.acquire()
+ if pname not in self.packages:
+ self.packages[pname] = {}
+ notify = True
+ finally:
self.cv.release()
+ if notify and self.console != None:
+ self.console.newPackage(pname)
# Send a class request
broker._incOutstanding()
@@ -422,30 +425,38 @@ class Session:
if context == self._CONTEXT_STARTUP:
broker._decOutstanding()
elif context == self._CONTEXT_SYNC and seq == broker.syncSequence:
- broker.cv.acquire()
- broker.syncInFlight = False
- broker.cv.notify()
- broker.cv.release()
+ try:
+ broker.cv.acquire()
+ broker.syncInFlight = False
+ broker.cv.notify()
+ finally:
+ broker.cv.release()
elif context == self._CONTEXT_MULTIGET and seq in self.syncSequenceList:
- self.cv.acquire()
- self.syncSequenceList.remove(seq)
- if len(self.syncSequenceList) == 0:
- self.cv.notify()
- self.cv.release()
+ try:
+ self.cv.acquire()
+ self.syncSequenceList.remove(seq)
+ if len(self.syncSequenceList) == 0:
+ self.cv.notify()
+ finally:
+ self.cv.release()
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
+ unknown = False
- self.cv.acquire()
- if pname not in self.packages:
+ try:
+ self.cv.acquire()
+ if pname in self.packages:
+ if (cname, hash) not in self.packages[pname]:
+ unknown = True
+ finally:
self.cv.release()
- return
- if (cname, hash) not in self.packages[pname]:
+
+ if unknown:
# Send a schema request for the unknown class
- self.cv.release()
broker._incOutstanding()
sendCodec = Codec(broker.conn.spec)
seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
@@ -455,8 +466,6 @@ class Session:
sendCodec.write_bin128(hash)
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
- else:
- self.cv.release()
def _handleMethodResp(self, broker, codec, seq):
code = codec.read_uint32()
@@ -469,11 +478,13 @@ class Session:
outArgs[arg.name] = self._decodeValue(codec, arg.type)
result = MethodResult(code, text, outArgs)
if synchronous:
- broker.cv.acquire()
- broker.syncResult = result
- broker.syncInFlight = False
- broker.cv.notify()
- broker.cv.release()
+ try:
+ broker.cv.acquire()
+ broker.syncResult = result
+ broker.syncInFlight = False
+ broker.cv.notify()
+ finally:
+ broker.cv.release()
else:
if self.console:
self.console.methodResponse(broker, seq, result)
@@ -495,9 +506,11 @@ class Session:
hash = codec.read_bin128()
classKey = (pname, cname, hash)
_class = SchemaClass(kind, classKey, codec)
- self.cv.acquire()
- self.packages[pname][(cname, hash)] = _class
- self.cv.release()
+ try:
+ self.cv.acquire()
+ self.packages[pname][(cname, hash)] = _class
+ finally:
+ self.cv.release()
broker._decOutstanding()
if self.console != None:
self.console.newClass(kind, classKey)
@@ -507,26 +520,28 @@ class Session:
cname = str(codec.read_str8())
hash = codec.read_bin128()
classKey = (pname, cname, hash)
- self.cv.acquire()
- if pname not in self.packages:
- self.cv.release()
- return
- if (cname, hash) not in self.packages[pname]:
+ try:
+ self.cv.acquire()
+ if pname not in self.packages:
+ return
+ if (cname, hash) not in self.packages[pname]:
+ return
+ schema = self.packages[pname][(cname, hash)]
+ finally:
self.cv.release()
- return
- self.cv.release()
- schema = self.packages[pname][(cname, hash)]
+
object = Object(self, broker, schema, codec, prop, stat)
if pname == "org.apache.qpid.broker" and cname == "agent":
broker._updateAgent(object)
- self.cv.acquire()
- if seq in self.syncSequenceList:
- if object.getTimestamps()[2] == 0 and self._selectMatch(object):
- self.getResult.append(object)
+ try:
+ self.cv.acquire()
+ if seq in self.syncSequenceList:
+ if object.getTimestamps()[2] == 0 and self._selectMatch(object):
+ self.getResult.append(object)
+ return
+ finally:
self.cv.release()
- return
- self.cv.release()
if self.console != None:
if prop:
@@ -536,10 +551,12 @@ class Session:
def _handleError(self, error):
self.error = error
- self.cv.acquire()
- self.syncSequenceList = []
- self.cv.notify()
- self.cv.release()
+ try:
+ self.cv.acquire()
+ self.syncSequenceList = []
+ self.cv.notify()
+ finally:
+ self.cv.release()
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
@@ -993,24 +1010,27 @@ class Object(object):
smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
(self._objectId.getBroker(), self._objectId.getBank()))
if synchronous:
- self._broker.cv.acquire()
- self._broker.syncInFlight = True
- self._broker.cv.release()
+ try:
+ self._broker.cv.acquire()
+ self._broker.syncInFlight = True
+ finally:
+ self._broker.cv.release()
self._broker._send(smsg)
return seq
return None
def _invoke(self, name, args, kwargs):
if self._sendMethodRequest(name, args, kwargs, True):
- self._broker.cv.acquire()
- starttime = time()
- while self._broker.syncInFlight and self._broker.error == None:
- self._broker.cv.wait(self._broker.SYNC_TIME)
- if time() - starttime > self._broker.SYNC_TIME:
- self._broker.cv.release()
- self._session.seqMgr._release(seq)
- raise RuntimeError("Timed out waiting for method to respond")
- self._broker.cv.release()
+ try:
+ self._broker.cv.acquire()
+ starttime = time()
+ while self._broker.syncInFlight and self._broker.error == None:
+ self._broker.cv.wait(self._broker.SYNC_TIME)
+ if time() - starttime > self._broker.SYNC_TIME:
+ self._session.seqMgr._release(seq)
+ raise RuntimeError("Timed out waiting for method to respond")
+ finally:
+ self._broker.cv.release()
if self._broker.error != None:
errorText = self._broker.error
self._broker.error = None
@@ -1208,36 +1228,40 @@ class Broker:
raise Exception("Broker already disconnected")
def _waitForStable(self):
- self.cv.acquire()
- if self.reqsOutstanding == 0:
+ try:
+ self.cv.acquire()
+ if self.reqsOutstanding == 0:
+ return
+ self.syncInFlight = True
+ starttime = time()
+ while self.reqsOutstanding != 0:
+ self.cv.wait(self.SYNC_TIME)
+ if time() - starttime > self.SYNC_TIME:
+ raise RuntimeError("Timed out waiting for broker to synchronize")
+ finally:
self.cv.release()
- return
- self.syncInFlight = True
- starttime = time()
- while self.reqsOutstanding != 0:
- self.cv.wait(self.SYNC_TIME)
- if time() - starttime > self.SYNC_TIME:
- self.cv.release()
- raise RuntimeError("Timed out waiting for broker to synchronize")
- self.cv.release()
def _incOutstanding(self):
- self.cv.acquire()
- self.reqsOutstanding += 1
- self.cv.release()
+ try:
+ self.cv.acquire()
+ self.reqsOutstanding += 1
+ finally:
+ self.cv.release()
def _decOutstanding(self):
- self.cv.acquire()
- self.reqsOutstanding -= 1
- if self.reqsOutstanding == 0 and not self.topicBound:
- self.topicBound = True
- for key in self.session.bindingKeyList:
- self.amqpSession.exchange_bind(exchange="qpid.management",
- queue=self.topicName, binding_key=key)
- if self.reqsOutstanding == 0 and self.syncInFlight:
- self.syncInFlight = False
- self.cv.notify()
- self.cv.release()
+ try:
+ self.cv.acquire()
+ self.reqsOutstanding -= 1
+ if self.reqsOutstanding == 0 and not self.topicBound:
+ self.topicBound = True
+ for key in self.session.bindingKeyList:
+ self.amqpSession.exchange_bind(exchange="qpid.management",
+ queue=self.topicName, binding_key=key)
+ if self.reqsOutstanding == 0 and self.syncInFlight:
+ self.syncInFlight = False
+ self.cv.notify()
+ finally:
+ self.cv.release()
def _replyCb(self, msg):
codec = Codec(self.conn.spec, msg.body)
@@ -1259,10 +1283,12 @@ class Broker:
def _exceptionCb(self, data):
self.isConnected = False
self.error = data
- self.cv.acquire()
- if self.syncInFlight:
- self.cv.notify()
- self.cv.release()
+ try:
+ self.cv.acquire()
+ if self.syncInFlight:
+ self.cv.notify()
+ finally:
+ self.cv.release()
self.session._handleError(self.error)
self.session._handleBrokerDisconnect(self)
@@ -1326,7 +1352,7 @@ class Event:
return self.arguments
def getTimestamp(self):
- return self.timerstamp
+ return self.timestamp
def getName(self):
return self.name
@@ -1343,21 +1369,25 @@ class SequenceManager:
def _reserve(self, data):
""" Reserve a unique sequence number """
- self.lock.acquire()
- result = self.sequence
- self.sequence = self.sequence + 1
- self.pending[result] = data
- self.lock.release()
+ try:
+ self.lock.acquire()
+ result = self.sequence
+ self.sequence = self.sequence + 1
+ self.pending[result] = data
+ finally:
+ self.lock.release()
return result
def _release(self, seq):
""" Release a reserved sequence number """
data = None
- self.lock.acquire()
- if seq in self.pending:
- data = self.pending[seq]
- del self.pending[seq]
- self.lock.release()
+ try:
+ self.lock.acquire()
+ if seq in self.pending:
+ data = self.pending[seq]
+ del self.pending[seq]
+ finally:
+ self.lock.release()
return data