diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-10 20:12:43 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-10 20:12:43 +0000 |
| commit | 62c915a43558d6a4a0dc7a78d31f328c2c27fccc (patch) | |
| tree | a34e53d583ff6119db37ebf49501193a6971bd21 | |
| parent | eb53863b1c27fc35181f60ee4f59330762bd4208 (diff) | |
| download | qpid-python-62c915a43558d6a4a0dc7a78d31f328c2c27fccc.tar.gz | |
QMF: bugfix - ack message even if handler throws an exception.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995963 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | extras/qmf/src/py/qmf/console.py | 214 |
1 files changed, 112 insertions, 102 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index e876940211..461cc66a7d 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -2589,62 +2589,62 @@ class Broker(Thread): """ This is the general message handler for messages received via the QMFv1 exchanges. """ - agent = None - agent_addr = None - mp = msg.get("message_properties") - ah = mp.application_headers - if ah and 'qmf.agent' in ah: - agent_addr = ah['qmf.agent'] - - if not agent_addr: - # - # See if we can determine the agent identity from the routing key - # - dp = msg.get("delivery_properties") - rkey = None - if dp and dp.routing_key: - rkey = dp.routing_key - items = rkey.split('.') - if len(items) >= 4: - if items[0] == 'console' and items[3].isdigit(): - agent_addr = str(items[3]) # The QMFv1 Agent Bank - if agent_addr != None and agent_addr in self.agents: - agent = self.agents[agent_addr] - - codec = Codec(msg.body) - alreadyTried = None - while True: - opcode, seq = self._checkHeader(codec) - - if not agent and not alreadyTried: - alreadyTried = True - try: - self.cv.acquire() - if seq in self.seqToAgentMap: - agent = self.seqToAgentMap[seq] - finally: - self.cv.release() - - if opcode == None: break - if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) - elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) - elif opcode == 'q': self.session._handleClassInd (self, codec, seq) - elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) - elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent) - elif agent: - agent._handleQmfV1Message(opcode, seq, mp, ah, codec) - - # ignore failures as the session may be shutting down... try: - self.amqpSession.receiver._completed.add(msg.id) - self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) - except: - pass + agent = None + agent_addr = None + mp = msg.get("message_properties") + ah = mp.application_headers + if ah and 'qmf.agent' in ah: + agent_addr = ah['qmf.agent'] + + if not agent_addr: + # + # See if we can determine the agent identity from the routing key + # + dp = msg.get("delivery_properties") + rkey = None + if dp and dp.routing_key: + rkey = dp.routing_key + items = rkey.split('.') + if len(items) >= 4: + if items[0] == 'console' and items[3].isdigit(): + agent_addr = str(items[3]) # The QMFv1 Agent Bank + if agent_addr != None and agent_addr in self.agents: + agent = self.agents[agent_addr] + + codec = Codec(msg.body) + alreadyTried = None + while True: + opcode, seq = self._checkHeader(codec) + + if not agent and not alreadyTried: + alreadyTried = True + try: + self.cv.acquire() + if seq in self.seqToAgentMap: + agent = self.seqToAgentMap[seq] + finally: + self.cv.release() + + if opcode == None: break + if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) + elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) + elif opcode == 'q': self.session._handleClassInd (self, codec, seq) + elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) + elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent) + elif agent: + agent._handleQmfV1Message(opcode, seq, mp, ah, codec) + agent.touch() # mark agent as being alive + + finally: # always ack the message! + try: + # ignore failures as the session may be shutting down... + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) + except: + pass - # mark agent as being alive - if agent: - agent.touch() def _v2Cb(self, msg): """ Callback from session receive thread for V2 messages @@ -2652,60 +2652,70 @@ class Broker(Thread): self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v2msg, msg)) def _v2Dispatch(self, msg): + try: + self._v2DispatchProtected(msg) + except Exception, e: + print "EXCEPTION in Broker._v2Cb:", e + import traceback + traceback.print_exc() + + def _v2DispatchProtected(self, msg): """ This is the general message handler for messages received via QMFv2 exchanges. """ - mp = msg.get("message_properties") - ah = mp["application_headers"] - codec = Codec(msg.body) - - if 'qmf.opcode' in ah: - opcode = ah['qmf.opcode'] - if mp.content_type == "amqp/list": - try: - content = codec.read_list() - if not content: - content = [] - except: - # malformed list - ignore - content = None - elif mp.content_type == "amqp/map": - try: - content = codec.read_map() - if not content: - content = {} - except: - # malformed map - ignore - content = None - else: - content = None - - if content != None: - ## - ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are - ## used to maintain the broker's list of agent proxies. - ## - if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) - elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) + try: + mp = msg.get("message_properties") + ah = mp["application_headers"] + codec = Codec(msg.body) + + if 'qmf.opcode' in ah: + opcode = ah['qmf.opcode'] + if mp.content_type == "amqp/list": + try: + content = codec.read_list() + if not content: + content = [] + except: + # malformed list - ignore + content = None + elif mp.content_type == "amqp/map": + try: + content = codec.read_map() + if not content: + content = {} + except: + # malformed map - ignore + content = None else: + content = None + + if content != None: ## - ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender - ## of the message. + ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are + ## used to maintain the broker's list of agent proxies. ## - agent_addr = ah['qmf.agent'] - if agent_addr == 'broker': - agent_addr = '0' - if agent_addr in self.agents: - agent = self.agents[agent_addr] - agent._handleQmfV2Message(opcode, mp, ah, content) - agent.touch() - - # ignore failures as the session may be shutting down... - try: - self.amqpSession.receiver._completed.add(msg.id) - self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) - except: - pass + if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) + elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) + else: + ## + ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender + ## of the message. + ## + agent_addr = ah['qmf.agent'] + if agent_addr == 'broker': + agent_addr = '0' + if agent_addr in self.agents: + agent = self.agents[agent_addr] + agent._handleQmfV2Message(opcode, mp, ah, content) + agent.touch() + + finally: # always ack the message! + try: + # ignore failures as the session may be shutting down... + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) + except: + pass def _exceptionCb(self, data): """ Exception notification callback from session receive thread. |
