diff options
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qpid/messaging/driver.py | 14 |
1 files changed, 14 insertions, 0 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index ac59bcf0a6..06bbe610b8 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -259,6 +259,15 @@ class LinkIn: reliability = link_opts.get("reliability") cmds = [MessageCancel(_rcv.destination)] cmds.extend(_rcv.on_unlink) + msgs = [] #release back messages for the closing receiver + msg = rcv.session._pop(rcv) + while msg is not None: + msgs.append(msg) + msg = rcv.session._pop(rcv) + if len(msgs) > 0: + ids = RangedSet(*[m._transfer_id for m in msgs]) + log.debug("releasing back messages: %s, as receiver is closing", ids) + cmds.append(MessageRelease(ids, True)) sst.write_cmds(cmds, action) def del_link(self, sst, rcv, _rcv): @@ -1283,6 +1292,11 @@ class Engine: msg = self._decode(xfr) rcv = sst.destinations[xfr.destination].target msg._receiver = rcv + if rcv.closing or rcv.closed: # release message to a closing receiver + ids = RangedSet(*[msg._transfer_id]) + log.debug("releasing back %s message: %s, as receiver is closing", ids, msg) + sst.write_cmd(MessageRelease(ids, True)) + return if rcv.impending is not UNLIMITED: assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) rcv.received += 1 |
