diff options
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qpid/messaging/driver.py | 14 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 25 |
2 files changed, 31 insertions, 8 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 16f1b2902e..a6170c0a79 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -114,6 +114,7 @@ class SessionState: self.min_completion = self.sent self.max_completion = self.sent self.results = {} + self.need_sync = False # receiver state self.received = None @@ -131,12 +132,12 @@ class SessionState: for k, v in overrides.items(): cmd[k.replace('-', '_')] = v - def write_cmd(self, cmd, action=noop, overrides=None): + def write_cmd(self, cmd, action=noop, overrides=None, sync=True): if overrides: self.apply_overrides(cmd, overrides) - if action != noop: - cmd.sync = True + if sync or action != noop: + cmd.sync = sync if self.detached: raise Exception("detached") cmd.id = self.sent @@ -144,6 +145,7 @@ class SessionState: self.actions[cmd.id] = action self.max_completion = cmd.id self.write_op(cmd) + self.need_sync = not cmd.sync def write_cmds(self, cmds, action=noop): if cmds: @@ -963,6 +965,10 @@ class Engine: else: break + for snd in ssn.senders: + if snd.synced >= snd.queued and sst.need_sync: + sst.write_cmd(ExecutionSync(), sync=True) + for rcv in ssn.receivers: self.process_receiver(rcv) @@ -1167,7 +1173,7 @@ class Engine: log.debug("RACK[%s]: %s", sst.session.log_id, msg) assert msg == m sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body), msg_acked) + payload=body), msg_acked, sync=msg._sync) log.debug("SENT[%s]: %s", sst.session.log_id, msg) def do_message_transfer(self, xfr): diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 707aee3ed6..58a654ef2a 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -677,12 +677,20 @@ class Session: assert self.aborted @synchronized + def sync(self): + """ + Sync the session. + """ + for snd in self.senders: + snd.sync() + self._ewait(lambda: not self.outgoing and not self.acked) + + @synchronized def close(self): """ Close the session. """ - # XXX: should be able to express this condition through API calls - self._ewait(lambda: not self.outgoing and not self.acked) + self.sync() for link in self.receivers + self.senders: link.close() @@ -704,8 +712,10 @@ class Sender: self.target = target self.options = options self.capacity = options.get("capacity", UNLIMITED) + self.threshold = 0.5 self.durable = options.get("durable") self.queued = Serial(0) + self.synced = Serial(0) self.acked = Serial(0) self.error = None self.linked = False @@ -792,18 +802,25 @@ class Sender: # XXX: what if we send the same message to multiple senders? message._sender = self + if self.capacity is not UNLIMITED: + message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) + else: + message._sync = sync self.session.outgoing.append(message) self.queued += 1 - self._wakeup() - if sync: self.sync() assert message not in self.session.outgoing + else: + self._wakeup() @synchronized def sync(self): mno = self.queued + if self.synced < mno: + self.synced = mno + self._wakeup() self._ewait(lambda: self.acked >= mno) @synchronized |
