summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-10-11 05:11:48 +0000
committerRafael H. Schloming <rhs@apache.org>2009-10-11 05:11:48 +0000
commit110b9a660d4e64f507f51c7d9a94d4997f6584f8 (patch)
tree5da78876c275d9b86590f4ecd6694e9b894af353 /python/qpid/driver.py
parent13e6bd9704643993d95c81f22106dae5b59b3084 (diff)
downloadqpid-python-110b9a660d4e64f507f51c7d9a94d4997f6584f8.tar.gz
resent linked variable; fixed possible drain failure during reconnect
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@824024 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py21
1 files changed, 15 insertions, 6 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index dadf43fc7f..a7a8b13819 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -151,8 +151,11 @@ class Driver:
for ssn in self.connection.sessions.values():
for m in ssn.acked + ssn.unacked + ssn.incoming:
m._transfer_id = None
+ for snd in ssn.senders:
+ snd.linked = False
for rcv in ssn.receivers:
rcv.impending = rcv.received
+ rcv.linked = False
@synchronized
def wakeup(self):
@@ -663,16 +666,22 @@ class Driver:
sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value))
sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
rcv.impending += delta
- elif delta < 0:
+ elif delta < 0 and not rcv.draining:
_rcv.draining = True
- def flush_stop_cmplt():
+ def do_stop():
rcv.impending = rcv.received
_rcv.draining = False
self.grant(rcv)
- if rcv.drain:
- sst.write_cmd(MessageFlush(rcv.destination, sync=True), flush_stop_cmplt)
- else:
- sst.write_cmd(MessageStop(rcv.destination, sync=True), flush_stop_cmplt)
+ sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+
+ if rcv.draining:
+ def do_flush():
+ rcv.impending = rcv.received
+ rcv.granted = rcv.impending
+ _rcv.draining = False
+ rcv.draining = False
+ sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+
def process_receiver(self, rcv):
if rcv.closed: return