From c54daace1322bcb5428467c590645baa15da6b50 Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Fri, 27 Sep 2013 13:52:01 +0000 Subject: QPID-5183 Python client does not release acquired messages on consumer close when session persists git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1526901 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qpid/messaging/driver.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'qpid/python') diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index ac59bcf0a6..06bbe610b8 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -259,6 +259,15 @@ class LinkIn: reliability = link_opts.get("reliability") cmds = [MessageCancel(_rcv.destination)] cmds.extend(_rcv.on_unlink) + msgs = [] #release back messages for the closing receiver + msg = rcv.session._pop(rcv) + while msg is not None: + msgs.append(msg) + msg = rcv.session._pop(rcv) + if len(msgs) > 0: + ids = RangedSet(*[m._transfer_id for m in msgs]) + log.debug("releasing back messages: %s, as receiver is closing", ids) + cmds.append(MessageRelease(ids, True)) sst.write_cmds(cmds, action) def del_link(self, sst, rcv, _rcv): @@ -1283,6 +1292,11 @@ class Engine: msg = self._decode(xfr) rcv = sst.destinations[xfr.destination].target msg._receiver = rcv + if rcv.closing or rcv.closed: # release message to a closing receiver + ids = RangedSet(*[msg._transfer_id]) + log.debug("releasing back %s message: %s, as receiver is closing", ids, msg) + sst.write_cmd(MessageRelease(ids, True)) + return if rcv.impending is not UNLIMITED: assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) rcv.received += 1 -- cgit v1.2.1