From 1af47c409e67c090b016463798bd4bfea8d0653d Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 8 Aug 2008 12:19:41 +0000 Subject: QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. Added a new InternalBrokerBaseCase for performing testing on the broker without using the client libraries. This allows for testing closer to AMQP. Merged from M2.1.x git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683949 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/ack/UnacknowledgedMessageMap.java | 5 +++-- .../server/ack/UnacknowledgedMessageMapImpl.java | 9 +++++++-- .../qpid/server/txn/NonTransactionalContext.java | 22 +++++----------------- 3 files changed, 15 insertions(+), 21 deletions(-) (limited to 'java/broker/src/main') diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index 8e5b631f96..c80a96f967 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.StoreContext; public interface UnacknowledgedMessageMap { @@ -55,8 +56,8 @@ public interface UnacknowledgedMessageMap QueueEntry remove(long deliveryTag); - void drainTo(Collection destination, long deliveryTag) throws AMQException; - + public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException; + Collection cancelAllMessages(); void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 79208ab426..ef48b60bcd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.ack; +import org.apache.qpid.server.store.StoreContext; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; @@ -160,7 +161,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public void drainTo(Collection destination, long deliveryTag) throws AMQException + public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException + { synchronized (_lock) { @@ -175,6 +177,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + unacked.getValue().discard(storeContext); + it.remove(); _unackedSize -= unacked.getValue().getMessage().getSize(); @@ -182,7 +188,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap unacked.getValue().restoreCredit(); - destination.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) { break; diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 18f1836185..03d59d3ab9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -154,28 +154,13 @@ public class NonTransactionalContext implements TransactionalContext throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); } - LinkedList acked = new LinkedList(); - unacknowledgedMessageMap.drainTo(acked, deliveryTag); - for (QueueEntry msg : acked) - { - if (debug) - { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); - } - if(msg.getMessage().isPersistent()) - { - beginTranIfNecessary(); - } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); - } + unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext); } } else { QueueEntry msg; - msg = unacknowledgedMessageMap.remove(deliveryTag); + msg = unacknowledgedMessageMap.get(deliveryTag); if (msg == null) { @@ -197,6 +182,9 @@ public class NonTransactionalContext implements TransactionalContext //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(_storeContext); + unacknowledgedMessageMap.remove(deliveryTag); + + if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + -- cgit v1.2.1