summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/messaging/driver.py14
1 files changed, 14 insertions, 0 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index ac59bcf0a6..06bbe610b8 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -259,6 +259,15 @@ class LinkIn:
reliability = link_opts.get("reliability")
cmds = [MessageCancel(_rcv.destination)]
cmds.extend(_rcv.on_unlink)
+ msgs = [] #release back messages for the closing receiver
+ msg = rcv.session._pop(rcv)
+ while msg is not None:
+ msgs.append(msg)
+ msg = rcv.session._pop(rcv)
+ if len(msgs) > 0:
+ ids = RangedSet(*[m._transfer_id for m in msgs])
+ log.debug("releasing back messages: %s, as receiver is closing", ids)
+ cmds.append(MessageRelease(ids, True))
sst.write_cmds(cmds, action)
def del_link(self, sst, rcv, _rcv):
@@ -1283,6 +1292,11 @@ class Engine:
msg = self._decode(xfr)
rcv = sst.destinations[xfr.destination].target
msg._receiver = rcv
+ if rcv.closing or rcv.closed: # release message to a closing receiver
+ ids = RangedSet(*[msg._transfer_id])
+ log.debug("releasing back %s message: %s, as receiver is closing", ids, msg)
+ sst.write_cmd(MessageRelease(ids, True))
+ return
if rcv.impending is not UNLIMITED:
assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
rcv.received += 1