diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-10-11 05:11:48 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-10-11 05:11:48 +0000 |
| commit | 2bd19455d4c1ca071a958e996cbb95ec9396fa63 (patch) | |
| tree | 97f519555f7ebe40a4cc9f97bb6fee0fcb4c83ad /qpid/python | |
| parent | ca9e506b1d39ad1959cacb19a64b44c5894ce5f9 (diff) | |
| download | qpid-python-2bd19455d4c1ca071a958e996cbb95ec9396fa63.tar.gz | |
resent linked variable; fixed possible drain failure during reconnect
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@824024 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qpid/driver.py | 21 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging.py | 8 |
2 files changed, 18 insertions, 11 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index dadf43fc7f..a7a8b13819 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -151,8 +151,11 @@ class Driver: for ssn in self.connection.sessions.values(): for m in ssn.acked + ssn.unacked + ssn.incoming: m._transfer_id = None + for snd in ssn.senders: + snd.linked = False for rcv in ssn.receivers: rcv.impending = rcv.received + rcv.linked = False @synchronized def wakeup(self): @@ -663,16 +666,22 @@ class Driver: sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, UNLIMITED.value)) sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta)) rcv.impending += delta - elif delta < 0: + elif delta < 0 and not rcv.draining: _rcv.draining = True - def flush_stop_cmplt(): + def do_stop(): rcv.impending = rcv.received _rcv.draining = False self.grant(rcv) - if rcv.drain: - sst.write_cmd(MessageFlush(rcv.destination, sync=True), flush_stop_cmplt) - else: - sst.write_cmd(MessageStop(rcv.destination, sync=True), flush_stop_cmplt) + sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop) + + if rcv.draining: + def do_flush(): + rcv.impending = rcv.received + rcv.granted = rcv.impending + _rcv.draining = False + rcv.draining = False + sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush) + def process_receiver(self, rcv): if rcv.closed: return diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index 10d9c78396..1d548a3d87 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -646,7 +646,7 @@ class Receiver: self.started = started self.capacity = options.get("capacity", UNLIMITED) self.granted = Serial(0) - self.drain = False + self.draining = False self.impending = Serial(0) self.received = Serial(0) self.returned = Serial(0) @@ -722,11 +722,9 @@ class Receiver: self._ewait(lambda: self.impending >= self.granted) msg = self.session._get(self._pred, timeout=timeout) if msg is None: - self.drain = True - self.granted = self.received + self.draining = True self._wakeup() - self._ewait(lambda: self.impending == self.received) - self.drain = False + self._ewait(lambda: not self.draining) self._grant() self._wakeup() msg = self.session._get(self._pred, timeout=0) |
