summaryrefslogtreecommitdiff
path: root/python/qpid/messaging
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-03-11 00:03:25 +0000
committerRafael H. Schloming <rhs@apache.org>2010-03-11 00:03:25 +0000
commit88086e0099c0fb67ac3a01c5f8793c0634b946a0 (patch)
tree7fab04466df2bb9e33e9e83ccc3286a420f0ee0d /python/qpid/messaging
parent195193dab20a2e7481e470ddc8226cff9102e1fb (diff)
downloadqpid-python-88086e0099c0fb67ac3a01c5f8793c0634b946a0.tar.gz
added support for reject/release
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r--python/qpid/messaging/constants.py12
-rw-r--r--python/qpid/messaging/driver.py70
-rw-r--r--python/qpid/messaging/endpoints.py3
-rw-r--r--python/qpid/messaging/message.py15
4 files changed, 78 insertions, 22 deletions
diff --git a/python/qpid/messaging/constants.py b/python/qpid/messaging/constants.py
index cad47bd52a..f230c4def8 100644
--- a/python/qpid/messaging/constants.py
+++ b/python/qpid/messaging/constants.py
@@ -17,11 +17,16 @@
# under the License.
#
+__SELF__ = object()
+
class Constant:
- def __init__(self, name, value=None):
+ def __init__(self, name, value=__SELF__):
self.name = name
- self.value = value
+ if value is __SELF__:
+ self.value = self
+ else:
+ self.value = value
def __repr__(self):
return self.name
@@ -30,3 +35,6 @@ AMQP_PORT = 5672
AMQPS_PORT = 5671
UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+REJECTED = Constant("REJECTED")
+RELEASED = Constant("RELEASED")
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index d0f5b746f3..383845f214 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -18,7 +18,7 @@
#
import socket, struct, sys, time
-from logging import getLogger
+from logging import getLogger, DEBUG
from qpid import compat
from qpid import sasl
from qpid.concurrency import synchronized
@@ -27,9 +27,9 @@ from qpid.exceptions import Timeout, VersionError
from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
FrameDecoder, SegmentDecoder, OpDecoder
from qpid.messaging import address
-from qpid.messaging.constants import UNLIMITED
+from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
from qpid.messaging.exceptions import ConnectError
-from qpid.messaging.message import get_codec, Message
+from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
from qpid.util import connect
@@ -435,6 +435,8 @@ class Driver:
self._host = (self._host + 1) % len(self._hosts)
self.close_engine(e)
+DEFAULT_DISPOSITION = Disposition(None)
+
class Engine:
def __init__(self, connection):
@@ -915,19 +917,49 @@ class Engine:
if ssn.acked:
messages = [m for m in ssn.acked if m not in sst.acked]
if messages:
- # XXX: we're ignoring acks that get lost when disconnected,
- # could we deal this via some message-id based purge?
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ ids = RangedSet()
+
+ disposed = [(DEFAULT_DISPOSITION, [])]
+ for m in messages:
+ # XXX: we're ignoring acks that get lost when disconnected,
+ # could we deal this via some message-id based purge?
+ if m._transfer_id is None:
+ continue
+ ids.add(m._transfer_id)
+ disp = m._disposition or DEFAULT_DISPOSITION
+ last, msgs = disposed[-1]
+ if disp.type is last.type and disp.options == last.options:
+ msgs.append(m)
+ else:
+ disposed.append((disp, [m]))
+
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
- def ack_ack():
- for m in messages:
- ssn.acked.remove(m)
- if not ssn.transactional:
- sst.acked.remove(m)
- sst.write_cmd(MessageAccept(ids), ack_ack)
- log.debug("SACK[%s]: %s", ssn.log_id, m)
+
+ def ack_acker(msgs):
+ def ack_ack():
+ for m in msgs:
+ ssn.acked.remove(m)
+ if not ssn.transactional:
+ sst.acked.remove(m)
+ return ack_ack
+
+ for disp, msgs in disposed:
+ if not msgs: continue
+ if disp.type is None:
+ op = MessageAccept
+ elif disp.type is RELEASED:
+ op = MessageRelease
+ elif disp.type is REJECTED:
+ op = MessageReject
+ sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
+ **disp.options),
+ ack_acker(msgs))
+ if log.isEnabledFor(DEBUG):
+ for m in msgs:
+ log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
+
sst.acked.extend(messages)
if ssn.committing and not sst.committing:
@@ -948,7 +980,7 @@ class Engine:
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
- sst.write_cmd(MessageRelease(ids))
+ sst.write_cmd(MessageRelease(ids, True))
sst.write_cmd(TxRollback(), do_rb_ok)
def do_rb_ok():
@@ -1055,8 +1087,11 @@ class Engine:
if mp.application_headers is None:
mp.application_headers = {}
mp.application_headers[TO] = msg.to
- if msg.durable:
- dp.delivery_mode = delivery_mode.persistent
+ if msg.durable is not None:
+ if msg.durable:
+ dp.delivery_mode = delivery_mode.persistent
+ else:
+ dp.delivery_mode = delivery_mode.non_persistent
if msg.priority is not None:
dp.priority = msg.priority
if msg.ttl is not None:
@@ -1109,7 +1144,8 @@ class Engine:
if mp.reply_to is not None:
msg.reply_to = reply_to2addr(mp.reply_to)
msg.correlation_id = mp.correlation_id
- msg.durable = dp.delivery_mode == delivery_mode.persistent
+ if dp.delivery_mode is not None:
+ msg.durable = dp.delivery_mode == delivery_mode.persistent
msg.priority = dp.priority
msg.ttl = dp.ttl
msg.redelivered = dp.redelivered
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 004cee5f88..53df51dfd8 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -508,7 +508,7 @@ class Session:
raise Empty
@synchronized
- def acknowledge(self, message=None, sync=True):
+ def acknowledge(self, message=None, disposition=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
unacknowledged messages on the session are acknowledged.
@@ -530,6 +530,7 @@ class Session:
raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
self._wakeup()
self._ewait(lambda: len(self.acked) < self.ack_capacity)
+ m._disposition = disposition
self.unacked.remove(m)
self.acked.append(m)
diff --git a/python/qpid/messaging/message.py b/python/qpid/messaging/message.py
index 46494e428e..a9660b05b1 100644
--- a/python/qpid/messaging/message.py
+++ b/python/qpid/messaging/message.py
@@ -129,7 +129,7 @@ class Message:
"correlation_id", "priority", "ttl"]:
value = self.__dict__[name]
if value is not None: args.append("%s=%r" % (name, value))
- for name in ["durable", "properties"]:
+ for name in ["durable", "redelivered", "properties"]:
value = self.__dict__[name]
if value: args.append("%s=%r" % (name, value))
if self.content_type != get_type(self.content):
@@ -141,4 +141,15 @@ class Message:
args.append(repr(self.content))
return "Message(%s)" % ", ".join(args)
-__all__ = ["Message"]
+class Disposition:
+
+ def __init__(self, type, **options):
+ self.type = type
+ self.options = options
+
+ def __repr__(self):
+ args = [str(self.type)] + \
+ ["%s=%r" % (k, v) for k, v in self.options.items()]
+ return "Disposition(%s)" % ", ".join(args)
+
+__all__ = ["Message", "Disposition"]