diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-03-11 00:03:25 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-03-11 00:03:25 +0000 |
| commit | 88086e0099c0fb67ac3a01c5f8793c0634b946a0 (patch) | |
| tree | 7fab04466df2bb9e33e9e83ccc3286a420f0ee0d /python/qpid/messaging | |
| parent | 195193dab20a2e7481e470ddc8226cff9102e1fb (diff) | |
| download | qpid-python-88086e0099c0fb67ac3a01c5f8793c0634b946a0.tar.gz | |
added support for reject/release
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
| -rw-r--r-- | python/qpid/messaging/constants.py | 12 | ||||
| -rw-r--r-- | python/qpid/messaging/driver.py | 70 | ||||
| -rw-r--r-- | python/qpid/messaging/endpoints.py | 3 | ||||
| -rw-r--r-- | python/qpid/messaging/message.py | 15 |
4 files changed, 78 insertions, 22 deletions
diff --git a/python/qpid/messaging/constants.py b/python/qpid/messaging/constants.py index cad47bd52a..f230c4def8 100644 --- a/python/qpid/messaging/constants.py +++ b/python/qpid/messaging/constants.py @@ -17,11 +17,16 @@ # under the License. # +__SELF__ = object() + class Constant: - def __init__(self, name, value=None): + def __init__(self, name, value=__SELF__): self.name = name - self.value = value + if value is __SELF__: + self.value = self + else: + self.value = value def __repr__(self): return self.name @@ -30,3 +35,6 @@ AMQP_PORT = 5672 AMQPS_PORT = 5671 UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + +REJECTED = Constant("REJECTED") +RELEASED = Constant("RELEASED") diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index d0f5b746f3..383845f214 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -18,7 +18,7 @@ # import socket, struct, sys, time -from logging import getLogger +from logging import getLogger, DEBUG from qpid import compat from qpid import sasl from qpid.concurrency import synchronized @@ -27,9 +27,9 @@ from qpid.exceptions import Timeout, VersionError from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder from qpid.messaging import address -from qpid.messaging.constants import UNLIMITED +from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED from qpid.messaging.exceptions import ConnectError -from qpid.messaging.message import get_codec, Message +from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector from qpid.util import connect @@ -435,6 +435,8 @@ class Driver: self._host = (self._host + 1) % len(self._hosts) self.close_engine(e) +DEFAULT_DISPOSITION = Disposition(None) + class Engine: def __init__(self, connection): @@ -915,19 +917,49 @@ class Engine: if ssn.acked: messages = [m for m in ssn.acked if m not in sst.acked] if messages: - # XXX: we're ignoring acks that get lost when disconnected, - # could we deal this via some message-id based purge? - ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + ids = RangedSet() + + disposed = [(DEFAULT_DISPOSITION, [])] + for m in messages: + # XXX: we're ignoring acks that get lost when disconnected, + # could we deal this via some message-id based purge? + if m._transfer_id is None: + continue + ids.add(m._transfer_id) + disp = m._disposition or DEFAULT_DISPOSITION + last, msgs = disposed[-1] + if disp.type is last.type and disp.options == last.options: + msgs.append(m) + else: + disposed.append((disp, [m])) + for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) - def ack_ack(): - for m in messages: - ssn.acked.remove(m) - if not ssn.transactional: - sst.acked.remove(m) - sst.write_cmd(MessageAccept(ids), ack_ack) - log.debug("SACK[%s]: %s", ssn.log_id, m) + + def ack_acker(msgs): + def ack_ack(): + for m in msgs: + ssn.acked.remove(m) + if not ssn.transactional: + sst.acked.remove(m) + return ack_ack + + for disp, msgs in disposed: + if not msgs: continue + if disp.type is None: + op = MessageAccept + elif disp.type is RELEASED: + op = MessageRelease + elif disp.type is REJECTED: + op = MessageReject + sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), + **disp.options), + ack_acker(msgs)) + if log.isEnabledFor(DEBUG): + for m in msgs: + log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) + sst.acked.extend(messages) if ssn.committing and not sst.committing: @@ -948,7 +980,7 @@ class Engine: for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) - sst.write_cmd(MessageRelease(ids)) + sst.write_cmd(MessageRelease(ids, True)) sst.write_cmd(TxRollback(), do_rb_ok) def do_rb_ok(): @@ -1055,8 +1087,11 @@ class Engine: if mp.application_headers is None: mp.application_headers = {} mp.application_headers[TO] = msg.to - if msg.durable: - dp.delivery_mode = delivery_mode.persistent + if msg.durable is not None: + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + else: + dp.delivery_mode = delivery_mode.non_persistent if msg.priority is not None: dp.priority = msg.priority if msg.ttl is not None: @@ -1109,7 +1144,8 @@ class Engine: if mp.reply_to is not None: msg.reply_to = reply_to2addr(mp.reply_to) msg.correlation_id = mp.correlation_id - msg.durable = dp.delivery_mode == delivery_mode.persistent + if dp.delivery_mode is not None: + msg.durable = dp.delivery_mode == delivery_mode.persistent msg.priority = dp.priority msg.ttl = dp.ttl msg.redelivered = dp.redelivered diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 004cee5f88..53df51dfd8 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -508,7 +508,7 @@ class Session: raise Empty @synchronized - def acknowledge(self, message=None, sync=True): + def acknowledge(self, message=None, disposition=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all unacknowledged messages on the session are acknowledged. @@ -530,6 +530,7 @@ class Session: raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) self._wakeup() self._ewait(lambda: len(self.acked) < self.ack_capacity) + m._disposition = disposition self.unacked.remove(m) self.acked.append(m) diff --git a/python/qpid/messaging/message.py b/python/qpid/messaging/message.py index 46494e428e..a9660b05b1 100644 --- a/python/qpid/messaging/message.py +++ b/python/qpid/messaging/message.py @@ -129,7 +129,7 @@ class Message: "correlation_id", "priority", "ttl"]: value = self.__dict__[name] if value is not None: args.append("%s=%r" % (name, value)) - for name in ["durable", "properties"]: + for name in ["durable", "redelivered", "properties"]: value = self.__dict__[name] if value: args.append("%s=%r" % (name, value)) if self.content_type != get_type(self.content): @@ -141,4 +141,15 @@ class Message: args.append(repr(self.content)) return "Message(%s)" % ", ".join(args) -__all__ = ["Message"] +class Disposition: + + def __init__(self, type, **options): + self.type = type + self.options = options + + def __repr__(self): + args = [str(self.type)] + \ + ["%s=%r" % (k, v) for k, v in self.options.items()] + return "Disposition(%s)" % ", ".join(args) + +__all__ = ["Message", "Disposition"] |
