summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-09-10 20:12:43 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-09-10 20:12:43 +0000
commit62c915a43558d6a4a0dc7a78d31f328c2c27fccc (patch)
treea34e53d583ff6119db37ebf49501193a6971bd21
parenteb53863b1c27fc35181f60ee4f59330762bd4208 (diff)
downloadqpid-python-62c915a43558d6a4a0dc7a78d31f328c2c27fccc.tar.gz
QMF: bugfix - ack message even if handler throws an exception.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995963 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--extras/qmf/src/py/qmf/console.py214
1 files changed, 112 insertions, 102 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py
index e876940211..461cc66a7d 100644
--- a/extras/qmf/src/py/qmf/console.py
+++ b/extras/qmf/src/py/qmf/console.py
@@ -2589,62 +2589,62 @@ class Broker(Thread):
"""
This is the general message handler for messages received via the QMFv1 exchanges.
"""
- agent = None
- agent_addr = None
- mp = msg.get("message_properties")
- ah = mp.application_headers
- if ah and 'qmf.agent' in ah:
- agent_addr = ah['qmf.agent']
-
- if not agent_addr:
- #
- # See if we can determine the agent identity from the routing key
- #
- dp = msg.get("delivery_properties")
- rkey = None
- if dp and dp.routing_key:
- rkey = dp.routing_key
- items = rkey.split('.')
- if len(items) >= 4:
- if items[0] == 'console' and items[3].isdigit():
- agent_addr = str(items[3]) # The QMFv1 Agent Bank
- if agent_addr != None and agent_addr in self.agents:
- agent = self.agents[agent_addr]
-
- codec = Codec(msg.body)
- alreadyTried = None
- while True:
- opcode, seq = self._checkHeader(codec)
-
- if not agent and not alreadyTried:
- alreadyTried = True
- try:
- self.cv.acquire()
- if seq in self.seqToAgentMap:
- agent = self.seqToAgentMap[seq]
- finally:
- self.cv.release()
-
- if opcode == None: break
- if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
- elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
- elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
- elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr)
- elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
- elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent)
- elif agent:
- agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
-
- # ignore failures as the session may be shutting down...
try:
- self.amqpSession.receiver._completed.add(msg.id)
- self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
- except:
- pass
+ agent = None
+ agent_addr = None
+ mp = msg.get("message_properties")
+ ah = mp.application_headers
+ if ah and 'qmf.agent' in ah:
+ agent_addr = ah['qmf.agent']
+
+ if not agent_addr:
+ #
+ # See if we can determine the agent identity from the routing key
+ #
+ dp = msg.get("delivery_properties")
+ rkey = None
+ if dp and dp.routing_key:
+ rkey = dp.routing_key
+ items = rkey.split('.')
+ if len(items) >= 4:
+ if items[0] == 'console' and items[3].isdigit():
+ agent_addr = str(items[3]) # The QMFv1 Agent Bank
+ if agent_addr != None and agent_addr in self.agents:
+ agent = self.agents[agent_addr]
+
+ codec = Codec(msg.body)
+ alreadyTried = None
+ while True:
+ opcode, seq = self._checkHeader(codec)
+
+ if not agent and not alreadyTried:
+ alreadyTried = True
+ try:
+ self.cv.acquire()
+ if seq in self.seqToAgentMap:
+ agent = self.seqToAgentMap[seq]
+ finally:
+ self.cv.release()
+
+ if opcode == None: break
+ if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
+ elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
+ elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
+ elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr)
+ elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
+ elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent)
+ elif agent:
+ agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
+ agent.touch() # mark agent as being alive
+
+ finally: # always ack the message!
+ try:
+ # ignore failures as the session may be shutting down...
+ self.amqpSession.receiver._completed.add(msg.id)
+ self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
+ except:
+ pass
- # mark agent as being alive
- if agent:
- agent.touch()
def _v2Cb(self, msg):
""" Callback from session receive thread for V2 messages
@@ -2652,60 +2652,70 @@ class Broker(Thread):
self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v2msg, msg))
def _v2Dispatch(self, msg):
+ try:
+ self._v2DispatchProtected(msg)
+ except Exception, e:
+ print "EXCEPTION in Broker._v2Cb:", e
+ import traceback
+ traceback.print_exc()
+
+ def _v2DispatchProtected(self, msg):
"""
This is the general message handler for messages received via QMFv2 exchanges.
"""
- mp = msg.get("message_properties")
- ah = mp["application_headers"]
- codec = Codec(msg.body)
-
- if 'qmf.opcode' in ah:
- opcode = ah['qmf.opcode']
- if mp.content_type == "amqp/list":
- try:
- content = codec.read_list()
- if not content:
- content = []
- except:
- # malformed list - ignore
- content = None
- elif mp.content_type == "amqp/map":
- try:
- content = codec.read_map()
- if not content:
- content = {}
- except:
- # malformed map - ignore
- content = None
- else:
- content = None
-
- if content != None:
- ##
- ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are
- ## used to maintain the broker's list of agent proxies.
- ##
- if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
- elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+ try:
+ mp = msg.get("message_properties")
+ ah = mp["application_headers"]
+ codec = Codec(msg.body)
+
+ if 'qmf.opcode' in ah:
+ opcode = ah['qmf.opcode']
+ if mp.content_type == "amqp/list":
+ try:
+ content = codec.read_list()
+ if not content:
+ content = []
+ except:
+ # malformed list - ignore
+ content = None
+ elif mp.content_type == "amqp/map":
+ try:
+ content = codec.read_map()
+ if not content:
+ content = {}
+ except:
+ # malformed map - ignore
+ content = None
else:
+ content = None
+
+ if content != None:
##
- ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender
- ## of the message.
+ ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are
+ ## used to maintain the broker's list of agent proxies.
##
- agent_addr = ah['qmf.agent']
- if agent_addr == 'broker':
- agent_addr = '0'
- if agent_addr in self.agents:
- agent = self.agents[agent_addr]
- agent._handleQmfV2Message(opcode, mp, ah, content)
- agent.touch()
-
- # ignore failures as the session may be shutting down...
- try:
- self.amqpSession.receiver._completed.add(msg.id)
- self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
- except:
- pass
+ if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
+ elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+ else:
+ ##
+ ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender
+ ## of the message.
+ ##
+ agent_addr = ah['qmf.agent']
+ if agent_addr == 'broker':
+ agent_addr = '0'
+ if agent_addr in self.agents:
+ agent = self.agents[agent_addr]
+ agent._handleQmfV2Message(opcode, mp, ah, content)
+ agent.touch()
+
+ finally: # always ack the message!
+ try:
+ # ignore failures as the session may be shutting down...
+ self.amqpSession.receiver._completed.add(msg.id)
+ self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
+ except:
+ pass
def _exceptionCb(self, data):
""" Exception notification callback from session receive thread.