diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-06-24 17:34:34 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-06-24 17:34:34 +0000 |
| commit | f111ab7dca58ad68d7d9252e4c6010b4b8c2f784 (patch) | |
| tree | a45f3947bdd029fcf7d96394823e8d3e3ed6fffc /qpid/python | |
| parent | bc5441acda9f35bef338677868e46145ce7a418b (diff) | |
| download | qpid-python-f111ab7dca58ad68d7d9252e4c6010b4b8c2f784.tar.gz | |
added full support for unreliable, at-least-once, and at-most-once reliability options
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@957644 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qpid/messaging/driver.py | 59 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 27 |
2 files changed, 68 insertions, 18 deletions
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index 7f0490332b..76ccd54e9f 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -95,6 +95,7 @@ CLIENT_PROPERTIES = {"product": "qpid python client", "qpid.client_ppid": ppid} def noop(): pass +def sync_noop(): pass class SessionState: @@ -136,7 +137,7 @@ class SessionState: if overrides: self.apply_overrides(cmd, overrides) - if sync or action != noop: + if action != noop: cmd.sync = sync if self.detached: raise Exception("detached") @@ -215,11 +216,14 @@ class LinkIn: def do_link(self, sst, rcv, _rcv, type, subtype, action): link_opts = _rcv.options.get("link", {}) - # XXX: default? - reliability = link_opts.get("reliability", "unreliable") + reliability = link_opts.get("reliability", "at-least-once") declare = link_opts.get("x-declare", {}) subscribe = link_opts.get("x-subscribe", {}) acq_mode = acquire_mode.pre_acquired + if reliability in ("unreliable", "at-most-once"): + rcv._accept_mode = accept_mode.none + else: + rcv._accept_mode = accept_mode.explicit if type == "topic": default_name = "%s.%s" % (rcv.session.name, _rcv.destination) @@ -239,9 +243,12 @@ class LinkIn: acq_mode = acquire_mode.not_acquired bindings = get_bindings(link_opts, queue=_rcv._queue) + sst.write_cmds(bindings) - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, - acquire_mode = acq_mode), + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, + destination=_rcv.destination, + acquire_mode = acq_mode, + accept_mode = rcv._accept_mode), overrides=subscribe) sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) @@ -263,9 +270,12 @@ class LinkOut: def init_link(self, sst, snd, _snd): _snd.closing = False + _snd.pre_ack = False def do_link(self, sst, snd, _snd, type, subtype, action): link_opts = _snd.options.get("link", {}) + reliability = link_opts.get("reliability", "at-least-once") + _snd.pre_ack = reliability in ("unreliable", "at-most-once") if type == "topic": _snd._exchange = _snd.name _snd._routing_key = _snd.subject @@ -968,32 +978,34 @@ class Engine: for snd in ssn.senders: if snd.synced >= snd.queued and sst.need_sync: - sst.write_cmd(ExecutionSync(), sync=True) + sst.write_cmd(ExecutionSync(), sync_noop) for rcv in ssn.receivers: self.process_receiver(rcv) if ssn.acked: messages = ssn.acked[sst.acked_idx:] - delta = len(messages) if messages: ids = RangedSet() disposed = [(DEFAULT_DISPOSITION, [])] + acked = [] for m in messages: # XXX: we're ignoring acks that get lost when disconnected, # could we deal this via some message-id based purge? if m._transfer_id is None: - ssn.acked.remove(m) - delta -= 1 + acked.append(m) continue ids.add(m._transfer_id) - disp = m._disposition or DEFAULT_DISPOSITION - last, msgs = disposed[-1] - if disp.type is last.type and disp.options == last.options: - msgs.append(m) + if m._receiver._accept_mode is accept_mode.explicit: + disp = m._disposition or DEFAULT_DISPOSITION + last, msgs = disposed[-1] + if disp.type is last.type and disp.options == last.options: + msgs.append(m) + else: + disposed.append((disp, [m])) else: - disposed.append((disp, [m])) + acked.append(m) for range in ids: sst.executed.add_range(range) @@ -1004,6 +1016,7 @@ class Engine: for m in msgs: ssn.acked.remove(m) sst.acked_idx -= 1 + # XXX: should this check accept_mode too? if not ssn.transactional: sst.acked.remove(m) return ack_ack @@ -1023,9 +1036,9 @@ class Engine: for m in msgs: log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) - # XXX: could add messages with _transfer_id of None sst.acked.extend(messages) - sst.acked_idx += delta + sst.acked_idx += len(messages) + ack_acker(acked)() if ssn.committing and not sst.committing: def commit_ok(): @@ -1173,10 +1186,20 @@ class Engine: sst.outgoing_idx -= 1 log.debug("RACK[%s]: %s", sst.session.log_id, msg) assert msg == m - sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body), msg_acked, sync=msg._sync) + + xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), + payload=body) + + if _snd.pre_ack: + sst.write_cmd(xfr) + else: + sst.write_cmd(xfr, msg_acked, sync=msg._sync) + log.debug("SENT[%s]: %s", sst.session.log_id, msg) + if _snd.pre_ack: + msg_acked() + def do_message_transfer(self, xfr): sst = self.get_sst(xfr) ssn = sst.session diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index 3133fe73cd..dce8d9b5ff 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -685,6 +685,33 @@ class ReceiverTests(Base): # XXX: need testUnsettled() + def unreliabilityTest(self, mode="unreliable"): + msgs = [self.message("testUnreliable", i) for i in range(3)] + snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}") + rcv = self.ssn.receiver(snd.target) + for m in msgs: + snd.send(m) + + # close without ack on reliable receiver, messages should be requeued + ssn = self.conn.session() + rrcv = ssn.receiver("test-unreliability-queue") + self.drain(rrcv, expected=msgs) + ssn.close() + + # close without ack on unreliable receiver, messages should not be requeued + ssn = self.conn.session() + urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode) + self.drain(urcv, expected=msgs, redelivered=True) + ssn.close() + + self.assertEmpty(rcv) + + def testUnreliable(self): + self.unreliabilityTest(mode="unreliable") + + def testAtMostOnce(self): + self.unreliabilityTest(mode="at-most-once") + class AddressTests(Base): def setup_connection(self): |
