From 56e72efee5eeefa3d73df1f9fbd77058d017a8ee Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 16 Apr 2008 09:40:42 +0000 Subject: QPID-926 : Perform all store operations associated with an acknowledge in a single store transaction git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648648 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/txn/NonTransactionalContext.java | 73 ++++++++++++---------- 1 file changed, 39 insertions(+), 34 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 1e4b69c935..cac3489f4c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -49,8 +49,6 @@ public class NonTransactionalContext implements TransactionalContext /** Where to put undeliverable messages */ private final List _returnMessages; - private final Set _browsedAcks; - private final MessageStore _messageStore; private final StoreContext _storeContext; @@ -60,12 +58,18 @@ public class NonTransactionalContext implements TransactionalContext public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, List returnMessages, Set browsedAcks) + { + this(messageStore,storeContext,channel,returnMessages); + } + + public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, + List returnMessages) { _channel = channel; _storeContext = storeContext; _returnMessages = returnMessages; _messageStore = messageStore; - _browsedAcks = browsedAcks; + } @@ -112,6 +116,9 @@ public class NonTransactionalContext implements TransactionalContext boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException { + + final boolean debug = _log.isDebugEnabled(); + if (multiple) { if (deliveryTag == 0) @@ -125,20 +132,17 @@ public class NonTransactionalContext implements TransactionalContext { public boolean callback(UnacknowledgedMessage message) throws AMQException { - if (!_browsedAcks.contains(deliveryTag)) + if (debug) { - if (_log.isDebugEnabled()) - { - _log.debug("Discarding message: " + message.getMessage().getMessageId()); - } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - message.discard(_storeContext); + _log.debug("Discarding message: " + message.getMessage().getMessageId()); } - else + if(message.getMessage().isPersistent()) { - _browsedAcks.remove(deliveryTag); + beginTranIfNecessary(); } + //Message has been ack so discard it. This will dequeue and decrement the reference. + message.discard(_storeContext); + return false; } @@ -159,20 +163,17 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.drainTo(acked, deliveryTag); for (UnacknowledgedMessage msg : acked) { - if (!_browsedAcks.contains(deliveryTag)) + if (debug) { - if (_log.isDebugEnabled()) - { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); - } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } - else + if(msg.getMessage().isPersistent()) { - _browsedAcks.remove(deliveryTag); + beginTranIfNecessary(); } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + msg.discard(_storeContext); } } } @@ -189,27 +190,31 @@ public class NonTransactionalContext implements TransactionalContext _channel.getChannelId()); } - if (!_browsedAcks.contains(deliveryTag)) + if (debug) { - if (_log.isDebugEnabled()) - { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); - } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } - else + if(msg.getMessage().isPersistent()) { - _browsedAcks.remove(deliveryTag); + beginTranIfNecessary(); } - if (_log.isDebugEnabled()) + //Message has been ack so discard it. This will dequeue and decrement the reference. + msg.discard(_storeContext); + + if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + msg.getMessage().getMessageId()); } } + + if(_inTran) + { + _messageStore.commitTran(_storeContext); + _inTran = false; + } + } public void messageFullyReceived(boolean persistent) throws AMQException -- cgit v1.2.1