summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-06-16 22:15:14 +0000
committerRafael H. Schloming <rhs@apache.org>2010-06-16 22:15:14 +0000
commit941af76031ac971ebdce7d052bf32d4ab00a7ef1 (patch)
tree33d4b2d3d2e5f0122b6d73a6592b34722aa0b5d7 /qpid/python
parent5d356715fd93ee1c072b482cf13b5ed62a1dc493 (diff)
downloadqpid-python-941af76031ac971ebdce7d052bf32d4ab00a7ef1.tar.gz
don't always set the sync bit on send
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@955414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/messaging/driver.py14
-rw-r--r--qpid/python/qpid/messaging/endpoints.py25
2 files changed, 31 insertions, 8 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index 16f1b2902e..a6170c0a79 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -114,6 +114,7 @@ class SessionState:
self.min_completion = self.sent
self.max_completion = self.sent
self.results = {}
+ self.need_sync = False
# receiver state
self.received = None
@@ -131,12 +132,12 @@ class SessionState:
for k, v in overrides.items():
cmd[k.replace('-', '_')] = v
- def write_cmd(self, cmd, action=noop, overrides=None):
+ def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
if overrides:
self.apply_overrides(cmd, overrides)
- if action != noop:
- cmd.sync = True
+ if sync or action != noop:
+ cmd.sync = sync
if self.detached:
raise Exception("detached")
cmd.id = self.sent
@@ -144,6 +145,7 @@ class SessionState:
self.actions[cmd.id] = action
self.max_completion = cmd.id
self.write_op(cmd)
+ self.need_sync = not cmd.sync
def write_cmds(self, cmds, action=noop):
if cmds:
@@ -963,6 +965,10 @@ class Engine:
else:
break
+ for snd in ssn.senders:
+ if snd.synced >= snd.queued and sst.need_sync:
+ sst.write_cmd(ExecutionSync(), sync=True)
+
for rcv in ssn.receivers:
self.process_receiver(rcv)
@@ -1167,7 +1173,7 @@ class Engine:
log.debug("RACK[%s]: %s", sst.session.log_id, msg)
assert msg == m
sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
- payload=body), msg_acked)
+ payload=body), msg_acked, sync=msg._sync)
log.debug("SENT[%s]: %s", sst.session.log_id, msg)
def do_message_transfer(self, xfr):
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index 707aee3ed6..58a654ef2a 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -677,12 +677,20 @@ class Session:
assert self.aborted
@synchronized
+ def sync(self):
+ """
+ Sync the session.
+ """
+ for snd in self.senders:
+ snd.sync()
+ self._ewait(lambda: not self.outgoing and not self.acked)
+
+ @synchronized
def close(self):
"""
Close the session.
"""
- # XXX: should be able to express this condition through API calls
- self._ewait(lambda: not self.outgoing and not self.acked)
+ self.sync()
for link in self.receivers + self.senders:
link.close()
@@ -704,8 +712,10 @@ class Sender:
self.target = target
self.options = options
self.capacity = options.get("capacity", UNLIMITED)
+ self.threshold = 0.5
self.durable = options.get("durable")
self.queued = Serial(0)
+ self.synced = Serial(0)
self.acked = Serial(0)
self.error = None
self.linked = False
@@ -792,18 +802,25 @@ class Sender:
# XXX: what if we send the same message to multiple senders?
message._sender = self
+ if self.capacity is not UNLIMITED:
+ message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
+ else:
+ message._sync = sync
self.session.outgoing.append(message)
self.queued += 1
- self._wakeup()
-
if sync:
self.sync()
assert message not in self.session.outgoing
+ else:
+ self._wakeup()
@synchronized
def sync(self):
mno = self.queued
+ if self.synced < mno:
+ self.synced = mno
+ self._wakeup()
self._ewait(lambda: self.acked >= mno)
@synchronized